Spring boot Kafka Integration

Introduction

Introduction

Kafka is an open source streaming platform which scales very well in a horizontal way without compromising speed and efficiency.

The Kafka core is written in Scala, and Kafka Streams and KSQL are written in Java.

Features

Kafka is a service bus

To connect heterogeneous applications, we need to implement a message publication mechanism to send and receive messages among them. A message router is known as message broker. Kafka is a message broker, a solution to deal with routing messages among clients in a quick way.

Kafka architecture has two directives

  1. The first is to not block the producers(to deal with back pressure).

  2. The second is to isolate producers and consumers. The producers should not know who their consumers are, hence the kafka follows the dumb broker and smart clients model.

Tip
Back pressure is to make applications robust against data surges. when producer generates data higher rate than consumer consumes

Kafka is a real-time messaging system

Kafka is a software solution with a publish-subscribe model with following features

  1. open source

  2. distributed

  3. partitioned

  4. replicated

  5. commit-log based

Kafka Terminology

Important Kafka Terminology
  • Broker: Kafka server, also the Kafka server process itself.

  • Cluster: Set of Kafka Brokers.

  • Zookeeper: Cluster coordinator.

  • Topic: This is a queue. a broker can run several topics.

  • Offset: This is an identifier for each message.

  • Partition: Immutable and ordered sequence of records continually appended to a structured commit log.

  • Producer: Program that publishes data to topics.

  • Consumer: Program that processes data from the topics.

  • Retention Period: Time to keep messages available for consumption

Kafka Architecture

Role of Zookeeper

Most of the contemporary distributed orchestration systems such as Kubernetes and Swarm rely on a distributed key/value pair for maintaining the global state of the cluster.

Consul, etcd, and even Redis are used for service discovery and cluster state management.

Apache Kafka was designed much before these lightweight services are built.

zooKeeper

Kafka uses Apache ZooKeeper as the distributed configuration store.

It forms the backbone of Kafka cluster that continuously monitors the health of the brokers.

When new brokers get added to the cluster, ZooKeeper will start utilizing it by creating topics and partitions on it.

Apart from cluster management, initial versions of Kafka used ZooKeeper for storing the partition and offset information for each consumer.

Starting from 0.10, that information has moved to an internal Kafka topic

Cluster Types

Kafka supports three types of clusters

  1. Single node - single broker

  2. Single node - multiple broker

  3. Multiple node - multiple broker

Message Delivery Modes

Kafka, there are three ways to deliver messages

  1. Never redelivered: The messages may be lost because, once delivered, they are not sent again

  2. May be redelivered: The messages are never lost, if it is not received, the message can be sent again

  3. Delivered once: The message is delivered exactly once. (most difficult form of delivery)

Logs

TBD

Message log can be compacted in two ways

  • Coarse-grained: Log compacted by time

  • Fine-grained: Log compacted by message

Kafka Installation

There are three ways to install a Kafka environment

  1. Download executable files

  2. Homebrew or yum package managers

  3. Installing Confluent Platform

Note
JDK8 and Scala are pre-requisite software to install kafka
Warning
Ensure you have at least 4GB of RAM, and installation directory will be /usr/local/kafka/ for MacOS and /opt/kafka/ for Linux users.

Installing on macOS

  • install sbt(Scala build tool) with brew, execute the following

sbt.sh
$brew install sbt # (1)

$brew upgrade sbt # (2)
  • Install Scala with brew

scala.sh
$brew install scala #(1)

$brew upgrade scala #(2)

Installing on Windows

This will explain installation of kafka on windows machine

Installing on Linux

TBD

Running Kafka

There are two ways to run Kafka, depending on whether we install it directly or through Confluent Platform

Tip
If you installed kafka with brew the location would be /usr/local/etc/kafka

There are two types of configurations possible with Kafka

  • Standalone

  • Cluster

The real power of Kafka is unlocked when running with replication in cluster mode and all topics are correctly partitioned.

Cluster mode has two main advantages

  • Parallelism: it is the capacity to run tasks simultaneously among the cluster members.

  • Redundancy: warrants that, when a Kafka node goes dow, the cluster is safe and accessible from the other running nodes.

Running Zookeeper

zookeeper.sh
$zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &

Running Kafka Server (Standalone Mode)

kafka.sh
$kafka-server-start /usr/local/etc/kafka/server.properties &
Tip
Append & at the end to run as separate deamon

Refer below link for detailed description of supported server properties

Caution
Zookeeper must be running on the machine before starting Kafka.

Running Kafka Server (Cluster Mode)

Refer server.properties file and create two broker configurations.

server.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

#port on which kafka server(broker) will run
port=9093

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs


# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

Create two broker configurations as mentioned below

  • mark-1.properties

mark-1.properties
broker.id=1
port=9093
zookeeper.connect=localhost:2181
log.dirs=/tmp/mark-1-logs
  • mark-2.properties

mark-1.properties
broker.id=1
port=9093
zookeeper.connect=localhost:2181
log.dirs=/tmp/mark-1-logs
  • Run first broker

broker-1.sh
kafka-server-start mark-1.properties &
  • Run second broker

broker-2.sh
kafka-server-start mark-2.properties &
Tip
you can check status of kafka broker with below command
status.sh
tvajjala$lsof -i:9093
COMMAND   PID     USER   FD   TYPE             DEVICE SIZE/OFF NODE NAME
java    43439 tvajjala  106u  IPv6 0x672b0fb3db715d83      0t0  TCP *:9093 (LISTEN)
java    43439 tvajjala  122u  IPv6 0x672b0fb3db717483      0t0  TCP tvajjala.home:57770->tvajjala.home:9093 (ESTABLISHED)
java    43439 tvajjala  123u  IPv6 0x672b0fb3db714683      0t0  TCP tvajjala.home:9093->tvajjala.home:57770 (ESTABLISHED)
tvajjala$lsof -i:9094
COMMAND   PID     USER   FD   TYPE             DEVICE SIZE/OFF NODE NAME
java    43439 tvajjala  127u  IPv6 0x672b0fb3e101e203      0t0  TCP tvajjala.home:57774->tvajjala.home:9094 (ESTABLISHED)
java    43708 tvajjala  106u  IPv6 0x672b0fb3db716903      0t0  TCP *:9094 (LISTEN)
java    43708 tvajjala  116u  IPv6 0x672b0fb3e101d683      0t0  TCP tvajjala.home:9094->tvajjala.home:57774 (ESTABLISHED)

Dealing With Topics(CLI)

Kafka, like almost all modern infrastructure projects, has three ways of building things:

  • Through the command line,

  • Through programming,

  • Through a web console (in this case the Confluent Control Center).

Kafka has pre-built utilities to manage brokers as we already saw and to manage topics

Create Topic

create_topic.sh
$kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic BarCap
--partitions

parameter controls the parallelism

--replication-factor

parameter controls the redundancy

--topic

name of topic to be created

--zookeeper

zookeeper cluster host

Input Parameters
  • --replication-factor parameter is fundamental as it specifies in how many servers of the cluster the topic is going to replicate. On the other hand, one broker can run just one replica.

org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1

  • --partitions parameter, as its name implies, how many partitions the topic will have. The number determines the parallelism that can be achieved on the consumer side.

  • --zookeeper parameter indicates where the Zookeeper cluster is running.

List Topics

list_topics.sh
$kafka-topics --list --zookeeper localhost:2181

Create Producer

producer.sh
$kafka-console-producer --broker-list localhost:9092 --topic BarCap
>This is first message to Barclay
>Second message to FraudTC
--broker-list

This specifies the Zookeeper servers specified as a comma- separated list in the form, hostname:port.

--topic

This parameter is followed by the name of the target topic.

--sync

This specifies whether the messages should be sent synchronously.

--compression-codec

This specifies the compression codec used to produce the messages. The possible options are: none, gzip, snappy, or lz4. If not specified, the default is gzip.

--batch-size

If the messages are not sent synchronously, but the message size is sent in a single batch, this value is specified in bytes.

--message-send-max-retries

As the brokers can fail receiving messages, this parameter specifies the number of retries before a producer gives up and drops the message. This number must be a positive integer.

--retry-backoff-ms

In case of failure, the node leader election might take some time. This parameter is the time to wait before producer retries after this election. The number is the time in milliseconds.

--timeout

If the producer is running in asynchronous mode and this parameter is set, it indicates the maximum amount of time a message will queue awaiting for the sufficient batch size. This value is expressed in milliseconds.

--queue-size

If the producer is running in asynchronous mode and this parameter is set, it gives the maximum amount of messages will queue awaiting the sufficient batch size.

Create Consumer

consumer.sh
$kafka-console-consumer --bootstrap-server localhost:9092 --topic BarCap --from-beginning
 — from-beginning

parameter indicates that messages should be consumed from the beginning instead of the last messages in the log

--fetch-size

This is the amount of data to be fetched in a single request. The size in bytes follows as argument. The default value is 1,024 x 1,024. --socket-buffer-size: This is the size of the TCP RECV. The size in bytes follows this parameter. The default value is 2 x 1024 x 1024.

--formater

This is the name of the class to use for formatting messages for display. The default value is NewlineMessageFormatter.

--autocommit.interval.ms

This is the time interval at which to save the current offset in milliseconds. The time in milliseconds follows as argument. The default value is 10,000.

--max-messages

This is the maximum number of messages to consume before exiting. If not set, the consumption is continuous. The number of messages follows as the argument.

--skip-message-on-error

If there is an error while processing a message, the system should skip it instead of halting.

Describe Topic

describe.sh
$kafka-topics --describe --zookeeper localhost:2181 --topic BarCapTC
Topic:BarCapTC PartitionCount:1 ReplicationFactor:2 Configs:
 Topic: BarCapTC Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
PartitionCount

Number of partitions on the topic (parallelism)

ReplicationFactor

Number of replicas on the topic (redundancy)

Leader

Node responsible for reading and writing operations of a given partition

Replicas

List of brokers replicating this topic data; some of these might even be dead

Isr

List of nodes that are currently in-sync replicas

Delete Topic

delete.sh
$kafka-topics --delete  --zookeeper localhost:2181 --topic=BarCap

Kafkacat

Kafkacat is a generic command-line non-JVM utility used to test and debug apache Kafka deployments.

Kafkacat can be used to produce, consume, and list topic and partition information for Kafka.

Kafkacat is netcat for Kafka, and it is a tool for inspecting and creating data in Kafka.

Tip
kafkacat is similar to kafka-console-producer and kafka-console-consumer but more powerful

Kafkacat is an open source utility and it is available at https://github.com/edenhill/kafkacat

install_kafkacat.sh
brew install kafkacat <macOS>
apt-get install kafkacat <Linux>

Subscribe to topic BarCap and BarCapTC

subscribe.sh
kafkacat -b localhost:9093 -t BarCap BarCapTC

Message Processing (Java API)

Message Processing involves following:

  • Message structure validation against a message schema

  • Given an event stream, filtering the messages from the stream

  • Message enrichment with additional data

  • Message aggregation(composition) form two or more message to produce a new message

Message Formats

Message can be represented in several formats

  • JSON Notation

  • Apache Avro

  • Apache Thrift

  • Protocol Buffers

Tip
JSON is easily read and written by both humans and machines, as counterweight, binary representation is very fast and lightweight in processing.

Aapache Avro

Avro is a remote procedure call and data serialization framework developed within Apache’s Hadoop project. It uses JSON for defining data types and protocols, and serializes data in a compact binary format.

Read more about Avro spec at https://avro.apache.org/docs/1.8.2/spec.html

Message Processing with Spring Boot

This Spring boot application will produces and consumes payment details as JSON message

Update configuration under resources/application.yml

application.yaml
kafka:
  broker:
    address: localhost:9092 #(1)

message:
  topic:
    payments: payments  #(2)
  1. Kafka Server(Broker) address

  2. Topic name where we send messages

Create KafkaTopicConfiguration to create Topic in the Kafka broker

KafkaTopicConfig.java
/**
 * This configuration lets you to create Topic in the specified broker
 *
 * @author ThirupathiReddy Vajjala
 */
@Configuration
public class KafkaTopicConfig {

    @Value("${kafka.broker.address}")
    private String kafkaBrokerAddress; #(1)


    /**
     * An admin that delegates to an {@link org.apache.kafka.clients.admin.AdminClient} to create topics defined
     * in the application context.
     *
     * @return KafkaAdmin {@link KafkaAdmin}
     */
    @Bean
    public KafkaAdmin kafkaAdmin() {
        return new KafkaAdmin(singletonMap(BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerAddress)); #(2)
    }


    /**
     * Create Topic on the above specified broker instance
     *
     * @param topicName topicName from configuration
     * @return topicInstance
     */
    @Bean
    public NewTopic paymentDetailsTopic(@Value("${message.topic.payments}") String topicName) {

        return new NewTopic(topicName, 1, (short) 1); #(3)
    }



}
  1. Kafka Broker address which is referred from application.yml

  2. KafkaAdmin is an admin that delegates to an AdminClient to create topics defined in the application context.

  3. Creates Topic in the Kafka Broker with given name while bootstrapping the application

Create KafkaTemplate to send(produce) messages into Topic

KafkaTemplateConfig.java
/**
 * This class lets you allow to create {@link KafkaTemplate} instance,
 * which is used to send messages to Topic.
 * <p>
 * {@link KafkaTemplate} requires {@link ProducerFactory} instance
 * <p>
 * {@link org.apache.kafka.clients.producer.Producer} instance and {@link KafkaTemplate} are thread-safe.
 *
 * @author ThirupathiReddy Vajjala
 */
@Configuration
public class KafkaTemplateConfig {


    @Value("${kafka.broker.address}")
    private String kafkaBrokerAddress;


    @Bean
    public ProducerFactory<String, PaymentDetails> producerFactory() {
        Map<String, Object> configProperties = new HashMap<>();
        configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerAddress);
        configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); #(1)
        configProperties.put(ProducerConfig.ACKS_CONFIG, "all");
        configProperties.put(ProducerConfig.RETRIES_CONFIG, 0);
        return new DefaultKafkaProducerFactory<>(configProperties); #(2)
    }


    /**
     * KafkaTemplate used to produce messages into topics
     *
     * @param producerFactory producerFactory instance
     * @return kafkaTemplate {@link KafkaTemplate}
     */
    @Bean
    public KafkaTemplate<String, PaymentDetails> kafkaTemplate(ProducerFactory producerFactory) {

        return new KafkaTemplate<>(producerFactory);  #(3)
    }


}
  1. Custom Message Serializer

  2. ProducerFactory is used to specify producer configuration and type of messages we are sending

  3. KafkaTemplate is wrapper to ProducerFactory which will ease sending messages programmatically

Create Consumer Configuration to receive messages

KafkaConsumerConfig.java
/**
 * For consuming messages, we need to configure {@link ConsumerFactory}
 * <p>
 * and {@link org.springframework.kafka.config.KafkaListenerContainerFactory}. Once these beans are available POJO based consumers
 * can be configured using @{@link org.springframework.kafka.annotation.KafkaListener} annotation
 */
//@EnableKafka
@Configuration
public class KafkaConsumerConfig {


    @Value("${kafka.broker.address}")
    private String kafkaBrokerAddress;

    private static final String GROUP_ID = "FRAUD";

    @Bean
    public ConsumerFactory<String, PaymentDetails> consumerFactory() {
        Map<String, Object> configProperties = new HashMap<>();
        configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerAddress);
        configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);

        return new DefaultKafkaConsumerFactory<>(configProperties,
                new StringDeserializer(),
                new JsonDeserializer<>(PaymentDetails.class)); #(1)
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, PaymentDetails> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, PaymentDetails> factory
                = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory()); #(2)
        return factory;
    }
}
  1. Custom Message Deserializer

  2. KafkaListener to read messages

Note
We are using PaymentDetails object as Kafka message.

Create Producer and send message to the Topic

MessageProducer.java
/**
 * This spring component sends messages to the given Topic for every 5 seconds.
 * The response is Blocking object on which we added callback function to read the status of our message
 */
@Component
public class MessageProducer implements CommandLineRunner {


    @Autowired
    KafkaTemplate kafkaTemplate;

    @Value("${message.topic.payments}")
    String paymentMessageTopic;


    private static final Logger LOGGER = LoggerFactory.getLogger(MessageProducer.class);


    @Override
    public void run(String... args) throws InterruptedException {

        do {
            sendMessage(new PaymentDetails(currentTimeMillis(), "12345", "Thiru", 1234.0));
            Thread.sleep(5000);
        } while (true);


    }


    public void sendMessage(PaymentDetails message) {

        ListenableFuture<SendResult<String, PaymentDetails>> future = kafkaTemplate.send(paymentMessageTopic, message);


        future.addCallback(new ListenableFutureCallback<SendResult<String, PaymentDetails>>() {


            @Override
            public void onFailure(Throwable ex) {
                LOGGER.warn("Message Sending Failed {} ", ex);
            }

            @Override
            public void onSuccess(SendResult<String, PaymentDetails> result) {

                LOGGER.info("Message [{}] sent to topic [{}]", result.getProducerRecord().value(), result.getRecordMetadata().topic());
            }
        });
    }
}

From the terminal use below command to lister to that Topic

listen_topic.sh
$kafka-console-consumer --bootstrap-server localhost:9092 --topic payments --from-beginning

Alternatively, we can create Java Client as shown below read messages

MessageConsumer.java
@Component
public class MessageConsumer {


    private static final Logger LOGGER = LoggerFactory.getLogger(MessageConsumer.class);

    @KafkaListener(topics = "payments", groupId = "FRAUD")   #(1)
    public void receivePaymentDetails(PaymentDetails message) {

        LOGGER.info("Received Message {} ", message);

    }

}
  1. KafkaListener annotation used in any spring bean message to listen to Topic

Comments

Popular posts from this blog

IBM Datapower GatewayScript

Spring boot SOAP Web Service Performance

Source code migration (Github <=> Bitbucket)