Fancy Stream Processing Made (even more) Operationally Mundane

Mar 26, 2024
Richard Artoul
WarpStream is proud to sponsor Benthos, and we encourage you to do the same. If you’re using Benthos, become a sponsor and support the project. Thank you to Ashley Jeffs and the Benthos community for creating and maintaining Benthos!

WarpStream + Benthos

WarpStream is a drop-in replacement for Apache Kafka®, but Kafka is just the storage layer. Real world use-cases require connecting Kafka to at least one external system, but writing custom code for each use-case quickly becomes tedious and time consuming.

Kafka Connect can ease this burden as it comes packaged with a wide variety of connectors that automatically connect Kafka to a wide variety of systems, but operating Kafka Connect is not a simple task. Like Apache Kafka, Kafka Connect is a stateful distributed system that needs to be managed alongside your Kafka cluster, which increases your infrastructure footprint and operational burden.

We designed WarpStream to run as a single, stateless binary directly in our customer’s cloud accounts. Asking our customers to also deploy and manage Kafka Connect didn’t sit right with us. That’s why when we first learned about Benthos, a stateless stream processing framework written in Go, we decided to evaluate if it could be embedded directly in the WarpStream Agents. To our delight, it could!

We’re excited to announce that WarpStream now natively embeds Benthos, a stateless stream processing framework that connects to many data sources and sinks. Benthos offers much of the functionality of Kafka Connect, as well as additional lightweight stream processing functionality like single message transforms, (lightweight) aggregations, multiplexing, enrichments, and more. It also has native support for WASM (WebAssembly) for more advanced processing (using the excellent wazero library).

Benthos describes itself as a stream processing framework that makes “fancy stream processing operationally mundane”. We like that idea, and together WarpStream and Benthos make stream processing more operationally mundane than ever before!

Don’t just take our word for it though, let’s look at an example.


Benthos has support for powerful data integration and processing features, but to demonstrate the basic functionality with WarpStream and learn a little bit about how it works, we’re going to start with something a little more trivial. Feel free to follow along at home using your own Terminal!

First, we’re going to install WarpStream locally. If you’re already a WarpStream user, make sure you update to the latest version that has Benthos support built in:

curl | sh

Next, we’ll create a YAML file that specifies the Benthos configuration. We’ll store this file locally so we can pass it to the WarpStream Agent:

# benthos.yaml input: generate: count: 1000 interval: 1s mapping: | root = if random_int() % 2 == 0 { { "type": "foo", "foo": "xyz" } } else { { "type": "bar", "bar": "abc" } } output: broker: pattern: fan_out outputs: - type: file file: path: /tmp/data.txt codec: lines - type: kafka_franz kafka_franz: seed_brokers: - localhost:9092 topic: benthos_test

This file instructs Benthos to generate 1000 messages at a rate of 1 message per second, with a random mapping of two values. Benthos then writes the result to a local file called data.txt, located at /tmp/, with each message as a new line in the file, and writes each message to a topic in WarpStream (running locally in Playground mode) called "benthos_test".

In our Terminal, we’re going to start up WarpStream using the "playground" command, and tell the Agent to use the YAML file that we just created. The playground command creates a temporary (ephemeral) WarpStream account and cluster that we can use for testing and local development.

warpstream playground -benthosConfigPath ./benthos.yaml

The output of that command should look something like this:

WARNING, RUNNING IN PLAYGROUND MODE. All data and state is ephemeral. Server will shutdown automatically after: 4h0m0s Signing up for temporary account... Done signing up for temporary account Starting local agent... started agent, HTTP server on port: 8080 and kafka server on port: 9092 open the developer console at: https:/ &warpstream_session_key=sks_84a19b03eb51df6374ab17d3a62e2b804f7d530409a5af45cea10ef961e18ac1

Navigate to the console using the link provided in the Terminal, and you’ll see a topic in WarpStream with messages being produced.

Next, open the file called data.txt that Benthos wrote to the /tmp/ directory:

cat /tmp/data.txt

You should see the contents of the file with our messages that were created by Benthos, running in the WarpStream Agent, printed in the console.

Using Benthos embedded within the WarpStream Agents, we were able to write data to both a local file and a topic in WarpStream, without writing any application code or increasing our infrastructure footprint. All we had to do was provide a configuration file, and Benthos did the rest.

Next steps

This example was admittedly pretty trivial, but it shows how flexible and powerful Benthos can be. Benthos supports over 50 different inputs, outputs, and processors, which you can use to integrate WarpStream with your external systems and write single message transforms, aggregations, multiplexing, enrichments, and more, all without managing any additional infrastructure. Check out the official Benthos docs to learn more about what’s possible with Benthos.

For WarpStream BYOC deployments, the WarpStream Agents now ship with Benthos directly embedded, so you don’t have to manage anything extra. Benthos is available for BYOC starting with Agent release v541 as a beta feature. Review the WarpStream <> Benthos documentation to learn how to configure and use Benthos with WarpStream. And stay tuned, support for Benthos in Serverless clusters is coming soon!

Contract us or join our Slack if you want to learn more.

Return To Blog
Return To Blog