Why using Kafka Streams in the first place?
If you’re running a Kafka cluster, Kafka Streams gets handy mainly for threereasons: (1) it’s an high level wrapping of consumers/producers on top of Kafka,(2) it supports statefull streams using RocksDB, and (3) supports partitionassignments across your processing nodes.
Plan the # of partitions in advance
In order to scale your processing you can either (1) move towards a stronger CPU,more memory, and faster disk, or (2) increase the number of processing instances,but always have in mind that a single Kafka partition can only be processed byone consumer - that’s why the number of partitions is important - if the numberof partitions is low, maybe a consumer can’t handle your throughput. Make sure youplan the number of partitions in advance or your consumer lag will grow.
Be careful with your persisted schemas
When processing and storing specific events in a state store you must be verycareful with the event schema, specially if you rely on JSON format. Makingbreaking changes to the event schema means that the processing layer will failto parse the JSON when reading from the state store, and it will probably leadto lost data if you ignore the event or into a crash loop if you retry theprocessing.
Don’t rely on internal changelogs for downstream processing
For each state store, it maintains a replicated changelog Kafka topic in whichit tracks any state updates. Every time we insert a key-value into our statestore, a Kafka message is sent to the corresponding topic. In case of any crash,our application will rebuild the state store from the corresponding changelogtopic. If we want to apply another processing layer, either in the current applicationor another one downstream, we should always use context.forward(k, v) (using theProcessor API) to forward our processor output to a given sink topic.
State Stores don’t have TTL
While the state store changelog topic has log compaction so that old data can bepurged to prevent the topics from growing indefinitely, the state store itself don’thave this kind of mechanism and yes, it will grow forever, unless the applicationcrashes and the state store is rebuilt using a now, shorter version of the changelog.
Log compaction ensures that Kafka will always retain at least the last knownvalue for each message key within the log of data for a single topic partition.It addresses use cases and scenarios such as restoring state after applicationcrashes or system failure, or reloading caches after application restarts duringoperational maintenance. Let’s dive into these use cases in more detail and thendescribe how compaction works.
[Edit 1] Accordingto KIP-258there is an ongoing effort to add TTLto state stores. Record timestamps were addedto Ktable allowing to move forward with this initiative. If you’re asking yourselfwhy TTL State Stores are not yet supported in Kafka Streams is mainly because itrelies on changelogs as the source of truth, not the state stores. The two must be insync, otherwise, if we delete old state store records, it might happen that we restore all of themfrom the changelog during a rebalance for example.
The restore process
Every time the application starts, or in the worst case restarts, the states storeswill be restored using the corresponding changelog. If we pay attention, we’ll noticethat a given processor that relies on a given state store, doesn’t start processingwhile the state store is recovering. In order to give more awareness, we might want toadd a custom StateRestoreListener to track the state store restore process.
import com.typesafe.scalalogging.Loggerimport org.apache.kafka.common.TopicPartitionimport org.apache.kafka.streams.processor.StateRestoreListenerclass LoggingStateRestoreListener extends StateRestoreListener { val logger = Logger(classOf[LoggingStateRestoreListener]) override def onRestoreStart(topicPartition: TopicPartition, storeName: String, startingOffset: Long, endingOffset: Long): Unit = { logger.info(s"Restore started for $storeName and partition ${topicPartition.partition}...") logger.info(s"Total records to be restored ${endingOffset - startingOffset}.") } override def onBatchRestored(topicPartition: TopicPartition, storeName: String, batchEndOffset: Long, numRestored: Long): Unit = { logger.info( s"Restored batch $numRestored for $storeName and partition ${topicPartition.partition}.") } override def onRestoreEnd(topicPartition: TopicPartition, storeName: String, totalRestored: Long): Unit = { logger.info(s"Restore completed for $storeName and partition ${topicPartition.partition}.") }}You’ll be amazed, or outraged, with the amount of time wasted on restoring largechangelogs. Note to self: if you’re running Kafka Streams on top of Kubernetes, makesure you have persistent storage, otherwise these restore processes will kill your SLAs.
In a disaster scenario, when a particular instance crashes, configuringnum.standby.replicas may minimize the restore process by introducing shadow copiesof the local state stores.
Oh, the memory overhead
Assigning large heaps to the JVM sounds reasonable at first, although Kafka Streamsutilize lots of off-heap memory when using RocksDB which eventually leads to crashedapplications lacking free memory.
RocksDB stores data at least in four data structures (1) memstore, (2) bloomfilter,(3) index, and (4) blockcache. Besides that, it has lots of configurableproperties which makes a difficult job to properly tune it.
Unfortunately, configuring RocksDB optimally is not trivial. Even we as RocksDBdevelopers don’t fully understand the effect of each configuration change. Ifyou want to fully optimize RocksDB for your workload, we recommend experimentsand benchmarking, while keeping an eye on the three amplification factors.
(https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide)
On the other hand, if you don’t tune it, the memory usage of our applicationwill grow, and grow, and grow. So, we need to make sure we know the number of sourcetopics our application is consuming from, as the number of partitions and state stores.
If you have many stores in your topology, there is a fixed per-store memory cost. E.g., if RocksDB is your defaultstore, it uses some off-heap memory per store. Either consider spreading your app instances on multiple machines orconsider lowering RocksDb’s memory usage using the RocksDBConfigSetter class.
If you take the latter approach, note that RocksDB exposes several important memory configurations. In particular,these settings include block_cache_size (16 MB by default), write_buffer_size (32 MB by default) write_buffer_count (3by default). With those defaults, the estimate per RocksDB store (let’s call it estimate per store) is (write_buffer_size_mb * write_buffer_count) + block_cache_size_mb (112 MB by default).
Then if you have 40 partitions and using a windowed store (with a default of 3 segments per partition), the totalmemory consumption is 40 * 3 * estimate per store (in this example that would be 13440 MB).
(https://docs.confluent.io/current/streams/sizing.html)
Having configured ROCKSDB_BLOCK_CACHE_SIZE_MB, ROCKSDB_BLOCK_SIZE_KB, ROCKSDB_WRITE_BUFFER_SIZE_MB, andROCKSDB_WRITE_BUFFER_COUNT to the best possible values, we’re ableto estime the cost of a single store. Obviously, if we have lots of streams with lotsof stores, it will require lots of memory.
[Edit 2] KAFKA-8323: Memory leak of BloomFilter Rocks objectand KAFKA-8215: Limit memory usage of RocksDBfrom Kafka 2.3.0 might help/solve somememory issues.
Don’t forget the disk space
Consuming from large source topics and performing processing that requires storing nrecords in RocksDB for each message, will lead to a fairly large amount of data storedin disk. Without the proper monitoring, it is very easy to run out of space.
It’s ok to do external lookups
Well, most people will say to load all the needed data into a Stream or Store andperform the joins or local lookups while processing our messages. Sometimes it’s easierto perform external lookups, ideally to a fast database. And I must say that’s fine. Obviously,it depends on your load, how many external lookups are performed per message, and how fast yourdatabase can handle those lookups. Making this type of external calls inside a streammay introduce extra latency which could have an impact on the consumer lag of downstream systems,so please, use it carefully.
Data locality is critical for performance. Although key lookups are typically very fast,the latency introduced by using remote storage becomes a bottleneck when you’re working at scale.
The key point here isn’t the degree of latency per record retrieval, which may be minimal.The important factor is that you’ll potentially process millions or billions of records througha streaming application. When multiplied by a factor that large, even a small degree of networklatency can have a huge impact.
The cool thing about having to lookup for data from an external state store is that we canabstract our external state store as a simple StateStore and use it like the others, withoutchanging any existing code.
import org.apache.kafka.streams.processor.{ProcessorContext, StateRestoreCallback, StateStore} class LookupStore[T](storeName: String, pool: Pool[T]) extends StateStore { override def init(context: ProcessorContext, root: StateStore): Unit = { context.register(root, new StateRestoreCallback() { override def restore(key: Array[Byte], value: Array[Byte]): Unit = {} }) } override def name(): String = this.storeName override def flush(): Unit = () override def close(): Unit = pool().close() override def persistent(): Boolean = true override def isOpen: Boolean = !pool().isClosed def pool(): Pool[T] = pool }Note to self: every partition from a processor that’s using this custom storestarts a new connection to the database, which is not sustainable for the amountof processors and partitions that may exist. It’s advised to use a shared connectionpool to reduce and control the available connections.
Streams may die, a lot
Within a Kafka cluster, there are leader elections amoung the available nodes. Thebad thing about them is that sometimes it can have a negative impact on the currentprocessing streams.
2019-07-08 14:27:54 [my-stream-d98754ff-6690-4040-ae3c-fbe51f9cf39f-StreamThread-2] WARN o.apache.kafka.clients.NetworkClient - [Consumer clientId=my-stream-d98754ff-6690-4040-ae3c-fbe51f9cf39f-StreamThread-2-consumer, groupId=my-stream] 266 partitions have leader brokers without a matching listener, including [my-topic-15, my-topic-9, my-topic-3, my-topic-10, my-topic-16, my-topic-4, __transaction_state-0, __transaction_state-30, __transaction_state-18,__transaction_state-6]2019-07-08 14:29:51 [my-stream-d98754ff-6690-4040-ae3c-fbe51f9cf39f-StreamThread-2] WARN o.apache.kafka.streams.KafkaStreams - stream-client [my-stream-d98754ff-6690-4040-ae3c-fbe51f9cf39f] All stream threads have died. The instance will be in error state and should be closed.The logs above show that 266 partitions lost their leader and eventually the correspondingstream just stopped. Without proper monitoring and alerts configured, mainly consumer lag and numberof available streams, we can stay without processing messages for a long time. Having some kind ofretry mechanism may help as well. Just don’t trust that your stream will be up and running all the time,bad things happen.
Timeouts and rebalances
From time to time, applications get stuck in a rebalancing state leading to severaltimeouts.
Often it’s caused by processing very large batches of messages that take morethan the five minutes threshold to commit. When the processing is finished, the streamis already considered dead and the message can’t be committed. On rebalance, the samemessage is fetched once again from Kafka and the same error would occur.
Reducing the max.poll.records value, often to 1 would sometimes alleviate thisspecific issue ¯_(ツ)_/¯.
Still, …
Building real-time applications with Kafka Streams is quick, easy, powerful, and verynatural after grasping the non-trivial stuff that comes with the full package.
