DAG ETL Server Data


Note: this is the same project as DAG Bash ETL Server Data that was done using BashOperator

Case Study - Python


Write a DAG named ETL_Server_Access_Log_Processing that will

  • download a file from a remote server
  • extract the file
  • transform the content
  • load the transformed data into a file
  • verify/check DAG succeeded

Data

  • Data is here
  • The server access log file contains these fields.

a. timestamp - TIMESTAMP
b. latitude - float
c. longitude - float
d. visitorid - char(37)
e. accessed_from_mobile - boolean
f. browser_code - int

Tasks

  1. Add tasks in the DAG file to
  2. download the file
  3. read the file
  4. extract the fields timestamp and visitorid from the web-server-access-log.txt
  5. transform the data by capitalizing the visitorid for all the records and store it in a local variable
  6. load the data into a new file capitalized.txt

Requirements

  1. Create the imports block
  2. Create the DAG Arguments block. You can use the default settings
  3. Create the DAG definition block. The DAG should run daily
  4. Create the tasks extract, transform, and load to call the Python script
  5. Create the task pipeline block
  6. Submit the DAG
  7. Verify if the DAG is submitted

Solution


Create DAG


Script file

  • Create a new file by going to File -> New File from the menu and name it as ETL_Server_Access_Log_Processing.py.

Imports

# ___  IMPORT libraries
from datetime import timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow.models import DAG

# Operators; you need this to write tasks!
from airflow.operators.python import PythonOperator

# This makes scheduling easy
from airflow.utils.dates import days_ago import requests

Set Variables

  • Define input and output files
# ___  DEFINE input & output files
input_file = 'web-server-access-log.txt'
extracted_file = 'extracted-data.txt'
transformed_file = 'transformed.txt'
output_file = 'capitalized.txt'

Arguments

# ___  ARGUMENTS
default_args = {
    'owner': 'Your name',
    'start_date': days_ago(0),
    'email': ['your email'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

Definition

# ___  DAG DEFINITION
dag = DAG(
    'my-first-python-etl-dag',
    default_args=default_args,
    description='My first DAG',
    schedule_interval=timedelta(days=1),
)

Create Functions


Here are the five tasks that we need to create functions for:

  • download
  • execute_extract
  • execute_transform
  • execute_load
  • execute_check

Function 1: download_file

  • Download the file from here
# ___  CREATE FUNCTION 1 FOR TASK 1
def download_file():
    url = "https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM
-DB0250EN-SkillsNetwork/labs/Apache%20Airflow/Build%20a%20DAG%20using%
20Airflow/web-server-access-log.txt"
    # Send a GET request to the URL
    with requests.get(url, stream=True) as response:
        # Raise an exception for HTTP errors
        response.raise_for_status()
        # Open a local file in binary write mode
        with open(input_file, 'wb') as file:
            # Write the content to the local file in chunks
            for chunk in response.iter_content(chunk_size=8192):
                file.write(chunk)
    print(f"File downloaded successfully: {input_file}")

Function 2: extract

def extract():
    global input_file
    print("Inside Extract")
    # Read the contents of the file into a string
    with open(input_file, 'r') as infile, \
            open(extracted_file, 'w') as outfile:
        for line in infile:
            fields = line.split('#')
            if len(fields) >= 4:
                field_1 = fields[0]
                field_4 = fields[3]
                outfile.write(field_1 + "#" + field_4 + "\n")

Function 3: transform

# ___  CREATE FUNCTION 3 FOR TASK 3
def transform():
    global extracted_file, transformed_file
    print("Inside Transform")
    with open(extracted_file, 'r') as infile, \
            open(transformed_file, 'w') as outfile:
        for line in infile:          
            processed_line = line.upper()
            outfile.write(processed_line + '\n')

Function 4: load

# ___  CREATE FUNCTION 4 FOR TASK 4
def load():
    global transformed_file, output_file
    print("Inside Load")
    # Save the array to a CSV file
    with open(transformed_file, 'r') as infile, \
            open(output_file, 'w') as outfile:
        for line in infile:
            outfile.write(line + '\n')

Function 5: check

# ___  CREATE FUNCTION 5 FOR TASK 5
def check():
    global output_file
    print("Inside Check")
    # Save the array to a CSV file
    with open(output_file, 'r') as infile:
        for line in infile:
            print(line)

Define Tasks


From the functions we’ve created we can now define our five tasks:

Task 1: download

# ___ TASK DEFINITION : DOWNLOAD to call the `download_file` function
download = PythonOperator(
    task_id='download',
    python_callable=download_file,
    dag=dag,
)

Task 2: extract

# ___ TASK DEFINITION : EXTRACT to call the `extract` function
execute_extract = PythonOperator(
    task_id='extract',
    python_callable=extract,
    dag=dag,
)

Task 3: transform

# ___ TASK DEFINITION : TRANSFORM to call the `transform` function
execute_transform = PythonOperator(
    task_id='transform',
    python_callable=transform,
    dag=dag,
)

Task 4: load

# ___ TASK DEFINITION : LOAD to call the `load` function
execute_load = PythonOperator(
    task_id='load',
    python_callable=load,
    dag=dag,
)

Task 5: check

# ___ TASK DEFINITION : CHECK to call the `check` function
execute_check = PythonOperator(
    task_id='check',
    python_callable=check,
    dag=dag,
)

Define Pipeline


# ___ PIPELINE
download >> execute_extract >> execute_transform >> execute_load >> execute_check

Submit DAG


# ___ SUBMIT DAG file into the directory
cp ETL_Server_Access_Log_Processing.py $AIRFLOW_HOME/dags

Verify DAG

# ___ VERIFY the DAG was submitted
airflow dags list | grep etl-server-logs-dag

List Tasks

# ___ LIST All Tasks
airflow tasks list etl-server-logs-dag

# ___ CHECK for errors
airflow dags list-import-errors

Complete Script


# ___  IMPORT libraries
from datetime import timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow.models import DAG

# Operators; you need this to write tasks!
from airflow.operators.python import PythonOperator

# This makes scheduling easy
from airflow.utils.dates import days_ago import requests


# ___  DEFINE input & output files
input_file = 'web-server-access-log.txt'
extracted_file = 'extracted-data.txt'
transformed_file = 'transformed.txt'
output_file = 'capitalized.txt'


# ___  CREATE functions
def download_file():
    url = "https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM
-DB0250EN-SkillsNetwork/labs/Apache%20Airflow/Build%20a%20DAG%20using%
20Airflow/web-server-access-log.txt"
    # Send a GET request to the URL
    with requests.get(url, stream=True) as response:
        # Raise an exception for HTTP errors
        response.raise_for_status()
        # Open a local file in binary write mode
        with open(input_file, 'wb') as file:
            # Write the content to the local file in chunks
            for chunk in response.iter_content(chunk_size=8192):
                file.write(chunk)
    print(f"File downloaded successfully: {input_file}")


def extract():
    global input_file
    print("Inside Extract")
    # Read the contents of the file into a string
    with open(input_file, 'r') as infile, \
            open(extracted_file, 'w') as outfile:
        for line in infile:
            fields = line.split('#')
            if len(fields) >= 4:
                field_1 = fields[0]
                field_4 = fields[3]
                outfile.write(field_1 + "#" + field_4 + "\n")


def transform():
    global extracted_file, transformed_file
    print("Inside Transform")
    with open(extracted_file, 'r') as infile, \
            open(transformed_file, 'w') as outfile:
        for line in infile:          
            processed_line = line.upper()
            outfile.write(processed_line + '\n')


def load():
    global transformed_file, output_file
    print("Inside Load")
    # Save the array to a CSV file
    with open(transformed_file, 'r') as infile, \
            open(output_file, 'w') as outfile:
        for line in infile:
            outfile.write(line + '\n')


def check():
    global output_file
    print("Inside Check")
    # Save the array to a CSV file
    with open(output_file, 'r') as infile:
        for line in infile:
            print(line)


# ___  ARGUMENTS
default_args = {
    'owner': 'Your name',
    'start_date': days_ago(0),
    'email': ['your email'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# ___  DAG DEFINITION
dag = DAG(
    'my-first-python-etl-dag',
    default_args=default_args,
    description='My first DAG',
    schedule_interval=timedelta(days=1),
)

# ___ TASK DEFINITION : DOWNLOAD to call the `download_file` function
download = PythonOperator(
    task_id='download',
    python_callable=download_file,
    dag=dag,
)

# ___ TASK DEFINITION : EXTRACT to call the `extract` function
execute_extract = PythonOperator(
    task_id='extract',
    python_callable=extract,
    dag=dag,
)

# ___ TASK DEFINITION : TRANSFORM to call the `transform` function
execute_transform = PythonOperator(
    task_id='transform',
    python_callable=transform,
    dag=dag,
)

# ___ TASK DEFINITION : LOAD to call the `load` function
execute_load = PythonOperator(
    task_id='load',
    python_callable=load,
    dag=dag,
)

# ___ TASK DEFINITION : CHECK to call the `check` function
execute_check = PythonOperator(
    task_id='check',
    python_callable=check,
    dag=dag,
)

# ___ PIPELINE
download >> execute_extract >> execute_transform >> execute_load >> execute_check

Submit & Verify DAG

# ___ SUBMIT DAG file into the directory
cp ETL_Server_Access_Log_Processing.py $AIRFLOW_HOME/dags

# ___ VERIFY the DAG was submitted
airflow dags list | grep etl-server-logs-dag

# ___ LIST All Tasks
airflow tasks list etl-server-logs-dag

# ___ CHECK for errors
airflow dags list-import-errors