Spring Cloud Data Flow topics creation for Kafka

12/14/2019

I have my own Spring Cloud Data Flow processor with Python inside, I used this sample as a guide: https://dataflow.spring.io/docs/recipes/polyglot/processor/. Then I want to scale and create three of these processors, so using spring.cloud.deployer.myApp.count=3 I create 3 pods with Python inside. I slightly modified a code in sample: when I create a Kafka consumer, I also pass a group id, so messages should be load balanced.

consumer = KafkaConsumer(get_input_channel(), group_id=get_consumer_group(), bootstrap_servers=[get_kafka_binder_brokers()])

The issue is that SCDF creates a Kafka topic with only 1 partition, so messages arrive to one pod only. So I am wondering:

  • Should I somehow configure SCDF to create a Kafka topic with 3 partitions?
  • Or should I not rely on SCDF and create topics on my own in Python? I suppose this will be redundant, since SCDF also creates this topic.
  • What component in SCDF is actually responsible for Kafka topics creation? And how can I influence it regarding number of partitions?
  • If I stop this stream and launch again with 4 processor steps, should the topic be extended with the 4th partition? Because currently no new partitions get created.
-- Maks Shal
apache-kafka
kubernetes
python
spring
spring-cloud-dataflow

2 Answers

12/15/2019

Please take a moment to review Spring Cloud Data Flow's responsibilities. In case it is not clear, SCDF doesn't either interact with the backing messaging middleware like Kafka or uses it at runtime. In other words, SCDF doesn't create topics or partitions associated with it — it merely automates the configuration of the Spring Cloud Stream (SCSt) properties.

If you're using SCSt in a custom processor, however, the framework automates the binding of the desired channels to the underlying topics in the middleware. The framework also has the facility to alter the partitioning behavior. You can deploy the processor with the over-partitioned topic, too. There are several other configuration options to build the desired streaming data processing behavior.

The Python sample that you're looking at doesn't have all the features that SCSt provides. The recipe is an example walkthrough of how someone can build a native processor-style application in Python, where the producer and consumer configurations are manually created within the Python code itself. Neither SCDF nor SCSt influences the application behavior in this recipe.

Should I somehow configure SCDF to create a Kafka topic with 3 partitions?

As discussed previously, SCDF does not interact with Kafka.

Or should I not rely on SCDF and create topics on my own in Python? I suppose this will be redundant, since SCDF also creates this topic.

If your custom processor is not a Spring Cloud Stream application, yes, it is your responsibility to define the topics + partitions in your code explicitly.

What component in SCDF is actually responsible for Kafka topics creation? And how can I influence it regarding number of partitions?

Spring Cloud Stream. See the explanation above.

If I stop this stream and launch again with 4 processor steps, should the topic be extended with the 4th partition? Because currently no new partitions get created.

You don't necessarily need to restart the streaming data pipeline. If your topic is over-partitioned upfront, yes, any additional consumers at runtime should be able to participate in the competing consumer relationship automatically. Keep an eye on spring-io/dataflow.spring.io#156 — we are in the process of adding a recipe to demonstrate manual and autoscaling possibilities using SCSt + SCDF + Kafka.

-- Sabby Anandan
Source: StackOverflow

12/17/2019

Was able to get around this by introducing the following code into the Python container startup script, improved code provided in https://dataflow.spring.io/docs/recipes/polyglot/processor/ . Using arguments passed by SCDF server to get a broker URL, topic name, number of instances:

admin_client = KafkaAdminClient(bootstrap_servers=[get_kafka_binder_brokers()], client_id=sys.argv[0])

partition_count = get_cmd_arg("spring.cloud.stream.instanceCount")

# create Kafka topic if does not exist
new_topic = NewTopic(name=get_input_channel(), num_partitions=partition_count, replication_factor=1)
try:
    admin_client.create_topics(new_topics=[new_topic])
except TopicAlreadyExistsError:
    logging.info(f"Topic {get_input_channel()} was already created")

# add Kafka partitions to existing topic
new_partitions = NewPartitions(total_count=partition_count)
try:
    admin_client.create_partitions(topic_partitions={get_input_channel(): new_partitions})
except InvalidPartitionsError as exp:
    logging.info(f"No need to increase Kafka partitions for topic {get_input_channel()}")
-- Maks Shal
Source: StackOverflow