Spring boot Kafka Integration
Introduction
Kafka is an open source streaming platform which scales very well in a horizontal way without compromising speed and efficiency.
The Kafka core
is written in Scala, and Kafka Streams
and KSQL
are written in Java.
Features
Kafka is a service bus
To connect heterogeneous applications, we need to implement a message publication mechanism to send and receive messages among them.
A message router is known as message broker. Kafka is a message broker
, a solution to deal with routing messages among clients in a quick way.
Kafka architecture has two directives
-
The first is to
not block the producers
(to deal with back pressure). -
The second is to
isolate producers and consumers
. The producers should not know who their consumers are, hence the kafka follows the dumb broker and smart clients model.
Tip
|
Back pressure is to make applications robust against data surges. when producer generates data higher rate than consumer consumes |
Kafka is a real-time messaging system
Kafka is a software solution with a publish-subscribe model with following features
-
open source
-
distributed
-
partitioned
-
replicated
-
commit-log based
Kafka Terminology
Kafka Architecture
Role of Zookeeper
Most of the contemporary distributed orchestration systems such as Kubernetes
and Swarm
rely on a distributed key/value pair for maintaining the global state of the cluster.
Consul
, etcd
, and even Redis
are used for service discovery and cluster state management.
Apache Kafka was designed much before these lightweight services are built.
Kafka uses Apache ZooKeeper
as the distributed configuration store.
It forms the backbone of Kafka cluster that continuously monitors the health of the brokers.
When new brokers get added to the cluster, ZooKeeper will start utilizing it by creating topics and partitions on it.
Apart from cluster management, initial versions of Kafka used ZooKeeper for storing the partition and offset information for each consumer.
Starting from 0.10, that information has moved to an internal Kafka topic
Cluster Types
Kafka supports three types of clusters
-
Single node - single broker
-
Single node - multiple broker
-
Multiple node - multiple broker
Message Delivery Modes
Kafka, there are three ways to deliver messages
-
Never redelivered: The messages may be lost because, once delivered, they are not sent again
-
May be redelivered: The messages are never lost, if it is not received, the message can be sent again
-
Delivered once: The message is delivered exactly once. (most difficult form of delivery)
Logs
TBD
Message log can be compacted in two ways
-
Coarse-grained: Log compacted by time
-
Fine-grained: Log compacted by message
Kafka Installation
There are three ways to install a Kafka environment
-
Download executable files
-
Homebrew
oryum
package managers -
Installing Confluent Platform
Note
|
JDK8 and Scala are pre-requisite software to install kafka |
Warning
|
Ensure you have at least 4GB of RAM, and installation directory will be /usr/local/kafka/ for MacOS and /opt/kafka/ for Linux users. |
Installing on macOS
-
install sbt(Scala build tool) with brew, execute the following
$brew install sbt # (1)
$brew upgrade sbt # (2)
-
Install Scala with brew
$brew install scala #(1)
$brew upgrade scala #(2)
Installing on Windows
This will explain installation of kafka on windows machine
Installing on Linux
TBD
Running Kafka
There are two ways to run Kafka, depending on whether we install it directly or
through Confluent Platform
Tip
|
If you installed kafka with brew the location would be /usr/local/etc/kafka
|
There are two types of configurations possible with Kafka
-
Standalone
-
Cluster
The real power of Kafka is unlocked when running with replication in cluster mode and all topics are correctly partitioned.
Cluster mode has two main advantages
-
Parallelism: it is the capacity to run tasks simultaneously among the cluster members.
-
Redundancy: warrants that, when a Kafka node goes dow, the cluster is safe and accessible from the other running nodes.
Running Zookeeper
$zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
Running Kafka Server (Standalone Mode)
$kafka-server-start /usr/local/etc/kafka/server.properties &
Tip
|
Append & at the end to run as separate deamon
|
Refer below link for detailed description of supported server properties
Caution
|
Zookeeper must be running on the machine before starting Kafka .
|
Running Kafka Server (Cluster Mode)
Refer server.properties
file and create two broker configurations.
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
#port on which kafka server(broker) will run
port=9093
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181
Create two broker configurations as mentioned below
-
mark-1.properties
broker.id=1
port=9093
zookeeper.connect=localhost:2181
log.dirs=/tmp/mark-1-logs
-
mark-2.properties
broker.id=1
port=9093
zookeeper.connect=localhost:2181
log.dirs=/tmp/mark-1-logs
-
Run first broker
kafka-server-start mark-1.properties &
-
Run second broker
kafka-server-start mark-2.properties &
Tip
|
you can check status of kafka broker with below command |
tvajjala$lsof -i:9093
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 43439 tvajjala 106u IPv6 0x672b0fb3db715d83 0t0 TCP *:9093 (LISTEN)
java 43439 tvajjala 122u IPv6 0x672b0fb3db717483 0t0 TCP tvajjala.home:57770->tvajjala.home:9093 (ESTABLISHED)
java 43439 tvajjala 123u IPv6 0x672b0fb3db714683 0t0 TCP tvajjala.home:9093->tvajjala.home:57770 (ESTABLISHED)
tvajjala$lsof -i:9094
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 43439 tvajjala 127u IPv6 0x672b0fb3e101e203 0t0 TCP tvajjala.home:57774->tvajjala.home:9094 (ESTABLISHED)
java 43708 tvajjala 106u IPv6 0x672b0fb3db716903 0t0 TCP *:9094 (LISTEN)
java 43708 tvajjala 116u IPv6 0x672b0fb3e101d683 0t0 TCP tvajjala.home:9094->tvajjala.home:57774 (ESTABLISHED)
Dealing With Topics(CLI)
Kafka, like almost all modern infrastructure projects, has three ways of building things:
-
Through the command line,
-
Through programming,
-
Through a web console (in this case the Confluent Control Center).
Kafka has pre-built utilities to manage brokers as we already saw and to manage topics
Create Topic
$kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic BarCap
--partitions |
parameter controls the parallelism |
--replication-factor |
parameter controls the redundancy |
--topic |
name of topic to be created |
--zookeeper |
zookeeper cluster host |
List Topics
$kafka-topics --list --zookeeper localhost:2181
Create Producer
$kafka-console-producer --broker-list localhost:9092 --topic BarCap
>This is first message to Barclay
>Second message to FraudTC
--broker-list |
This specifies the Zookeeper servers specified as a comma- separated list in the form, hostname:port. |
--topic |
This parameter is followed by the name of the target topic. |
--sync |
This specifies whether the messages should be sent synchronously. |
--compression-codec |
This specifies the compression codec used to produce the messages. The possible options are: none, gzip, snappy, or lz4. If not specified, the default is gzip. |
--batch-size |
If the messages are not sent synchronously, but the message size is sent in a single batch, this value is specified in bytes. |
--message-send-max-retries |
As the brokers can fail receiving messages, this parameter specifies the number of retries before a producer gives up and drops the message. This number must be a positive integer. |
--retry-backoff-ms |
In case of failure, the node leader election might take some time. This parameter is the time to wait before producer retries after this election. The number is the time in milliseconds. |
--timeout |
If the producer is running in asynchronous mode and this parameter is set, it indicates the maximum amount of time a message will queue awaiting for the sufficient batch size. This value is expressed in milliseconds. |
--queue-size |
If the producer is running in asynchronous mode and this parameter is set, it gives the maximum amount of messages will queue awaiting the sufficient batch size. |
Create Consumer
$kafka-console-consumer --bootstrap-server localhost:9092 --topic BarCap --from-beginning
— from-beginning |
parameter indicates that messages should be consumed from the beginning instead of the last messages in the log |
--fetch-size |
This is the amount of data to be fetched in a single request. The size in bytes follows as argument. The default value is 1,024 x 1,024. --socket-buffer-size: This is the size of the TCP RECV. The size in bytes follows this parameter. The default value is 2 x 1024 x 1024. |
--formater |
This is the name of the class to use for formatting messages for display. The default value is NewlineMessageFormatter. |
--autocommit.interval.ms |
This is the time interval at which to save the current offset in milliseconds. The time in milliseconds follows as argument. The default value is 10,000. |
--max-messages |
This is the maximum number of messages to consume before exiting. If not set, the consumption is continuous. The number of messages follows as the argument. |
--skip-message-on-error |
If there is an error while processing a message, the system should skip it instead of halting. |
Describe Topic
$kafka-topics --describe --zookeeper localhost:2181 --topic BarCapTC
Topic:BarCapTC PartitionCount:1 ReplicationFactor:2 Configs:
Topic: BarCapTC Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
PartitionCount |
Number of partitions on the topic (parallelism) |
ReplicationFactor |
Number of replicas on the topic (redundancy) |
Leader |
Node responsible for reading and writing operations of a given partition |
Replicas |
List of brokers replicating this topic data; some of these might even be dead |
Isr |
List of nodes that are currently in-sync replicas |
Delete Topic
$kafka-topics --delete --zookeeper localhost:2181 --topic=BarCap
Kafkacat
Kafkacat is a generic command-line non-JVM utility used to test and debug apache Kafka deployments.
Kafkacat can be used to produce, consume, and list topic and partition information for Kafka.
Kafkacat is netcat for Kafka, and it is a tool for inspecting and creating data in Kafka.
Tip
|
kafkacat is similar to kafka-console-producer and kafka-console-consumer but more powerful |
Kafkacat is an open source utility and it is available at https://github.com/edenhill/kafkacat
brew install kafkacat <macOS>
apt-get install kafkacat <Linux>
Subscribe to topic BarCap
and BarCapTC
kafkacat -b localhost:9093 -t BarCap BarCapTC
Message Processing (Java API)
Message Processing involves following:
-
Message structure validation
against a message schema -
Given an event stream,
filtering the messages
from the stream -
Message enrichment
with additional data -
Message aggregation
(composition) form two or more message to produce a new message
Message Formats
Message can be represented in several formats
-
JSON Notation
-
Apache Avro
-
Apache Thrift
-
Protocol Buffers
Tip
|
JSON is easily read and written by both humans and machines, as counterweight, binary representation is very fast and lightweight in processing. |
Aapache Avro
Avro is a remote procedure call and data serialization framework developed within Apache’s Hadoop project. It uses JSON for defining data types and protocols, and serializes data in a compact binary format.
Read more about Avro spec at https://avro.apache.org/docs/1.8.2/spec.html
Message Processing with Spring Boot
-
✓ Create Spring Boot Gradle project at https://start.spring.io/
This Spring boot application will produces and consumes payment details as JSON message
Update configuration under resources/application.yml
kafka:
broker:
address: localhost:9092 #(1)
message:
topic:
payments: payments #(2)
-
Kafka Server(Broker) address
-
Topic name where we send messages
Create KafkaTopicConfiguration to create Topic in the Kafka broker
/**
* This configuration lets you to create Topic in the specified broker
*
* @author ThirupathiReddy Vajjala
*/
@Configuration
public class KafkaTopicConfig {
@Value("${kafka.broker.address}")
private String kafkaBrokerAddress; #(1)
/**
* An admin that delegates to an {@link org.apache.kafka.clients.admin.AdminClient} to create topics defined
* in the application context.
*
* @return KafkaAdmin {@link KafkaAdmin}
*/
@Bean
public KafkaAdmin kafkaAdmin() {
return new KafkaAdmin(singletonMap(BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerAddress)); #(2)
}
/**
* Create Topic on the above specified broker instance
*
* @param topicName topicName from configuration
* @return topicInstance
*/
@Bean
public NewTopic paymentDetailsTopic(@Value("${message.topic.payments}") String topicName) {
return new NewTopic(topicName, 1, (short) 1); #(3)
}
}
-
Kafka Broker address which is referred from
application.yml
-
KafkaAdmin
is an admin that delegates to anAdminClient
to create topics defined in the application context. -
Creates Topic in the Kafka Broker with given name while bootstrapping the application
Create KafkaTemplate to send(produce) messages into Topic
/**
* This class lets you allow to create {@link KafkaTemplate} instance,
* which is used to send messages to Topic.
* <p>
* {@link KafkaTemplate} requires {@link ProducerFactory} instance
* <p>
* {@link org.apache.kafka.clients.producer.Producer} instance and {@link KafkaTemplate} are thread-safe.
*
* @author ThirupathiReddy Vajjala
*/
@Configuration
public class KafkaTemplateConfig {
@Value("${kafka.broker.address}")
private String kafkaBrokerAddress;
@Bean
public ProducerFactory<String, PaymentDetails> producerFactory() {
Map<String, Object> configProperties = new HashMap<>();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerAddress);
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); #(1)
configProperties.put(ProducerConfig.ACKS_CONFIG, "all");
configProperties.put(ProducerConfig.RETRIES_CONFIG, 0);
return new DefaultKafkaProducerFactory<>(configProperties); #(2)
}
/**
* KafkaTemplate used to produce messages into topics
*
* @param producerFactory producerFactory instance
* @return kafkaTemplate {@link KafkaTemplate}
*/
@Bean
public KafkaTemplate<String, PaymentDetails> kafkaTemplate(ProducerFactory producerFactory) {
return new KafkaTemplate<>(producerFactory); #(3)
}
}
-
Custom Message Serializer
-
ProducerFactory is used to specify producer configuration and type of messages we are sending
-
KafkaTemplate is wrapper to ProducerFactory which will ease sending messages programmatically
Create Consumer Configuration to receive messages
/**
* For consuming messages, we need to configure {@link ConsumerFactory}
* <p>
* and {@link org.springframework.kafka.config.KafkaListenerContainerFactory}. Once these beans are available POJO based consumers
* can be configured using @{@link org.springframework.kafka.annotation.KafkaListener} annotation
*/
//@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${kafka.broker.address}")
private String kafkaBrokerAddress;
private static final String GROUP_ID = "FRAUD";
@Bean
public ConsumerFactory<String, PaymentDetails> consumerFactory() {
Map<String, Object> configProperties = new HashMap<>();
configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerAddress);
configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
return new DefaultKafkaConsumerFactory<>(configProperties,
new StringDeserializer(),
new JsonDeserializer<>(PaymentDetails.class)); #(1)
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, PaymentDetails> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, PaymentDetails> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory()); #(2)
return factory;
}
}
-
Custom Message Deserializer
-
KafkaListener to read messages
Note
|
We are using PaymentDetails object as Kafka message.
|
Create Producer and send message to the Topic
/**
* This spring component sends messages to the given Topic for every 5 seconds.
* The response is Blocking object on which we added callback function to read the status of our message
*/
@Component
public class MessageProducer implements CommandLineRunner {
@Autowired
KafkaTemplate kafkaTemplate;
@Value("${message.topic.payments}")
String paymentMessageTopic;
private static final Logger LOGGER = LoggerFactory.getLogger(MessageProducer.class);
@Override
public void run(String... args) throws InterruptedException {
do {
sendMessage(new PaymentDetails(currentTimeMillis(), "12345", "Thiru", 1234.0));
Thread.sleep(5000);
} while (true);
}
public void sendMessage(PaymentDetails message) {
ListenableFuture<SendResult<String, PaymentDetails>> future = kafkaTemplate.send(paymentMessageTopic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, PaymentDetails>>() {
@Override
public void onFailure(Throwable ex) {
LOGGER.warn("Message Sending Failed {} ", ex);
}
@Override
public void onSuccess(SendResult<String, PaymentDetails> result) {
LOGGER.info("Message [{}] sent to topic [{}]", result.getProducerRecord().value(), result.getRecordMetadata().topic());
}
});
}
}
From the terminal use below command to lister to that Topic
$kafka-console-consumer --bootstrap-server localhost:9092 --topic payments --from-beginning
Alternatively, we can create Java Client as shown below read messages
@Component
public class MessageConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageConsumer.class);
@KafkaListener(topics = "payments", groupId = "FRAUD") #(1)
public void receivePaymentDetails(PaymentDetails message) {
LOGGER.info("Received Message {} ", message);
}
}
-
KafkaListener annotation used in any spring bean message to listen to Topic
Comments
Post a Comment