DAG with Bash


Remember from AA Intro that An Apache Airflow DAG is a Python script consisting of the following logical blocks;

  1. Library imports
  2. DAG arguments
  3. DAG definition
  4. Task definitions
  5. Task pipeline

Bash Simple - 1


Case Study

We will create a simple pipeline using BashOperator

  • called simple_example_DAG.py that
  • prints a greeting and then
  • prints the current date and time
  • we will also schedule it to repeat the process every 5 seconds

Solution

Let’s go through the list of logical blocks one by one:

Import

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
  • Begin by importing the Python libraries you will need for your DAG here:
  • the DAG class from the airflow models library.
  • import the bash operator, which you will use to create
  • the two print tasks, and the datetime, and timedelta modules from the datetime package, which you will need for specifying several time-related parameters.

Arguments

default_args = {
        'owner':'name_of_the_owner',
        'start_date': datetime(2024, 1, 1),
        'retries':1,
        'retry_delay': timedelta(minutes=5),
}
  • The next block is for specifying the default DAG arguments.
  • Notice they are specified as a Python dictionary, which is just a collection of key-value pairs enclosed by curly braces. These are used to specify such things as
  • the owner of the DAG, which is you, and
  • its start date, in this case January 1, 2024
  • the number of times it should keep trying if it is failing. Here, only once if it does fail,
  • and the retry_delay, or the time to wait between subsequent tries, which in this case is five minutes.

DAG Definition

dag = DAG('simple_example',
          description = 'A simple example DAG',
          default_args = default_args,
          schedule = timedelta(seconds=5)
          )
  • The DAG definition block is used for instantiating your workflow as a DAG object.
  • The name of your DAG, such as simple_example,
  • a description for your workflow, for example, a simple example DAG,
  • the default arguments to apply to your DAG, which in this case are specified by the default_args dict you already defined in the previous block, and finally
  • scheduling instructions.
    • The DAG will run repeatedly on a schedule of every 5 seconds once it is deployed.

Task Definition

task1 = BashOperator(
        task_id = 'print_hello',
        bash_command='echo \'Greetings, the date and time are \'',
        dag = dag,
)
task2 = BashOperator(
        task_id = 'print_date',
        bash_command='date',
        dag = dag,
)
  • Here we define two tasks: task1 and task2, both of which are bash operators.
  • Their respective ids are specified as print_hello and print_date.
  • They each call a bash command, where the first task will echo “Greetings, the date and time are”, and
  • The second task will print the current date and time using the bash command date.
  • Finally, each task is assigned to the DAG you instantiated in the DAG definition block above.

Dependencies

task1 >> task2
  • The final block is where you specify the dependencies for your workflow.
  • Here, the double greater than notation specifies that task2 is downstream from task1.
  • This means that task1, which we named print hello, will run first.
  • Once print hello runs successfully, task2 or print date will run.

Bash ETL - 3


Case Study

Let’s create a DAG that

  • runs daily
  • extracts user information from /etc/passwd file
  • transforms
  • loads it into a file

This DAG will have two tasks

  • extract that extracts fields from /etc/passwd file
  • transform_and_load that transforms and loads data into a file.

Solution

Create DAG

  1. Open AA Wevserver UI
  2. Create a new file by choosing File->New File and naming it my_second_dag.py.
  3. Insert thiscode in it
# ___  IMPORT libraries
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow.models import DAG
# Operators; you need this to write tasks!
from airflow.operators.bash_operator import BashOperator
# This makes scheduling easy
from airflow.utils.dates import days_ago


# ___  ARGUMENTS
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'your_name_here',
    'start_date': days_ago(0),
    'email': ['your_email_here'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}


# ___  DAG DEFINITION
dag = DAG(
    'my-second-dag',
    default_args=default_args,
    description='My first DAG',
    schedule_interval=timedelta(days=1),
)

# ___ TASK DEFINITION : EXTRACT
extract = BashOperator(
    task_id='extract',
    bash_command='cut -d":" -f1,3,6 /etc/passwd > /home/project/airflow/dags/extracted-data.txt',
    dag=dag,
)

# ___ TASK DEFINITION : TRANSFORM
transform_and_load = BashOperator(
    task_id='transform',
    bash_command='tr ":" "," < /home/project/airflow/dags/extracted-data.txt >
    /home/project/airflow/dags/transformed-data.csv',
    dag=dag,
)

# ___ PIPELINE
extract >> transform_and_load

Submit DAG

  • Submitting a DAG is as simple as copying the DAG Python file into the dags folder in the AIRFLOW_HOME directory.
  • Airflow searches for Python source files within the specified DAGS_FOLDER. The location of DAGS_FOLDER can be located in the airflow.cfg file, where it has been configured as /home/project/airflow/dags.

Set AIRFLOW_HOME

# ___ EXPORT sets a temporary environment var
export AIRFLOW_HOME=/home/project/airflow
echo $AIRFLOW_HOME

Submit DAG

# ___ SUBMIT DAG file into the directory
export AIRFLOW_HOME=/home/project/airflow
 cp my_second_dag.py $AIRFLOW_HOME/dags

Verify DAG

# ___ VERIFY the DAG was submitted
airflow dags list | grep "my-second-dag"

# ___ OUTPUT
my-second-dag   | /home/project/airflow/dags/my_second_dag.py  
                | your_name_here | True     

List Tasks

# ___ LIST All Tasks
airflow tasks list my-second-dag

# ___ OUTPUT
extract
transform

Bash Simple - 4


Case Study

Let’s create a dummy DAG with three tasks.

  • Task1 does nothing but sleep for 1 second.
  • Task2 sleeps for 2 seconds.
  • Task3 sleeps for 3 seconds.
  • This DAG is scheduled to run every 1 minute.

Solution

  • Using Menu->File->New File create a new file named dummy_dag.py.
# ___  IMPORT libraries
from datetime import timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to write tasks!
from airflow.operators.bash_operator import BashOperator

# This makes scheduling easy
from airflow.utils.dates import days_ago


# ___  ARGUMENTS
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'Your name',
    'start_date': days_ago(0),
    'email': ['your email'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# ___  DAG DEFINITION
dag = DAG(
    'dummy_dag',
    default_args=default_args,
    description='My first DAG',
    schedule_interval=timedelta(minutes=1),
)

# ___ TASK DEFINITION : TASK1
task1 = BashOperator(
    task_id='task1',
    bash_command='sleep 1',
    dag=dag,
)

# ___ TASK DEFINITION : TASK2
task2 = BashOperator(
    task_id='task2',
    bash_command='sleep 2',
    dag=dag,
)

# ___ TASK DEFINITION : TASK3
task3 = BashOperator(
    task_id='task3',
    bash_command='sleep 3',
    dag=dag,
)

# ___ PIPELINE
task1 >> task2 >> task3

Set AIRFLOW_HOME

# ___ EXPORT sets a temporary environment var
export AIRFLOW_HOME=/home/project/airflow

Submit DAG

# ___ SUBMIT DAG file into the directory
export AIRFLOW_HOME=/home/project/airflow
cp dummy_dag.py $AIRFLOW_HOME/dags

Verify DAG

# ___ VERIFY the DAG was submitted
airflow dags list | grep dummy_dag

# ___ OUTPUT
dummy_dag               | /home/project/airflow/dags/dummy_dag.py 
                        | Your name      | True 

List Tasks

# ___ LIST All Tasks
airflow tasks list dummy_dag

# ___ OUTPUT
task1
task2
task3