We will create a Kafka producer in Python and publish some messages to the Kafka topic.

Kafka producer in python

Prerequisites

Install Python dependencies

we need to install two important Python modules that will help to produce messages to the Kafka topic

  • kafka-python: Python client for the Apache Kafka distributed stream processing system
pip install kafka-python
  • Faker: Faker is a Python package that generates fake data for you.
pip install Faker

Configuration needed by Python Kafka Producer

You should provide some important configuration for the Kafka producer to connect to the Kafka bootstrap server and produce messages.

  • bootstrap server: This is the address of the Kafka server you use. If you have followed the prerequisite for Kafka installation then the value will be ip-address:9092
    • Her IP address is your machine’s IP address
  • topic: This is the topic name where you will publish the message. In our case, it will be registered_user
  • value_serializer: Only serialized messages can be sent to the Kafka topic over the network. The Producer sends the serialized message and the consumer will deserialize the message. we will be using the JSON serializer

Writing the Final Python Kafka Producer

Create a new file producer.py

touch producer.py

Put the below code of Kafka producer

from kafka import KafkaProducer
import json
from data import get_registered_user
import time
from faker import Faker

fake = Faker()


def get_registered_user():
    return {
        "name": fake.name(),
        "address": fake.address(),
        "created_at": fake.year()
    }


def json_serializer(data):
    return json.dumps(data).encode("utf-8")


producer = KafkaProducer(bootstrap_servers=['192.168.0.10:9092'],
                         value_serializer=json_serializer)

if __name__ == "__main__":
    while 1 == 1:
        registered_user = get_registered_user()
        print(registered_user)
        producer.send("registered_user", registered_user)
        time.sleep(4)

Methods

  • get_registered_user
    • It uses the Faker module and returns a dummy user dictionary every time. It generates fake names, fake addresses, and fake years.
  • json_serializer
    • It is being used as a data serializer.
    • It accepts dictionary data and then converts that to JSON string

Variable Instances

  • producer
    • It is the instance of the KafkaProducer class
    • For creating the producer instance we provide the bootstrap server address and the serializer method
    • The IP address will be different for your machine.

Run the Kafka Python Producer

You can use the below command to run the producer

python producer.py
  • The application will run infinitely
  • Every 4 second
    • It will create a fake user
    • print the fake user
    • and finally, send the data to the Kafka using send command

References

Leave a Reply

Your email address will not be published. Required fields are marked *