Hacking the Kafka PRoTocOL

Sep 18, 2023
Richard Artoul

WarpStream is an Apache Kafka® protocol-compatible data streaming system built directly on-top of object storage. That means WarpStream is designed to “look and feel” exactly like a “real” Kafka cluster, but under the hood it has a completely different implementation and makes a completely different set of architectural decisions.

The most important difference between WarpStream and Kafka is that WarpStream offloads all storage to object storage and requires no local disks. This makes WarpStream 5-10x cheaper for high volume workloads (because it eliminates inter-zone networking costs entirely), and also makes it trivial to operate since WarpStream’s single binary (called an Agent) is completely stateless.

This dichotomy presents a conundrum: WarpStream Agents are stateless, but the Kafka protocol assumes an incredibly stateful system. Somehow we have to square that circle. The rest of this post will cover exactly how we did that, going so far as to make WarpStream work with off the shelf Kafka clients even when it’s deployed behind a traditional network load balancer like Nginx, something that’s impossible to do with traditional Apache Kafka.

Traditional Kafka Service Discovery

Let’s start by discussing how Kafka’s service discovery mechanism works from a client’s perspective. The client establishes a TCP connection to the provided “bootstrap” broker URL and sends an APIVersions request to negotiate what features of the Kafka protocol are supported by the broker. 

Simplified view of the initial part of the Kafka protocol.

Immediately after, the client will issue a Metadata request which returns several key pieces of information:

  1. A list of all the Kafka brokers in the cluster and their hostnames / ports.
  2. A list of all the Kafka topics in the cluster.
  3. The ID of the broker that is the leader for each topic-partition (I.E where to send writes), as well as a list of the other brokers that are replicas for that topic-partition.

Here is a small snippet of this response from one of our test environments:

It shows that node 1729786125 with hostname 10.0.174.233 running in availability zone (Rack) us-east-1c is exposing the Kafka protocol on port 9092. It also shows that this particular broker is the leader for partition 0 of Topic 0, and there are no other replicas.

The client uses the “Brokers” part of the response to begin establishing connections to different brokers in the cluster, and it uses the topic-partition leader information to ensure that it’s routing Produce and Fetch requests to the brokers that are the leaders for the specific topic-partitions it’s interacting with.

All of this is extremely important to get right because Kafka brokers are stateful. For any given topic-partition, data must be written to the leader for correctness and durability reasons. No other broker besides the leader can process that request. Similarly, reads can only be processed by Kafka brokers that are replicas for a given topic-partition, the other brokers simply don’t have the data.

After the connection is established, the clients will continue to issue Metadata requests periodically in the background (as well as in response to specific error codes) so they can refresh their view of the cluster in case any leadership changes occur.

To summarize, Kafka’s service discovery system is designed to propagate cluster information and ensure that clients route requests to the appropriate replicas when writing/reading data for a given topic-partition.

WarpStream Service Discovery

That makes sense for Kafka, but what about for WarpStream? Unlike Apache Kafka, WarpStream doesn’t have stateful brokers. Instead, WarpStream has stateless Agents that offload all storage concerns to object storage.

In practice this means that any WarpStream Agent can serve writes or reads for any topic-partition, at any time. Of course, that doesn’t mean that WarpStream doesn’t care about which Agent requests are routed to at all. WarpStream has several goals with its routing strategy:

  1. Route requests between clients and Agents in a zone-local way. This means that clients should always read and write to Agents in the same availability zone as them whenever possible. This is how WarpStream reduces the cost of high volume streaming workloads by 5-10x compared to Apache Kafka, by driving inter-zone networking costs to 0. 
  2. Spread load as evenly as possible amongst all the different WarpStream Agents within an availability zone.

That seems pretty straightforward, but doing it within the confines of the pre-existing Kafka protocol that wasn’t designed for this is trickier than it appears.

Zone Aware Routing

Let’s start by talking about zone-aware routing. The first step of performing zone-aware routing is to know in which availability zone each Agent is running in the first place. Every major cloud provider exposes APIs for applications to determine this information at runtime, so the WarpStream agents do this automatically. Once they’ve determined their availability zone, they report that information as part of their regular “heartbeat” requests to the WarpStream service discovery system.

Simplified diagram of the WarpStream discovery system.

These regular heartbeats ensure that the WarpStream service discovery system always knows how many Agents are online and healthy, what availability zone they’re each running in, and what hostname / port they’re advertising the Kafka protocol on.

Now that we know about all the Agents and which availability zone they’re in, we can generate responses for the Kafka protocol Metadata requests so we can tell the Kafka clients which Agents they should establish connections to. This seems like it should be straightforward: just generate unique Metadata responses for each client that only contains Agents running in the same availability zone as the client. Easy right?

Unfortunately, it’s not that simple. While the Apache Kafka protocol already has a concept of “racks” that can be used for zone-aware request routing with the “follower-fetch” feature, that only works for reads. The protocol has no concept of rack-aware Produce requests because that’s not possible with Apache Kafka. Data must be written to the leader of each topic-partition, regardless of which availability zone it is running in or how much inter-zone networking fees that would incur.

We experimented with a variety of different approaches to solve this problem, including the use of zone-specific bootstrap URLs. The idea with zone specific bootstrap URLs is to provide Kafka clients with a DNS entry for their bootstrap URL that will only ever return the IP addresses of Agents running in the same availability zone as the one specified in the URL. For example, a typical URL would look like this:

 api-11155fd1-30a3-41a5-9e2d-33ye5a71bfd9.us-east-1a.discovery.prod-z.us-east-1.warpstream.com:9092

We implemented this functionality by writing a custom zone-aware DNS server. These zone specific URLs turned out to be a pretty decent solution, and we still use them for some things, but they make one big assumption: that a Kafka client connected to a WarpStream Agent is necessarily running in the same availability zone as the Agent. This is true in the general case, but we wanted the flexibility to temporarily route clients to Agents in different availability zones for, well, availability reasons! 

For example, imagine a small H.A WarpStream setup with 3 agents running across 3 availability zones, one Agent per zone. When everything is going well, the flow of requests would look like this:

Three availability zone WarpStream deployment.

However, sometimes an availability zone may contain 0 Agents, say during a deployment of the WarpStream Agents themselves. In that scenario, we want to be able to route clients to Agents in one of the other two zones temporarily, and then shift them back to the correct availability zone later once the missing Agents are restored.

Three availability zone WarpStream deployment, with one AZ containing 0 Agents.

Unfortunately this doesn’t work well with the approach of using zone-specific bootstrap URLs because they have a baked in assumption that the client is running in the same availability zone as the Agent it’s connected to, and there is no way to distinguish “after the fact” that a client is communicating with an Agent in the wrong availability zone. 

Our initial approach to solving this problem was to configure the Agents to periodically close their TCP connections after a fixed period of time (with jitter) to force periodic DNS resolution. This would eventually “shift” client connections back towards the correct availability zone over time.

This worked, but users hated it. Traditional Kafka brokers and their associated TCP connections tend to be extremely long lived, so many client libraries don’t handle connection failures very gracefully and users would have to manually implement retries and suppress error logs in their applications. Whoops.

After a lot of testing and experimentation, we concluded that we needed to have some mechanism for determining which availability zone a client was running in as part of every protocol message. That was the only possible way we could build a routing system that minimized interzone networking costs, but was also highly available and behaved the way that most Kafka clients expected.

Unfortunately, the Kafka protocol doesn’t have any built in support for this, but it does have one field that is completely controlled by the user / application and is present as part of every protocol message: the client ID.

We settled on a new approach where users that want to leverage the zone-aware routing feature encode their client’s availability zone into the client ID like this:

warpstream_session_id=aa456def-8626-4544-92db-61fd3e57e4bb,warpstream_az=us-east-1

Once a client has configured their client ID such that it encodes the clients’ availability zone, the WarpStream service discovery system can meet its goal of minimizing inter-zone networking fees and remaining highly available: every time a Kafka client sends a Metadata request to WarpStream to refresh its view of the cluster, WarpStream parses the client’s availability zone from the client ID.

If there are a healthy number of agents running in that availability zone, then WarpStream returns to that client a view of the cluster that only includes agents in that zone. However, if that zone is ”unhealthy”, then WarpStream returns a complete view of the Agents across all availability zones. This will temporarily incur inter-zone networking fees, but almost all Kafka client implementations periodically dispatch Metadata requests in the background to refresh their view of the cluster. Eventually the client will issue a new Metadata request and WarpStream will have an opportunity to “shift” the client’s view of the cluster back to an exclusive view of only the Agents in their availability zone. Win win!

Load Balancing

Now that we’ve covered how WarpStream does zone-aware routing, we need to talk about how WarpStream keeps load evenly balanced within an availability zone. The primary vehicle we have for accomplishing this is the same Metadata protocol response we discussed earlier. We can control how clients route their Produce and Fetch requests by controlling which Agents we specify as the leader for each topic-partition.

The most naive thing we could do is just evenly split all the topic-partitions amongst all the agents. This would result in reasonable load balancing for most cases, although it would reintroduce a common operational problem that Apache Kafka suffers from: partition imbalances. That said, it would be straightforward to monitor the volume of each partition and rebalance them amongst the Agents so that load was spread relatively evenly. We could do this frequently and react quickly because all the data is stored in object storage anyways, so we don’t have to actually shift any data around.

However, we decided to take a different approach instead: We return Metadata responses in which a single Agent appears to be the leader for all topic partitions. In practice, this means that every instance of a Kafka client producer/consumer in a given application is sending Produce requests to a single Agent at a time. 

That sounds like it would balance poorly, but in practice it behaves extremely well in realistic scenarios where there is more than one client as long as we tell different clients that different Agents are the leader

Produce requests are almost perfectly balanced across all the WarpStream Agents.

One simple way to balance the different clients would be to respond with a random Agent as the leader for all the topic-partitions every time we received a Kafka protocol Metadata request. However, in practice, random load balancing performs terribly in almost all scenarios. Instead, we use a round-robin load balancing policy which works much better. The image below shows the transition from a random load balancing policy, to a round-robin based one, it’s quite dramatic!

Round-robin is much more effective than random!

Of course the question remains: why go through all this trouble when we could have just divided the topic-partitions amongst the Agents evenly? A few reasons:

  1. The WarpStream approach guarantees that all Produce requests are fully atomic. Either all the data produced across all the topic-partitions in a single request will be committed atomically, or none of it will. We think that’s really nice.
  2. It dramatically simplifies the implementation of the Kafka idempotent producer and transactions features.
  3. It prevents the need for the relationship between clients and Agent connections to become a full mesh which improves performance.
  4. It improves batching between the Kafka client and the WarpStream Agents. Instead of the Kafka client producer buffering one batch of data for every Agent in a cluster, producers only have to buffer a single batch at a time. This means that if the client is flushing based on request / batch size, it can flush much sooner than it would ordinarily be able to if the number of Agents is high. This improves performance and reduces latency for many workloads.

Balancing Behind Load Balancers

The previous section explains how WarpStream is able to load balance connections without a dedicated load balancer. However, sometimes it still makes sense to deploy the WarpStream Agents behind a traditional network load balancer.

For example, let’s say you want to expose the Agents over the internet. WarpStream handles SASL authentication built in, but to leverage that feature safely over the internet, the WarpStream Agents still need something to perform SSL termination. The simplest and easiest way to accomplish that is to deploy a network load balancer in-front of the Agents and let the load balancer handle SSL termination for you, just like you would with a traditional HTTP or gRPC application.

WarpStream Agents deployed behind a network load balancer.

Accomplishing this within the confines of the Apache Kafka protocol is actually straightforward: simply configure the WarpStream Agents to announce themselves to the discovery system using the load-balancer’s hostname instead of their private IP address. This is extremely convenient because it means you can use whatever mechanism you already have for certificate management and SSL termination, instead of having to create a bespoke solution just for your Kafka clusters. For example, the diagram below demonstrates what this looks like with one of our WarpStream test clusters deployed to Fly.io. Note that we didn't have to do anything special, we're just using the stock Fly.io load balancer that's used for "regular" applications.

Concrete view of WarpStream Agents deployed behind Fly.io network load balancer.

We did this and everything just worked. 🎉. Well, almost.

The load balancing mechanism we discussed earlier depends on one crucial detail: Kafka clients periodically refresh their view of the cluster in the background by issuing Metadata requests. Every time they do this, the WarpStream control plane round-robin load balancing policy kicks in and the client potentially receives a response in which a different Agent is now marked as the leader for all topic-partitions. When this happens, the Client will start routing requests to the new Agent instead of the previous one.

This works great and ensures that load remains evenly balanced, and that client’s quickly react to changes in cluster topology (I.E when new Agents are added and removed, say, due to auto-scaling). However, some clients inspect both the node ID and the hostname of a broker to determine if a broker transition has actually occurred and they need to establish new connections / reroute requests.

WarpStream assigns each Agent a unique, but deterministic Kafka node ID by hashing the Agent’s UUID. Normally, each Agent has a unique hostname as well (its private IP address on the network it’s running on). However, when the Agents are running behind a load balancer each Agent has a unique Kafka node ID, but is announcing itself under the same hostname: the hostname of the load balancer.

This confuses some client implementations and prevents them from re-establishing connections as quickly as they should. That’s problematic because it means it can take a long time for requests to balance amongst all the Agents after upscaling when they’re running behind a network load balancer:

It takes a *long* time for requests to balance after upscaling the WarpStream Agents.

Periodically closing connections in the Agents would solve this problem, but it introduces other issues like we mentioned previously. Instead, we settled on a much simpler solution that depends on a very simple insight: hostnames in the Kafka protocol client implementations are all treated as case-sensitive, but DNS is case-insensitive.

That means we can generate a uniquely cased version of the load balancer hostname for each Agent and that will ensure that each Agent looks distinct from the Kafka client’s perspective, but won’t interfere with the client’s ability to resolve DNS and establish a connection with the load balancer. This is how it ends up looking to the client:

This casing trick makes the load balancer scenario look almost exactly like the non load balancer scenario, and everything behaves nicely:

Load balancing actually isn't perfect because Fly.io using a random load balancing algorithm. Hopefully they'll offer a better one in the future!

This final optimization isn’t strictly required for most well behaved clients, but it helps a few client implementations and doesn’t hurt any of them, so we decided to keep it. Fun fact, we actually learned about this trick while writing our own DNS server and realizing that sometimes it didn’t work because Google’s DNS server was randomizing the case of the requested hostname to improve security.

Wanna take WarpStream for a spin? You can run our demo in under 30s with this command:

curl https://console.warpstream.com/install.sh | sh

If you have any questions, hop into our community Slack!

Return To Blog
Return To Blog