Building a Scalable Data Pipeline with Zookeeper, Kafka, and the ELK Stack

Setting Up a Zookeeper Ensemble

A Zookeeper cluster maintains coordination metadata for distributed systems. For production, deploy an odd number of nodes (minimum three) to achieve quorum.

Configuration Essentials

On each node, prepare the configuration file conf/zoo.cfg:

tickTime=2000
dataDir=/var/lib/zookeeper
dataLogDir=/var/log/zookeeper
clientPort=2181
initLimit=5
syncLimit=2
server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888
  • server.X lists the nodes, where X matches the content of the myid file inside dataDir.
  • Port 2888 handles peer communication; 3888 is for leader election.

Create the myid file with a unique numeric identifier on each host:

echo "1" > /var/lib/zookeeper/myid   # on zoo1
echo "2" > /var/lib/zookeeper/myid   # on zoo2
echo "3" > /var/lib/zookeeper/myid   # on zoo3

Systemd Unit for Zookeeper

Instead of legacy init scripts, define a systemd service /etc/systemd/system/zookeeper.service:

[Unit]
Description=Apache Zookeeper
After=network.target

[Service]
Type=forking
User=zookeeper
Environment="ZOO_HOME=/opt/zookeeper"
ExecStart=${ZOO_HOME}/bin/zkServer.sh start
ExecStop=${ZOO_HOME}/bin/zkServer.sh stop
ExecReload=${ZOO_HOME}/bin/zkServer.sh restart
Restart=on-failure

[Install]
WantedBy=multi-user.target

Enable and start the ensemble on all nodes:

systemctl daemon-reload
systemctl enable zookeeper --now

Core Kafka Concepts

Apache Kafka is a distributed streaming platform built around the publish‑subscribe pattern. It decouples producers and consumers, enables high throughput, and stores messages durably.

Key Entities

  • Broker – a single Kafka server. A cluster consists of several brokers.
  • Topic – a logical channel to which producers send messages and from which consumers read.
  • Partition – a topic is split into one or more ordered, immutable sequences. Partitions allow horizontal scaling and parallelism.
  • Replica – each partition can have multiple copies for fault tolerance. One replica acts as the leader (handles reads/writes); others are followers that replicate data.
  • Producer – pushes records to a topic. Partition assignment can rely on a key hash or round‑robin.
  • Consumer – pulls records from topics. Consumers belong to a consumer group; within a group, each partition is consumed by exactly one member.
  • Offset – a monotonically increasing identifier of a record within a partition. Consumers commit their current offset to resume from the correct posision after restarts.
  • Zookeeper – until Kafka 2.x, Zookeeper stores cluster metadata (broker list, topic configuration, controller election). Newer Kafka versions can run without Zookeeper (KRaft mode).

Data Retention and Semantics

  •    Messages are persisted on disk in segment files.mature segments are deleted after `log.retention.hours` (default 168 hours).
    
  • Kafka guarantees order within a partition, not acros different partitions. Use a single partition when strict ordering is required (e.g., flash sales).

Deploying a Kafka Cluster

Installation and Configuration

Download and extract Kafka on each node:

wget https://dlcdn.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
tar -xzf kafka_2.13-3.6.0.tgz -C /opt
ln -s /opt/kafka_2.13-3.6.0 /opt/kafka

Create the broker configuration file /opt/kafka/config/server.properties using the template below. Replace placeholders with actual values per node.

# Unique broker ID (0, 1, 2, ...)
broker.id=__BROKER_ID__
listeners=PLAINTEXT://__NODE_IP__:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka-logs
num.partitions=3
default.replication.factor=2
log.retention.hours=168
log.segment.bytes=1073741824
zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181

Note: log.dirs should point to dedicated fast storage (e.g., multiple JBOD disks) for optimal performance.

1.acar thego: environment variable KAFKA_HOME andadd it to PATH.

echo "export KAFKA_HOME=/opt/kafka" >> /etc/profile
echo "export PATH=\$PATH:\$KAFKA_HOME/bin" >> /etc/profile
source /etc/profile

Systemd Service for Kafka

Create /etc/systemd/system/kafka.service:

[Unit]
Description=Apache Kafka
After=network.target zookeeper.service
Requires=zookeeper.service

[Service]
Type=simple
User=kafka
Environment="KAFKA_HOME=/opt/kafka"
ExecStart=${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties
ExecStop=${KAFKA_HOME}/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

Reload systemd and start Kafka on all nodes:

systemctl daemon-reload
systemctl enable kafka --now

Administrative Operations

Create a topic with 3 partitions and a replication factor of 2:

kafka-topics.sh --create --bootstrap-server kafka1:9092 \
  --replication-factor 2 --partitions 3 --topic app-logs

List topics:

kafka-topics.sh --list --bootstrap-server kafka1:9092

Produce test messages:

echo "Hello Kafka" | kafka-console-producer.sh \
  --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic app-logs

Consume all messages (from beginning):

kafka-console-consumer.sh --bootstrap-server kafka1:9092 \
  --topic app-logs --from-beginning

Alter partitions:

kafka-topics.sh --alter --bootstrap-server kafka1:9092 \
  --topic app-logs --partitions 6

Integrating with the ELK Stack

When Kafka is used as a buffering layer, logs collected by Filebeat can be forwarded to Kafka topics. Logstash then consumes those topics and indexes the data into Elasticsearch.

Filebeat Configuration

Edit filebeat.yml to define log inputs and the Kafka output:

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access.log
  fields:
    log_type: access
  fields_under_root: true

- type: log
  enabled: true
  paths:
    - /var/log/nginx/error.log
  fields:
    log_type: error
  fields_under_root: true

output.kafka:
  hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
  topic: nginx-logs
  partitioning.round_robin:
    reachable_only: false
  required_acks: 1

Start Filebeat:

systemctl restart filebeat

Logstash Pipeline

Create /etc/logstash/conf.d/kafka-to-es.conf:

input {
  kafka {
    bootstrap_servers => "kafka1:9092,kafka2:9092,kafka3:9092"
    topics => ["nginx-logs"]
    codec => json
    auto_offset_reset => "latest"
  }
}

filter {
  # Optional: further parsing or enrichment
}

output {
  if [log_type] == "access" {
    elasticsearch {
      hosts => ["es-node:9200"]
      index => "nginx-access-%{+YYYY.MM.dd}"
    }
  }
  else if [log_type] == "error" {
    elasticsearch {
      hosts => ["es-node:9200"]
      index => "nginx-error-%{+YYYY.MM.dd}"
    }
  }
  stdout { codec => rubydebug }
}

Finally, start Logstash:

/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/kafka-to-es.conf

awn the Elasticsearch endices are created, they can be visualised in Kibana by defining index patterns.

Tags: Kafka ZooKeeper ELK filebeat Logstash

Posted on Mon, 22 Jun 2026 17:18:48 +0000 by bios