# ___ Open a terminal and create a directory
-p /home/project/airflow/dags/finalassignment/staging sudo mkdir
DAG ETL Toll Data
Case Study - Bash
Note: This is the same project solved in DAG Python ETL Toll Data using PythonOperator
You have been assigned a project to decongest the national highways by analyzing the road traffic data from different toll plazas.
- Each highway is operated by a different toll operator with a different IT setup that uses different file formats.
- Your job is to collect data available in different formats and consolidate it into a single file.
We will develop an Apache Airflow DAG that will:
- Use
BashOperator
for this project - Download the data from an online repository
- Extract data from a csv file
- Extract data from a tsv file
- Extract data from a fixed-width file
- Transform the data
- Load the transformed data into the staging area
- dag_args.jpg
- dag_definition.jpg
- unzip_data.jpg
Solution
We will be using the Skills Network Cloud IDE by IBM, so we start by activating Airflow from the IDE
Create Directory
Set Permissions
# ___ Set permissions rwx for everyone to the directory
-R 777 /home/project/airflow/dags/finalassignment sudo chmod
Download Data
# ___ Download the data and save it in a new file: tolldata.tgz
://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/
sudo curl https-DB0250EN-SkillsNetwork/labs/Final%20Assignment/tolldata.tgz -o /
IBM/project/airflow/dags/finalassignment/tolldata.tgz home
Create DAG
Script File
- Using IDE, create a new file named
ETL_toll_data.py
in/home/project
directory and open it in the file editor.
Imports
# ___ IMPORT libraries
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow.models import DAG
# Operators; you need this to write tasks!
from airflow.operators.bash_operator import BashOperator
# This makes scheduling easy
from airflow.utils.dates import days_ago
Arguments
# ___ ARGUMENTS
= {
default_args 'owner': 'santa',
'start_date': days_ago(0), # today
'email': ['santa@gmail.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
Definition
# ___ DAG DEFINITION
= DAG(
dag dag_id = 'ETL_toll_data',
description = 'Apache Airflow Final Assignment',
default_args = default_args,
schedule_interval=timedelta(days=1) # once daily
)
Create Tasks
Review Data
Before I unzip the data, let me review the files.
# ___ To LIST the contents in .tgz file
-ztvf airflow/dags/finalassignment/tolldata.tgz
tar
# ___ OUTPUT
-rw-r--r-- sr/staff 1704 2021-08-22 10:33 fileformats.txt
-rw-r--r-- sr/staff 680000 2021-08-22 09:51 payment-data.txt
-rw-r--r-- sr/staff 344 2021-08-22 10:25 ._tollplaza-data.tsv
-rw-r--r-- sr/staff 602524 2021-08-22 10:25 tollplaza-data.tsv
-rw-r--r-- sr/staff 288 2021-08-22 09:27 ._vehicle-data.csv
-rw-r--r-- sr/staff 512524 2021-08-22 09:27 vehicle-data.csv
# ___ To EXTRACT the contents of .tgz file
-xvzf airflow/dags/finalassignment/tolldata.tgz
tar
# ___ To EXTRACT to a specific directory use -C
-xzvf airflow/dags/finalassignment/tolldata.tgz -C </directory/folder> tar
Task 1: unzip_data
- Create a task named
unzip_data
to unzip data. - Use the data downloaded in the first part of this assignment in
Set up the lab environment
and - uncompress it into the destination directory using
tar
.
# ___ TASK1 DEFINITION unzip_data
= BashOperator(
unzip_data task_id ='unzip_data',
bash_command = 'tar -xvzf airflow/dags/finalassignment/tolldata.tgz',
dag=dag,
)
Task 2: extract_data_from_csv
- Create a task named
extract_data_from_csv
to extract the fields -
Rowid
,Timestamp
,Anonymized Vehicle number
, andVehicle type
from thevehicle-data.csv
file and - save them into a file named
csv_data.csv
Review vehicle_data.csv
- Before we extract the fields from the file we need to know which position they occupy
- Will use head -n 5 to view the first 5 lines which includes the header line so we learn about the delimiter as well
# ___ Read through the first few lines of vehicle-data.csv
-n 5 vehicle-data.csv
head
# ___ OUTPUT
1,Thu Aug 19 21:54:38 2021,125094,car,2,VC965
2,Sat Jul 31 04:09:44 2021,174434,car,2,VC965
3,Sat Aug 14 17:19:04 2021,8538286,car,2,VC965
4,Mon Aug 2 18:23:45 2021,5521221,car,2,VC965
5,Thu Jul 29 22:44:20 2021,3267767,car,2,VC965
# ___ TASK2 DEFINITION extract_data_from_csv
= BashOperator(
extract_data_from_csv task_id = 'extract_data_from_csv',
bash_command = 'cut -d, -f1-4 vehicle-data.csv > csv_data.csv',
dag=dag,
)
Task 3: extract_data_from_tsv
- Create a task named
extract_data_from_tsv
to extract the fields Number of axles
,Tollplaza id
, andTollplaza code
from thetollplaza-data.tsv
file and- save it into a file named
tsv_data.csv
Review tollplaza-data.tsv
# ___ Read through the first few lines of tollplaza-data.tsv
-n 5 tollplaza-data.tsv
head
# ___ OUTPUT
1 Thu Aug 19 21:54:38 2021 125094 car 2 4856 PC7C042B7
2 Sat Jul 31 04:09:44 2021 174434 car 2 4154 PC2C2EF9E
3 Sat Aug 14 17:19:04 2021 8538286 car 2 4070 PCEECA8B2
4 Mon Aug 2 18:23:45 2021 5521221 car 2 4095 PC3E1512A
5 Thu Jul 29 22:44:20 2021 3267767 car 2 4135 PCC943ECD
# ___ TASK3 DEFINITION extract_data_from_tsv
= BashOperator(
extract_data_from_tsv task_id = 'extract_data_from_tsv',
bash_command = 'cut -f5-7 tollplaza-data.tsv | tr "\t" "," > tsv_data.csv',
dag=dag,
)
Task 4: extract_data_from_fixed_width
- Create a task named
extract_data_from_fixed_width
to extract the fields Type of Payment code
, andVehicle Code
from the fixed width filepayment-data.txt
and- save it into a file named
fixed_width_data.csv
# ___ Read through the first few lines of payment-data.txt
-n 5 payment-data.txt
head
# ___ OUTPUT
1 Thu Aug 19 21:54:38 2021 125094 4856 PC7C042B7 PTE VC965
2 Sat Jul 31 04:09:44 2021 174434 4154 PC2C2EF9E PTP VC965
3 Sat Aug 14 17:19:04 2021 8538286 4070 PCEECA8B2 PTE VC965
4 Mon Aug 2 18:23:45 2021 5521221 4095 PC3E1512A PTP VC965
5 Thu Jul 29 22:44:20 2021 3267767 4135 PCC943ECD PTE VC965
# ___ Max line length
-L payment-data.txt
wc
# ___ OUTPUT
67 payment-data.txt
# ___ TASK4 DEFINITION extract_data_from_fixed_width
= BashOperator(
extract_data_from_fixed_width task_id = 'extract_data_from_fixed_width',
bash_command = 'cut -c59-67 payment-data.txt | tr " " "," > fixed_width_data.csv',
dag=dag,
)
Task 5: consolidate_data
- Create a task named
consolidate_data
to consolidate data extracted from previous tasks. - This task should create a single csv file named
extracted_data.csv
by combining data from the following files:csv_data.csv
tsv_data.csv
fixed_width_data.csv
- The final csv file should use the fields in the order given below:
Rowid
Timestamp
Anonymized Vehicle number
Vehicle type
Number of axles
Tollplaza id
Tollplaza code
Type of Payment code
, andVehicle Code
- Use the bash
paste
command that merges the columns of the files passed as a command-line parameter and sends the output to a new file specified. You can use the commandman paste
to explore more. - The order given above corresponds to pasting the 3 files in this order:
- csv_data.csv
- tsv_data.csv
- fixed_width_data.csv
# ___ Since all files were setup with d:"," let's paste them and set the delimiter to , (default is tab)
-d "," csv_data.csv tsv_data.csv fixed_width_data.csv > extracted_data.csv paste
# ___ TASK5 DEFINITION consolidate_data
= BashOperator(
consolidate_data task_id = 'consolidate_data',
bash_command = 'paste -d "," csv_data.csv tsv_data.csv fixed_width_data.csv > extracted_data.csv',
dag=dag,
)
Task 6: transform_data
- Create a task named
transform_data
to transform the vehicle_type field #4 inextracted_data.csv
into capital letters and - save it into a file named
transformed_data.csv
in the staging directory.
# ___ TASK6 DEFINITION transform_data
= BashOperator(
transform_data task_id='transform_data',
bash_command='cut -d"," -f4 extracted_data.csv | tr "[a-z]" "[A-Z]" > \
/airflow/dags/finalassignment/staging/transformed_data.csv',
dag=dag,
)
Define Pipeline
- Define the pipeline per table below
Task | Functionality |
---|---|
First task | unzip_data |
Second task | extract_data_from_csv |
Third task | extract_data_from_tsv |
Fourth task | extract_data_from_fixed_width |
Fifth task | consolidate_data |
Sixth task | transform_data |
# ___ PIPELINE
>> extract_data_from_csv >> extract_data_from_tsv >> extract_data_from_fixed_width >> \
unzip_data >> transform_data consolidate_data
Submit DAG
- Submitting a DAG is as simple as copying the DAG Python file into the
dags
folder in theAIRFLOW_HOME
directory. - Airflow searches for Python source files within the specified
DAGS_FOLDER
. The location ofDAGS_FOLDER
can be located in the airflow.cfg file, where it has been configured as/home/project/airflow/dags
.
# ___ SUBMIT DAG file into the directory
/dags
cp ETL_toll_data.py airflow
# ___ In case of errors use
-import-errors airflow dags list
Verify DAG
| grep "ETL_toll_data.py" airflow dags list
List Tasks
airflow tasks list
# ___ OUTPUT
consolidate_data
extract_data_from_csv
extract_data_from_fixed_width
extract_data_from_tsv
transform_data unzip_data
unpause & Trigger DAG
- Use the slider on the left (1st column to unpause)
- Use the > play arrow on the far right by links to manually trigger the DAG
- or can use terminal with
# ___ UNPAUSE DAG
airflow dags unpause ETL_toll_data
Look at runs
- Trigger the DAG again so now we should have 2 runs