Kafka Streams is a client library for building mission-critical real-time applications and microservices, where the input and/or output data is stored in Kafka clusters. Kafka Streams combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka's server-side cluster technology to make these applications highly scalable, elastic, fault-tolerant, distributed, and much more.
Kafka has four core APIs:
- The Producer API allows an application to publish a stream of records to one or more Kafka topics.
- The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.
- The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
- The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.
Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don't already have one. You may download ZooKeeper from this page. Once you've downloaded a stable ZooKeeper release unpack it and cd to the root. To start ZooKeeper you need a configuration file. Here is a sample, create it in conf/zoo.cfg.
tickTime=2000 dataDir=/var/lib/zookeeper clientPort=2181
You may also use conf/zoo_sample.cfg as a sample and update it as what you need.
This file can be called anything, but for the sake of this discussion call it conf/zoo.cfg. Change the value of dataDir to specify an existing (empty to start with) directory. Here are the meanings for each of the fields:
tickTime : the basic time unit in milliseconds used by ZooKeeper. It is used to do heartbeats and the minimum session timeout will be twice the tickTime.
dataDir : the location to store the in-memory database snapshots and, unless specified otherwise, the transaction log of updates to the database.
clientPort : the port to listen for client connections
Now that you created the configuration file, you can start ZooKeeper and Connect to ZooKeeper by the following commands:
bin/zkServer.sh start bin/zkCli.sh -server 127.0.0.1:2181
Step 1: Download the code
Download the 1.0.0 release and un-tar it.
tar -xzf kafka_2.11-1.0.0.tgz cd kafka_2.11-1.0.0
Step 2: Start the server
Step 3: Create a topic
Let's create a topic named "test" with a single partition and only one replica:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
We can now see that topic if we run the list topic command:
bin/kafka-topics.sh --list --zookeeper localhost:2181
Step 4: Send some messages
Run the producer and then type a few messages into the console to send to the server.
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test > This is a message > This is another message
Step 5: Start a consumer
Kafka also has a command line consumer that will dump out messages to standard output.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning > This is a message > This is another message
Since we have had both ZooKeeper and Kafka servers setup and have played around Kafka topics, producers and consumers, we are ready to run our first streams demo APP.
Run Streams Demo Application
This quickstart example will demonstrate how to run a streaming application coded in this library. Here is the gist of the WordCountDemo example code:
// Serializers/deserializers (serde) for String and Long types final Serde<String> stringSerde = Serdes.String(); final Serde<Long> longSerde = Serdes.Long(); // Construct a `KStream` from the input topic "streams-plaintext-input", where message values // represent lines of text (for the sake of this example, we ignore whatever may be stored // in the message keys). KStream<String, String> textLines = builder.stream("streams-plaintext-input", Consumed.with(stringSerde, stringSerde); KTable<String, Long> wordCounts = textLines // Split each text line, by whitespace, into words. .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) // Group the text words as message keys .groupBy((key, value) -> value) // Count the occurrences of each word (message key). .count() // Store the running counts as a changelog stream to the output topic. wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
Step 1: Implement the word count APP
From the Kafka root directory, run the command below to create the path recursively.
mkdir -p streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount
We can implement the word count APP by creating a java file and copying and paste the code above.
Step 2: Prepare input topic and start Kafka producer
Next, we create the input topic named streams-plaintext-input and the output topic named streams-wordcount-output:
bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 1 \ --topic streams-plaintext-input
Note: we create the output topic with compaction enabled because the output stream is a changelog stream
bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 1 \ --topic streams-wordcount-output \ --config cleanup.policy=compact
The created topic can be described with the same kafka-topics tool:
bin/kafka-topics.sh --zookeeper localhost:2181 --describe
Step 3: Start the Wordcount Application
The following command starts the WordCount demo application:
The demo application will read from the input topic streams-plaintext-input, perform the computations of the WordCount algorithm on each of the read messages, and continuously write its current results to the output topic streams-wordcount-output. Hence there won't be any STDOUT output except log entries as the results are written back into in Kafka.
Now we can start the console producer in a separate terminal to write some input data to this topic:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
and inspect the output of the WordCount demo application by reading from its output topic with the console consumer in a separate terminal:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true \ --property print.value=true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
Step 4: Process some data
Now let's write some message with the console producer into the input topic streams-plaintext-input by entering a single line of text and then hit <RETURN>. This will send a new message to the input topic, where the message key is null and the message value is the string encoded text line that you just entered (in practice, input data for applications will typically be streaming continuously into Kafka, rather than being manually entered as we do in this quickstart):
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input > all streams lead to kafka
This message will be processed by the Wordcount application and the following output data will be written to the streams-wordcount-output topic and printed by the console consumer:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true \ --property print.value=true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer > all 1 > streams 1 > lead 1 > to 1 > kafka 1
To test the APP further, we can type the following to Kafka producer:
> hello kafka streams
In your other terminal in which the console consumer is running, you will observe that the WordCount application wrote new output data:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true \ --property print.value=true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer all 1 streams 1 lead 1 to 1 kafka 1 hello 1 kafka 2 streams 2
Here the last printed lines kafka 2 and streams 2 indicate updates to the keys kafka and streams whose counts have been incremented from 1 to 2. Whenever you write further input messages to the input topic, you will observe new messages being added to the streams-wordcount-output topic, representing the most recent word counts as computed by the WordCount application.
Step 5: Teardown the application
You can now stop the console consumer, the console producer, the Wordcount application, the Kafka broker and the ZooKeeper server in order via Ctrl-C.