Basics of Kafka Setup

Basics of Kafka Setup

Background

Kafka Streams is a client library for building mission-critical real-time applications and microservices, where the input and/or output data is stored in Kafka clusters. Kafka Streams combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka's server-side cluster technology to make these applications highly scalable, elastic, fault-tolerant, distributed, and much more.

Kafka has four core APIs:

  • The Producer API allows an application to publish a stream of records to one or more Kafka topics.
  • The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.
  • The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
  • The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.

Preparation

Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don't already have one. You may download ZooKeeper from this page. Once you've downloaded a stable ZooKeeper release unpack it and cd to the root. To start ZooKeeper you need a configuration file. Here is a sample, create it in conf/zoo.cfg. 

tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181

You may also use conf/zoo_sample.cfg as a sample and update it as what you need.

This file can be called anything, but for the sake of this discussion call it conf/zoo.cfg. Change the value of dataDir to specify an existing (empty to start with) directory. Here are the meanings for each of the fields:

tickTime : the basic time unit in milliseconds used by ZooKeeper. It is used to do heartbeats and the minimum session timeout will be twice the tickTime.

dataDir : the location to store the in-memory database snapshots and, unless specified otherwise, the transaction log of updates to the database.

clientPort : the port to listen for client connections

Now that you created the configuration file, you can start ZooKeeper and Connect to ZooKeeper by the following commands:

bin/zkServer.sh start
bin/zkCli.sh -server 127.0.0.1:2181

Step 1: Download the code

Download the 1.0.0 release and un-tar it.

tar -xzf kafka_2.11-1.0.0.tgz
cd kafka_2.11-1.0.0

Step 2: Start the server

bin/kafka-server-start.sh config/server.properties

Step 3: Create a topic

Let's create a topic named "test" with a single partition and only one replica:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

We can now see that topic if we run the list topic command:

bin/kafka-topics.sh --list --zookeeper localhost:2181

Step 4: Send some messages

Run the producer and then type a few messages into the console to send to the server.

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> This is a message
> This is another message

Step 5: Start a consumer

Kafka also has a command line consumer that will dump out messages to standard output.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
> This is a message
> This is another message

Since we have had both ZooKeeper and Kafka servers setup and have played around Kafka topics, producers and consumers, we are ready to run our first streams demo APP.

Run Streams Demo Application

This quickstart example will demonstrate how to run a streaming application coded in this library. Here is the gist of the WordCountDemo example code:

// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
 
// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream("streams-plaintext-input",
    Consumed.with(stringSerde, stringSerde);
 
KTable<String, Long> wordCounts = textLines
    // Split each text line, by whitespace, into words.
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
 
    // Group the text words as message keys
    .groupBy((key, value) -> value)
 
    // Count the occurrences of each word (message key).
    .count()
 
// Store the running counts as a changelog stream to the output topic.
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

Step 1: Implement the word count APP

From the Kafka root directory, run the command below to create the path recursively.

mkdir -p streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount

We can implement the word count APP by creating a java file and copying and paste the code above.

vim WordCountDemo.java

Step 2: Prepare input topic and start Kafka producer

Next, we create the input topic named streams-plaintext-input and the output topic named streams-wordcount-output:

bin/kafka-topics.sh --create \
    --zookeeper localhost:2181 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-plaintext-input

Note: we create the output topic with compaction enabled because the output stream is a changelog stream

bin/kafka-topics.sh --create \
    --zookeeper localhost:2181 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-wordcount-output \
    --config cleanup.policy=compact

The created topic can be described with the same kafka-topics tool:

bin/kafka-topics.sh --zookeeper localhost:2181 --describe

Step 3: Start the Wordcount Application

The following command starts the WordCount demo application:

bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

The demo application will read from the input topic streams-plaintext-input, perform the computations of the WordCount algorithm on each of the read messages, and continuously write its current results to the output topic streams-wordcount-output. Hence there won't be any STDOUT output except log entries as the results are written back into in Kafka.

Now we can start the console producer in a separate terminal to write some input data to this topic:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input

and inspect the output of the WordCount demo application by reading from its output topic with the console consumer in a separate terminal:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

Step 4: Process some data

Now let's write some message with the console producer into the input topic streams-plaintext-input by entering a single line of text and then hit <RETURN>. This will send a new message to the input topic, where the message key is null and the message value is the string encoded text line that you just entered (in practice, input data for applications will typically be streaming continuously into Kafka, rather than being manually entered as we do in this quickstart):

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
> all streams lead to kafka

This message will be processed by the Wordcount application and the following output data will be written to the streams-wordcount-output topic and printed by the console consumer:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

> all     1
> streams 1
> lead    1
> to      1
> kafka   1

To test the APP further, we can type the following to Kafka producer:

> hello kafka streams

In your other terminal in which the console consumer is running, you will observe that the WordCount application wrote new output data:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
 
all     1
streams 1
lead    1
to      1
kafka   1
hello   1
kafka   2
streams 2

 

Here the last printed lines kafka 2 and streams 2 indicate updates to the keys kafka and streams whose counts have been incremented from 1 to 2. Whenever you write further input messages to the input topic, you will observe new messages being added to the streams-wordcount-output topic, representing the most recent word counts as computed by the WordCount application.

Step 5: Teardown the application

You can now stop the console consumer, the console producer, the Wordcount application, the Kafka broker and the ZooKeeper server in order via Ctrl-C.

blog tag: