Looking for an Expert Development Team? Take two weeks Trial! Try Now

Kafka Streams: An Introduction and Developer’s Guide for Core Concept

banner

Kafka Stream is a transformation library that comes with Kafka binary for analyzing and processing data stored in Kafka topics. It is used for data transformation, enrichment, fraud detection, monitoring and alerts.

Kafka streams are standard java applications that take data from Kafka topics to process and analyze and then stores results back to Kafka topics. It is highly scalable, fault tolerant, requires no separate cluster and leverages the full capabilities of Kafka.

Moreover, it is the only streaming library semantics in the streaming world that has exactly once and is a true streaming framework because it processes one record at a time rather than micro batches. It works for applications of any size either small or big.

Kafka Stream Vs other Streaming Libraries

There are many streaming libraries in the market today such as Spark Streaming, Nifi, and Flink etc. All these libraries are constantly evolving and it entirely depends on your use case to decide which one to select.

  Kafka Streams Spark Streaming Flink Nifi
Cluster Required No Yes Yes Yes
Real time Streaming Yes No Yes Yes
Micro Batches No Yes No No
Semantics Exactly Once At least Once At least Once At least Once
Implementation Code based Code based Code based Drag & drop

Who Uses Kafka Streams For Real-Time Streaming?

Kafka streams

Many companies are using Kafka streams to build their real time streaming pipeline.

Newyork Times is using Kafka Stream to store & distribute their content in real-time to their readers.

Pinrest is using Kafka Streams to predict the budget of their advertising infrastructure.

Rabobank is one of the top banks in the Netherlands, and is using Kafka streams to alert customers in real-time for financial events.

Kafka Stream Core Concepts

Kafka Stream Core Concepts
  • Processor Topology

    Kafka streams application is represented in the form of processor topology that is simply a graph of stream processors connected with streams. It defines the computation logic of your application and how your input data is transformed to output data.

  • Source Processors

    This is a special processor that takes data directly from kafka topics and then forward that data into downstream processors. It does not do any transformations, it just reads data from the kafka topic and creates a stream out of it to send it to further processors in the topology for transformation.

  • Sink Processors

    This processor takes transformed or enriched data from upstream processor and send it to Kafka topics

  • Stream

    Sequence of fully ordered immutable data records that can be replayed in case of failures. These data records correspond to records in Kafka topic.

  • Stream Processor

    It is a node in processor topology that transforms incoming streams record by record and then creates a new stream from it.

How to Write Topologies?

We can write topologies in two ways:

  • High level DSL:

    It is high-level API, simple, most common transformation operations such as filter, map, join etc

  • Low Level Processor API:

    Low-level API used to implement complex logic but rarely needed.

Mandatory Configurations for Kafka Streams

bootstrap.servers Needed to connect to kafka at port 9092
Auto.offset.reset.config Earliest- to consume topic from start Latest – to consume topic from latest offset
Application.id Id given to kafka stream application
Default Key Serde [Key/Value] Serialization & deserialization for keys
Default Value Serde [Key/Value] Serialization & deserialization for values

Kafka Stream Maven Dependencies

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.0.0</version> </dependency>

Setting up the Properties

import org.apache.kafka.streams.StreamConfig; import org.apache.kakfka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import java.util.Properties; public class sampleStreamApp { public static void main(String[] args) { Properties config = new Properties(); config.put(StreamConfig.APPLICATION_ID_CONFIG, ”SampleStreamAPP”); config.put(StreamConfig.BOOTSTRAP_SERVER_CONFIG, localhost: 9092); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, ”earliest”); config.put(StreamConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serde.String().getClass()); config.put(StreamConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serde.String().getClass()); } }

Build KStream from Kafka Topic

KstreamBuilder builder = new KstreamBuilder(); Kstream<String,String> input = builder.stream(topic,”sample-input”);

Ktables/Kstreams

Kstream is an infinite, unbounded stream of immutable data streams. Whenever a new record is added to Kafka topic, that record is appended to Kstream.

Kstreams

Ktables are similar to database tables. It deletes the records on null value, it updates the records if the same key appears again. It is used if you want to apply unique constraints on the data like the primary key in the database table. In the below example, Ktables updated the “Mary” record from value 23 to 27 and deleted the record with key as “john” since it came with null value.

Kafka Topic Records

MapValues

It applies the operation on the values of the records. For example, I want to change the stream data into uppercase. This function is supported in both Kstream and Ktables.

stream.mapValues(value -> value.toUpperCase());
MapValues

Filter/FilterNot

Filter applies some condition on Kstreams/Ktables to get the filtered stream whereas FilterNot is completely opposite of Filter. Each record produces zero or one record depending upon whether it satisfies the condition. Below statement filters out all values that are less than or equal to zero and gives back a new Kstream with values greater than zero.

Stream.filter((key,value) -> value > 0);
Filter

FlatMapValues / FlatMap

Flatmap takes one record and produces zero,one or more records. FlatMapValues does not change the keys, does not trigger any repartition whereas FlatMap changes the keys and triggers the repartition. Both the operations are for Kstream only.

Stream.flatMapValues(value -> Arrays.asList(value.split(“\\s+”)));
FlatMapValues

Branch

Branch splits a Kstream into multiple Kstream based on one or more predicates.

Stream.branch( (key, value) - > value > 100, (key, value) - > value > 10, (key, value) - > value > 0 )

In the above example, predicates are evaluated in order. If a record comes whose value is greater than 100, it will become part of the first Kstream else it will be evaluated for next predicates. If the record does not match any predicate then that particular record will get dropped.

Above operation will create 3 Kstreams with the first one carrying all data greater than 100, second with data between 10 & 100 and third Kstream with data between 0 & 10. All records with negative values will be dropped.

Branch

SelectKey

SelectKey assigns a new key to the record. Below operation will assign the first letter of the old key as the new key.

stream.selectKey((key,value) -> key.subString(0,1));
SelectKey

Reading from kafka

We can read data from Kafka as a Kstream or as a Ktable. As discussed earlier, Ktables and Kstreams have differences in terms of how it deals with null values or how it deals with values with the same key.

//Reading as Kstream Kstream < String, Long > wordcounts = builder.stream(Serdes.String(), //Key Serde Serdes.Long(), //Value Serde “topic - name” //input topic ) //Reading as Ktable KTable < String, Long > wordcounts = builder.table(Serdes.String(), //Key Serde Serdes.Long(), //Value Serde “topic - name” //input topic )

Writing to Kafka

There are two API’s for Writing to a topic. ”To” method simply writes to a topic and returns nothing whereas the “through” method writes to a Kafka topic and also returns the stream with updated data.

//writing to topic stream.to(“topic-name”); table.to(“topic-name”); //writing & reading newly written data Kstream < String, Long > newStream = stream.through(“topic - name”); Ktable < String, Long > newStream = table.through(“topic - name”);

Conclusion

All real-time machine learning processing frameworks are still evolving at a constant pace with new API’s everyday but Kafka streams undoubtedly have an edge over others because of its integration with Kafka, exactly one semantics and its real-time processing power.

Read More:

DMCA Logo do not copy