Automated Data Ingestion Pipeline Using Kubernetes and Docker Containers

Infrastructure Setup

Prerequisites

Ensure the cluster infrastructure is active on CentOS 7 hosts. The node toploogy includes:

Role IP Address
Master Node 192.168.138.110
Slave Node 1 192.168.138.111
Slave Node 2 192.138.138.112

1. Database Provisioning

1.1 Resource Isolation

Define a dedicated namespace to isolate database resources from other workloads.

kubectl create namespace data-pipeline

1.2 Persistent Storage Configuration

Create a PersistentVolume (PV) backed by NFS for data durability.

File: storage-pv.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: vol-db-storage
spec:
  capacity:
    storage: 5Gi
  accessModes:
    - ReadWriteMany
  storageClassName: nfs-class
  nfs:
    path: /data/exports/db
    server: 192.168.138.110

Apply the configuration:

kubectl apply -f storage-pv.yaml

1.3 Claim Volume

Bind the volume to a claim for the application to consume.

File: claim-pvc.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: pvc-db-data
  namespace: data-pipeline
spec:
  accessModes:
    - ReadWriteMany
  resources:
    requests:
      storage: 5Gi
  storageClassName: nfs-class
kubectl apply -f claim-pvc.yaml

1.4 Secrets Management

Store credentials securely using Kubernetes Secrets instead of plain text.

kubectl create secret generic db-auth-cred \
  --from-literal=password=SecurePass123! \
  -n data-pipeline

Verify creation:

kubectl get secret db-auth-cred -n data-pipeline -o yaml

1.5 Service Exposure

Deploy the database container and expose it via a NodePort service for external access.

File: db-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: data-store
  name: data-store
  namespace: data-pipeline
spec:
  replicas: 1
  selector:
    matchLabels:
      app: data-store
  template:
    metadata:
      labels:
        app: data-store
    spec:
      containers:
      - image: mysql:8.0
        name: db-engine
        env:
        - name: MYSQL_ROOT_PASSWORD
          valueFrom:
            secretKeyRef:
              name: db-auth-cred
              key: password
        ports:
        - containerPort: 3306
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: db-svc
  name: svc-data-store
  namespace: data-pipeline
spec:
  selector:
    app: data-store
  type: NodePort
  ports:
  - port: 3306
    protocol: TCP
    targetPort: 3306
    nodePort: 31233
kubectl apply -f db-deployment.yaml
kubectl get pods,svc -n data-pipeline

Connectivity Configuration

2. Remote Access & Schema

2.1 Network Permissions

Access the running container to configure remote root privileges.

kubectl exec -it data-store-xxxx -c db-engine -n data-pipeline -- bash
mysql -u root -p

Run internal SQL commands:

ALTER USER 'root'@'%' IDENTIFIED WITH mysql_native_password BY 'SecurePass123!';
FLUSH PRIVILEGES;

CREATE DATABASE warehouse_db;
GRANT ALL PRIVILEGES ON warehouse_db.* TO 'root'@'%';
FLUSH PRIVILEGES;

To connect externally via tools like Navicat, use the Node IP (192.168.138.112) and mapped port 31233.

Note: If DNS resolution fails (common in some K8s setups), update /etc/resolv.conf on the host machines or restart CoreDNS pods in kube-system.

2.2 Table Definition

Initialize the schema within the database.

USE warehouse_db;

CREATE TABLE product_records (
  record_id INT AUTO_INCREMENT PRIMARY KEY,
  item_name VARCHAR(1000),
  vendor_name VARCHAR(1000),
  vendor_site_url VARCHAR(1000),
  description_text VARCHAR(1000),
  image_link VARCHAR(1000),
  price_tag VARCHAR(1000),
  location VARCHAR(1000),
  shipping_policy VARCHAR(1000)
);

ETL Implementation

3. Scraping Logic

3.1 Python Worker Script

Develop a robust scraper using standard libraries.

File: fetcher.py

import urllib.request
import time
from bs4 import BeautifulSoup as bs
import pymysql
import os

HEADERS = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}

def extract_data(page_num):
    url = f'https://www.example.com/category/page/{page_num}/'
    req = urllib.request.Request(url, headers=HEADERS)
    
    try:
        response = urllib.request.urlopen(req)
        html_content = response.read()
        soup = bs(html_content, 'html.parser')
        
        items = []
        rows = soup.find_all('div', class_='show-ctn')
        for row in rows:
            items.append({
                'name': row.find('h2').get_text(),
                'vendor': row.find("a").get_text(),
                'site': row.find("a").attrs.get('href'),
                'desc': row.find('div', class_='shop-image').img.attrs.get('alt'),
                'price': row.find('div', class_='shops-price').get_text().strip()
            })
        return items
    except Exception as e:
        print(f"Error fetching page {page_num}: {e}")
        return []

def persist_to_db(batch_records):
    conn = pymysql.connect(
        host=os.getenv('DB_HOST', 'localhost'),
        user=os.getenv('DB_USER', 'root'),
        password=os.getenv('DB_PASS', ''),
        database='warehouse_db',
        charset='utf8mb4'
    )
    cursor = conn.cursor()
    sql = """
      INSERT INTO product_records 
      (item_name, vendor_name, vendor_site_url, description_text, price_tag) 
      VALUES (%s, %s, %s, %s, %s)
    """
    
    for item in batch_records:
        data_tuple = tuple(item.values())
        cursor.execute(sql, data_tuple)
    
    conn.commit()
    conn.close()

if __name__ == '__main__':
    # Process first 10 pages for demonstration
    for idx in range(1, 11):
        record_set = extract_data(idx)
        if record_set:
            persist_to_db(record_set)
            time.sleep(5)

3.2 Cron Automation Wrapper

Schedule the script execution via system cron inside the container.

File: scheduler.sh

#!/bin/bash
python3 /opt/scripts/fetcher.py >> /var/log/worker.log 2>&1

File: Dockerfile

FROM centos:7

LABEL maintainer="automation-team"

RUN yum install -y python3 epel-release && \
    yum clean all && yum makecache

WORKDIR /opt/scripts

COPY requirements.txt .
RUN pip3 install -r requirements.txt

COPY fetcher.py scheduler.sh .

RUN chmod +x scheduler.sh

CMD ["/usr/sbin/crond", "-n"]

Setup Note: Ensure requirements.txt lists beautifulsoup4, pymysql, and requests.

Container Lifecycle Management

4. Image Distribution

4.1 Build and Tag

Construct the final image containing the worker logic.

docker build -t data-ingestion-worker:v1 .

4.2 Registry Interaction

Push the artifact to a centralized repository for cluster distribution.

# Login to registry
docker login -u <username> <registry-url>

# Push image
docker push <registry-url>/team/data-ingestion-worker:v1

Confirm visibility by listing available repositories in your management console. This concludes the automated deployment pipeline.

Tags: kubernetes docker python MySQL Data Engineering

Posted on Sat, 16 May 2026 21:18:39 +0000 by Gregg