I have a Kafka streams app to consume 1 source topic with 20 partitions. Traffic load is about 2K records/sec.
I deployed the app to 63 instances and it’s working fine. But I noticed that, the partition assignment is always changing. I checked the
KafkaStreams#localTheadMetadata output for each instance, the response is always
PARTITIONS_ASSIGNED, sometime it’s
From the log, I saw two different errors:
Offset commit failed on partition production-smoke-KSTREAM-REDUCE-STATE-STORE-0000000015-repartition-13 at offset 25010: The coordinator is not aware of this member.
org.apache.kafka.streams.errors.StreamsException: task [2_13] Abort sending since an error caught with a previous record (key 264344038933 value [[email protected] timestamp 1563412782970) to topic production-smoke-KTABLE-SUPPRESS-STATE-STORE-0000000021-changelog due to org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. You can increase producer parameter `retries` and `retry.backoff.ms` to avoid this error.```
The app is still running and sending messages to downstream topic. My understanding is, since my app doesn’t need 63 nodes, some nodes are idle and once one node is dead due to above error, rebalance will be triggered. Is the correct? (Some nodes return
KafkaStreams is not running. State is ERROR. when call
KafkaStreams#localTheadMetadata) The app will be dead entirely once all nodes are dead?
Can anyone help to understand what’s the right way to solve the above errors? Will increase
retry.backoff.ms mask some bigger issues?
Here is the config I have:
buffered.records.per.partition = 1000 cache.max.bytes.buffering = 10485760 commit.interval.ms = 30000 connections.max.idle.ms = 540000 max.task.idle.ms = 0 metadata.max.age.ms = 300000 metric.reporters =  metrics.num.samples = 2 metrics.sample.window.ms = 30000 num.standby.replicas = 0 num.stream.threads = 1 partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper poll.ms = 100 processing.guarantee = at_least_once receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 replication.factor = 1 request.timeout.ms = 60000 retries = 20 retry.backoff.ms = 60000 rocksdb.config.setter = null send.buffer.bytes = 131072 state.cleanup.delay.ms = 600000 state.dir = /tmp/kafka-streams topology.optimization = none upgrade.from = null windowstore.changelog.additional.retention.ms = 86400000
auto.commit.interval.ms = 5000 auto.offset.reset = none check.crcs = true client.dns.lookup = default connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = heartbeat.interval.ms = 3000 interceptor.classes =  internal.leave.group.on.close = false isolation.level = read_uncommitted max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 2147483647 max.poll.records = 1000 metadata.max.age.ms = 300000 metric.reporters =  metrics.num.samples = 2 metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 60000 retry.backoff.ms = 60000
Thanks a lot!