Blog about software - ordep.dev 10月02日
Kafka Streams 使用指南
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

Kafka Streams 是 Kafka 生态系统中的流处理工具,主要用于实时数据处理。它提供了高级抽象,支持状态管理,并允许跨节点分配分区。使用 Kafka Streams 需要合理规划分区数量以应对吞吐量,注意事件模式的一致性,避免依赖内部变更日志进行下游处理,并监控状态存储的增长。此外,还需要关注内存和磁盘空间的使用,以及流处理中的故障恢复和超时问题。通过优化配置和监控,可以构建高效、稳定的实时应用。

💡 Kafka Streams 是 Kafka 生态系统中的流处理工具,主要用于实时数据处理。它提供了高级抽象,支持状态管理,并允许跨节点分配分区,适用于各种实时数据处理场景。

📈 使用 Kafka Streams 需要合理规划分区数量以应对吞吐量。分区数量低可能导致消费者无法处理高吞吐量,因此需要提前规划分区数量,避免消费者延迟增加。

🔄 当处理和存储特定事件到状态存储时,必须非常小心事件模式的一致性,特别是在依赖 JSON 格式的情况下。对事件模式的破坏性更改可能导致处理层无法解析 JSON,从而丢失数据或进入崩溃循环。

🔗 不要依赖内部变更日志进行下游处理。对于每个状态存储,它维护一个复制的变更日志 Kafka 主题,用于跟踪任何状态更新。应使用 Processor API 的 context.forward(k, v) 将处理器输出转发到给定的存储主题。

⏳ 状态存储本身没有 TTL 机制,而变更日志主题有日志压缩功能以清除旧数据。因此,状态存储会无限增长,除非应用程序崩溃并使用较短的变更日志重新构建状态存储。

Why using Kafka Streams in the first place?

If you’re running a Kafka cluster, Kafka Streams gets handy mainly for threereasons: (1) it’s an high level wrapping of consumers/producers on top of Kafka,(2) it supports statefull streams using RocksDB, and (3) supports partitionassignments across your processing nodes.


Plan the # of partitions in advance

In order to scale your processing you can either (1) move towards a stronger CPU,more memory, and faster disk, or (2) increase the number of processing instances,but always have in mind that a single Kafka partition can only be processed byone consumer - that’s why the number of partitions is important - if the numberof partitions is low, maybe a consumer can’t handle your throughput. Make sure youplan the number of partitions in advance or your consumer lag will grow.


Be careful with your persisted schemas

When processing and storing specific events in a state store you must be verycareful with the event schema, specially if you rely on JSON format. Makingbreaking changes to the event schema means that the processing layer will failto parse the JSON when reading from the state store, and it will probably leadto lost data if you ignore the event or into a crash loop if you retry theprocessing.


Don’t rely on internal changelogs for downstream processing

For each state store, it maintains a replicated changelog Kafka topic in whichit tracks any state updates. Every time we insert a key-value into our statestore, a Kafka message is sent to the corresponding topic. In case of any crash,our application will rebuild the state store from the corresponding changelogtopic. If we want to apply another processing layer, either in the current applicationor another one downstream, we should always use context.forward(k, v) (using theProcessor API) to forward our processor output to a given sink topic.


State Stores don’t have TTL

While the state store changelog topic has log compaction so that old data can bepurged to prevent the topics from growing indefinitely, the state store itself don’thave this kind of mechanism and yes, it will grow forever, unless the applicationcrashes and the state store is rebuilt using a now, shorter version of the changelog.

(https://stackoverflow.com/questions/50622369/kafka-streams-is-it-possible-to-have-compact-delete-policy-on-state-stores)

Log compaction ensures that Kafka will always retain at least the last knownvalue for each message key within the log of data for a single topic partition.It addresses use cases and scenarios such as restoring state after applicationcrashes or system failure, or reloading caches after application restarts duringoperational maintenance. Let’s dive into these use cases in more detail and thendescribe how compaction works.

[Edit 1] Accordingto KIP-258there is an ongoing effort to add TTLto state stores. Record timestamps were addedto Ktable allowing to move forward with this initiative. If you’re asking yourselfwhy TTL State Stores are not yet supported in Kafka Streams is mainly because itrelies on changelogs as the source of truth, not the state stores. The two must be insync, otherwise, if we delete old state store records, it might happen that we restore all of themfrom the changelog during a rebalance for example.


The restore process

Every time the application starts, or in the worst case restarts, the states storeswill be restored using the corresponding changelog. If we pay attention, we’ll noticethat a given processor that relies on a given state store, doesn’t start processingwhile the state store is recovering. In order to give more awareness, we might want toadd a custom StateRestoreListener to track the state store restore process.

import com.typesafe.scalalogging.Loggerimport org.apache.kafka.common.TopicPartitionimport org.apache.kafka.streams.processor.StateRestoreListenerclass LoggingStateRestoreListener extends StateRestoreListener {  val logger = Logger(classOf[LoggingStateRestoreListener])  override def onRestoreStart(topicPartition: TopicPartition,                              storeName: String,                              startingOffset: Long,                              endingOffset: Long): Unit = {    logger.info(s"Restore started for $storeName and partition ${topicPartition.partition}...")    logger.info(s"Total records to be restored ${endingOffset - startingOffset}.")  }  override def onBatchRestored(topicPartition: TopicPartition,                               storeName: String,                               batchEndOffset: Long,                               numRestored: Long): Unit = {    logger.info(      s"Restored batch $numRestored for $storeName and partition ${topicPartition.partition}.")  }  override def onRestoreEnd(topicPartition: TopicPartition,                            storeName: String,                            totalRestored: Long): Unit = {    logger.info(s"Restore completed for $storeName and partition ${topicPartition.partition}.")  }}

You’ll be amazed, or outraged, with the amount of time wasted on restoring largechangelogs. Note to self: if you’re running Kafka Streams on top of Kubernetes, makesure you have persistent storage, otherwise these restore processes will kill your SLAs.

In a disaster scenario, when a particular instance crashes, configuringnum.standby.replicas may minimize the restore process by introducing shadow copiesof the local state stores.


Oh, the memory overhead

Assigning large heaps to the JVM sounds reasonable at first, although Kafka Streamsutilize lots of off-heap memory when using RocksDB which eventually leads to crashedapplications lacking free memory.

RocksDB stores data at least in four data structures (1) memstore, (2) bloomfilter,(3) index, and (4) blockcache. Besides that, it has lots of configurableproperties which makes a difficult job to properly tune it.

Unfortunately, configuring RocksDB optimally is not trivial. Even we as RocksDBdevelopers don’t fully understand the effect of each configuration change. Ifyou want to fully optimize RocksDB for your workload, we recommend experimentsand benchmarking, while keeping an eye on the three amplification factors.

(https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide)

On the other hand, if you don’t tune it, the memory usage of our applicationwill grow, and grow, and grow. So, we need to make sure we know the number of sourcetopics our application is consuming from, as the number of partitions and state stores.

If you have many stores in your topology, there is a fixed per-store memory cost. E.g., if RocksDB is your defaultstore, it uses some off-heap memory per store. Either consider spreading your app instances on multiple machines orconsider lowering RocksDb’s memory usage using the RocksDBConfigSetter class.

If you take the latter approach, note that RocksDB exposes several important memory configurations. In particular,these settings include block_cache_size (16 MB by default), write_buffer_size (32 MB by default) write_buffer_count (3by default). With those defaults, the estimate per RocksDB store (let’s call it estimate per store) is (write_buffer_size_mb * write_buffer_count) + block_cache_size_mb (112 MB by default).

Then if you have 40 partitions and using a windowed store (with a default of 3 segments per partition), the totalmemory consumption is 40 * 3 * estimate per store (in this example that would be 13440 MB).

(https://docs.confluent.io/current/streams/sizing.html)

Having configured ROCKSDB_BLOCK_CACHE_SIZE_MB, ROCKSDB_BLOCK_SIZE_KB, ROCKSDB_WRITE_BUFFER_SIZE_MB, andROCKSDB_WRITE_BUFFER_COUNT to the best possible values, we’re ableto estime the cost of a single store. Obviously, if we have lots of streams with lotsof stores, it will require lots of memory.

[Edit 2] KAFKA-8323: Memory leak of BloomFilter Rocks objectand KAFKA-8215: Limit memory usage of RocksDBfrom Kafka 2.3.0 might help/solve somememory issues.


Don’t forget the disk space

Consuming from large source topics and performing processing that requires storing nrecords in RocksDB for each message, will lead to a fairly large amount of data storedin disk. Without the proper monitoring, it is very easy to run out of space.


It’s ok to do external lookups

Well, most people will say to load all the needed data into a Stream or Store andperform the joins or local lookups while processing our messages. Sometimes it’s easierto perform external lookups, ideally to a fast database. And I must say that’s fine. Obviously,it depends on your load, how many external lookups are performed per message, and how fast yourdatabase can handle those lookups. Making this type of external calls inside a streammay introduce extra latency which could have an impact on the consumer lag of downstream systems,so please, use it carefully.

Data locality is critical for performance. Although key lookups are typically very fast,the latency introduced by using remote storage becomes a bottleneck when you’re working at scale.

The key point here isn’t the degree of latency per record retrieval, which may be minimal.The important factor is that you’ll potentially process millions or billions of records througha streaming application. When multiplied by a factor that large, even a small degree of networklatency can have a huge impact.

The cool thing about having to lookup for data from an external state store is that we canabstract our external state store as a simple StateStore and use it like the others, withoutchanging any existing code.

import org.apache.kafka.streams.processor.{ProcessorContext, StateRestoreCallback, StateStore}      class LookupStore[T](storeName: String, pool: Pool[T]) extends StateStore {  override def init(context: ProcessorContext, root: StateStore): Unit = {    context.register(root, new StateRestoreCallback() {      override def restore(key: Array[Byte], value: Array[Byte]): Unit = {}      })        }    override def name(): String = this.storeName  override def flush(): Unit = ()  override def close(): Unit = pool().close()  override def persistent(): Boolean = true  override def isOpen: Boolean = !pool().isClosed  def pool(): Pool[T] = pool    }

Note to self: every partition from a processor that’s using this custom storestarts a new connection to the database, which is not sustainable for the amountof processors and partitions that may exist. It’s advised to use a shared connectionpool to reduce and control the available connections.


Streams may die, a lot

Within a Kafka cluster, there are leader elections amoung the available nodes. Thebad thing about them is that sometimes it can have a negative impact on the currentprocessing streams.

2019-07-08 14:27:54 [my-stream-d98754ff-6690-4040-ae3c-fbe51f9cf39f-StreamThread-2] WARN  o.apache.kafka.clients.NetworkClient - [Consumer clientId=my-stream-d98754ff-6690-4040-ae3c-fbe51f9cf39f-StreamThread-2-consumer, groupId=my-stream] 266 partitions have leader brokers without a matching listener, including [my-topic-15, my-topic-9, my-topic-3, my-topic-10, my-topic-16, my-topic-4, __transaction_state-0, __transaction_state-30, __transaction_state-18,__transaction_state-6]
2019-07-08 14:29:51 [my-stream-d98754ff-6690-4040-ae3c-fbe51f9cf39f-StreamThread-2] WARN  o.apache.kafka.streams.KafkaStreams - stream-client [my-stream-d98754ff-6690-4040-ae3c-fbe51f9cf39f] All stream threads have died. The instance will be in error state and should be closed.

The logs above show that 266 partitions lost their leader and eventually the correspondingstream just stopped. Without proper monitoring and alerts configured, mainly consumer lag and numberof available streams, we can stay without processing messages for a long time. Having some kind ofretry mechanism may help as well. Just don’t trust that your stream will be up and running all the time,bad things happen.


Timeouts and rebalances

From time to time, applications get stuck in a rebalancing state leading to severaltimeouts.

Often it’s caused by processing very large batches of messages that take morethan the five minutes threshold to commit. When the processing is finished, the streamis already considered dead and the message can’t be committed. On rebalance, the samemessage is fetched once again from Kafka and the same error would occur.

Reducing the max.poll.records value, often to 1 would sometimes alleviate thisspecific issue ¯_(ツ)_/¯.


Still, …

Building real-time applications with Kafka Streams is quick, easy, powerful, and verynatural after grasping the non-trivial stuff that comes with the full package.

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Kafka Streams 流处理 状态管理 分区规划 实时数据处理
相关文章