# ___ IMPORT the libraries
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow.models import DAG
# Operators; you need this to write tasks!
from airflow.operators.python import PythonOperator
# This makes scheduling easy
from airflow.utils.dates import days_ago
# Define the path for the input and output files
= '/etc/passwd'
input_file = 'extracted-data.txt'
extracted_file = 'transformed.txt'
transformed_file = 'data_for_analytics.csv'
output_file
# ___ CREATE functions
extract():
def
global input_fileprint("Inside Extract")
# Read the contents of the file into a string
open(input_file, 'r') as infile, \
with open(extracted_file, 'w') as outfile:
for line in infile:
= line.split(':')
fields if len(fields) >= 6:
= fields[0]
field_1 = fields[2]
field_3 = fields[5]
field_6 outfile.write(field_1 + ":" + field_3 + ":" + field_6 + "\n")
transform():
def
global extracted_file, transformed_fileprint("Inside Transform")
open(extracted_file, 'r') as infile, \
with open(transformed_file, 'w') as outfile:
for line in infile:
= line.replace(':', ',')
processed_line outfile.write(processed_line + '\n')
load():
def
global transformed_file, output_fileprint("Inside Load")
# Save the array to a CSV file
open(transformed_file, 'r') as infile, \
with open(output_file, 'w') as outfile:
for line in infile:
outfile.write(line + '\n')
check():
def
global output_fileprint("Inside Check")
# Save the array to a CSV file
open(output_file, 'r') as infile:
with for line in infile:
print(line)
# ___ ARGUMENTS: can override them on a per-task basis during operator initialization
= {
default_args 'owner': 'Your name',
'start_date': days_ago(0),
'email': ['your email'],
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# ___ DAG DEFINITION
= DAG(
dag 'my-first-python-etl-dag',
default_args=default_args,
description='My first DAG',
schedule_interval=timedelta(days=1),
)
# ___ TASK EXTRACT DEFINITION to call the `extract` function
= PythonOperator(
execute_extract task_id='extract',
python_callable=extract,
dag=dag,
)
# ___ TASK TRANSFORM DEFINITION to call the `transform` function
= PythonOperator(
execute_transform task_id='transform',
python_callable=transform,
dag=dag,
)
# ___ TASK LOAD DEFINITION to call the `load` function
= PythonOperator(
execute_load task_id='load',
python_callable=load,
dag=dag,
)
# ___ TASK CHECK DEFINITION to call the `check` function
= PythonOperator(
execute_check task_id='check',
python_callable=check,
dag=dag,
)
# ___ PIPELINE
>> execute_transform >> execute_load >> execute_check execute_extract
DAG with Python
This document will describe a few examples of DAGs created using PythonOperator
.
Remember from AA Intro that An Apache Airflow DAG is a Python script consisting of the following logical blocks;
- Library imports
- DAG arguments
- DAG definition
- Task definitions
- Task pipeline
Python ETL - 2
Case Study
We will create a DAG, which will define a pipeline of tasks, such as
extract
,transform
,load
, andcheck
- using
PythonOperator
- run the DAG daily
Solution
Create DAG
- Create a
DAG
filemy_first_dag.py
, which will run daily. - The
my_first_dag.py
file defines tasksexecute_extract
,execute_transform
,execute_load
, andexecute_check
to call the respective Python functions. - Here are the steps coded in
my_first_dag.py
below- Import libraries
- Define functions to be used by Tasks
- Arguments
- DAG definition
- Task definition
- Pipeline dependencies
Submit DAG
Verify DAG
List Tasks
Submitting a DAG is as simple as copying the DAG Python file into the dags
folder in the AIRFLOW_HOME
directory.
- Open a terminal and run the command below to set the
AIRFLOW_HOME
.
=/home/project/airflow
export AIRFLOW_HOME$AIRFLOW_HOME echo
- Run the command below to submit the DAG file that was created above
# ____ cp copies 'file' SUBMIT DAG file into the directory
$AIRFLOW_HOME/dags
cp my_first_dag.py
# ____ VERIFY the DAG was submitted
| grep my-first-python-etl-dag
airflow dags list
# ___ LIST All Tasks in my-first-python-etl-dag
-first-python-etl-dag
airflow tasks list my
# ___ OUTPUT
check
extract
load
thransform
# ___ CHECK for errors
-import-errors airflow dags list