A Kafka consumer is an application that reads the messages from a Kafka topic and then processes them. we will create a Kafka consumer in Python.
Prerequisites
- Kafka must be installed
- Python project must be created
Install Python Dependencies
we need to install two important Python modules that will help to consume messages to the Kafka topic
- kafka-python: Python client for the Apache Kafka distributed stream processing system
pip install kafka-python
Configuration needed by Kafka Consumer Python
You should provide some important configuration for the Kafka consumer to connect to the Kafka bootstrap server and consume messages.
- bootstrap server: This is the address of the Kafka server you use. If you have followed the prerequisite for Kafka installation then the value will be ip-address:9092
- Here the IP address is your machine’s IP address
- topic: This is the topic name from where messages will be consumed. In our case, it will be registered_user
- value_serializer: Only serialized messages can be sent to the Kafka topic over the network. The Producer sends the serialized message and the consumer will deserialize the message. we will be using the JSON serializer
- auto_offset_reset
- It helps to consume the messages either from the earliest offset or the latest offset that is present in the Kafka topic
Write Final Kafka Consumer in Python
Create a new consumer file
touch consumer.py
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
"registered_user",
bootstrap_servers='192.168.0.10:9093',
auto_offset_reset='earliest',
group_id="consumer-group-a")
if __name__ == "__main__":
print("starting the consumer")
for msg in consumer:
print("Registered User = {}".format(json.loads(msg.value)))
Variable Instance
- consumer
- Actual Kafka consumer instance created from the KafkaConsumer class of Kafka-python library
- It accepts multiple parameters like topic name, bootstrap server address, offset, consumer name
- Note: The IP address will be different for your machine
- The auto_offset_reset value is the earliest so whenever you start the consumer then it will consumer from the earliest message
- If you want to consume only the latest message then use the value as the latest
Run the consumer
python consumer.py
The consumer will run indefinitely and whenever a new message is published then it will print the message in the console. we are parsing the stringified JSON data using json.loads