A Kafka consumer is an application that reads the messages from a Kafka topic and then processes them. we will create a Kafka consumer in Python.

Kafka consumer in python

Prerequisites

Install Python Dependencies

we need to install two important Python modules that will help to consume messages to the Kafka topic

  • kafka-python: Python client for the Apache Kafka distributed stream processing system
pip install kafka-python

Configuration needed by Kafka Consumer Python

You should provide some important configuration for the Kafka consumer to connect to the Kafka bootstrap server and consume messages.

  • bootstrap server: This is the address of the Kafka server you use. If you have followed the prerequisite for Kafka installation then the value will be ip-address:9092
    • Here the IP address is your machine’s IP address
  • topic: This is the topic name from where messages will be consumed. In our case, it will be registered_user
  • value_serializer: Only serialized messages can be sent to the Kafka topic over the network. The Producer sends the serialized message and the consumer will deserialize the message. we will be using the JSON serializer
  • auto_offset_reset
    • It helps to consume the messages either from the earliest offset or the latest offset that is present in the Kafka topic

Write Final Kafka Consumer in Python

Create a new consumer file

touch consumer.py
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
        "registered_user",
        bootstrap_servers='192.168.0.10:9093',
        auto_offset_reset='earliest',
        group_id="consumer-group-a")
        
if __name__ == "__main__":
    print("starting the consumer")
    for msg in consumer:
        print("Registered User = {}".format(json.loads(msg.value)))
    

Variable Instance

  • consumer
    • Actual Kafka consumer instance created from the KafkaConsumer class of Kafka-python library
    • It accepts multiple parameters like topic name, bootstrap server address, offset, consumer name
    • Note: The IP address will be different for your machine
    • The auto_offset_reset value is the earliest so whenever you start the consumer then it will consumer from the earliest message
      • If you want to consume only the latest message then use the value as the latest

Run the consumer

python consumer.py

The consumer will run indefinitely and whenever a new message is published then it will print the message in the console. we are parsing the stringified JSON data using json.loads

References

Leave a Reply

Your email address will not be published. Required fields are marked *