In the article we will see how can we take deep dive into messages going on a Kafka bus. For eg if you have a large enterprise grade solution, and want to capture a copy of messages going to and fro within systems without any additional development.
To achieve this, we will use fluentd to hook onto Kafka, read messages and insert them into elasticsearch database. Later, you can visualize these messages Kibana. You can also build nice dashboards of bar charts, donuts, pie charts out of collected data.
Here goes the Fluentd Kafka plugin installation:
sudo fluent-gem install fluent-plugin-kafka
Fluentd kafka configuration:
<source>
@type kafka
brokers kafkahost:9092
topics MyTopic
format json
message_key message
add_prefix test.kafka
#add_suffix <tag suffix (Optional)>
#Optionally, you can manage topic offset by using zookeeper
#offset_zookeeper <zookeer node list (<zookeeper1_host>:<zookeeper1_port>,<zookeeper2_host>:<zookeeper2_port>,..)>
#offset_zk_root_node <offset path in zookeeper> default => '/fluent-plugin-kafka'
#ruby-kafka consumer options
#max_bytes (integer) :default => nil (Use default of ruby-kafka)
#max_wait_time (integer) :default => nil (Use default of ruby-kafka)
#min_bytes (integer) :default => nil (Use default of ruby-kafka)
</source>
Fluentd config to ingest messages into elasticsearch:
<match test.**>
@type copy
<store>
@type elasticsearch
include_tag_key true
host localhost
port 9200
logstash_format true
logstash_prefix kafkamessages
</store>
</match>
With this Fluentd configuration, fluentd will read message exchanges on “MyTopic” and ingest them into elasticsearch with “test.kafka” tag