Kafka ETL Toll Data

Case Study


 You have been assigned to a project that aims to de-congest the national highways by analyzing the road traffic data from different toll plazas. As a vehicle passes a toll plaza, the vehicle’s data like vehicle_id,vehicle_type,toll_plaza_id, and timestamp are streamed to Kafka. Your job is to create a data pipe line that collects the streaming data and loads it into a database.

Steps

In this project we will create a streaming data pipe by performing these steps:

  • Start a MySQL database server
  • Create a table to hold the toll data
  • Start the Kafka server
  • Install the Kafka Python driver
  • Install the MySQL Python driver
  • Create a topic named toll in Kafka
  • Download streaming data generator program
  • Customize the generator program to steam to toll topic
  • Download and customize streaming data consumer
  • Customize the consumer program to write into a MySQL database table
  • Verify that streamed data is being collected in the database table

Setup Environment


Download Kafka

wget https://downloads.apache.org/kafka/3.7.0/kafka_2.12-3.7.0.tgz

Extract Kafka

# ____  Create directory and Extract
tar -xzf kafka_2.12-3.7.0.tgz

Change to directory

cd kafka_2.12-3.7.0

Generate Cluster

  • Generate a cluster UUID that will uniquely identify the Kafka cluster
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

Configure Log Directory

  • KRaft requires the log directories to be configured
  • Run the following command to configure the log directories passing the cluster id.
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

Start Kafka

  • Now that KRaft is configured, you can start the Kafka server by running the following command.
bin/kafka-server-start.sh config/kraft/server.properties

MySQL Setup


Start MySQL server

  • Launch MySQL Web UI
  • On the launching page, click the Start button.

  • Once the MySQL server started, select the Connection Information tab. From that, copy the password.

Connect to MySQL

  • Save password for later use
  • Using the password from above connect as below
mysql --host=mysql --port=3306 --user=root --password= blah

Create DB

  • At the mysql> prompt run
create database tolldata;

Create Table

  • Create livetolldata in db from above with use tolldata;
  • Schema as shown below
  • This is the table where you will store all streamed data that comes from Kafka.
  • Each row is a record of when a vehicle has passed through a certain toll plaza along with its type and anonymized id.
use tolldata;

create table livetolldata(timestamp datetime,vehicle_id int,vehicle_type char(15),toll_plaza_id smallint);

Disconnect from Server

exit

Install Python Package

  • Install the Python module kafka-python.
  • This Python module will help you to communicate with kafka server.
  • It can used to send and receive messages from Kafka.
pip3 install kafka-python
  • Install the Python module mysql-connector-python using the pip command
  • It will aid in interacting with MySQL server
pip3 install mysql-connector-python==8.0.31

Create Kafka Topic


  1. Create a Kafka topic named toll.

  2. Download the toll_traffic_generator.py from the url given below using wget.

wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Final%20Assignment/toll_traffic_generator.py
  1. Open the code using the editor using the “Menu –> File –>Open” option.

  2. Open the toll_traffic_generator.py and set the topic to toll.

  3. Run the toll_traffic_generator.py.

python3 toll_traffic_generator.py
  1. Download the streaming-data-reader.py from the URL below using wget.
wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/vVxmU5uatDowvAIKRZrFjg/streaming-data-reader.py
  1. Open the streaming-data-reader.py and modify the following details so that the program can connect to your MySQL server.

    TOPIC

    DATABASE

    USERNAME

    PASSWORD

  2. Run the streaming-data-reader.py.

python3 streaming-data-reader.py
  1. If you completed all the steps correctly, the streaming toll data will get stored in the table livetolldata. As a last step in this lab, open mysql CLI and list the top 10 rows in the table livetolldata.
# Start Kafka

# Task 2.1 - Start Zookeeper
# $ cd kafka_2.12-2.8.0
bin/zookeeper-server-start.sh config/zookeeper.properties

# Task 2.2 - Start Kafka server
# new terminal
# $ cd kafka_2.12-2.8.0
bin/kafka-server-start.sh config/server.properties

# Task 2.3 - Create a topic named toll
# new terminal
# $ cd kafka_2.12-2.8.0
bin/kafka-topics.sh --create --topic toll --bootstrap-server localhost:9092

# Task 2.4 - Download the Toll Traffic Simulator
# $ wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Final%20Assignment/toll_traffic_generator.py
# open toll_traffic_generator.py

# Task 2.5 - Configure the Toll Traffic Simulator
# set the topic to toll
"""
Top Traffic Simulator
"""
from time import sleep, time, ctime
from random import random, randint, choice
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')

TOPIC = 'toll'

VEHICLE_TYPES = ("car", "car", "car", "car", "car", "car", "car", "car",
                 "car", "car", "car", "truck", "truck", "truck",
                 "truck", "van", "van")
for _ in range(100000):
    vehicle_id = randint(10000, 10000000)
    vehicle_type = choice(VEHICLE_TYPES)
    now = ctime(time())
    plaza_id = randint(4000, 4010)
    message = f"{now},{vehicle_id},{vehicle_type},{plaza_id}"
    message = bytearray(message.encode("utf-8"))
    print(f"A {vehicle_type} has passed by the toll plaza {plaza_id} at {now}.")
    producer.send(TOPIC, message)
    sleep(random() * 2)
# save 

# Task 2.6 - Run the Toll Traffic Simulator
# run toll_traffic_generator.py
# $ python3 toll_traffic_generator.py
# output:
## A car has passed by the toll plaza 4006 at Wed Apr 19 16:16:08 2023.
## A car has passed by the toll plaza 4002 at Wed Apr 19 16:16:10 2023.
## A truck has passed by the toll plaza 4002 at Wed Apr 19 16:16:11 2023.
## A car has passed by the toll plaza 4004 at Wed Apr 19 16:16:11 2023.
## A car has passed by the toll plaza 4006 at Wed Apr 19 16:16:11 2023.

# Task 2.7 - Configure streaming_data_reader.py
# download streaming_data_reader.py
# new terminal
# $ wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Final%20Assignment/streaming_data_reader.py
# open streaming_data_reader.py
# modify: TOPIC DATABASE USERNAME PASSWORD
"""
Streaming data consumer
"""
from datetime import datetime
from kafka import KafkaConsumer
import mysql.connector

TOPIC='toll'
DATABASE = 'tolldata'
USERNAME = 'root'
PASSWORD = 'MTI3NDUtYnJldGFu'

print("Connecting to the database")
try:
    connection = mysql.connector.connect(host='localhost', database=DATABASE, user=USERNAME, password=PASSWORD)
except Exception:
    print("Could not connect to database. Please check credentials")
else:
    print("Connected to database")
cursor = connection.cursor()

print("Connecting to Kafka")
consumer = KafkaConsumer(TOPIC)
print("Connected to Kafka")
print(f"Reading messages from the topic {TOPIC}")
for msg in consumer:

    # Extract information from kafka

    message = msg.value.decode("utf-8")

    # Transform the date format to suit the database schema
    (timestamp, vehcile_id, vehicle_type, plaza_id) = message.split(",")

    dateobj = datetime.strptime(timestamp, '%a %b %d %H:%M:%S %Y')
    timestamp = dateobj.strftime("%Y-%m-%d %H:%M:%S")

    # Loading data into the database table

    sql = "insert into livetolldata values(%s,%s,%s,%s)"
    result = cursor.execute(sql, (timestamp, vehcile_id, vehicle_type, plaza_id))
    print(f"A {vehicle_type} was inserted into the database")
    connection.commit()
connection.close()

# Task 2.8 - Run streaming_data_reader.py
# $ python3 streaming_data_reader.py
# output: 
## Reading messages from the topic toll
## A car was inserted into the database
## A truck was inserted into the database
## A car was inserted into the database
## A car was inserted into the database
## A car was inserted into the database
## A car was inserted into the database
## A car was inserted into the database...

# Task 2.9 - Health check of the streaming data pipeline
# list the top 10 rows in the table livetolldata where streaming toll data is stored
# reconnect
# $ mysql --host=127.0.0.1 --port=3306 --user=root --password=MTI3NDUtYnJldGFu
# mysql> SELECT * FROM tolldata.livetolldata LIMIT 10;
# output:
+---------------------+------------+--------------+---------------+
| timestamp           | vehicle_id | vehicle_type | toll_plaza_id |
+---------------------+------------+--------------+---------------+
| 2023-04-19 16:22:53 |    4098571 | car          |          4008 |
| 2023-04-19 16:22:54 |    3537739 | truck        |          4007 |
| 2023-04-19 16:22:55 |    9724284 | car          |          4005 |
| 2023-04-19 16:22:55 |    5307225 | car          |          4002 |
| 2023-04-19 16:22:55 |    1096233 | car          |          4008 |
| 2023-04-19 16:22:57 |    2925913 | car          |          4000 |
| 2023-04-19 16:22:59 |    8914529 | car          |          4002 |
| 2023-04-19 16:23:00 |    7849247 | van          |          4005 |
| 2023-04-19 16:23:01 |    4702309 | car          |          4003 |
| 2023-04-19 16:23:01 |    1590871 | car          |          4001 |
+---------------------+------------+--------------+---------------+