# ___ IMPORT libraries
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow.models import DAG
# Operators; you need this to write tasks!
from airflow.operators.bash_operator import BashOperator
# This makes scheduling easy
from airflow.utils.dates import days_ago
# ___ ARGUMENTS
# You can override them on a per-task basis during operator initialization
= {
default_args 'owner': 'your_name',
'start_date': days_ago(0),
'email': ['your email'],
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# ___ DAG DEFINITION
= DAG(
dag 'Bash_ETL_Server_Log_Processing',
default_args=default_args,
description='Bash DAG',
schedule_interval=timedelta(days=1),
)
# ___ TASK DEFINITION : DOWNLOAD
= BashOperator(
download task_id='download',
bash_command='curl "https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-
SkillsNetwork/labs/Apache%20Airflow/Build%20a%20DAG%20using%20Airflow/
web-server-access-log.txt" -o web-server-access-log.txt',
dag=dag,
)
# ___ TASK DEFINITION : EXTRACT
= BashOperator(
extract task_id='extract',
bash_command='cut -f1,4 -d"#" web-server-access-log.txt > /home/project/airflow/dags/extracted.txt',
dag=dag,
)
# ___ TASK DEFINITION : TRANSFORM
= BashOperator(
transform task_id='transform',
bash_command='tr "[a-z]" "[A-Z]" < /home/project/airflow/dags/extracted.txt >
/home/project/airflow/dags/capitalized.txt',
dag=dag,
)
# ___ TASK DEFINITION : LOAD
= BashOperator(
load task_id='load',
bash_command='zip log.zip capitalized.txt' ,
dag=dag,
)
# ___ PIPELINE
>> extract >> transform >> load download
DAG ETL Server Data
Note: this is the same project as DAG Python ETL Server Data that was done using PythonOperator
Case Study - Bash
Note: this is the same project as DAG Python ETL Case 1 that was done using BashOperator
Write a DAG named Bash_ETL_Server_Log_Processing.py
download
a file from a remote server at this URLextract
the filetransform
the contentload
the transformed data into a fileverify/check
DAG succeeded
The server access log file contains these fields.
a. timestamp
- TIMESTAMP
b. latitude
- float
c. longitude
- float
d. visitorid
- char(37)
e. accessed_from_mobile
- boolean
f. browser_code
- int
Tasks
- Add tasks in the DAG file to
download
the fileread
the fileextract
the fieldstimestamp
andvisitorid
from theweb-server-access-log.txt
transform
the data by capitalizing thevisitorid
for all the records and store it in a local variableload
the data into a new filecapitalized.txt
Steps
- Create the imports block
- Create the DAG Arguments block. You can use the default settings
- Create the DAG definition block. The DAG should run daily
- Create the tasks extract, transform, and load to call the Python script
- Create the task pipeline block
- Submit the DAG
- Verify if the DAG is submitted
Solution
Create a new file by going to File -> New File from the menu and name it as ETL_Server_Access_Log_Processing.py
.
- Data is here
Here is the code below in the python file. This will contain your DAG with five tasks:
- download
- execute_extract
- execute_transform
- execute_load
- execute_check
Submit DAG
# ___ SUBMIT DAG file into the directory
=/home/project/airflow
export AIRFLOW_HOME$AIRFLOW_HOME/dags
cp Bash_ETL_Server_Log_Processing.py
# ___ LIST All Tasks
airflow dags list
# ___ OUTPUT
download
extract
load transform
Verify DAG
# ___ VERIFY the DAG was submitted
|grep "Bash_ETL_Server_Log_Processing"
airflow dags list# OUTPUT
| /home/project/airflow/dags/ETL_Server_Log_Processing.py
Bash_ETL_Server_Log_Processing | your_name | True