Managing Topics, Producing and Consuming Events with a Kafka Client

Prev Next

Prior to using Kafka APIs, ensure that the prerequisites described in Configuring VAST Event Broker are met.

The following examples use a Kafka client that can be installed with:  pip install kafka-python.

In the examples, you need to supply a list of Kafka bootstrap server IPs. These are the IPs that a Kafka client uses to connect to the cluster. It is recommended to specify three of the virtual IPs from the virtual IP pool assigned to the VAST Event Broker view.

Create a Topic

from kafka.admin import KafkaAdminClient, NewTopic
BOOSTRAP_SERVERS = "<comma-separated list of IPs>"
topic_name = #
# Kafka Admin Configuration
admin_client = KafkaAdminClient(
    bootstrap_servers=[BOOSTRAP_SERVERS],
    client_id='python-admin-client'
)
topic_name = topic_name
num_partitions = 2
replication_factor = 1
topic = NewTopic(
    name=topic_name,
    num_partitions=num_partitions,
    replication_factor=replication_factor
)
try:
    # Create the topic
    admin_client.create_topics(new_topics=[topic], validate_only=False)
    print(f"Topic '{topic_name}' created successfully with {num_partitions} partitions and replication factor {replication_factor}.")
except Exception as e:
    print(f"Error creating topic: {e}")
finally:
    admin_client.close()

List Topics

BOOTSTRAP_SERVER = "<comma-separated list of IPs>"
from kafka.admin import KafkaAdminClient
# Define the Kafka broker (adjust the host/port as needed)
# Create an admin client
admin_client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVER)
# Fetch the list of topics
topics = admin_client.list_topics()
# Print the list of topics
print("Kafka Topics:", topics)
# Close the admin client
admin_client.close()

Write to a Topic

from kafka import KafkaProducer
import json
BOOSTRAP_SERVERS = "<comma-separated list of IPs>"
producer = KafkaProducer(bootstrap_servers=BOOSTRAP_SERVERS)
for i in range (100,2500):
    key = str(i)
    message = {"name": "vast", "id": i}
    producer.send(topic_name, key=key.encode("utf-8") , value=json.dumps(message).encode('utf-8'))
producer.flush()
print("Message sent to Kafka topic")

Read from a Topic

BOOSTRAP_SERVERS = "<comma-separated list of IPs>"
from kafka import KafkaConsumer
consumer = KafkaConsumer(
    topic_name,
    bootstrap_servers=[BOOSTRAP_SERVERS],
    auto_offset_reset='earliest',
    group_id='your-consumer-group-name'
)
for message in consumer:
    print(f"Received message: {message.value.decode('utf-8')}")

Delete a Topic

from kafka.admin import KafkaAdminClient, NewTopic
# Define the Kafka broker
BOOTSTRAP_SERVER = "<comma-separated list of IPs>"
TOPIC_TO_DELETE = "my_topic"  # Change this to the topic you want to delete
# Create an admin client
admin_client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVER)
# Delete the topic
try:
    admin_client.delete_topics([TOPIC_TO_DELETE])
    print(f"Topic '{TOPIC_TO_DELETE}' deleted successfully.")
except Exception as e:
    print(f"Failed to delete topic '{TOPIC_TO_DELETE}': {e}")
# Close the admin client
admin_client.close()