Looking for an Expert Development Team? Take two weeks Trial! Try Now

Kafka Streams: An Introduction and Developers Guide

banner

Kafka Stream is a transformation library that comes with Kafka binary for analyzing and processing data stored in Kafka topics.It is used for data transformation, enrichment, fraud detection, monitoring and alerts. Kafka streams are standard java applications that takes data from Kafka topics to process and analyze and then stores results back to Kafka topics.It is highly scalable, fault tolerant, requires no separate cluster and leverages the full capabilties of Kafka. Moreover, it is the only streaming library semanctics in the streaming world that has exactly once and is true streaming framework because it processes one record at a time rather than micro batches.It works for application of any size either small or big.

Kafka Stream Vs other Streaming Libraries

There are many streaming libraries in the market today such as Spark Streaming, Nifi, and Flink etc. All these libraries are constantly evolving and it entirely depends on your usecase to decide which one to select.

  Kafka Streams Spark Streaming Flink Nifi
Cluster Required No Yes Yes Yes
Real time Streaming Yes No Yes Yes
Micro Batches No Yes No No
Semantics Exactly Once At least Once At least Once At least Once
Implementation Code based Code based Code based Drag & drop

Who uses Kafka Streams?

Kafka streams

Many companies are using Kafka streams to build their real time streaming pipeline.

Newyork Time - is using Kafka Stream to store & distribute their content in real time to their readers.

Pinrest - is using Kafka Streams to predict the budget of their advertising infrastructure.

Rabobank - one of the top bank in Netherlands, is using Kafka streams to alert customers in real time for financial events.

Kafka Stream Core Concepts

Kafka Stream Core Concepts

Processor topology: Kafka streams application is represented in the form of processor topology that is simply a graph of stream processors connected with streams. It defines the computation logic of your application and how your input data is transformed to output data.

Source Processors: This is a special processor that takes data directly from kafka topics and then forward that data into downstream processors.It does not do any transformations , it just read data from kafka topic and create stream out of it to send it to further processor in the topology for transformatiom.

Sink Processors: This processor takes transformed or enriched data from upstream processor and send it to Kafka topics

Stream: Sequence of fully ordered immutable data records that can be replayed in case of failures. These data records corresponds to records in Kafka topic.

Stream Processor: It is node in processor topology that transforms incoming streams record by record and then create new stream from it.

How to write Topologies?

We can write topologies in two ways:

High level DSL: It is high-level API, simple, most common transformation operations such as filter, map, join etc

Low Level Processor API: Low-level API used to implement complex logic but rarely needed.

Kafka Stream Mandatory Configurations

bootstrap.servers Needed to connect to kafka at port 9092
Auto.offset.reset.config Earliest- to consume topic from start Latest – to consume topic from latest offset
Application.id Id given to kafka stream application
Default Key Serde [Key/Value] Serialization & deserialization for keys
Default Value Serde [Key/Value] Serialization & deserialization for values

Kafka Stream Maven Dependencies

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.0.0</version> </dependency>

Setting up the properties

import org.apache.kafka.streams.StreamConfig; import org.apache.kakfka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import java.util.Properties; public class sampleStreamApp { public static void main(String[] args) { Properties config = new Properties(); config.put(StreamConfig.APPLICATION_ID_CONFIG, ”SampleStreamAPP”); config.put(StreamConfig.BOOTSTRAP_SERVER_CONFIG, localhost: 9092); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, ”earliest”); config.put(StreamConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serde.String().getClass()); config.put(StreamConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serde.String().getClass()); } }

Building KStream from Kafka Topic

KstreamBuilder builder = new KstreamBuilder(); Kstream<String,String> input = builder.stream(topic,”sample-input”);

Ktables/Kstreams

Kstream is infinite, unbounded stream of immutable data stream. Whenever a new record is added to Kakfa topic, that record is appended to Kstream.

Kstreams

Ktables are similar to database table. It deletes the records on null value, it updates the records if same key appear again. It is used if you want to apply unique constraint on the data like primary key in database table.In below example, Ktables updated the “Mary” record from value 23 to 27 and deleted record with key as “john” since it came with null value.

Kafka Topic Records

MapValues

It apply the operation on the values of the records. For example, I want to change the stream data into uppercase.This function is supported in both Kstream and Ktables.

stream.mapValues(value -> value.toUpperCase());
MapValues

Filter/FilterNot

Filter apply some condition on Kstreams/Ktables to get the filtered stream where as FilterNot is complete opposite of Filter.Each record produces zero or one record depending upon whether it satisfies the condition. Below statement filter out all values that are less than or equal to zero and give back a new Kstream with values greater than zero.

Stream.filter((key,value) -> value > 0);
Filter

FlatMapValues/FlatMap

Flatmap takes one record and produces zero,one or more records.FlatMapValues does not change the keys, does not trigger any repartition where as FlatMap change the keys and triggers the repartition. Both the operations are for Kstream only.

Stream.flatMapValues(value -> Arrays.asList(value.split(“\\s+”)));
FlatMapValues

Branch

Branch splits a Kstream into multiple Kstream based on one or more predicates.

Stream.branch( (key, value) - > value > 100, (key, value) - > value > 10, (key, value) - > value > 0 )

In above example, predicates are evaluated in order. If a record comes whose value is greater than 100, it will become part of first Kstream else it will be evaluated for next predicates. If record does not match any predicate then that particular record will get dropped.Above operation will create 3 Kstreams with first one carrying all data greater than 100, second with data between 10 & 100 and third Kstream with data between 0 & 10. All records with negative values will be dropped.

Branch

SelectKey

selectKey assigns a new key to the record. Below operation will assign the first letter of old key as the new key.

stream.selectKey((key,value) -> key.subString(0,1));
SelectKey

Reading from kafka

We can read data from Kafka as a Kstream or as a Ktable. As discussed earlier, Ktables and Kstreams have difference in terms of how it deals with null values or how it deals with values with same key.

//Reading as Kstream Kstream < String, Long > wordcounts = builder.stream(Serdes.String(), //Key Serde Serdes.Long(), //Value Serde “topic - name” //input topic ) //Reading as Ktable KTable < String, Long > wordcounts = builder.table(Serdes.String(), //Key Serde Serdes.Long(), //Value Serde “topic - name” //input topic )

Writing to kafka

There are two API’s for Writing to a topic.”To” method simply writes to a topic and return nothing where as “through” method writes to a Kafka topic and also returns the stream with updated data.

//writing to topic stream.to(“topic-name”); table.to(“topic-name”); //writing & reading newly written data Kstream < String, Long > newStream = stream.through(“topic - name”); Ktable < String, Long > newStream = table.through(“topic - name”);

Conclusion

All real time machine learning processing framework are still evolving at constant pace with new APIs everyday but Kafka streams undoubtedly has an edge over others because of its integeration with Kafka, exactly one semantics and its real time processing power.

Read More:

DMCA Logo do not copy