://downloads.apache.org/kafka/3.7.0/kafka_2.12-3.7.0.tgz wget https
Kafka ETL Toll Data
Case Study
You have been assigned to a project that aims to de-congest the national highways by analyzing the road traffic data from different toll plazas. As a vehicle passes a toll plaza, the vehicle’s data like vehicle_id
,vehicle_type
,toll_plaza_id
, and timestamp are streamed to Kafka. Your job is to create a data pipe line that collects the streaming data and loads it into a database.
Steps
In this project we will create a streaming data pipe by performing these steps:
- Start a MySQL database server
- Create a table to hold the toll data
- Start the Kafka server
- Install the Kafka Python driver
- Install the MySQL Python driver
- Create a topic named toll in Kafka
- Download streaming data generator program
- Customize the generator program to steam to toll topic
- Download and customize streaming data consumer
- Customize the consumer program to write into a MySQL database table
- Verify that streamed data is being collected in the database table
Setup Environment
Download Kafka
Extract Kafka
# ____ Create directory and Extract
-xzf kafka_2.12-3.7.0.tgz tar
Change to directory
.12-3.7.0 cd kafka_2
Generate Cluster
- Generate a cluster UUID that will uniquely identify the Kafka cluster
="$(bin/kafka-storage.sh random-uuid)" KAFKA_CLUSTER_ID
Configure Log Directory
- KRaft requires the log directories to be configured
- Run the following command to configure the log directories passing the cluster id.
/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties bin
Start Kafka
- Now that KRaft is configured, you can start the Kafka server by running the following command.
/kafka-server-start.sh config/kraft/server.properties bin
MySQL Setup
Start MySQL server
- Launch MySQL Web UI
- On the launching page, click the Start button.
- Once the MySQL server started, select the Connection Information tab. From that, copy the password.
Connect to MySQL
- Save password for later use
- Using the password from above connect as below
--host=mysql --port=3306 --user=root --password= blah mysql
Create DB
- At the mysql> prompt run
create database tolldata;
Create Table
- Create livetolldata in db from above with
use tolldata;
- Schema as shown below
- This is the table where you will store all streamed data that comes from Kafka.
- Each row is a record of when a vehicle has passed through a certain toll plaza along with its type and anonymized id.
use tolldata;
livetolldata(timestamp datetime,vehicle_id int,vehicle_type char(15),toll_plaza_id smallint); create table
Disconnect from Server
exit
Install Python Package
- Install the Python module
kafka-python
. - This Python module will help you to communicate with kafka server.
- It can used to send and receive messages from Kafka.
-python pip3 install kafka
- Install the Python module
mysql-connector-python
using thepip
command - It will aid in interacting with MySQL server
-connector-python==8.0.31 pip3 install mysql
Create Kafka Topic
Create a Kafka topic named
toll
.Download the
toll_traffic_generator.py
from the url given below using wget.
://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Final%20Assignment/toll_traffic_generator.py wget https
Open the code using the editor using the “Menu –> File –>Open” option.
Open the
toll_traffic_generator.py
and set the topic totoll
.Run the
toll_traffic_generator.py
.
python3 toll_traffic_generator.py
- Download the
streaming-data-reader.py
from the URL below using wget.
://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/vVxmU5uatDowvAIKRZrFjg/streaming-data-reader.py wget https
Open the
streaming-data-reader.py
and modify the following details so that the program can connect to your MySQL server.TOPIC
DATABASE
USERNAME
PASSWORD
Run the
streaming-data-reader.py
.
-data-reader.py python3 streaming
- If you completed all the steps correctly, the streaming toll data will get stored in the table
livetolldata
. As a last step in this lab, open mysql CLI and list the top 10 rows in the tablelivetolldata
.
# Start Kafka
# Task 2.1 - Start Zookeeper
# $ cd kafka_2.12-2.8.0
/zookeeper-server-start.sh config/zookeeper.properties
bin
# Task 2.2 - Start Kafka server
# new terminal
# $ cd kafka_2.12-2.8.0
/kafka-server-start.sh config/server.properties
bin
# Task 2.3 - Create a topic named toll
# new terminal
# $ cd kafka_2.12-2.8.0
/kafka-topics.sh --create --topic toll --bootstrap-server localhost:9092
bin
# Task 2.4 - Download the Toll Traffic Simulator
# $ wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Final%20Assignment/toll_traffic_generator.py
# open toll_traffic_generator.py
# Task 2.5 - Configure the Toll Traffic Simulator
# set the topic to toll
"""
Top Traffic Simulator
"""
from time import sleep, time, ctime
from random import random, randint, choice
from kafka import KafkaProducer= KafkaProducer(bootstrap_servers='localhost:9092')
producer
= 'toll'
TOPIC
= ("car", "car", "car", "car", "car", "car", "car", "car",
VEHICLE_TYPES "car", "car", "car", "truck", "truck", "truck",
"truck", "van", "van")
for _ in range(100000):
= randint(10000, 10000000)
vehicle_id = choice(VEHICLE_TYPES)
vehicle_type = ctime(time())
now = randint(4000, 4010)
plaza_id = f"{now},{vehicle_id},{vehicle_type},{plaza_id}"
message = bytearray(message.encode("utf-8"))
message print(f"A {vehicle_type} has passed by the toll plaza {plaza_id} at {now}.")
producer.send(TOPIC, message)
sleep(random() * 2)
# save
# Task 2.6 - Run the Toll Traffic Simulator
# run toll_traffic_generator.py
# $ python3 toll_traffic_generator.py
# output:
## A car has passed by the toll plaza 4006 at Wed Apr 19 16:16:08 2023.
## A car has passed by the toll plaza 4002 at Wed Apr 19 16:16:10 2023.
## A truck has passed by the toll plaza 4002 at Wed Apr 19 16:16:11 2023.
## A car has passed by the toll plaza 4004 at Wed Apr 19 16:16:11 2023.
## A car has passed by the toll plaza 4006 at Wed Apr 19 16:16:11 2023.
# Task 2.7 - Configure streaming_data_reader.py
# download streaming_data_reader.py
# new terminal
# $ wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Final%20Assignment/streaming_data_reader.py
# open streaming_data_reader.py
# modify: TOPIC DATABASE USERNAME PASSWORD
"""
Streaming data consumer
"""
from datetime import datetime
from kafka import KafkaConsumer
import mysql.connector
='toll'
TOPIC= 'tolldata'
DATABASE = 'root'
USERNAME = 'MTI3NDUtYnJldGFu'
PASSWORD
print("Connecting to the database")
:
try= mysql.connector.connect(host='localhost', database=DATABASE, user=USERNAME, password=PASSWORD)
connection :
except Exceptionprint("Could not connect to database. Please check credentials")
else:
print("Connected to database")
= connection.cursor()
cursor
print("Connecting to Kafka")
= KafkaConsumer(TOPIC)
consumer print("Connected to Kafka")
print(f"Reading messages from the topic {TOPIC}")
for msg in consumer:
# Extract information from kafka
= msg.value.decode("utf-8")
message
# Transform the date format to suit the database schema
= message.split(",")
(timestamp, vehcile_id, vehicle_type, plaza_id)
= datetime.strptime(timestamp, '%a %b %d %H:%M:%S %Y')
dateobj = dateobj.strftime("%Y-%m-%d %H:%M:%S")
timestamp
# Loading data into the database table
= "insert into livetolldata values(%s,%s,%s,%s)"
sql = cursor.execute(sql, (timestamp, vehcile_id, vehicle_type, plaza_id))
result print(f"A {vehicle_type} was inserted into the database")
connection.commit()
connection.close()
# Task 2.8 - Run streaming_data_reader.py
# $ python3 streaming_data_reader.py
# output:
## Reading messages from the topic toll
## A car was inserted into the database
## A truck was inserted into the database
## A car was inserted into the database
## A car was inserted into the database
## A car was inserted into the database
## A car was inserted into the database
## A car was inserted into the database...
# Task 2.9 - Health check of the streaming data pipeline
# list the top 10 rows in the table livetolldata where streaming toll data is stored
# reconnect
# $ mysql --host=127.0.0.1 --port=3306 --user=root --password=MTI3NDUtYnJldGFu
# mysql> SELECT * FROM tolldata.livetolldata LIMIT 10;
# output:
+---------------------+------------+--------------+---------------+
| timestamp | vehicle_id | vehicle_type | toll_plaza_id |
+---------------------+------------+--------------+---------------+
| 2023-04-19 16:22:53 | 4098571 | car | 4008 |
| 2023-04-19 16:22:54 | 3537739 | truck | 4007 |
| 2023-04-19 16:22:55 | 9724284 | car | 4005 |
| 2023-04-19 16:22:55 | 5307225 | car | 4002 |
| 2023-04-19 16:22:55 | 1096233 | car | 4008 |
| 2023-04-19 16:22:57 | 2925913 | car | 4000 |
| 2023-04-19 16:22:59 | 8914529 | car | 4002 |
| 2023-04-19 16:23:00 | 7849247 | van | 4005 |
| 2023-04-19 16:23:01 | 4702309 | car | 4003 |
| 2023-04-19 16:23:01 | 1590871 | car | 4001 |
+---------------------+------------+--------------+---------------+