from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta
DAG with Bash
Remember from AA Intro that An Apache Airflow DAG is a Python script consisting of the following logical blocks;
- Library imports
- DAG arguments
- DAG definition
- Task definitions
- Task pipeline
Bash Simple - 1
Case Study
We will create a simple pipeline using BashOperator
- called simple_example_DAG.py that
- prints a greeting and then
- prints the current date and time
- we will also schedule it to repeat the process every 5 seconds
Solution
Let’s go through the list of logical blocks one by one:
Import
- Begin by importing the Python libraries you will need for your DAG here:
- the DAG class from the airflow models library.
- import the bash operator, which you will use to create
- the two print tasks, and the datetime, and timedelta modules from the datetime package, which you will need for specifying several time-related parameters.
Arguments
= {
default_args 'owner':'name_of_the_owner',
'start_date': datetime(2024, 1, 1),
'retries':1,
'retry_delay': timedelta(minutes=5),
}
- The next block is for specifying the default DAG arguments.
- Notice they are specified as a Python dictionary, which is just a collection of key-value pairs enclosed by curly braces. These are used to specify such things as
- the owner of the DAG, which is you, and
- its start date, in this case January 1, 2024
- the number of times it should keep trying if it is failing. Here, only once if it does fail,
- and the retry_delay, or the time to wait between subsequent tries, which in this case is five minutes.
DAG Definition
= DAG('simple_example',
dag description = 'A simple example DAG',
default_args = default_args,
schedule = timedelta(seconds=5)
)
- The DAG definition block is used for instantiating your workflow as a DAG object.
- The name of your DAG, such as simple_example,
- a description for your workflow, for example, a simple example DAG,
- the default arguments to apply to your DAG, which in this case are specified by the default_args dict you already defined in the previous block, and finally
- scheduling instructions.
- The DAG will run repeatedly on a schedule of every 5 seconds once it is deployed.
Task Definition
= BashOperator(
task1 task_id = 'print_hello',
bash_command='echo \'Greetings, the date and time are \'',
dag = dag,
)= BashOperator(
task2 task_id = 'print_date',
bash_command='date',
dag = dag,
)
- Here we define two tasks: task1 and task2, both of which are bash operators.
- Their respective ids are specified as print_hello and print_date.
- They each call a bash command, where the first task will echo “Greetings, the date and time are”, and
- The second task will print the current date and time using the bash command date.
- Finally, each task is assigned to the DAG you instantiated in the DAG definition block above.
Dependencies
>> task2 task1
- The final block is where you specify the dependencies for your workflow.
- Here, the double greater than notation specifies that task2 is downstream from task1.
- This means that task1, which we named print hello, will run first.
- Once print hello runs successfully, task2 or print date will run.
Bash ETL - 3
Case Study
Let’s create a DAG that
- runs daily
extracts
user information from /etc/passwd filetransforms
loads
it into a file
This DAG will have two tasks
extract
that extracts fields from/etc/passwd
filetransform_and_load
that transforms and loads data into a file.
Solution
Create DAG
- Open AA Wevserver UI
- Create a new file by choosing File->New File and naming it
my_second_dag.py
. - Insert thiscode in it
# ___ IMPORT libraries
from datetime import timedelta# The DAG object; we'll need this to instantiate a DAG
from airflow.models import DAG# Operators; you need this to write tasks!
from airflow.operators.bash_operator import BashOperator# This makes scheduling easy
from airflow.utils.dates import days_ago
# ___ ARGUMENTS
# You can override them on a per-task basis during operator initialization
= {
default_args 'owner': 'your_name_here',
'start_date': days_ago(0),
'email': ['your_email_here'],
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# ___ DAG DEFINITION
= DAG(
dag 'my-second-dag',
default_args=default_args,
description='My first DAG',
schedule_interval=timedelta(days=1),
)
# ___ TASK DEFINITION : EXTRACT
= BashOperator(
extract task_id='extract',
bash_command='cut -d":" -f1,3,6 /etc/passwd > /home/project/airflow/dags/extracted-data.txt',
dag=dag,
)
# ___ TASK DEFINITION : TRANSFORM
= BashOperator(
transform_and_load task_id='transform',
bash_command='tr ":" "," < /home/project/airflow/dags/extracted-data.txt >
/home/project/airflow/dags/transformed-data.csv',
dag=dag,
)
# ___ PIPELINE
>> transform_and_load extract
Submit DAG
- Submitting a DAG is as simple as copying the DAG Python file into the
dags
folder in theAIRFLOW_HOME
directory. - Airflow searches for Python source files within the specified
DAGS_FOLDER
. The location ofDAGS_FOLDER
can be located in the airflow.cfg file, where it has been configured as/home/project/airflow/dags
.
Set AIRFLOW_HOME
# ___ EXPORT sets a temporary environment var
=/home/project/airflow
export AIRFLOW_HOME$AIRFLOW_HOME echo
Submit DAG
# ___ SUBMIT DAG file into the directory
=/home/project/airflow
export AIRFLOW_HOME$AIRFLOW_HOME/dags cp my_second_dag.py
Verify DAG
# ___ VERIFY the DAG was submitted
| grep "my-second-dag"
airflow dags list
# ___ OUTPUT
-second-dag | /home/project/airflow/dags/my_second_dag.py
my| your_name_here | True
List Tasks
# ___ LIST All Tasks
-second-dag
airflow tasks list my
# ___ OUTPUT
extract transform
Bash Simple - 4
Case Study
Let’s create a dummy DAG with three tasks.
- Task1 does nothing but sleep for 1 second.
- Task2 sleeps for 2 seconds.
- Task3 sleeps for 3 seconds.
- This DAG is scheduled to run every 1 minute.
Solution
- Using Menu->
File
->New File
create a new file nameddummy_dag.py
.
# ___ IMPORT libraries
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to write tasks!
from airflow.operators.bash_operator import BashOperator
# This makes scheduling easy
from airflow.utils.dates import days_ago
# ___ ARGUMENTS
# You can override them on a per-task basis during operator initialization
= {
default_args 'owner': 'Your name',
'start_date': days_ago(0),
'email': ['your email'],
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# ___ DAG DEFINITION
= DAG(
dag 'dummy_dag',
default_args=default_args,
description='My first DAG',
schedule_interval=timedelta(minutes=1),
)
# ___ TASK DEFINITION : TASK1
= BashOperator(
task1 task_id='task1',
bash_command='sleep 1',
dag=dag,
)
# ___ TASK DEFINITION : TASK2
= BashOperator(
task2 task_id='task2',
bash_command='sleep 2',
dag=dag,
)
# ___ TASK DEFINITION : TASK3
= BashOperator(
task3 task_id='task3',
bash_command='sleep 3',
dag=dag,
)
# ___ PIPELINE
>> task2 >> task3 task1
Set AIRFLOW_HOME
# ___ EXPORT sets a temporary environment var
=/home/project/airflow export AIRFLOW_HOME
Submit DAG
# ___ SUBMIT DAG file into the directory
=/home/project/airflow
export AIRFLOW_HOME$AIRFLOW_HOME/dags cp dummy_dag.py
Verify DAG
# ___ VERIFY the DAG was submitted
| grep dummy_dag
airflow dags list
# ___ OUTPUT
| /home/project/airflow/dags/dummy_dag.py
dummy_dag | Your name | True
List Tasks
# ___ LIST All Tasks
airflow tasks list dummy_dag
# ___ OUTPUT
task1
task2 task3