# ___ IMPORT libraries
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow.models import DAG
# Operators; you need this to write tasks!
from airflow.operators.python import PythonOperator
# This makes scheduling easy
from airflow.utils.dates import days_ago import requests
DAG ETL Server Data
Note: this is the same project as DAG Bash ETL Server Data that was done using BashOperator
Case Study - Python
Write a DAG named ETL_Server_Access_Log_Processing
that will
download
a file from a remote serverextract
the filetransform
the contentload
the transformed data into a fileverify/check
DAG succeeded
Data
- Data is here
- The server access log file contains these fields.
a. timestamp
- TIMESTAMP
b. latitude
- float
c. longitude
- float
d. visitorid
- char(37)
e. accessed_from_mobile
- boolean
f. browser_code
- int
Tasks
- Add tasks in the DAG file to
download
the fileread
the fileextract
the fieldstimestamp
andvisitorid
from theweb-server-access-log.txt
transform
the data by capitalizing thevisitorid
for all the records and store it in a local variableload
the data into a new filecapitalized.txt
Requirements
- Create the imports block
- Create the DAG Arguments block. You can use the default settings
- Create the DAG definition block. The DAG should run daily
- Create the tasks extract, transform, and load to call the Python script
- Create the task pipeline block
- Submit the DAG
- Verify if the DAG is submitted
Solution
Create DAG
Script file
- Create a new file by going to File -> New File from the menu and name it as
ETL_Server_Access_Log_Processing.py
.
Imports
Set Variables
- Define input and output files
# ___ DEFINE input & output files
= 'web-server-access-log.txt'
input_file = 'extracted-data.txt'
extracted_file = 'transformed.txt'
transformed_file = 'capitalized.txt' output_file
Arguments
# ___ ARGUMENTS
= {
default_args 'owner': 'Your name',
'start_date': days_ago(0),
'email': ['your email'],
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
Definition
# ___ DAG DEFINITION
= DAG(
dag 'my-first-python-etl-dag',
default_args=default_args,
description='My first DAG',
schedule_interval=timedelta(days=1),
)
Create Functions
Here are the five tasks that we need to create functions for:
- download
- execute_extract
- execute_transform
- execute_load
- execute_check
Function 1: download_file
- Download the file from here
# ___ CREATE FUNCTION 1 FOR TASK 1
download_file():
def = "https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM
url -DB0250EN-SkillsNetwork/labs/Apache%20Airflow/Build%20a%20DAG%20using%
20Airflow/web-server-access-log.txt"
# Send a GET request to the URL
requests.get(url, stream=True) as response:
with # Raise an exception for HTTP errors
response.raise_for_status()
# Open a local file in binary write mode
open(input_file, 'wb') as file:
with # Write the content to the local file in chunks
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
print(f"File downloaded successfully: {input_file}")
Function 2: extract
extract():
def
global input_fileprint("Inside Extract")
# Read the contents of the file into a string
open(input_file, 'r') as infile, \
with open(extracted_file, 'w') as outfile:
for line in infile:
= line.split('#')
fields if len(fields) >= 4:
= fields[0]
field_1 = fields[3]
field_4 outfile.write(field_1 + "#" + field_4 + "\n")
Function 3: transform
# ___ CREATE FUNCTION 3 FOR TASK 3
transform():
def
global extracted_file, transformed_fileprint("Inside Transform")
open(extracted_file, 'r') as infile, \
with open(transformed_file, 'w') as outfile:
for line in infile:
= line.upper()
processed_line outfile.write(processed_line + '\n')
Function 4: load
# ___ CREATE FUNCTION 4 FOR TASK 4
load():
def
global transformed_file, output_fileprint("Inside Load")
# Save the array to a CSV file
open(transformed_file, 'r') as infile, \
with open(output_file, 'w') as outfile:
for line in infile:
outfile.write(line + '\n')
Function 5: check
# ___ CREATE FUNCTION 5 FOR TASK 5
check():
def
global output_fileprint("Inside Check")
# Save the array to a CSV file
open(output_file, 'r') as infile:
with for line in infile:
print(line)
Define Tasks
From the functions we’ve created we can now define our five tasks:
Task 1: download
# ___ TASK DEFINITION : DOWNLOAD to call the `download_file` function
= PythonOperator(
download task_id='download',
python_callable=download_file,
dag=dag,
)
Task 2: extract
# ___ TASK DEFINITION : EXTRACT to call the `extract` function
= PythonOperator(
execute_extract task_id='extract',
python_callable=extract,
dag=dag,
)
Task 3: transform
# ___ TASK DEFINITION : TRANSFORM to call the `transform` function
= PythonOperator(
execute_transform task_id='transform',
python_callable=transform,
dag=dag,
)
Task 4: load
# ___ TASK DEFINITION : LOAD to call the `load` function
= PythonOperator(
execute_load task_id='load',
python_callable=load,
dag=dag,
)
Task 5: check
# ___ TASK DEFINITION : CHECK to call the `check` function
= PythonOperator(
execute_check task_id='check',
python_callable=check,
dag=dag,
)
Define Pipeline
# ___ PIPELINE
>> execute_extract >> execute_transform >> execute_load >> execute_check download
Submit DAG
# ___ SUBMIT DAG file into the directory
$AIRFLOW_HOME/dags cp ETL_Server_Access_Log_Processing.py
Verify DAG
# ___ VERIFY the DAG was submitted
| grep etl-server-logs-dag airflow dags list
List Tasks
# ___ LIST All Tasks
-server-logs-dag
airflow tasks list etl
# ___ CHECK for errors
-import-errors airflow dags list
Complete Script
# ___ IMPORT libraries
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow.models import DAG
# Operators; you need this to write tasks!
from airflow.operators.python import PythonOperator
# This makes scheduling easy
from airflow.utils.dates import days_ago import requests
# ___ DEFINE input & output files
= 'web-server-access-log.txt'
input_file = 'extracted-data.txt'
extracted_file = 'transformed.txt'
transformed_file = 'capitalized.txt'
output_file
# ___ CREATE functions
download_file():
def = "https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM
url -DB0250EN-SkillsNetwork/labs/Apache%20Airflow/Build%20a%20DAG%20using%
20Airflow/web-server-access-log.txt"
# Send a GET request to the URL
requests.get(url, stream=True) as response:
with # Raise an exception for HTTP errors
response.raise_for_status()
# Open a local file in binary write mode
open(input_file, 'wb') as file:
with # Write the content to the local file in chunks
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
print(f"File downloaded successfully: {input_file}")
extract():
def
global input_fileprint("Inside Extract")
# Read the contents of the file into a string
open(input_file, 'r') as infile, \
with open(extracted_file, 'w') as outfile:
for line in infile:
= line.split('#')
fields if len(fields) >= 4:
= fields[0]
field_1 = fields[3]
field_4 outfile.write(field_1 + "#" + field_4 + "\n")
transform():
def
global extracted_file, transformed_fileprint("Inside Transform")
open(extracted_file, 'r') as infile, \
with open(transformed_file, 'w') as outfile:
for line in infile:
= line.upper()
processed_line outfile.write(processed_line + '\n')
load():
def
global transformed_file, output_fileprint("Inside Load")
# Save the array to a CSV file
open(transformed_file, 'r') as infile, \
with open(output_file, 'w') as outfile:
for line in infile:
outfile.write(line + '\n')
check():
def
global output_fileprint("Inside Check")
# Save the array to a CSV file
open(output_file, 'r') as infile:
with for line in infile:
print(line)
# ___ ARGUMENTS
= {
default_args 'owner': 'Your name',
'start_date': days_ago(0),
'email': ['your email'],
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# ___ DAG DEFINITION
= DAG(
dag 'my-first-python-etl-dag',
default_args=default_args,
description='My first DAG',
schedule_interval=timedelta(days=1),
)
# ___ TASK DEFINITION : DOWNLOAD to call the `download_file` function
= PythonOperator(
download task_id='download',
python_callable=download_file,
dag=dag,
)
# ___ TASK DEFINITION : EXTRACT to call the `extract` function
= PythonOperator(
execute_extract task_id='extract',
python_callable=extract,
dag=dag,
)
# ___ TASK DEFINITION : TRANSFORM to call the `transform` function
= PythonOperator(
execute_transform task_id='transform',
python_callable=transform,
dag=dag,
)
# ___ TASK DEFINITION : LOAD to call the `load` function
= PythonOperator(
execute_load task_id='load',
python_callable=load,
dag=dag,
)
# ___ TASK DEFINITION : CHECK to call the `check` function
= PythonOperator(
execute_check task_id='check',
python_callable=check,
dag=dag,
)
# ___ PIPELINE
>> execute_extract >> execute_transform >> execute_load >> execute_check download
Submit & Verify DAG
# ___ SUBMIT DAG file into the directory
$AIRFLOW_HOME/dags
cp ETL_Server_Access_Log_Processing.py
# ___ VERIFY the DAG was submitted
| grep etl-server-logs-dag
airflow dags list
# ___ LIST All Tasks
-server-logs-dag
airflow tasks list etl
# ___ CHECK for errors
-import-errors airflow dags list