Unlocking Idempotency with Retroactive Tombstones

Nov 18, 2023
Richard Artoul

We first introduced WarpStream in our blog post: "Kafka is Dead, Long Live Kafka", but to summarize: WarpStream is an Apache Kafka protocol compatible data streaming system built directly on top of object storage. It has zero local disks, and incurs zero inter-zone bandwidth costs. 

Most people think of Apache Kafka as a simple queue or durable log. However, many people aren’t aware that Kafka supports more advanced features like idempotent produce requests which allows a Kafka client to produce the same batch of data multiple times, but ensure it’s only appended to the log once.

When we first set out to build WarpStream, we knew that idempotent producer would be one of the most challenging features to support with our stateless architecture, and that we’d have to design for it from day one. In fact, it was one of the primary reasons why we created our own metadata store instead of reusing something off the shelf.

In this post, we’ll go over in detail how we added support for the idempotent producer functionality to WarpStream’s storage engine. However, before we can do that, we first need to provide some relevant context on the WarpStream write path.

Separating Data from Metadata

In traditional Apache Kafka, batches are ordered by the Kafka broker that is the leader for the topic-partition that is being written to. The ordering of the batches is stored implicitly in the Kafka log segments themselves. Concretely, the logical ordering of the batches is, by definition, the same as the physical ordering of the batches on disk. Consistency across replicas is maintained using a custom leader-follower replication protocol.

Kafka Leader-Follower Replication Protocol

WarpStream, however, takes a different approach. Requests begin the same way: Kafka clients connect to the WarpStream Agents and then begin writing batches of compressed data using the Produce() API. However, the two systems diverge quickly after that. The WarpStream Agents will buffer Produce() requests from many different clients in-memory for ~100ms and then combine them all together into a single segment file and upload it to object storage. This segment file will contain batches for many different topic-partitions and from many different clients.

WarpStream Write Path

After flushing the file to object storage, the Agent will commit the presence of the file to the WarpStream Metadata store, and then acknowledge all the Produce() requests successfully back to the clients.

The Agents in this architecture are completely stateless. There are no leaders, and any Agent can write data for any topic-partition. This makes the WarpStream Agents dramatically easier to operate than Kafka brokers, but it begs the question: if there are no topic-partition leaders, then how is the order of the final batches determined?

Concurrent writes to the same topic-partition from two different WarpStream Agents.

For example, in the diagram above two files were written concurrently by Agents 1 and 2, and both files contain batches for partition 1. Which batches comes first in the logical ordering of the partition, X or Y?

WarpStream solves this problem by determining the final order of each batch when it’s committed to the metadata store after it’s already been flushed to object storage. This decouples the physical layout of the WarpStream files from the logical ordering of the batches.

Batch ordering and record offsets are assigned after data has already been flushed to object storage.

This is what we’re referring to when we say that WarpStream separates data from metadata, in addition to separating compute from storage. Another way to think about this is that the metadata store acts as the leader for ordering all topic-partition batches.

This decoupling of data from metadata is valuable for a number of reasons. First, it enables the Agents to flush data to object storage in a massively parallel way without having to synchronize or serialize their uploads. This is important because the higher latency nature of object storage demands a highly concurrent storage engine to achieve high throughput. 

Second, as we’ll see later in this post, decoupling data from metadata is also a key part of how WarpStream is able to implement the idempotent producer functionality, and in the future, Kafka transactions.

WarpStream Read Path

Of course, the decoupling of data from metadata has implications for WarpStream’s read path. Unlike Kafka, any Agent can serve any Fetch() request (although the details are a bit more nuanced, see our previous blog post about distributed mmap), and the Agents aren’t leaders for any individual topic-partitions. As a result, the Agent must first query the Metadata store to determine which batches (including from which files, and in which order) should be read next to satisfy the Fetch() request.

WarpStream Read Path

For example, in the diagram above a client issues a Fetch request for partition 1, asking to read 1000 records starting at offset 306. The Agent queries the Metadata store asking which set of batches logically contain the records 306 → 1306 for partition 1. The Metadata store replies that the Agent should read batch X from file 900 followed by batch Y from file 806.

This highlights an interesting insight: file 900 was flushed to object storage after file 806, but the Metadata store determined that file 900 was committed before file 806. This is normal behavior and one of the reasons that WarpStream can achieve high write throughput in a stateless manner, while still guaranteeing Kafka’s ordering semantics: flushing data to object storage is massively parallelized, but committing metadata is (logically) serialized.

That may seem like it would become a bottleneck, but in practice the metadata store can process millions of metadata operations/s per virtual cluster, so individual clusters can scale easily to multiple GiB/s of throughput without issue.

Of course, even if the metadata store can handle an extremely high mutation rate, we don’t want the number of batches we have to track to grow forever; eventually the Metadata size would become impractical to manage. In addition, even if we could track all of the batches forever in the metadata store, a high number of small batches reduces compression ratios and makes replaying historical data expensive and slow due to the number of IO operations that have to be performed.

To mitigate these issues, the WarpStream Agents periodically run compaction jobs in the background that merge together different segment files. When the files are being merged together, the Agent will also take the opportunity to combine multiple small batches that belong to the same topic-partition into a single larger batch.

WarpStream compaction inputs and outputs.

This ensures that even if a client produces 10s of millions of batches, the number of batches the metadata store has to track remains proportional to the overall write throughput, partition count, and retention of the of the topics in the cluster. It also improves compression ratios and the ability to perform historical replays efficiently and cost effectively as discussed earlier.

Idempotent Producer and Retroactive Tombstones

Now that we’ve got all the relevant background information, let’s talk about the idempotent producer functionality. The idea with the idempotent producer feature is that every producer client will call the InitProducer() method to get a unique “producer ID”. Every time they produce data to Kafka, they will provide this producer ID, along with a monotonically increasing sequence number for each batch of data that they produce. The tuple of <producer_id, sequence_number> can then be used to deduplicate produce requests. For example, in the diagram below the first time the client tries to produce the batch with sequence number 1 it encounters a timeout. At that point in time, it does not know if the batch it wrote has been persisted into WarpStream's durable log. However, with the idempotent producer functionality enabled, the client can safely retry its request knowing that it won't introduce a duplicate batch into the system.

Example scenario that would result in duplicate batches without the idempotent producer functionality.

Of course, this doesn’t guarantee exactly once semantics end-to-end, but it is a helpful feature nonetheless. 

But how does WarpStream actually ensure idempotency of individual Produce() requests? Remember, unlike a traditional Kafka broker, the WarpStream Agent is not a topic-partition leader. Therefore when it flushes a segment file to object storage, it has no idea if the batches it included are duplicates or not.

One way to solve this problem would be for the Agent to consult the Metadata store before flushing the file to object storage to ask if any of the batches it’s about to write are duplicates. If so, it could just not include them in the output file. However, the Agent would also have to acquire a “lock” on all the batches that were not duplicates to ensure no other Agent wrote them either. This would result in an extra round-trip to the Metadata store, and reduce availability in the case of an Agent failure since the system would have to wait for the Agent’s lock to expire before the Producer could write that batch to a different Agent.

Instead, WarpStream takes a different approach that leverages the fact that the system already separates data from metadata. The metadata store tracks the sequence IDs of the last 5 batches that were successfully committed for each tuple of <producer_id, partition>. It only tracks the last 5 because the Kafka protocol specifies that an idempotent producer is never allowed to have more than 5 in-flight concurrent produce requests at any given moment.

When the Metadata store receives a request to commit a new file from the Agent, it can inspect the sequence ID of each batch that the Agent wrote into the file and “drop” any that are duplicates. Of course, at that point the duplicate batches have already been written to the immutable segment files in object storage. That’s fine though, because as we discussed earlier, every Fetch() request issued by a client will result in a call to the Metadata store to know which batches to read next.

WarpStream Read Path

The metadata store simply won’t return the metadata for the duplicate batches that were dropped, and so the Agent won’t read them, even if they’re sandwiched between two valid batches. We call this mechanism "retroactive tombstoning" because it enables the Metadata store to decide after the fact that a section of the Segment file is "dead" without having to actually modify it. Another way to think about it is that the separation of data from metadata enables WarpStream to create "mutable views" over object storage files that are physically immutable.

However, as always, the devil is in the details. And there are a lot of details involved in getting Kafka’s idempotent producer functionality working correctly, especially since the feature involves a coordinated dance between the Kafka client and the server exposing the Kafka protocol.

Luckily, WarpStream already had an aggressive test suite of property and fuzz tests that executed complex workloads with automatically injected faults and measured the percentage of duplicate records that were consumed on the other side. We created versions of these tests that enabled the idempotent producer functionality, and configured them to fail if any duplicates were observed. Finally, we cranked up the fault ratio to extremely high levels, added many new fault sites, and ran the tests with 4 different Kafka client implementations.

These property tests allowed us to identify and fix subtle bugs in the implementation that only occurred in extremely rare scenarios that would have been otherwise almost impossible to identify.

Reducing Space Amplification

At some point we do want to clean up the duplicate batches to save space. When a compaction is planned and performed, the metadata store includes in the compaction plan a list of all the batches that are “valid” in the input files. When the compaction merger runs, it consults this metadata for each batch it observes in the input. If it encounters an unexpected batch, it knows that it's a duplicate and drops it. 

This reduces space amplification, and also improves IO efficiency since we don’t waste time paging in “dead batches” from object storage that just happened to be sandwiched between two “live” batches that we need to read.

When we had finished implementing all of this, we found that everything worked great from a correctness perspective. However, end-to-end latency degraded significantly when the idempotent producer functionality was enabled. The performance regression was highly correlated with a large increase in CPU usage, as well as the number of overall requests in our distributed file cache system.

Distributed File Cache RPC/s
WarpStream Agent CPU Usage

After a little investigation, the issue became obvious. Our internal benchmarking workloads are intentionally configured to simulate poorly configured Kafka clients. As a result, the producers were writing a lot of data, but using extremely small batches, often just one record per batch.

Earlier we discussed how the WarpStream Agents merge batches of data for the same partitions together at compaction time. Well, whenever possible, they also merge batches of data together at ingestion time. However, when the idempotent producer feature is enabled the Agents can’t do this, because the Metadata store may determine that a batch is a duplicate after the Agent’s have already written it to object storage. For example, if the batch that the metadata store declared as a duplicate had already been merged with other batches, there would be no way to only drop only the records that originated from the duplicate batch. As a result, when a Kafka client enables the idempotent producer functionality, the WarpStream Agents must wait until a compaction is performed to begin merging smaller batches into larger ones.

This exposed a performance bottleneck in the interaction between WarpStream’s implementation of the Kafka protocol Fetch() method and the distributed file cache. When idempotency was disabled, the Agent’s would usually only have to read a single batch from any given file to serve a Fetch request for a single partition. However, with idempotency enabled, uncompacted files would often have 40+ individual batches for a single partition. Luckily we caught this issue very early on because we do all of our testing / benchmarking with Kafka clients that are intentionally poorly tuned to generate a huge number of unique batches.

The WarpStream file format sorts all the batches in a given file first by topic, and then by partition. As a result, all the batches were already laid out right next to each other and being paged in from object storage together automatically via our distributed mmap implementation. However, the Agent serving the Fetch request was requesting each batch individually from the distributed file cache which resulted in a huge number of tiny RPCs. 

We modified the file cache interface to support reading a set of batches in a single RPC and then deployed the fix to our test environment.

End-to-end latency (producer to consumer) dropped back down below 2s after deploying the fix.

Success! We’d made idempotent producer work, then correct, and finally fast.

WarpStream’s BYOC product is currently in developer preview, and our fully hosted scale-to-zero serverless product will be launched in Q1 of next year. If you’re interested in learning more about WarpStream, or getting early access to the product please contact us, or join our Slack

Return To Blog
Return To Blog