About

Need to quickly delete all empty kafka topics with a SASL config? This is a (meh) way. 

Steps

Extract the keystore

Look in config file for info

Grab all values in "<< >>" for later use from FIXME PATH

sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="<<SOMEUSERNAME>>" password="<<SOMEPASSWORD0>>";
sasl.mechanism=<<SCRAM-SHA-512>>
security.protocol=<<SASL_SSL>>
ssl.keystore.location=<<JKS PATH>>
ssl.keystore.password=<<SOMEPASSWORD1>>
ssl.key.password=<<SOMEPASSWORD2>>

Extract CA/Cert/Key From Keystore

The keystore is located in the <<JKS PATH>> from the config file. 

keytool -importkeystore -srckeystore <<JKS PATH>> -destkeystore keystore.p12 -srcstoretype jks -deststoretype pkcs12
openssl pkcs12 -in keystore.p12 -nokeys -out ca.pem
cp ca.pem cert.pem
openssl pkcs12 -in keystore.p12 -nodes -nocerts -out private_key.pem

Install script deps

You'll need to install `apt install python3-confluent-kafka` and `apt install python3-kafka` via apt, dnf, and/or pip3. 

apt install python3-confluent-kafka python3-kafka

The script

Fill in the script into a file called `delete_empty_topcs.py`

# Delete empty topics
# TODO: Stop using two different clients!
from kafka import KafkaConsumer
from kafka import KafkaAdminClient
from confluent_kafka import Consumer, TopicPartition
from concurrent.futures import ThreadPoolExecutor
import kafka
import ssl
import logging
import os,sys
import certifi

logging.basicConfig(level=logging.INFO)

topic = "<<your name or other identifier>>-test"
sasl_mechanism = "<<SCRAM-SHA-512>>"
username = "<<SOMEUSERNAME>>"
password = "<<SOMEPASSWORD0>>"
security_protocol = "<<SASL_SSL>>"
broker = '<<BROKER HOST>>:<<BROKER PORT>>'

conf = {
	'bootstrap.servers': broker,
	'security.protocol': security_protocol,
	'sasl.mechanisms': sasl_mechanism,
	'sasl.username': username,
	'sasl.password': password,
	'ssl.key.location': 'private_key.pem',
	'ssl.ca.location': 'ca.pem',
	'ssl.certificate.location': 'cert.pem',
	"group.id": "<<your name or other identifier>>-test",
}

consumer = Consumer(conf)

a = KafkaAdminClient(bootstrap_servers=broker,
						  #api_version=(0, 10),
						  security_protocol=security_protocol,
						  #ssl_context=context,
						  #ssl_cafile='ca.pem',
						  sasl_mechanism = sasl_mechanism,
						  sasl_plain_username = username,
						  sasl_plain_password = password,
						  #ssl_certfile='cert.pem',
						  ssl_check_hostname=False,
						  ssl_keyfile='private_key.pem')

c = KafkaConsumer(bootstrap_servers=broker,
						  #api_version=(0, 10),
						  security_protocol=security_protocol,
						  #ssl_context=context,
						  #ssl_cafile='ca.pem',
						  sasl_mechanism = sasl_mechanism,
						  sasl_plain_username = username,
						  sasl_plain_password = password,
						  #ssl_certfile='cert.pem',
						  ssl_check_hostname=False,
						  ssl_keyfile='private_key.pem')

def get_partition_size(topic_name: str, partition_key: int):
    topic_partition = TopicPartition(topic_name, partition_key)
    low_offset, high_offset = consumer.get_watermark_offsets(topic_partition)
    partition_size = high_offset - low_offset
    return partition_size

def get_topic_size(topic_name: str):
    topic = consumer.list_topics(topic=topic_name)
    partitions = topic.topics[topic_name].partitions
    workers, max_workers = [], len(partitions) or 1

    with ThreadPoolExecutor(max_workers=max_workers) as e:
        for partition_key in list(topic.topics[topic_name].partitions.keys()):
            job = e.submit(get_partition_size, topic_name, partition_key)
            workers.append(job)

    topic_size = sum([w.result() for w in workers])
    return topic_size


count_ttl=0
count_empty=0
count_tp=0

for topic in c.topics():
    count_ttl+=1
    if topic.startswith("<< SOME PREFIX>>"): # Update with topic check here
        count_tp+=1
        if get_topic_size(topic)==0:
            count_empty+=1
            print(f"Deleting topic {topic}")
            a.delete_topics(topics=[topic,])
            print(f"Deleted topic {topic}")
            #print(f"Topic is empty {topic}")
            
print(f"Total={count_ttl} TP={count_tp} Empty={count_empty}")

Run The Script

python3 delete_empty_topcs.py

Tags