Setting Up a Zookeeper Ensemble
A Zookeeper cluster maintains coordination metadata for distributed systems. For production, deploy an odd number of nodes (minimum three) to achieve quorum.
Configuration Essentials
On each node, prepare the configuration file conf/zoo.cfg:
tickTime=2000
dataDir=/var/lib/zookeeper
dataLogDir=/var/log/zookeeper
clientPort=2181
initLimit=5
syncLimit=2
server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888
server.Xlists the nodes, whereXmatches the content of themyidfile insidedataDir.- Port
2888handles peer communication;3888is for leader election.
Create the myid file with a unique numeric identifier on each host:
echo "1" > /var/lib/zookeeper/myid # on zoo1
echo "2" > /var/lib/zookeeper/myid # on zoo2
echo "3" > /var/lib/zookeeper/myid # on zoo3
Systemd Unit for Zookeeper
Instead of legacy init scripts, define a systemd service /etc/systemd/system/zookeeper.service:
[Unit]
Description=Apache Zookeeper
After=network.target
[Service]
Type=forking
User=zookeeper
Environment="ZOO_HOME=/opt/zookeeper"
ExecStart=${ZOO_HOME}/bin/zkServer.sh start
ExecStop=${ZOO_HOME}/bin/zkServer.sh stop
ExecReload=${ZOO_HOME}/bin/zkServer.sh restart
Restart=on-failure
[Install]
WantedBy=multi-user.target
Enable and start the ensemble on all nodes:
systemctl daemon-reload
systemctl enable zookeeper --now
Core Kafka Concepts
Apache Kafka is a distributed streaming platform built around the publish‑subscribe pattern. It decouples producers and consumers, enables high throughput, and stores messages durably.
Key Entities
- Broker – a single Kafka server. A cluster consists of several brokers.
- Topic – a logical channel to which producers send messages and from which consumers read.
- Partition – a topic is split into one or more ordered, immutable sequences. Partitions allow horizontal scaling and parallelism.
- Replica – each partition can have multiple copies for fault tolerance. One replica acts as the leader (handles reads/writes); others are followers that replicate data.
- Producer – pushes records to a topic. Partition assignment can rely on a key hash or round‑robin.
- Consumer – pulls records from topics. Consumers belong to a consumer group; within a group, each partition is consumed by exactly one member.
- Offset – a monotonically increasing identifier of a record within a partition. Consumers commit their current offset to resume from the correct posision after restarts.
- Zookeeper – until Kafka 2.x, Zookeeper stores cluster metadata (broker list, topic configuration, controller election). Newer Kafka versions can run without Zookeeper (KRaft mode).
Data Retention and Semantics
-
Messages are persisted on disk in segment files.mature segments are deleted after `log.retention.hours` (default 168 hours). - Kafka guarantees order within a partition, not acros different partitions. Use a single partition when strict ordering is required (e.g., flash sales).
Deploying a Kafka Cluster
Installation and Configuration
Download and extract Kafka on each node:
wget https://dlcdn.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
tar -xzf kafka_2.13-3.6.0.tgz -C /opt
ln -s /opt/kafka_2.13-3.6.0 /opt/kafka
Create the broker configuration file /opt/kafka/config/server.properties using the template below. Replace placeholders with actual values per node.
# Unique broker ID (0, 1, 2, ...)
broker.id=__BROKER_ID__
listeners=PLAINTEXT://__NODE_IP__:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka-logs
num.partitions=3
default.replication.factor=2
log.retention.hours=168
log.segment.bytes=1073741824
zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181
Note:
log.dirsshould point to dedicated fast storage (e.g., multiple JBOD disks) for optimal performance.
1.acar thego: environment variable KAFKA_HOME andadd it to PATH.
echo "export KAFKA_HOME=/opt/kafka" >> /etc/profile
echo "export PATH=\$PATH:\$KAFKA_HOME/bin" >> /etc/profile
source /etc/profile
Systemd Service for Kafka
Create /etc/systemd/system/kafka.service:
[Unit]
Description=Apache Kafka
After=network.target zookeeper.service
Requires=zookeeper.service
[Service]
Type=simple
User=kafka
Environment="KAFKA_HOME=/opt/kafka"
ExecStart=${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties
ExecStop=${KAFKA_HOME}/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
Reload systemd and start Kafka on all nodes:
systemctl daemon-reload
systemctl enable kafka --now
Administrative Operations
Create a topic with 3 partitions and a replication factor of 2:
kafka-topics.sh --create --bootstrap-server kafka1:9092 \
--replication-factor 2 --partitions 3 --topic app-logs
List topics:
kafka-topics.sh --list --bootstrap-server kafka1:9092
Produce test messages:
echo "Hello Kafka" | kafka-console-producer.sh \
--bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic app-logs
Consume all messages (from beginning):
kafka-console-consumer.sh --bootstrap-server kafka1:9092 \
--topic app-logs --from-beginning
Alter partitions:
kafka-topics.sh --alter --bootstrap-server kafka1:9092 \
--topic app-logs --partitions 6
Integrating with the ELK Stack
When Kafka is used as a buffering layer, logs collected by Filebeat can be forwarded to Kafka topics. Logstash then consumes those topics and indexes the data into Elasticsearch.
Filebeat Configuration
Edit filebeat.yml to define log inputs and the Kafka output:
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
fields:
log_type: access
fields_under_root: true
- type: log
enabled: true
paths:
- /var/log/nginx/error.log
fields:
log_type: error
fields_under_root: true
output.kafka:
hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
topic: nginx-logs
partitioning.round_robin:
reachable_only: false
required_acks: 1
Start Filebeat:
systemctl restart filebeat
Logstash Pipeline
Create /etc/logstash/conf.d/kafka-to-es.conf:
input {
kafka {
bootstrap_servers => "kafka1:9092,kafka2:9092,kafka3:9092"
topics => ["nginx-logs"]
codec => json
auto_offset_reset => "latest"
}
}
filter {
# Optional: further parsing or enrichment
}
output {
if [log_type] == "access" {
elasticsearch {
hosts => ["es-node:9200"]
index => "nginx-access-%{+YYYY.MM.dd}"
}
}
else if [log_type] == "error" {
elasticsearch {
hosts => ["es-node:9200"]
index => "nginx-error-%{+YYYY.MM.dd}"
}
}
stdout { codec => rubydebug }
}
Finally, start Logstash:
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/kafka-to-es.conf
awn the Elasticsearch endices are created, they can be visualised in Kibana by defining index patterns.