DAG with Python


This document will describe a few examples of DAGs created using PythonOperator.

Remember from AA Intro that An Apache Airflow DAG is a Python script consisting of the following logical blocks;

  1. Library imports
  2. DAG arguments
  3. DAG definition
  4. Task definitions
  5. Task pipeline

Python ETL - 2


Case Study

We  will create a DAG, which will define a pipeline of tasks, such as

  • extracttransformload, and check
  • using PythonOperator
  • run the DAG daily

Solution

Create DAG

  • Create a DAG file my_first_dag.py, which will run daily.
  • The my_first_dag.py file defines tasks execute_extract, execute_transform, execute_load, and execute_check to call the respective Python functions.
  • Here are the steps coded in my_first_dag.py below
    • Import libraries
    • Define functions to be used by Tasks
    • Arguments
    • DAG definition
    • Task definition
    • Pipeline dependencies
# ___ IMPORT the libraries
from datetime import timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow.models import DAG

# Operators; you need this to write tasks!
from airflow.operators.python import PythonOperator

# This makes scheduling easy
from airflow.utils.dates import days_ago

#  Define the path for the input and output files
input_file = '/etc/passwd'
extracted_file = 'extracted-data.txt'
transformed_file = 'transformed.txt'
output_file = 'data_for_analytics.csv'

# ___  CREATE functions
def extract():
    global input_file
    print("Inside Extract")
    # Read the contents of the file into a string
    with open(input_file, 'r') as infile, \
            open(extracted_file, 'w') as outfile:
        for line in infile:
            fields = line.split(':')
            if len(fields) >= 6:
                field_1 = fields[0]
                field_3 = fields[2]
                field_6 = fields[5]
                outfile.write(field_1 + ":" + field_3 + ":" + field_6 + "\n")


def transform():
    global extracted_file, transformed_file
    print("Inside Transform")
    with open(extracted_file, 'r') as infile, \
            open(transformed_file, 'w') as outfile:
        for line in infile:
            processed_line = line.replace(':', ',')
            outfile.write(processed_line + '\n')


def load():
    global transformed_file, output_file
    print("Inside Load")
    # Save the array to a CSV file
    with open(transformed_file, 'r') as infile, \
            open(output_file, 'w') as outfile:
        for line in infile:
            outfile.write(line + '\n')


def check():
    global output_file
    print("Inside Check")
    # Save the array to a CSV file
    with open(output_file, 'r') as infile:
        for line in infile:
            print(line)


# ___ ARGUMENTS: can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'Your name',
    'start_date': days_ago(0),
    'email': ['your email'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# ___ DAG DEFINITION
dag = DAG(
    'my-first-python-etl-dag',
    default_args=default_args,
    description='My first DAG',
    schedule_interval=timedelta(days=1),
)

# ___ TASK EXTRACT DEFINITION to call the `extract` function
execute_extract = PythonOperator(
    task_id='extract',
    python_callable=extract,
    dag=dag,
)

# ___ TASK TRANSFORM DEFINITION to call the `transform` function
execute_transform = PythonOperator(
    task_id='transform',
    python_callable=transform,
    dag=dag,
)

# ___ TASK LOAD DEFINITION to call the `load` function
execute_load = PythonOperator(
    task_id='load',
    python_callable=load,
    dag=dag,
)

# ___ TASK CHECK DEFINITION to call the `check` function
execute_check = PythonOperator(
    task_id='check',
    python_callable=check,
    dag=dag,
)

# ___ PIPELINE
execute_extract >> execute_transform >> execute_load >> execute_check

Submit DAG

Verify DAG

List Tasks

Submitting a DAG is as simple as copying the DAG Python file into the dags folder in the AIRFLOW_HOME directory.

  • Open a terminal and run the command below to set the AIRFLOW_HOME.
export AIRFLOW_HOME=/home/project/airflow
echo $AIRFLOW_HOME
  • Run the command below to submit the DAG file that was created above
# ____  cp copies 'file' SUBMIT DAG file into the directory
cp my_first_dag.py $AIRFLOW_HOME/dags
 
# ____ VERIFY the DAG was submitted
airflow dags list | grep my-first-python-etl-dag
 
# ___ LIST All Tasks in my-first-python-etl-dag
airflow tasks list my-first-python-etl-dag
 
# ___ OUTPUT
check
extract
load
thransform


# ___ CHECK for errors
airflow dags list-import-errors