# ___ Search for powershell in windows
:\Users\EMHRC>
PS C
# ___ Change directories
:\Users\EMHRC> cd D:\py_projects
PS C
# ___ Set content of file to var
:\py_projects> $view = Get-Content "data\extracted\vehicle-data.csv"
PS D
# ___ Read line 6
:\py_projects> $view[5]
PS D6,Sat Aug 14 03:57:47 2021,8411850,car,2,VC965
# ___ Read first 5 lines
:\py_projects> $view | Select-Object -First 5
PS D# ___ OUTPUT
1,Thu Aug 19 21:54:38 2021,125094,car,2,VC965
2,Sat Jul 31 04:09:44 2021,174434,car,2,VC965
3,Sat Aug 14 17:19:04 2021,8538286,car,2,VC965
4,Mon Aug 2 18:23:45 2021,5521221,car,2,VC965
5,Thu Jul 29 22:44:20 2021,3267767,car,2,VC965
# ___ Read last 5 lines
:\py_projects> $view = Get-Content "data\extracted\vehicle-data.csv"
PS D:\py_projects> $view | Select-Object -Last 5
PS D# ___ OUTPUT
9996,Sun Aug 15 03:02:27 2021,3369206,car,2,VC965
9997,Sat Aug 7 04:24:08 2021,5886709,truck,4,VCB43
9998,Mon Aug 9 02:58:26 2021,917421,truck,4,VCB43
9999,Tue Aug 17 18:41:00 2021,2601682,truck,4,VCB43
10000,Mon Aug 9 22:56:50 2021,4833266,car,2,VC965
# ___ Read extracted file
:\py_projects> $view = Get-Content "data\extracted\csv_data.csv"
PS D:\py_projects> $view | Select-Object -First 5
PS D1,Thu Aug 19 21:54:38 2021,125094,car
2,Sat Jul 31 04:09:44 2021,174434,car
3,Sat Aug 14 17:19:04 2021,8538286,car
4,Mon Aug 2 18:23:45 2021,5521221,car
5,Thu Jul 29 22:44:20 2021,3267767,car
# ___ Read extracted file
:\py_projects> $view = Get-Content "data\extracted\tsv_data.csv"
PS D:\py_projects> $view | Select-Object -First 5
PS D2,4856,PC7C042B7
2,4154,PC2C2EF9E
2,4070,PCEECA8B2
2,4095,PC3E1512A
2,4135,PCC943ECD
# ___ Read extracted file
:\py_projects> $view = Get-Content "data\extracted\fixed_width_data.csv"
PS D:\py_projects> $view | Select-Object -First 5
PS D
PTE,VC965
PTP,VC965
PTE,VC965
PTP,VC965 PTE,VC965
DAG ETL Toll Data
Case Study - Python
Note: This is the same project solved in DAG Bash ETL Toll Data using BashOperator
You have been assigned a project to decongest the national highways by analyzing the road traffic data from different toll plazas.
- Each highway is operated by a different toll operator with a different IT setup that uses different file formats.
- Your job is to collect data available in different formats and consolidate it into a single file.
We will develop an Apache Airflow DAG that will:
- Use
PythonOperator
for this project - Download the data from an online repository found here
- Unzip the file
- Extract data from a csv file
- Extract data from a tsv file
- Extract data from a fixed-width file
- Transform the data
- Load the transformed data into the staging area
Extra Powershell
To use windows powershell to view content do this:
# ___ Read combined_file
:\Users\EMHRC> cd D:\py_projects
PS C:\py_projects> $view = Get-Content "data\extracted\extracted_data.csv"
PS D:\py_projects> $view | Select-Object -First 5
PS D1,Thu Aug 19 21:54:38 2021,125094,car,2,4856,PC7C042B7,PTE,VC965
2,Sat Jul 31 04:09:44 2021,174434,car,2,4154,PC2C2EF9E,PTP,VC965
3,Sat Aug 14 17:19:04 2021,8538286,car,2,4070,PCEECA8B2,PTE,VC965
4,Mon Aug 2 18:23:45 2021,5521221,car,2,4095,PC3E1512A,PTP,VC965
5,Thu Jul 29 22:44:20 2021,3267767,car,2,4135,PCC943ECD,PTE,VC965
# ___ Read transformed_data
:\py_projects> $view = Get-Content "data\extracted\transformed_data.csv"
PS D:\py_projects> $view | Select-Object -First 5
PS D1,Thu Aug 19 21:54:38 2021,125094,CAR,2,4856,PC7C042B7,PTE,VC965
2,Sat Jul 31 04:09:44 2021,174434,CAR,2,4154,PC2C2EF9E,PTP,VC965
3,Sat Aug 14 17:19:04 2021,8538286,CAR,2,4070,PCEECA8B2,PTE,VC965
4,Mon Aug 2 18:23:45 2021,5521221,CAR,2,4095,PC3E1512A,PTP,VC965
5,Thu Jul 29 22:44:20 2021,3267767,CAR,2,4135,PCC943ECD,PTE,VC965
Solution
We will be using the Skills Network Cloud IDE by IBM, so we start by activating Airflow from the IDE
Create Directory
# ___ Open a terminal and create a directory
-p /home/project/airflow/dags/finalassignment/staging sudo mkdir
Set Permissions
# ___ Set permissions rwx for everyone to the directory
-R 777 /home/project/airflow/dags/finalassignment sudo chmod
Create DAG
Script File
- Using IDE, create a new file named
ETL_toll_data.py
in/home/project
directory and open it in the file editor.
Imports
# ___ IMPORT libraries
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow.models import DAG
# Operators; you need this to write tasks!
from airflow.operators.python import PythonOperator
# This makes scheduling easy
from airflow.utils.dates import days_ago
import requests
import tarfile import pandas as pd
Variables
# ___ DEFINE input & output files
= '/airflow/dags/python_etl/staging/tolldata.tgz'
input_file = '/airflow/dags/python_etl/staging'
extracted_dir = '/airflow/dags/python_etl/staging/vehicle-data.csv'
vehicle_file = '/airflow/dags/python_etl/staging/tollplaza-data.tsv'
tsv_in = '/airflow/dags/python_etl/staging/tsv_data.csv'
tsv_out = '/airflow/dags/python_etl/staging/payment-data.txt'
fixed_in = '/airflow/dags/python_etl/staging/fixed_width_data.csv'
fixed_out = 'csv_data.csv'
extracted_vehicle = 'transformed.txt'
combined_file = 'capitalized.txt' transformed_file
Arguments
# ___ ARGUMENTS
= {
default_args 'owner': 'santa',
'start_date': days_ago(0), # today
'email': ['santa@gmail.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
Definition
# ___ DAG DEFINITION
= DAG(
dag dag_id = 'ETL_toll_data',
description = 'Apache Airflow Final Assignment',
default_args = default_args,
schedule_interval=timedelta(days=1) # once daily
)
Create Functions
Function 1: download_dataset
- Create a Python function named
download_dataset
to download the data set from the source to the destination - Source
- Destination:
/home/project/airflow/dags/python_etl/staging
# ___ CREATE FUNCTION 1 FOR TASK 1 download_dataset
download_dataset():
def = "https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Final%20Assignment/tolldata.tgz"
url # Send a GET request to the URL
requests.get(url, stream=True) as response:
with # Raise an exception for HTTP errors
response.raise_for_status()
# Open a local file in binary write mode
open(input_file, 'wb') as file:
with # Write the content to the local file in chunks
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
print(f"File downloaded successfully: {input_file}")
Function 2: untar_dataset
- Create a Python function named
untar_dataset
to untar the downloaded data set.
# ___ CREATE FUNCTION 2 FOR TASK 2 untar_dataset
untar_dataset():
def tarfile.open(input_file, 'r') as tar:
with tar.extractall(path = extracted_dir)
print(f"Extracted all files to {extracted_dir}")
print(os.listdir({extracted_dir}))
# ___ OUTPUT
/data/extracted
Extracted all files to 'fileformats.txt', 'payment-data.txt', '._tollplaza-data.tsv', 'tollplaza-data.tsv', '._vehicle-data.csv', 'vehicle-data.csv'] [
Function 3: extract_data_from_csv
- Let’s look at the data first
- Create a function named
extract_data_from_csv
to extract the fields Rowid
,Timestamp
,Anonymized Vehicle number
, andVehicle type
from thevehicle-data.csv
file and- save them into a file named
csv_data.csv
Review vehicle-data.csv
Here is the information provided with the file:
- vehicle-data.csv is a comma-separated values file.
- It has the below 6 fields
- Rowid - This uniquely identifies each row. This is consistent across all the three files.
- Timestamp - What time did the vehicle pass through the toll gate.
- Anonymized Vehicle number - Anonymized registration number of the vehicle
- Vehicle type - Type of the vehicle
- Number of axles - Number of axles of the vehicle
- Vehicle code - Category of the vehicle as per the toll plaza.
# ___ Read through the first few lines of vehicle-data.csv
-n 5 vehicle-data.csv
head
# ___ OUTPUT
1,Thu Aug 19 21:54:38 2021,125094,car,2,VC965
2,Sat Jul 31 04:09:44 2021,174434,car,2,VC965
3,Sat Aug 14 17:19:04 2021,8538286,car,2,VC965
4,Mon Aug 2 18:23:45 2021,5521221,car,2,VC965
5,Thu Jul 29 22:44:20 2021,3267767,car,2,VC965
# ___ CREATE FUNCTION FOR TASK 3 extract_data_from_csv
extract_data_from_csv():
def print("Column Extract from CSV")
# Read the contents of the file into a string
open(vehicle_file, 'r') as infile, \
with open(extracted_vehicle, 'w') as outfile:
for line in infile:
= line.split(',')
fields if len(fields) >= 4:
= fields[0]
field_1 = fields[1]
field_2 = fields[2]
field_3 = fields[3]
field_4 outfile.write(field_1 + "," + field_2 + "," + field_3 + "," + field_4 + "\n")
Function 4: extract_data_from_tsv
- Let’s look at the data first
- Create a function named
extract_data_from_tsv
to extract the fields Number of axles
,Tollplaza id
, andTollplaza code
(the last 3 fields) from thetollplaza-data.tsv
file and- save it into a file named
tsv_data.csv
Review tollplaza-data.tsv
Here is the information provided with the file:
- tollplaza-data.tsv is a tab-separated values file
- It has the below 7 fields
- Rowid - This uniquely identifies each row. This is consistent across all the three files.
- Timestamp - What time did the vehicle pass through the toll gate.
- Anonymized Vehicle number
- Anonymized registration number of the vehicle
- Vehicle type - Type of the vehicle
- Number of axles - Number of axles of the vehicle
- Tollplaza id - Id of the toll plaza
- Tollplaza code - Tollplaza accounting code
# ___ Read through the first few lines of tollplaza-data.tsv Using Bash
-n 5 tollplaza-data.tsv
head
# ___ OUTPUT
1 Thu Aug 19 21:54:38 2021 125094 car 2 4856 PC7C042B7
2 Sat Jul 31 04:09:44 2021 174434 car 2 4154 PC2C2EF9E
3 Sat Aug 14 17:19:04 2021 8538286 car 2 4070 PCEECA8B2
4 Mon Aug 2 18:23:45 2021 5521221 car 2 4095 PC3E1512A
5 Thu Jul 29 22:44:20 2021 3267767 car 2 4135 PCC943ECD
# ___ CREATE FUNCTION FOR TASK 4 extract_data_from_tsv
extract_data_from_tsv():
def print("Column Extract from TSV")
# Read the contents of the file into a string
open(tsv_in, 'r') as infile, \
with open(tsv_out, 'w') as outfile:
for line in infile:
= line.split('\t')
fields if len(fields) >= 7:
= fields[4]
field_1 = fields[5]
field_2 = fields[6]
field_3 outfile.write(field_1 + "," + field_2 + "," + field_3 )
# because it is tab delimiter I took out the "\n" or it would add an extra line break
Function 5: extract_data_from_fixed_width
- Create a function named
extract_data_from_fixed_width
to extract the fields Type of Payment code
andVehicle Code
from the fixed width filepayment-data.txt
and- Save it into a file named
fixed_width_data.csv
- Let’s look at the data first
Review payment-data.txt
Here is the information provided with the file:
- payment-data.txt is a fixed width file
- Each field occupies a fixed number of characters.
- It has the below 7 fields:
- Rowid - This uniquely identifies each row. This is consistent across all the three files
- Timestamp - What time did the vehicle pass through the toll gate
- Anonymized Vehicle number - Anonymized registration number of the vehicle
- Tollplaza id - Id of the toll plaza
- Tollplaza code - Tollplaza accounting code
- Type of Payment code - Code to indicate the type of payment. Example : Prepaid, Cash
- Vehicle Code - Category of the vehicle as per the toll plaza
# ___ Read through the first few lines of payment-data.txt
-n 5 payment-data.txt
head
# ___ OUTPUT
1 Thu Aug 19 21:54:38 2021 125094 4856 PC7C042B7 PTE VC965
2 Sat Jul 31 04:09:44 2021 174434 4154 PC2C2EF9E PTP VC965
3 Sat Aug 14 17:19:04 2021 8538286 4070 PCEECA8B2 PTE VC965
4 Mon Aug 2 18:23:45 2021 5521221 4095 PC3E1512A PTP VC965
5 Thu Jul 29 22:44:20 2021 3267767 4135 PCC943ECD PTE VC965
# ___ Max line length
-L payment-data.txt
wc
# ___ OUTPUT
67 payment-data.txt
# ___ CREATE FUNCTION 5 FOR TASK 5 extract_data_from_fixed_width
# ___ Start position = 58 end position = 67
extract_data_from_fixed_width():
def print("Column Extract from Fixed Width")
# Read the contents of the file into a string
= 58
start_pos = 67
end_pos open(fixed_in, 'r') as infile, \
with open(fixed_out, 'w') as outfile:
for line in infile:
= line[start_pos:end_pos].strip()
wanted_line = wanted_line.split(' ')
fields if len(fields) >= 2:
= fields[0]
field_1 = fields[1]
field_2 outfile.write(field_1 + "," + field_2 + "\n")
Function 6: consolidate_data
- Create a function named
consolidate_data
to create a single csv file namedextracted_data.csv
by combining data from the following files: csv_data.csv
tsv_data.csv
fixed_width_data.csv
The final csv file should use the fields in the order given below:
Rowid
, Timestamp
, Anonymized Vehicle number
, Vehicle type
, Number of axles
, Tollplaza id
, Tollplaza code
, Type of Payment code
, and Vehicle Code
Sample Code Using Directory
- combine_csv_files(input_directory, output_file)
- Import Libraries: The script starts by importing the necessary libraries (pandas and os).
- Define Function: The combine_csv_files function takes two arguments: input_directory (the directory containing the CSV files) and output_file (the path to the output CSV file).
- List Dataframes: Initializes an empty list to hold individual dataframes.
- Iterate Files: Iterates over all files in the input directory, checking if they end with .csv.
- Read and Append: Reads each CSV file into a pandas dataframe and appends it to the list.
- Concatenate Dataframes: Uses pd.concat to concatenate all dataframes in the list into a single dataframe.
- Write Output: Writes the combined dataframe to the specified output file.
This script will create a single CSV file containing all the data from the CSV files in the specified directory.
import pandas as pd
import os
combine_csv_files(input_directory, output_file):
def # List to hold individual dataframes
= []
dataframes
# Iterate over all files in the input directory
for filename in os.listdir(input_directory):
if filename.endswith(".csv"):
= os.path.join(input_directory, filename)
file_path # Read the CSV file
= pd.read_csv(file_path)
df # Append the dataframe to the list
dataframes.append(df)
# Concatenate all dataframes
= pd.concat(dataframes, ignore_index=True)
combined_df
# Write the combined dataframe to the output file
combined_df.to_csv(output_file, index=False)
# Example usage
= 'path/to/your/csv/files'
input_directory = 'path/to/output/combined.csv' output_file
Here is our function
# ___ CREATE FUNCTION 6 FOR TASK 6 consolidate_data: csv_data.csv tsv_data.csv fixed_width_data.csv
consolidate_data():
def # Read csv files
= pd.read_csv(extracted_vehicle)
df1 = pd.read_csv(tsv_out)
df2 = pd.read_csv(fixed_out)
df3 # Combine dfs
= pd.concat([df1,df2,df3], axis =1)
combined_df # Write the combined df to the output file
combined_df.to_csv(combined_file, index = False)
Function 7: transform_data
- Create a function named
transform_data
to - Transform the
vehicle_type
field inextracted_data.csv
into capital letters and - Save it into a file named
transformed_data.csv
in the staging directory.
# ___ CREATE FUNCTION 7 FOR TASK 7 transform_data
transform_data():
def print("Inside Transform File")
# Read content of file into string
open(combined_file, 'r') as infile, \
with open(transformed_file, 'w') as outfile:
for line in infile:
= line.split(',')
fields 3] = fields[3].upper()
fields[outfile.write("," .join(fields) )
# I took out the "\n" or it would add an extra line break
Create Tasks
- Create 7 tasks using Python operators that does the following using the Python functions created in Create Functions:
- download_dataset
- untar_dataset
- extract_data_from_csv
- extract_data_from_tsv
- extract_data_from_fixed_width
- consolidate_data
- transform_data
- Define the task pipeline based on the details given below:
Task | Functionality |
---|---|
First task | download_data |
Second task | unzip_data |
Third task | extract_data_from_csv |
Fourth task | extract_data_from_tsv |
Fifth task | extract_data_from_fixed_width |
Sixth task | consolidate_data |
Seventh task | transform_data |
Task 1: download
# ___ TASK 1 DEFINITION :
= PythonOperator(
download_data task_id='download_data',
python_callable=download_dataset,
dag=dag,
)
Task 2: unzip
# ___ TASK 2 DEFINITION :
= PythonOperator(
unzip task_id='unzip',
python_callable=untar_dataset,
dag=dag,
)
Task 3: extract_from_csv
# ___ TASK 3 DEFINITION :
= PythonOperator(
extract_from_csv task_id='extract_from_csv',
python_callable=extract_data_from_csv,
dag=dag,
)
Task 4: extract_from_tsv
# ___ TASK 4 DEFINITION :
= PythonOperator(
extract_from_tsv task_id='extract_from_tsv',
python_callable=extract_data_from_tsv,
dag=dag,
)
Task 5: extract_from_fixed
# ___ TASK 5 DEFINITION :
= PythonOperator(
extract_from_fixed task_id='extract_from_fixed',
python_callable=extract_data_from_fixed_width,
dag=dag,
)
Task 6: consolidate
# ___ TASK 5 DEFINITION :
= PythonOperator(
consolidate task_id='consolidate',
python_callable=consolidate_data,
dag=dag,
)
Task 7: transform
# ___ TASK 5 DEFINITION :
= PythonOperator(
transform task_id='transform',
python_callable=transform_data,
dag=dag,
)
Create Pipeline
# ___ PIPELINE
>> unzip >> extract_from_csv >> extract_from_tsv >> extract_from_fixed >> consolidate >> transform download
Submit DAG
# ___ SUBMIT DAG file into the directory
/dags
cp ETL_toll_data.py airflow
# ___ In case of errors use
-import-errors airflow dags list
Verify DAG
| grep "ETL_toll_data.py" airflow dags list
List Tasks
- You can view the tasks in the Web UI in the Graph tab or
- Use CLI as shown below
# ___ List Tasks
airflow tasks list
Observe DAG
Unpause DAG
- You can unpause the DAG from within the Web UI or
- Use CLI as shown below
# ___ UNPAUSE DAG
airflow dags unpause ETL_toll_data
Trigger DAG
- Use the Web UI to trigger the DAG
- Manual Trigger is far right next to links