://downloads.apache.org/kafka/3.7.0/kafka_2.12-3.7.0.tgz wget https
Setup
This section will cover the setup and use of Apache Kafka on the Skills Network IDE. Local install will follow in another section.
- Start a New Terminal
Download Kafka
Extract Kafka
- Extract Kafka from the zip file into the new directory
kafka_2.12-3.7.0
-xzf kafka_2.12-3.7.0.tgz tar
Generate Cluster ID
- navigate to that directory
- generate a cluster UUID that will uniquely identify the Kafka cluster
- the cluster id will be used by the KRaft controller
.12-3.7.0
cd kafka_2# __ generate a cluster
="$(bin/kafka-storage.sh random-uuid)" KAFKA_CLUSTER_ID
Configure Directories
- KRaft requires the log directories to be configured. Run the following command to configure the log directories passing the cluster ID.
/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties bin
Start Kafka Server
- Now that KRaft is configured, you can start the Kafka server by running the following command.
/kafka-server-start.sh config/kraft/server.properties bin
Create Topic
- Start a new terminal and change to the
kafka_2.12-3.7.0
directory - create topic named
news
.12-3.7.0
cd kafka_2/kafka-topics.sh --create --topic news --bootstrap-server localhost:9092
bin
# OUTPUT
Created topic news:/home/project/kafka_2.12-3.7.0$
Start a Producer
- You need a producer to send messages to Kafka. Run the command below to start a producer.
- After the producer starts, and you get the ‘>’ prompt, type any text message and press enter.
/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic news
bin
# OUTPUT
:/home/project/kafka_2.12-3.7.0$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic news
>Good Morning
>Howdy
>How's the snow?
Start a Consumer
- Now that the producer has sent messages we need to start a consumer to read the messages from Kafka
- Start a terminal and change to the
kafka_2.12-3.7.0
directory - start a consumer to listen to the messages
- Now you can go back to the producer and type any message and it will be displayed here in the consumer
.12-3.7.0
cd kafka_2/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic news --from-beginning
bin
# OUTPUT
Good Morning
Howdy's the snow? How
Explore Directories
Kafka uses the /tmp//tmp/kraft-combined-logs directory to store the messages.
- Start a new terminal and change directories
- Explore the root directory with
ls
- Notice there is a
tmp
directory. Thekraft-combine-logs
inside thetmp
directory contains all the logs. - To check the logs generated for the topic
news
run the command below
Note: All messages are stored in the
news-0
directory under the /tmp/kraft-combined-logs directory.
.12-3.7.0
cd kafka_2
ls# OUTPUT
-docs
LICENSE NOTICE bin config libs licenses logs site
/tmp/kraft-combined-logs/news-0
ls # OUTPUT
00000000000000000000.index leader-epoch-checkpoint
00000000000000000000.log partition.metadata
00000000000000000000.timeindex
Clean Up
Stop the producer
In the terminal where you are running producer, press CTRL+C
.
Stop the consumer
In the terminal where you are running consumer, press CTRL+C
.
Stop the Kafka server
In the terminal where you are running Kafka server, press CTRL+C
.
Case Study 1
Create Topic
- Create a new topic named
weather
.
# Change to the kafka_2.12-3.7.0 directory and run
/kafka-topics.sh --create --topic weather --bootstrap-server localhost:9092 bin
Post Messages
- Post messages to the topic
weather
/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic weather bin
Read Messages
- from the topic
weather
.
/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic weather bin
Case Study 2
- Use message keys to keep message streams sorted in their original publication state and order
- Use consumer offset to control and track message sequential positions in topic partitions
- Start a new terminal and change to the
kafka_2.12-3.7.0
directory
Create Topic
create a
bankbranch
topic to process messages from bank branch ATM machines.Suppose the messages come from the ATM in the form of a simple JSON object, including an ATM ID and a transaction ID like the following example:
{"atmid": 1, "transid": 100}
Create a new topic using the
--topic
argument with the namebankbranch
.To simplify the topic configuration and better explain how message key and consumer offset work, you specify the
--partitions 2
argument to create two partitions for this topic. To compare the differences, you may try otherpartitions
settings for this topic..12-3.7.0 cd kafka_2# __ CREATE Topic /kafka-topics.sh --create --topic bankbranch --partitions 2 --bootstrap-server localhost:9092 bin # OUTPUT Created topic bankbranch
List Topics
/kafka-topics.sh --bootstrap-server localhost:9092 --list
bin
# OUTPUT
__consumer_offsets
bankbranch news
Describe Details
- You can also use the
--describe
command to check the details of the topicbankbranch
.
/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic bankbranch
bin
# OUTPUT
: bankbranch TopicId: 1kQHLybTRZSA4WRY9M33mw PartitionCount: 2 ReplicationFactor: 1
Topic: segment.bytes=1073741824
Configs: bankbranch Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: bankbranch Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic
You can view the bankbranch
as two partitions, Partition 0
and Partition 1
. If no message keys are specified, messages will be published to these two partitions in an alternating sequence like this:
Partition 0
-> Partition 1
-> Partition 0
-> Partition 1
Create Producer
- Run the following command in the same terminal window with the topic details to create a producer for the topic
bankbranch
. - At the > add the following ATM messages as seen below
/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic bankbranch
bin
# OUTPUT
> {"atmid": 1, "transid": 100}
> {"atmid": 1, "transid": 101}
> {"atmid": 2, "transid": 200}
> {"atmid": 1, "transid": 102}
> {"atmid": 2, "transid": 201}
Create Consumer
- To create a consumer in a new terminal to consume these messages
- Open a new terminal and change to the directory
/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bankbranch --from-beginning
bin
# OUTPUT
"atmid": 1, "transid": 100}
{"atmid": 1, "transid": 101}
{"atmid": 2, "transid": 200}
{"atmid": 1, "transid": 102}
{"atmid": 2, "transid": 201} {
Case Study 3
In this step, we will use message keys to ensure that messages with the same key are consumed in the same order as they were published. In the back end, messages with the same key are published into the same partition and will always be consumed by the same consumer. As such, the original publication order is kept on the consumer side.
At this point, we have the following three terminals open in Cloud IDE as we’ll be switching frequently between these terminals:
- Kafka Server terminal
- Producer terminal
- Consumer terminal
Stop C & P
First off let’s stop both Consumer and Producer terminals with CTRL + C
Next we will now start a new producer and consumer using message keys. You can start a new producer with the following message key options:
--property parse.key=true
to make the producer parse message keys--property key.separator=:
define the key separator to be the:
character, so our message with key now looks like the following key-value pair example:1:{"atmid": 1, "transid": 102}
Here, the message key is1
, which also corresponds to the ATM ID, and the value is the transaction JSON object,{"atmid": 1, "transid": 102}
.
Producer w Message Key
- Start a new producer with the message key enabled
- Now should see the
>
- Produce the messages below
/kafka-console-producer.sh --bootstrap-server localhost:9092
bin--topic bankbranch --property parse.key=true --property key.separator=:
# OUTPUT
> 1:{"atmid": 1, "transid": 103}
> 1:{"atmid": 1, "transid": 104}
> 2:{"atmid": 2, "transid": 202}
> 2:{"atmid": 2, "transid": 203}
> 1:{"atmid": 1, "transid": 105}
Start Consumer
- Switch to the consumer terminal and start a new consumer
- for this new key.separator
- As soon as that’s done we should see the messages from the Producer above
/kafka-console-consumer.sh --bootstrap-server localhost:9092
bin--topic bankbranch --from-beginning --property print.key=true --property key.separator=:
# OUTPUT
1:{"atmid": 1, "transid": 103}
1:{"atmid": 1, "transid": 104}
1:{"atmid": 1, "transid": 105}
2:{"atmid": 2, "transid": 202}
2:{"atmid": 2, "transid": 203}
Now, you should see that messages with the same key are being consumed in the same order (for example: trans102 -> trans103 -> trans104
) as they were published.
Each topic partition maintains its message queue, and new messages are enqueued (appended to the end of the queue) as they are published to the partition. Once consumed, the earliest messages are dequeued and no longer available for consumption.
Recall that with two partitions and no message keys specified, the transaction messages were published to the two partitions in rotation:
- Partition 0:
[{"atmid": 1, "transid": 100}, {"atmid": 2, "transid": 200}, {"atmid": 2, "transid": 201}]
- Partition 1:
[{"atmid": 1, "transid": 101}, {"atmid": 1, "transid": 102}]
As you can see, the transaction messages from atm1
and atm2
got scattered across both partitions. It would be difficult to unravel this and consume messages from one ATM in the same order as they were published.
However, with the message key specified as the atmid
value, the messages from the two ATMs will look like the following:
- Partition 0:
[{"atmid": 1, "transid": 103}, {"atmid": 1, "transid": 104}, {"atmid": 1, "transid": 105}]
- Partition 1:
[{"atmid": 2, "transid": 202}, {"atmid": 2, "transid": 203}]
Messages with the same key will always be published to the same partition so that their published order will be preserved within the message queue of each partition.
As such, you can keep the states or orders of the transactions for each ATM.
Consumer Offset
Topic partitions keep published messages in a sequence, such as a list. Message offset indicates a message’s position in the sequence.
For example, the offset of an empty Partition 0 of bankbranch
is 0
, and if you publish the first message to the partition, its offset will be 1
.
Using offsets in the consumer, you can specify the starting position for message consumption, such as from the beginning to retrieve all messages or from some later point to retrieve only the latest messages.
Consumer group
In addition, you normally group related consumers together as a consumer group. For example, you may want to create a consumer for each ATM in the bank and manage all ATM-related consumers together in a group.
So let’s see how to create a consumer group, which is actually very easy with the --group
argument.
- In the consumer terminal, stop the previous consumer if it is still running.
- Run the following command to create a new consumer within a consumer group called
atm-app
:
/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bankbranch --group atm-app bin
After the consumer within the atm-app
consumer group is started, you should not expect any messages to be consumed. This is because the offsets for both partitions have already reached the end. In other words, previous consumers have already consumed all messages and therefore queued them.
You can verify that by checking consumer group details.
- Stop the consumer.
- Show the details of the consumer group
atm-app
:
+ C
CTRL /kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group atm-app
bin
# OUTPUT
-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
GROUP TOPIC PARTITION CURRENT-app bankbranch 0 2 2 0 - - -
atm-app bankbranch 1 8 8 0 - - atm
- You should see that both offsets have been increased by 1, and the
LAG
columns for both partitions have become1
. - It means you have one new message for each partition to be consumed.
- Start the consumer again and see whether the two new messages will be consumed.
- You’ll notice nothing to be consumed as both partitions have reached the end once again
/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bankbranch --group atm-app
bin
# OUTPUT
< blank >
Reset Offset
Let’s look at how you can set the partitions to consume the messages again from the beginning through resetting offset.
You can reset the index with the --reset-offsets
argument.
First, let’s try resetting the offset to the earliest position (beginning) using --reset-offsets --to-earliest
.
- Stop the previous consumer if it is still running, and run the following command to reset the offset.
- Then run the command below to reset the offset
- You’ll notice the offsets have been set to 0
+ C
CTRL /kafka-consumer-groups.sh --bootstrap-server localhost:9092 --topic bankbranch --group atm-app --reset-offsets --to-earliest --execute
bin
# OUTPUT
-OFFSET
GROUP TOPIC PARTITION NEW-app bankbranch 0 0
atm-app bankbranch 1 0 atm
Start Consumer
/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bankbranch --group atm-app
bin
# OUTPUT
"atmid": 1, "transid": 100}
{"atmid": 1, "transid": 101}
{"atmid": 2, "transid": 200}
{"atmid": 1, "transid": 102}
{"atmid": 2, "transid": 201}
{"atmid": 1, "transid": 103}
{"atmid": 1, "transid": 104}
{"atmid": 1, "transid": 105}
{"atmid": 2, "transid": 202}
{"atmid": 2, "transid": 203} {
- Now you see all messages have been consumed again
Reset to Consume the last 2 messages
- Stop the consumer
- Shift the offset to the left by two using
--reset-offsets --shift-by -2
:
+ C
CTRL
/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --topic bankbranch --group atm-app --reset-offsets --shift-by -2 --execute
bin
# OUTPUT
-OFFSET
GROUP TOPIC PARTITION NEW-app bankbranch 0 0
atm-app bankbranch 1 6 atm
- Start Consumer again
- Should see four messages, 2 for each partition
/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bankbranch --group atm-app
bin
# OUTPUT
"atmid": 1, "transid": 104}
{"atmid": 1, "transid": 105}
{"atmid": 2, "transid": 202}
{"atmid": 2, "transid": 203} {
Kafka Python
Since we already have a cluster UUID that has been configured (from above examples), we can start using the Kafka server.
- Open new terminal and navigate to the
kafka_2.12-3.7.0
directory. - Install
kafka-python
package - Create a file named
admin.py
withtouch
.12-3.7.0
cd kafka_2
-python
pip3 install kafka
touch admin.py
Python Code
- Insert this code into
admin.py
Create Topic
- The code will create a topic
"bankbranch"
from kafka.admin import KafkaAdminClient,NewTopic= KafkaAdminClient(bootstrap_servers="localhost:9092", client_id='test')
admin_client = []
topic_list = NewTopic(name="bankbranch", num_partitions= 2, replication_factor=1)
new_topic topic_list.append(new_topic)
admin_client.create_topics(new_topics=topic_list)
Create Producer
- We will need a producer to send messages. The python code below will create a producer
- First create a file named
producer.py
- Insert this code in it
- In the code, the producer is sending across two messages through this code.
- These messages will be received by the consumer.
touch producer.py
from kafka import KafkaProducer
import json= KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer producer.send("bankbranch", {'atmid':1, 'transid':100})
producer.send("bankbranch", {'atmid':2, 'transid':101})
producer.flush()
producer.close()
Create Consumer
- We’ll create a consumer to read messages
- Create a file named consumer.py
- Insert the code in it
touch consumer.py
from kafka import KafkaConsumer= KafkaConsumer('bankbranch',
consumer group_id=None,
bootstrap_servers=['localhost:9092'],
auto_offset_reset = 'earliest')
print("Hello")
print(consumer)
for msg in consumer:
print(msg.value.decode("utf-8"))
Execute Admin & Producer
- Execute
admin.py
and producer.py
with this
python3 admin.py python3 producer.py
Execute Consumer
- Let’s execute the
consumer.py
in - a new terminal
- move to directory
kafka_2.12-3.7.0
.12-3.7.0
cd kafka_2 python3 consumer.py