Setup


This section will cover the setup and use of Apache Kafka on the Skills Network IDE. Local install will follow in another section.

  • Start a New Terminal

Download Kafka

wget https://downloads.apache.org/kafka/3.7.0/kafka_2.12-3.7.0.tgz

Extract Kafka

  • Extract Kafka from the zip file into the new directory kafka_2.12-3.7.0
tar -xzf kafka_2.12-3.7.0.tgz

Generate Cluster ID

  • navigate to that directory
  • generate a cluster UUID that will uniquely identify the Kafka cluster
  • the cluster id will be used by the KRaft controller
cd kafka_2.12-3.7.0
# __ generate a cluster
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

Configure Directories

  • KRaft requires the log directories to be configured. Run the following command to configure the log directories passing the cluster ID.
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

Start Kafka Server

  • Now that KRaft is configured, you can start the Kafka server by running the following command.
bin/kafka-server-start.sh config/kraft/server.properties

Create Topic

  • Start a new terminal and change to the kafka_2.12-3.7.0 directory
  • create topic named news
cd kafka_2.12-3.7.0
bin/kafka-topics.sh --create --topic news --bootstrap-server localhost:9092

# OUTPUT
Created topic news
:/home/project/kafka_2.12-3.7.0$

Start a Producer

  • You need a producer to send messages to Kafka. Run the command below to start a producer.
  • After the producer starts, and you get the ‘>’ prompt, type any text message and press enter.
bin/kafka-console-producer.sh   --bootstrap-server localhost:9092   --topic news

#  OUTPUT
:/home/project/kafka_2.12-3.7.0$ bin/kafka-console-producer.sh   --bootstrap-server localhost:9092   --topic news
>Good Morning
>Howdy
>How's the snow?

Start a Consumer

  • Now that the producer has sent messages we need to start a consumer to read the messages from Kafka
  • Start a terminal and change to the kafka_2.12-3.7.0 directory
  • start a consumer to listen to the messages
  • Now you can go back to the producer and type any message and it will be displayed here in the consumer
cd kafka_2.12-3.7.0
bin/kafka-console-consumer.sh   --bootstrap-server localhost:9092   --topic news   --from-beginning

#  OUTPUT
Good Morning
Howdy
How's the snow?

Explore Directories

Kafka uses the /tmp//tmp/kraft-combined-logs directory to store the messages.

  • Start a new terminal and change directories
  • Explore the root directory with ls
  • Notice there is a tmp directory. The kraft-combine-logs inside the tmp directory contains all the logs.
  • To check the logs generated for the topic news run the command below

Note: All messages are stored in the news-0 directory under the /tmp/kraft-combined-logs directory.

cd kafka_2.12-3.7.0

ls
#  OUTPUT
LICENSE  NOTICE  bin  config  libs  licenses  logs  site-docs

ls /tmp/kraft-combined-logs/news-0
#  OUTPUT
00000000000000000000.index      leader-epoch-checkpoint
00000000000000000000.log        partition.metadata
00000000000000000000.timeindex

Clean Up

Stop the producer

In the terminal where you are running producer, press CTRL+C.

Stop the consumer

In the terminal where you are running consumer, press CTRL+C.

Stop the Kafka server

In the terminal where you are running Kafka server, press CTRL+C.

Case Study 1


Create Topic

  • Create a new topic named weather.
#  Change to the kafka_2.12-3.7.0 directory and run 
bin/kafka-topics.sh --create --topic weather --bootstrap-server localhost:9092

Post Messages

  • Post messages to the topic weather
bin/kafka-console-producer.sh   --bootstrap-server localhost:9092   --topic weather

Read Messages

  • from the topic weather.
bin/kafka-console-consumer.sh   --bootstrap-server localhost:9092   --topic weather

Case Study 2


  • Use message keys to keep message streams sorted in their original publication state and order
  • Use consumer offset to control and track message sequential positions in topic partitions
  • Start a new terminal and change to the kafka_2.12-3.7.0 directory

Create Topic

  • create a bankbranch topic to process messages from bank branch ATM machines.

  • Suppose the messages come from the ATM in the form of a simple JSON object, including an ATM ID and a transaction ID like the following example:

    {"atmid": 1, "transid": 100}

  • Create a new topic using the --topic argument with the name bankbranch.

  • To simplify the topic configuration and better explain how message key and consumer offset work, you specify the --partitions 2 argument to create two partitions for this topic. To compare the differences, you may try other partitions settings for this topic.

    cd kafka_2.12-3.7.0
    # __ CREATE Topic
    bin/kafka-topics.sh --create --topic bankbranch --partitions 2 --bootstrap-server localhost:9092
    
    # OUTPUT
    Created topic bankbranch

List Topics

bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

# OUTPUT
__consumer_offsets
bankbranch
news

Describe Details

  • You can also use the --describe command to check the details of the topic bankbranch.
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic bankbranch

#  OUTPUT
Topic: bankbranch       TopicId: 1kQHLybTRZSA4WRY9M33mw PartitionCount: 2    ReplicationFactor: 1 
Configs: segment.bytes=1073741824
        Topic: bankbranch       Partition: 0    Leader: 1       Replicas: 1  Isr: 1
        Topic: bankbranch       Partition: 1    Leader: 1       Replicas: 1  Isr: 1

You can view the bankbranch as two partitions, Partition 0 and Partition 1. If no message keys are specified, messages will be published to these two partitions in an alternating sequence like this:

Partition 0 -> Partition 1 -> Partition 0 -> Partition 1

Create Producer

  • Run the following command in the same terminal window with the topic details to create a producer for the topic bankbranch.
  • At the > add the following ATM messages as seen below
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic bankbranch

#  OUTPUT
>  {"atmid": 1, "transid": 100}
>  {"atmid": 1, "transid": 101}
>  {"atmid": 2, "transid": 200}
>  {"atmid": 1, "transid": 102}
>  {"atmid": 2, "transid": 201}

Create Consumer

  • To create a consumer in a new terminal to consume these messages
  • Open a new terminal and change to the directory
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bankbranch --from-beginning

#  OUTPUT
{"atmid": 1, "transid": 100}
{"atmid": 1, "transid": 101}
{"atmid": 2, "transid": 200}
{"atmid": 1, "transid": 102}
{"atmid": 2, "transid": 201}

Case Study 3


In this step, we will use message keys to ensure that messages with the same key are consumed in the same order as they were published. In the back end, messages with the same key are published into the same partition and will always be consumed by the same consumer. As such, the original publication order is kept on the consumer side.

At this point, we have the following three terminals open in Cloud IDE as we’ll be switching frequently between these terminals:

  • Kafka Server terminal
  • Producer terminal
  • Consumer terminal

Stop C & P

First off let’s stop both Consumer and Producer terminals with CTRL + C

Next we will now start a new producer and consumer using message keys. You can start a new producer with the following message key options:

  • --property parse.key=true to make the producer parse message keys
  • --property key.separator=: define the key separator to be the : character, so our message with key now looks like the following key-value pair example:
    • 1:{"atmid": 1, "transid": 102}
      Here, the message key is 1, which also corresponds to the ATM ID, and the value is the transaction JSON object, {"atmid": 1, "transid": 102}.

Producer w Message Key

  • Start a new producer with the message key enabled
  • Now should see the >
  • Produce the messages below
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 
        --topic bankbranch --property parse.key=true --property key.separator=:
        
#  OUTPUT
> 1:{"atmid": 1, "transid": 103}
> 1:{"atmid": 1, "transid": 104}
> 2:{"atmid": 2, "transid": 202}
> 2:{"atmid": 2, "transid": 203}
> 1:{"atmid": 1, "transid": 105}

Start Consumer

  • Switch to the consumer terminal and start a new consumer
  • for this new key.separator
  • As soon as that’s done we should see the messages from the Producer above
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 
        --topic bankbranch --from-beginning --property print.key=true --property key.separator=:
                
#  OUTPUT
1:{"atmid": 1, "transid": 103}
1:{"atmid": 1, "transid": 104}
1:{"atmid": 1, "transid": 105}
2:{"atmid": 2, "transid": 202}
2:{"atmid": 2, "transid": 203}               

Now, you should see that messages with the same key are being consumed in the same order (for example: trans102 -> trans103 -> trans104) as they were published.

Each topic partition maintains its message queue, and new messages are enqueued (appended to the end of the queue) as they are published to the partition. Once consumed, the earliest messages are dequeued and no longer available for consumption.

Recall that with two partitions and no message keys specified, the transaction messages were published to the two partitions in rotation:

  • Partition 0: [{"atmid": 1, "transid": 100}, {"atmid": 2, "transid": 200}, {"atmid": 2, "transid": 201}]
  • Partition 1: [{"atmid": 1, "transid": 101}, {"atmid": 1, "transid": 102}]

As you can see, the transaction messages from atm1 and atm2 got scattered across both partitions. It would be difficult to unravel this and consume messages from one ATM in the same order as they were published.

However, with the message key specified as the atmid value, the messages from the two ATMs will look like the following:

  • Partition 0: [{"atmid": 1, "transid": 103}, {"atmid": 1, "transid": 104}, {"atmid": 1, "transid": 105}]
  • Partition 1: [{"atmid": 2, "transid": 202}, {"atmid": 2, "transid": 203}]

Messages with the same key will always be published to the same partition so that their published order will be preserved within the message queue of each partition.

As such, you can keep the states or orders of the transactions for each ATM.

Consumer Offset


Topic partitions keep published messages in a sequence, such as a list. Message offset indicates a message’s position in the sequence.

For example, the offset of an empty Partition 0 of bankbranch is 0, and if you publish the first message to the partition, its offset will be 1.

Using offsets in the consumer, you can specify the starting position for message consumption, such as from the beginning to retrieve all messages or from some later point to retrieve only the latest messages.

Consumer group

In addition, you normally group related consumers together as a consumer group. For example, you may want to create a consumer for each ATM in the bank and manage all ATM-related consumers together in a group.

So let’s see how to create a consumer group, which is actually very easy with the --group argument.

  • In the consumer terminal, stop the previous consumer if it is still running.
  • Run the following command to create a new consumer within a consumer group called atm-app:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bankbranch --group atm-app

After the consumer within the atm-app consumer group is started, you should not expect any messages to be consumed. This is because the offsets for both partitions have already reached the end. In other words, previous consumers have already consumed all messages and therefore queued them.

You can verify that by checking consumer group details.

  • Stop the consumer.
  • Show the details of the consumer group atm-app:
CTRL + C
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group atm-app

#  OUTPUT

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
atm-app         bankbranch      0          2               2               0               -               -               -
atm-app         bankbranch      1          8               8               0               -               -  
  • You should see that both offsets have been increased by 1, and the LAG columns for both partitions have become 1.
  • It means you have one new message for each partition to be consumed.
  • Start the consumer again and see whether the two new messages will be consumed.
  • You’ll notice nothing to be consumed as both partitions have reached the end once again
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bankbranch --group atm-app

#  OUTPUT
 < blank >

Reset Offset


Let’s look at how you can set the partitions to consume the messages again from the beginning through resetting offset.

You can reset the index with the --reset-offsets argument.

First, let’s try resetting the offset to the earliest position (beginning) using --reset-offsets --to-earliest.

  • Stop the previous consumer if it is still running, and run the following command to reset the offset.
  • Then run the command below to reset the offset
  • You’ll notice the offsets have been set to 0
CTRL + C
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092  --topic bankbranch --group atm-app --reset-offsets --to-earliest --execute

#  OUTPUT
GROUP                          TOPIC                          PARTITION  NEW-OFFSET     
atm-app                        bankbranch                     0          0              
atm-app                        bankbranch                     1          0   

Start Consumer

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bankbranch --group atm-app

#  OUTPUT
{"atmid": 1, "transid": 100}
{"atmid": 1, "transid": 101}
{"atmid": 2, "transid": 200}
{"atmid": 1, "transid": 102}
{"atmid": 2, "transid": 201}
{"atmid": 1, "transid": 103}
{"atmid": 1, "transid": 104}
{"atmid": 1, "transid": 105}
{"atmid": 2, "transid": 202}
{"atmid": 2, "transid": 203}
  • Now you see all messages have been consumed again

Reset to Consume the last 2 messages

  • Stop the consumer
  • Shift the offset to the left by two using --reset-offsets --shift-by -2:
CTRL + C

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092  --topic bankbranch --group atm-app --reset-offsets --shift-by -2 --execute

#  OUTPUT
GROUP                          TOPIC                          PARTITION  NEW-OFFSET     
atm-app                        bankbranch                     0          0              
atm-app                        bankbranch                     1          6  
  • Start Consumer again
  • Should see four messages, 2 for each partition
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bankbranch --group atm-app

#  OUTPUT
{"atmid": 1, "transid": 104}
{"atmid": 1, "transid": 105}
{"atmid": 2, "transid": 202}
{"atmid": 2, "transid": 203}

Kafka Python


Since we already have a cluster UUID that has been configured (from above examples), we can start using the Kafka server.

  • Open new terminal and navigate to the  kafka_2.12-3.7.0 directory.
  • Install kafka-python package
  • Create a file named admin.py with touch
cd kafka_2.12-3.7.0

pip3 install kafka-python

touch admin.py

Python Code

  • Insert this code into admin.py

Create Topic

  • The code will create a topic "bankbranch"
from kafka.admin import KafkaAdminClient,NewTopic
admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092", client_id='test')
topic_list = []
new_topic = NewTopic(name="bankbranch", num_partitions= 2, replication_factor=1)
topic_list.append(new_topic)
admin_client.create_topics(new_topics=topic_list)

Create Producer

  • We will need a producer to send messages. The python code below will create a producer
  • First create a file named producer.py
  • Insert this code in it
  • In the code, the producer is sending across two messages through this code.
  • These messages will be received by the consumer.
touch producer.py
from kafka import KafkaProducer
import json
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send("bankbranch", {'atmid':1, 'transid':100})
producer.send("bankbranch", {'atmid':2, 'transid':101})

producer.flush()

producer.close()

Create Consumer

  • We’ll create a consumer to read messages
  • Create a file named consumer.py
  • Insert the code in it
touch consumer.py

from kafka import KafkaConsumer
consumer = KafkaConsumer('bankbranch',
                        group_id=None,
                         bootstrap_servers=['localhost:9092'],
                         auto_offset_reset = 'earliest')
print("Hello")
print(consumer)

for msg in consumer:
    print(msg.value.decode("utf-8"))

Execute Admin & Producer

  • Execute admin.py and
  • producer.py with this
python3 admin.py
python3 producer.py

Execute Consumer

  • Let’s execute the consumer.py in
  • a new terminal
  • move to directory kafka_2.12-3.7.0
cd kafka_2.12-3.7.0
python3 consumer.py