Some events unprocessed by Kafka Streams application on Kubernetes on bare metal when broker fails

11/30/2018

My KStreams application has three instances running as a deployment on kubernetes cluster.
There are two input topics - one 'state' topic and one 'update-state' topic. The state topic is read as a GlobalKTable and the update-state topic is read as KStream and joined to the GKTable. When an update-state event arrives, the state is updated and written to the state topic.

The Streams config includes 'exactly_once' processing (full config below) with the expectation that 'update-state' events won't be lost/duplicated even in the case of failures. I have tested this under load and have verified that it works when application instances fail (pod deleted). There is some lag introduced as consumers are rebalanced at failure and when the new instance is available, but that's ok as long as events are eventually processed.

However, when a broker fails, some events on the update-state topic are left unprocessed. This happens at the time of the broker failure and also when a leader imbalance check is done after the broker is restarted.

Two questions -

  1. Should 'exactly-once' cover the case of a broker failure? (My understanding is that it should)
  2. If so, what configurations are necessary to guarantee that events cannot be left unprocessed on the input topic?

Application Kafka version: 1.0.2
Broker Kafka version: 1.0.0 (deployed using Helm chart https://github.com/helm/charts/tree/master/incubator/kafka )

Broker Config

log.dirs=/opt/kafka/data/logs
transaction.abort.timed.out.transaction.cleanup.interval.ms=10000
min.insync.replicas=2
offsets.topic.replication.factor=3
broker.id=0
default.replication.factor=3
heap.opts=-Xmx1G -Xms1G
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
zookeeper.connection.timeout.ms=16000
listeners=PLAINTEXT://0.0.0.0:9092,EXTERNAL://0.0.0.0:31090
request.timeout.ms=2000
jmx.port=5555
advertised.listeners=PLAINTEXT://10.244.3.91:9092,EXTERNAL://my.domain:31090
leader.imbalance.check.interval.seconds=60
zookeeper.connect=kafka-zookeeper:2181

Streams Config:

application.id = update-state
application.server =
bootstrap.servers = [kafka:9092]
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
client.id =
commit.interval.ms = 100
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.key.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
key.serde = null
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 1
num.stream.threads = 2
partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = exactly_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 2
request.timeout.ms = 40000
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /tmp/kafka-streams
timestamp.extractor = null
upgrade.from = null
value.serde = null
windowstore.changelog.additional.retention.ms = 86400000
zookeeper.connect =

Consumer Config:

auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [kafka:9092]
check.crcs = true
client.id = update-state-9e713a1a-3ee1-4abc-8b42-36e203cdb468-StreamThread-1-consumer
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = update-state
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = false
isolation.level = read_committed
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 2147483647
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamPartitionAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

Producer Config:

acks = all
batch.size = 16384
bootstrap.servers = [kafka:9092]
buffer.memory = 33554432
client.id = update-state-9e713a1a-3ee1-4abc-8b42-36e203cdb468-StreamThread-2-0_2-producer
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = true
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 100
max.block.ms = 60000
max.in.flight.requests.per.connection = 1
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 1500
transactional.id = update-state-0_2
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
-- Finbarr
apache-kafka
apache-kafka-streams
kubernetes

0 Answers