Prior to using Kafka APIs, ensure that the prerequisites described in Configuring VAST Event Broker are met.
The following examples use a Kafka client that can be installed with: pip install kafka-python.
In the examples, you need to supply a list of Kafka bootstrap server IPs. These are the IPs that a Kafka client uses to connect to the cluster. It is recommended to specify three of the virtual IPs from the virtual IP pool assigned to the VAST Event Broker view.
Create a Topic
from kafka.admin import KafkaAdminClient, NewTopic
BOOSTRAP_SERVERS = "<comma-separated list of IPs>"
topic_name = #
# Kafka Admin Configuration
admin_client = KafkaAdminClient(
bootstrap_servers=[BOOSTRAP_SERVERS],
client_id='python-admin-client'
)
topic_name = topic_name
num_partitions = 2
replication_factor = 1
topic = NewTopic(
name=topic_name,
num_partitions=num_partitions,
replication_factor=replication_factor
)
try:
# Create the topic
admin_client.create_topics(new_topics=[topic], validate_only=False)
print(f"Topic '{topic_name}' created successfully with {num_partitions} partitions and replication factor {replication_factor}.")
except Exception as e:
print(f"Error creating topic: {e}")
finally:
admin_client.close()
List Topics
BOOTSTRAP_SERVER = "<comma-separated list of IPs>"
from kafka.admin import KafkaAdminClient
# Define the Kafka broker (adjust the host/port as needed)
# Create an admin client
admin_client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVER)
# Fetch the list of topics
topics = admin_client.list_topics()
# Print the list of topics
print("Kafka Topics:", topics)
# Close the admin client
admin_client.close()
Write to a Topic
from kafka import KafkaProducer
import json
BOOSTRAP_SERVERS = "<comma-separated list of IPs>"
producer = KafkaProducer(bootstrap_servers=BOOSTRAP_SERVERS)
for i in range (100,2500):
key = str(i)
message = {"name": "vast", "id": i}
producer.send(topic_name, key=key.encode("utf-8") , value=json.dumps(message).encode('utf-8'))
producer.flush()
print("Message sent to Kafka topic")Read from a Topic
BOOSTRAP_SERVERS = "<comma-separated list of IPs>"
from kafka import KafkaConsumer
consumer = KafkaConsumer(
topic_name,
bootstrap_servers=[BOOSTRAP_SERVERS],
auto_offset_reset='earliest',
group_id='your-consumer-group-name'
)
for message in consumer:
print(f"Received message: {message.value.decode('utf-8')}")Delete a Topic
from kafka.admin import KafkaAdminClient, NewTopic
# Define the Kafka broker
BOOTSTRAP_SERVER = "<comma-separated list of IPs>"
TOPIC_TO_DELETE = "my_topic" # Change this to the topic you want to delete
# Create an admin client
admin_client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVER)
# Delete the topic
try:
admin_client.delete_topics([TOPIC_TO_DELETE])
print(f"Topic '{TOPIC_TO_DELETE}' deleted successfully.")
except Exception as e:
print(f"Failed to delete topic '{TOPIC_TO_DELETE}': {e}")
# Close the admin client
admin_client.close()