DAG ETL Server Data


Note: this is the same project as DAG Python ETL Server Data that was done using PythonOperator

Case Study - Bash


Note: this is the same project as DAG Python ETL Case 1 that was done using BashOperator

Write a DAG named Bash_ETL_Server_Log_Processing.py

  • download a file from a remote server at this URL
  • extract the file
  • transform the content
  • load the transformed data into a file
  • verify/check DAG succeeded

The server access log file contains these fields.

a. timestamp - TIMESTAMP
b. latitude - float
c. longitude - float
d. visitorid - char(37)
e. accessed_from_mobile - boolean
f. browser_code - int

Tasks

  1. Add tasks in the DAG file to
  2. download the file
  3. read the file
  4. extract the fields timestamp and visitorid from the web-server-access-log.txt
  5. transform the data by capitalizing the visitorid for all the records and store it in a local variable
  6. load the data into a new file capitalized.txt

Steps

  1. Create the imports block
  2. Create the DAG Arguments block. You can use the default settings
  3. Create the DAG definition block. The DAG should run daily
  4. Create the tasks extract, transform, and load to call the Python script
  5. Create the task pipeline block
  6. Submit the DAG
  7. Verify if the DAG is submitted

Solution

Create a new file by going to File -> New File from the menu and name it as ETL_Server_Access_Log_Processing.py.

Here is the code below in the python file. This will contain your DAG with five tasks:

  • download
  • execute_extract
  • execute_transform
  • execute_load
  • execute_check
# ___  IMPORT libraries
from datetime import timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow.models import DAG

# Operators; you need this to write tasks!
from airflow.operators.bash_operator import BashOperator

# This makes scheduling easy
from airflow.utils.dates import days_ago


# ___  ARGUMENTS
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'your_name',
    'start_date': days_ago(0),
    'email': ['your email'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# ___  DAG DEFINITION
dag = DAG(
    'Bash_ETL_Server_Log_Processing',
    default_args=default_args,
    description='Bash DAG',
    schedule_interval=timedelta(days=1),
)


# ___ TASK DEFINITION : DOWNLOAD
download = BashOperator(
    task_id='download',
    bash_command='curl "https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-
    SkillsNetwork/labs/Apache%20Airflow/Build%20a%20DAG%20using%20Airflow/
    web-server-access-log.txt" -o web-server-access-log.txt',
    dag=dag,
)


# ___ TASK DEFINITION : EXTRACT
extract = BashOperator(
    task_id='extract',
    bash_command='cut -f1,4 -d"#" web-server-access-log.txt > /home/project/airflow/dags/extracted.txt',
    dag=dag,
)


# ___ TASK DEFINITION : TRANSFORM
transform = BashOperator(
    task_id='transform',
    bash_command='tr "[a-z]" "[A-Z]" < /home/project/airflow/dags/extracted.txt >
    /home/project/airflow/dags/capitalized.txt',
    dag=dag,
)


# ___ TASK DEFINITION : LOAD
load = BashOperator(
    task_id='load',
    bash_command='zip log.zip capitalized.txt' ,
    dag=dag,
)

# ___ PIPELINE
download >> extract >> transform >> load

Submit DAG

# ___ SUBMIT DAG file into the directory
export AIRFLOW_HOME=/home/project/airflow 
cp  Bash_ETL_Server_Log_Processing.py $AIRFLOW_HOME/dags

# ___ LIST All Tasks 
airflow dags list
 
# ___ OUTPUT
download
extract
load
transform

Verify DAG

# ___ VERIFY the DAG was submitted
airflow dags list|grep "Bash_ETL_Server_Log_Processing"
# OUTPUT

Bash_ETL_Server_Log_Processing       | /home/project/airflow/dags/ETL_Server_Log_Processing.py 
                                | your_name      | True     

UI View