We will create a Kafka producer in Python and publish some messages to the Kafka topic.
Prerequisites
- Kafka must be installed
- Python project must be created
Install Python dependencies
we need to install two important Python modules that will help to produce messages to the Kafka topic
- kafka-python: Python client for the Apache Kafka distributed stream processing system
pip install kafka-python
- Faker: Faker is a Python package that generates fake data for you.
pip install Faker
Configuration needed by Python Kafka Producer
You should provide some important configuration for the Kafka producer to connect to the Kafka bootstrap server and produce messages.
- bootstrap server: This is the address of the Kafka server you use. If you have followed the prerequisite for Kafka installation then the value will be ip-address:9092
- Her IP address is your machine’s IP address
- topic: This is the topic name where you will publish the message. In our case, it will be registered_user
- value_serializer: Only serialized messages can be sent to the Kafka topic over the network. The Producer sends the serialized message and the consumer will deserialize the message. we will be using the JSON serializer
Writing the Final Python Kafka Producer
Create a new file producer.py
touch producer.py
Put the below code of Kafka producer
from kafka import KafkaProducer
import json
from data import get_registered_user
import time
from faker import Faker
fake = Faker()
def get_registered_user():
return {
"name": fake.name(),
"address": fake.address(),
"created_at": fake.year()
}
def json_serializer(data):
return json.dumps(data).encode("utf-8")
producer = KafkaProducer(bootstrap_servers=['192.168.0.10:9092'],
value_serializer=json_serializer)
if __name__ == "__main__":
while 1 == 1:
registered_user = get_registered_user()
print(registered_user)
producer.send("registered_user", registered_user)
time.sleep(4)
Methods
- get_registered_user
- It uses the Faker module and returns a dummy user dictionary every time. It generates fake names, fake addresses, and fake years.
- json_serializer
- It is being used as a data serializer.
- It accepts dictionary data and then converts that to JSON string
Variable Instances
- producer
- It is the instance of the KafkaProducer class
- For creating the producer instance we provide the bootstrap server address and the serializer method
- The IP address will be different for your machine.
Run the Kafka Python Producer
You can use the below command to run the producer
python producer.py
- The application will run infinitely
- Every 4 second
- It will create a fake user
- print the fake user
- and finally, send the data to the Kafka using send command