DAG ETL Toll Data

Case Study - Bash


Note: This is the same project solved in DAG Python ETL Toll Data using PythonOperator

You have been assigned a project to decongest the national highways by analyzing the road traffic data from different toll plazas.

  • Each highway is operated by a different toll operator with a different IT setup that uses different file formats.
  • Your job is to collect data available in different formats and consolidate it into a single file.

We will develop an Apache Airflow DAG that will:

  • Use BashOperator for this project
  • Download the data from an online repository
  • Extract data from a csv file
  • Extract data from a tsv file
  • Extract data from a fixed-width file
  • Transform the data
  • Load the transformed data into the staging area
    • dag_args.jpg
    • dag_definition.jpg
    • unzip_data.jpg

Solution


We will be using the Skills Network Cloud IDE by IBM, so we start by activating Airflow from the IDE

Create Directory

# ___ Open a terminal and create a directory
sudo mkdir -p /home/project/airflow/dags/finalassignment/staging

Set Permissions

# ___ Set permissions rwx for everyone to the directory
sudo chmod -R 777 /home/project/airflow/dags/finalassignment

Download Data

# ___  Download the data and save it in a new file: tolldata.tgz
sudo curl https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/
        IBM-DB0250EN-SkillsNetwork/labs/Final%20Assignment/tolldata.tgz -o /
        home/project/airflow/dags/finalassignment/tolldata.tgz

Create DAG


Script File

  • Using IDE, create a new file named ETL_toll_data.py in /home/project directory and open it in the file editor.

Imports

# ___  IMPORT libraries
from datetime import timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow.models import DAG

# Operators; you need this to write tasks!
from airflow.operators.bash_operator import BashOperator

# This makes scheduling easy
from airflow.utils.dates import days_ago

Arguments

# ___  ARGUMENTS
default_args = {
    'owner': 'santa',
    'start_date': days_ago(0),        # today
    'email': ['santa@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

Definition

# ___  DAG DEFINITION
dag = DAG(
          dag_id = 'ETL_toll_data',
          description = 'Apache Airflow Final Assignment',
          default_args = default_args,
          schedule_interval=timedelta(days=1)  # once daily
          )

Create Tasks


Review Data

Before I unzip the data, let me review the files.

# ___ To LIST the contents in .tgz file
tar -ztvf airflow/dags/finalassignment/tolldata.tgz

# ___ OUTPUT
-rw-r--r-- sr/staff       1704 2021-08-22 10:33 fileformats.txt
-rw-r--r-- sr/staff     680000 2021-08-22 09:51 payment-data.txt
-rw-r--r-- sr/staff        344 2021-08-22 10:25 ._tollplaza-data.tsv
-rw-r--r-- sr/staff     602524 2021-08-22 10:25 tollplaza-data.tsv
-rw-r--r-- sr/staff        288 2021-08-22 09:27 ._vehicle-data.csv
-rw-r--r-- sr/staff     512524 2021-08-22 09:27 vehicle-data.csv

# ___ To EXTRACT the contents of .tgz file
tar -xvzf airflow/dags/finalassignment/tolldata.tgz

# ___ To EXTRACT to a specific directory use -C
tar -xzvf airflow/dags/finalassignment/tolldata.tgz -C </directory/folder>

Task 1: unzip_data

  • Create a task named unzip_data to unzip data.
  • Use the data downloaded in the first part of this assignment in Set up the lab environment and
  • uncompress it into the destination directory using tar.
# ___ TASK1 DEFINITION unzip_data
unzip_data = BashOperator(
        task_id ='unzip_data',
        bash_command = 'tar -xvzf airflow/dags/finalassignment/tolldata.tgz',
        dag=dag,
)

Task 2: extract_data_from_csv

  • Create a task named extract_data_from_csv to extract the fields
  •  RowidTimestampAnonymized Vehicle number, and Vehicle type from the vehicle-data.csv file and
  • save them into a file named csv_data.csv

Review vehicle_data.csv

  • Before we extract the fields from the file we need to know which position they occupy
  • Will use head -n 5 to view the first 5 lines which includes the header line so we learn about the delimiter as well
# ___  Read through the first few lines of vehicle-data.csv
head -n 5 vehicle-data.csv

# ___ OUTPUT
1,Thu Aug 19 21:54:38 2021,125094,car,2,VC965
2,Sat Jul 31 04:09:44 2021,174434,car,2,VC965
3,Sat Aug 14 17:19:04 2021,8538286,car,2,VC965
4,Mon Aug  2 18:23:45 2021,5521221,car,2,VC965
5,Thu Jul 29 22:44:20 2021,3267767,car,2,VC965
# ___ TASK2 DEFINITION extract_data_from_csv
extract_data_from_csv = BashOperator(
        task_id = 'extract_data_from_csv',
        bash_command = 'cut -d, -f1-4 vehicle-data.csv > csv_data.csv',
        dag=dag,
)

Task 3: extract_data_from_tsv

  • Create a task named extract_data_from_tsv to extract the fields
  • Number of axlesTollplaza id, and Tollplaza code from the tollplaza-data.tsv file and
  • save it into a file named tsv_data.csv

Review tollplaza-data.tsv

# ___  Read through the first few lines of tollplaza-data.tsv
head -n 5 tollplaza-data.tsv

# ___ OUTPUT
1       Thu Aug 19 21:54:38 2021        125094  car     2       4856    PC7C042B7
2       Sat Jul 31 04:09:44 2021        174434  car     2       4154    PC2C2EF9E
3       Sat Aug 14 17:19:04 2021        8538286 car     2       4070    PCEECA8B2
4       Mon Aug  2 18:23:45 2021        5521221 car     2       4095    PC3E1512A
5       Thu Jul 29 22:44:20 2021        3267767 car     2       4135    PCC943ECD
# ___ TASK3 DEFINITION extract_data_from_tsv
extract_data_from_tsv = BashOperator(
        task_id = 'extract_data_from_tsv',
        bash_command = 'cut -f5-7 tollplaza-data.tsv | tr "\t" "," > tsv_data.csv',
        dag=dag,
)

Task 4: extract_data_from_fixed_width

  • Create a task named extract_data_from_fixed_width to extract the fields
  • Type of Payment code, and Vehicle Code from the fixed width file payment-data.txt and
  • save it into a file named fixed_width_data.csv
# ___  Read through the first few lines of payment-data.txt
head -n 5 payment-data.txt

# ___ OUTPUT
     1 Thu Aug 19 21:54:38 2021 125094     4856 PC7C042B7 PTE VC965
     2 Sat Jul 31 04:09:44 2021 174434     4154 PC2C2EF9E PTP VC965
     3 Sat Aug 14 17:19:04 2021 8538286    4070 PCEECA8B2 PTE VC965
     4 Mon Aug  2 18:23:45 2021 5521221    4095 PC3E1512A PTP VC965
     5 Thu Jul 29 22:44:20 2021 3267767    4135 PCC943ECD PTE VC965
     
# ___ Max line length
wc -L payment-data.txt

# ___ OUTPUT
67 payment-data.txt
# ___ TASK4 DEFINITION extract_data_from_fixed_width
extract_data_from_fixed_width = BashOperator(
        task_id = 'extract_data_from_fixed_width',
        bash_command = 'cut -c59-67 payment-data.txt | tr " " "," > fixed_width_data.csv',
        dag=dag,
)

Task 5: consolidate_data

  • Create a task named consolidate_data to consolidate data extracted from previous tasks.
  • This task should create a single csv file named extracted_data.csv by combining data from the following files:
    • csv_data.csv
    • tsv_data.csv
    • fixed_width_data.csv
  • The final csv file should use the fields in the order given below:
    • Rowid
    • Timestamp
    • Anonymized Vehicle number
    • Vehicle type
    • Number of axles
    • Tollplaza id
    • Tollplaza code
    • Type of Payment code, and
    • Vehicle Code
  • Use the bash paste command that merges the columns of the files passed as a command-line parameter and sends the output to a new file specified. You can use the command man paste to explore more.
  • The order given above corresponds to pasting the 3 files in this order:
    • csv_data.csv
    • tsv_data.csv
    • fixed_width_data.csv
# ___  Since all files were setup with d:"," let's paste them and set the delimiter to , (default is tab)
paste -d "," csv_data.csv tsv_data.csv fixed_width_data.csv > extracted_data.csv
# ___ TASK5 DEFINITION consolidate_data
consolidate_data = BashOperator(
        task_id = 'consolidate_data',
        bash_command = 'paste -d "," csv_data.csv tsv_data.csv fixed_width_data.csv > extracted_data.csv',
        dag=dag,
)

Task 6: transform_data

  • Create a task named transform_data to transform the vehicle_type field #4 in extracted_data.csv into capital letters and
  • save it into a file named transformed_data.csv in the staging directory.
# ___ TASK6 DEFINITION transform_data
transform_data = BashOperator(
         task_id='transform_data',
         bash_command='cut -d"," -f4 extracted_data.csv | tr "[a-z]" "[A-Z]" > \
         /airflow/dags/finalassignment/staging/transformed_data.csv',
    dag=dag,
)

Define Pipeline


  • Define the pipeline per table below
Task Functionality
First task unzip_data
Second task extract_data_from_csv
Third task extract_data_from_tsv
Fourth task extract_data_from_fixed_width
Fifth task consolidate_data
Sixth task transform_data
# ___  PIPELINE
unzip_data >> extract_data_from_csv >> extract_data_from_tsv >> extract_data_from_fixed_width >> \
consolidate_data >> transform_data

Submit DAG


  • Submitting a DAG is as simple as copying the DAG Python file into the dags folder in the AIRFLOW_HOME directory.
  • Airflow searches for Python source files within the specified DAGS_FOLDER. The location of DAGS_FOLDER can be located in the airflow.cfg file, where it has been configured as /home/project/airflow/dags.
# ___ SUBMIT DAG file into the directory
cp ETL_toll_data.py airflow/dags

# ___ In case of errors use
airflow dags list-import-errors

Verify DAG

airflow dags list | grep "ETL_toll_data.py"

List Tasks

airflow tasks list

# ___ OUTPUT
consolidate_data
extract_data_from_csv
extract_data_from_fixed_width
extract_data_from_tsv
transform_data
unzip_data

unpause & Trigger DAG

  • Use the slider on the left (1st column to unpause)
  • Use the > play arrow on the far right by links to manually trigger the DAG

  • or can use terminal with
# ___  UNPAUSE DAG
airflow dags unpause ETL_toll_data

Look at runs

  • Trigger the DAG again so now we should have 2 runs