Elasticsearch Bulk Write Optimization Through Gateway Implementation

Background: Bulk Operations Optimization

In Elasticsearch, bulk operations are widely used for batch data processing. Bulk operations allow users to submit multiple data operations—such as indexing, updating, and deleting—in a single request, thereby enhancing data processing efficiency. The implementation principle of bulk operations involves packaging data operation requests into HTTP requests and submitting them in batches to the Elasticsearch server. This enables the Elasticsearch server to process multiple data operations simultaneously, improving overall efficiency.

The core value of this optimization lies in reducing network round trips and connection establishment overhead. Each individual write operation requires a complete request-response cycle, whereas batch writing combines multiple operations into a single communication, accomplishing what would otherwise require multiple interactions. This not only saves time but also frees up system resources, allowing servers to focus on actual data processing rather than frequent protocol handshakes and state maintenance.

While such batch requests can optimize write request efficiency and allow ES clusters to allocate more resources to centralized write processing, other intermediate processes can also be optimized.

Gateway Optimization Points

The optimization concept behind bulk operations is to centralize scattered daily write demands, minimizing request overhead and maximizing resource utilization. In essence, this approach ensures resources are used where they matter most.

However, after receiving bulk write requests, Elasticsearch must still coordinate nodes to calculate the appropriate shard for each document based on its ID to distribute data to corresponding data nodes. This process incurs some overhead. If the data in a bulk request is widely distributed, with each shard requiring writes, the advantages of centralized bulk writing are not fully realized.

Gateway write acceleration complements the bulk optimization concept to its fullest extent. A gateway can locally calculate the target storage location in the backend Elasticsearch cluster for each indexed document, enabling precise write request targeting.

Within a bulk request, data may be destined for multiple backend nodes. The bulk_reshuffle filter is used to break down normal bulk requests, reorganizing them by target node or shard. This prevents Elasticsearch nodes from needing to redistribute requests after receipt, reducing inter-cluster traffic and load in Elasticsearch. It also prevents individual nodes from becoming hotspots and ensures balanced processing across data nodes, thereby enhancing the cluster's overall indexing throughput.

Optimization Implementation

Let's implement this approach to measure how much gateway can improve write performance. We'll test two scenarios:

  1. Basic centralized write testing without document IDs, directly performing batch writes. This scenario more closely resembles log or monitoring data collection.
  2. Write testing with document IDs, more aligned with search scenarios or large-scale batch synchronization.

Both scenarios compare efficiency between direct writes to ES and gateway-forwarded writes to ES.

Besides requiring a gateway and an ES cluster, the test setup includes the following materials. The test index mappings are identical, with names distinguished as follows:

PUT gateway_bulk_test
{
  "settings": {
    "number_of_shards": 6,
    "number_of_replicas": 0
  },
  "mappings": {
    "properties": {
      "timestamp": {
        "type": "date",
        "format": "strict_date_optional_time"
      },
      "field1": {
        "type": "keyword"
      },
      "field2": {
        "type": "keyword"
      },
      "field3": {
        "type": "keyword"
      },
      "field4": {
        "type": "integer"
      },
      "field5": {
        "type": "keyword"
      },
      "field6": {
        "type": "float"
      }
    }
  }
}

PUT bulk_test
{
  "settings": {
    "number_of_shards": 6,
    "number_of_replicas": 0
  },
  "mappings": {
    "properties": {
      "timestamp": {
        "type": "date",
        "format": "strict_date_optional_time"
      },
      "field1": {
        "type": "keyword"
      },
      "field2": {
        "type": "keyword"
      },
      "field3": {
        "type": "keyword"
      },
      "field4": {
        "type": "integer"
      },
      "field5": {
        "type": "keyword"
      },
      "field6": {
        "type": "float"
      }
    }
  }
}

The gateway configuration file is as follows:

path.data: data
path.logs: log

entry:
  - name: my_es_entry
    enabled: true
    router: my_router
    max_concurrency: 200000
    network:
      binding: 0.0.0.0:8000

flow:
  - name: async_bulk
    filter:
      - bulk_reshuffle:
          when:
            contains:
              _ctx.request.path: /_bulk
          elasticsearch: prod
          level: node
          partition_size: 1
          fix_null_id: true
      - elasticsearch:
          elasticsearch: prod
          max_connection_per_node: 1000
          max_response_size: -1
          balancer: weight
          refresh:
            enabled: true
            interval: 60s
          filter:
            roles:
              exclude:
                - master

router:
  - name: my_router
    default_flow: async_bulk

elasticsearch:
  - name: prod
    enabled: true
    endpoints:
      - https://127.0.0.1:9221
      - https://127.0.0.1:9222
      - https://127.0.0.1:9223
    basic_auth:
      username: admin
      password: admin

pipeline:
  - name: bulk_request_ingest
    auto_start: true
    keep_running: true
    retry_delay_in_ms: 1000
    processor:
      - bulk_indexing:
          max_connection_per_node: 100
          num_of_slices: 3
          max_worker_size: 30
          idle_timeout_in_seconds: 10
          bulk:
            compress: false
            batch_size_in_mb: 10
            batch_size_in_docs: 10000
          consumer:
            fetch_max_messages: 100
          queue_selector:
            labels:
              type: bulk_reshuffle

The test script is as follows:

#!/usr/bin/env python3
"""
Elasticsearch Bulk Write Performance Test Script
"""

import hashlib
import 
import random
import string
import time
from typing import List, Dict, Any

import requests
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
import urllib3

# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


class ESBulkTester:
    def __init__(self):
        # Configuration variables - modifiable
        self.es_configs = [
            {
                "name": "Direct ES Connection",
                "url": "https://127.0.0.1:9221",
                "index": "bulk_test",
                "username": "admin",
                "password": "admin",
                "verify_ssl": False
            },
            {
                "name": "Gateway Proxy",
                "url": "http://localhost:8000",
                "index": "gateway_bulk_test",
                "username": None,
                "password": None,
                "verify_ssl": False
            }
        ]
        self.batch_size = 10000  # Documents per bulk operation
        self.log_interval = 100000  # Log output every N bulk operations

        # ID generation rules - prefix 2 digits, suffix 5 digits
        self.id_prefix_start = 1
        self.id_prefix_end = 999
        self.id_suffix_start = 1
        self.id_suffix_end = 9999

        # Current ID counter
        self.current_prefix = self.id_prefix_start
        self.current_suffix = self.id_suffix_start

    def generate_id(self) -> str:
        """Generate ID with fixed format - 2 digits prefix, 5 digits suffix"""
        id_str = f"{self.current_prefix:02d}{self.current_suffix:05d}"

        # Update counter
        self.current_suffix += 1
        if self.current_suffix > self.id_suffix_end:
            self.current_suffix = self.id_suffix_start
            self.current_prefix += 1
            if self.current_prefix > self.id_prefix_end:
                self.current_prefix = self.id_prefix_start

        return id_str

    def generate_random_hash(self, length: int = 32) -> str:
        """Generate random hash value"""
        random_string = ''.join(random.choices(string.ascii_letters + string.digits, k=length))
        return hashlib.md5(random_string.encode()).hexdigest()

    def generate_document(self) -> Dict[str, Any]:
        """Generate random document content"""
        return {
            "timestamp": datetime.now().isoformat(),
            "field1": self.generate_random_hash(),
            "field2": self.generate_random_hash(),
            "field3": self.generate_random_hash(),
            "field4": random.randint(1, 1000),
            "field5": random.choice(["A", "B", "C", "D"]),
            "field6": random.uniform(0.1, 100.0)
        }

    def create_bulk_payload(self, index_name: str) -> str:
        """Create bulk write payload"""
        bulk_data = []

        for _ in range(self.batch_size):
            doc = self.generate_document()

            # Add index operation
            bulk_data.append(.dumps({
                "index": {
                    "_index": index_name
                }
            }))
            bulk_data.append(.dumps(doc))

        return "\n".join(bulk_data) + "\n"

    def bulk_index(self, config: Dict[str, Any], payload: str) -> bool:
        """Execute bulk write"""
        url = f"{config['url']}/_bulk"
        headers = {
            "Content-Type": "application/x-nd"
        }

        # Set authentication information
        auth = None
        if config.get('username') and config.get('password'):
            auth = (config['username'], config['password'])

        try:
            response = requests.post(
                url,
                data=payload,
                headers=headers,
                auth=auth,
                verify=config.get('verify_ssl', True),
                timeout=30
            )
            return response.status_code == 200
        except Exception as e:
            print(f"Bulk write failed: {e}")
            return False

    def refresh_index(self, config: Dict[str, Any]) -> bool:
        """Refresh index"""
        url = f"{config['url']}/{config['index']}/_refresh"

        # Set authentication information
        auth = None
        if config.get('username') and config.get('password'):
            auth = (config['username'], config['password'])

        try:
            response = requests.post(
                url,
                auth=auth,
                verify=config.get('verify_ssl', True),
                timeout=10
            )
            success = response.status_code == 200
            print(f"Index refresh {'successful' if success else 'failed'}: {config['index']}")
            return success
        except Exception as e:
            print(f"Index refresh failed: {e}")
            return False

    def run_test(self, config: Dict[str, Any], round_num: int, total_iterations: int = 100000):
        """Run performance test"""
        test_name = f"{config['name']}-Round {round_num}"
        print(f"\nStarting test: {test_name}")
        print(f"ES address: {config['url']}")
        print(f"Index name: {config['index']}")
        print(f"Authentication: {'Enabled' if config.get('username') else 'Disabled'}")
        print(f"Bulk size: {self.batch_size} documents")
        print(f"Total planned writes: {total_iterations * self.batch_size} documents")
        print("-" * 50)

        start_time = time.time()
        success_count = 0
        error_count = 0

        for i in range(1, total_iterations + 1):
            payload = self.create_bulk_payload(config['index'])

            if self.bulk_index(config, payload):
                success_count += 1
            else:
                error_count += 1

            # Log output every N operations
            if i % self.log_interval == 0:
                elapsed_time = time.time() - start_time
                rate = i / elapsed_time if elapsed_time > 0 else 0
                print(f"Completed {i:,} bulk operations, time elapsed: {elapsed_time:.2f}s, rate: {rate:.2f} bulk/s")

        end_time = time.time()
        total_time = end_time - start_time
        total_docs = total_iterations * self.batch_size

        print(f"\n{test_name} test completed!")
        print(f"Total time: {total_time:.2f}s")
        print(f"Successful bulk operations: {success_count:,}")
        print(f"Failed bulk operations: {error_count:,}")
        print(f"Total documents: {total_docs:,}")
        print(f"Average bulk rate: {success_count/total_time:.2f} bulk/s")
        print(f"Document write rate: {total_docs/total_time:.2f} docs/s")
        print("=" * 60)

        return {
            "test_name": test_name,
            "config_name": config['name'],
            "round": round_num,
            "es_url": config['url'],
            "index": config['index'],
            "total_time": total_time,
            "success_count": success_count,
            "error_count": error_count,
            "total_docs": total_docs,
            "bulk_rate": success_count/total_time,
            "doc_rate": total_docs/total_time
        }

    def run_comparison_test(self, total_iterations: int = 10000):
        """Run dual-address comparison test"""
        print("Elasticsearch Bulk Write Performance Test Started")
        print(f"Test time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print("=" * 60)

        results = []
        rounds = 2  # Test each configuration 2 rounds

        # Test all configurations in loop
        for config in self.es_configs:
            print(f"\nStarting test for: {config['name']}")
            print("*" * 40)

            for round_num in range(1, rounds + 1):
                # Run test
                result = self.run_test(config, round_num, total_iterations)
                results.append(result)

                # Refresh index after each round
                print(f"\nRound {round_num} completed, refreshing index...")
                self.refresh_index(config)

                # Reset ID counter
                if round_num == 1:
                    # Round 1: Use initial ID range (new data)
                    print("Round 1: New data mode")
                else:
                    # Round 2: Reuse same IDs (update data mode)
                    print("Round 2: Data update mode, reusing Round 1 IDs")
                    self.current_prefix = self.id_prefix_start
                    self.current_suffix = self.id_suffix_start

                print(f"{config['name']} Round {round_num} test completed\n")

        # Output comparison results
        print("\nPerformance Comparison Results:")
        print("=" * 80)

        # Group results by configuration
        config_results = {}
        for result in results:
            config_name = result['config_name']
            if config_name not in config_results:
                config_results[config_name] = []
            config_results[config_name].append(result)

        for config_name, rounds_data in config_results.items():
            print(f"\n{config_name}:")
            total_time = 0
            total_bulk_rate = 0
            total_doc_rate = 0

            for round_data in rounds_data:
                print(f"  Round {round_data['round']}:")
                print(f"    Time: {round_data['total_time']:.2f}s")
                print(f"    Bulk rate: {round_data['bulk_rate']:.2f} bulk/s")
                print(f"    Document rate: {round_data['doc_rate']:.2f} docs/s")
                print(f"    Success rate: {round_data['success_count']/(round_data['success_count']+round_data['error_count'])*100:.2f}%")

                total_time += round_data['total_time']
                total_bulk_rate += round_data['bulk_rate']
                total_doc_rate += round_data['doc_rate']

            avg_bulk_rate = total_bulk_rate / len(rounds_data)
            avg_doc_rate = total_doc_rate / len(rounds_data)

            print(f"  Average performance:")
            print(f"    Total time: {total_time:.2f}s")
            print(f"    Average bulk rate: {avg_bulk_rate:.2f} bulk/s")
            print(f"    Average document rate: {avg_doc_rate:.2f} docs/s")

        # Overall comparison
        if len(config_results) >= 2:
            config_names = list(config_results.keys())
            config1_avg = sum([r['bulk_rate'] for r in config_results[config_names[0]]]) / len(config_results[config_names[0]])
            config2_avg = sum([r['bulk_rate'] for r in config_results[config_names[1]]]) / len(config_results[config_names[1]])

            if config1_avg > config2_avg:
                faster = config_names[0]
                rate_diff = config1_avg - config2_avg
            else:
                faster = config_names[1]
                rate_diff = config2_avg - config1_avg

            print(f"\nOverall performance comparison:")
            print(f"{faster} has better average performance, {rate_diff:.2f} bulk/s higher")
            print(f"Performance improvement: {(rate_diff/min(config1_avg, config2_avg)*100):.1f}%")


def main():
    """Main function"""
    tester = ESBulkTester()

    # Run test (10,000 docs per bulk, 30 bulks = 300,000 documents)
    tester.run_comparison_test(total_iterations=30)


if __name__ == "__main__":
    main()

1. Log Scenario: Writes Without IDs

Test conditions:

  1. Bulk write data without document IDs
  2. 10,000 documents per bulk batch, totaling 300,000 documents

Results:

Performance Comparison Results:
================================================================================

Direct ES Connection:
  Round 1:
    Time: 152.29s
    Bulk rate: 1.97 bulk/s
    Document rate: 19699.59 docs/s
    Success rate: 100.00%
  Average performance:
    Total time: 152.29s
    Average bulk rate: 1.97 bulk/s
    Average document rate: 19699.59 docs/s

Gateway Proxy:
  Round 1:
    Time: 115.63s
    Bulk rate: 2.59 bulk/s
    Document rate: 25944.35 docs/s
    Success rate: 100.00%
  Average performance:
    Total time: 115.63s
    Average bulk rate: 2.59 bulk/s
    Average document rate: 25944.35 docs/s

Overall performance comparison:
Gateway Proxy has better average performance, 0.62 bulk/s higher
Performance improvement: 31.7%

2. Business Scenario: Writes With Document IDs

Test conditions:

  1. Bulk write data with document IDs, with consistent and duplicate ID generation rules for both tests
  2. 10,000 documents per bulk batch, totaling 300,000 documents

Results:

Performance Comparison Results:
================================================================================

Direct ES Connection:
  Round 1:
    Time: 155.30s
    Bulk rate: 1.93 bulk/s
    Document rate: 19317.39 docs/s
    Success rate: 100.00%
  Average performance:
    Total time: 155.30s
    Average bulk rate: 1.93 bulk/s
    Average document rate: 19317.39 docs/s

Gateway Proxy:
  Round 1:
    Time: 116.73s
    Bulk rate: 2.57 bulk/s
    Document rate: 25700.06 docs/s
    Success rate: 100.00%
  Average performance:
    Total time: 116.73s
    Average bulk rate: 2.57 bulk/s
    Average document rate: 25700.06 docs/s

Overall performance comparison:
Gateway Proxy has better average performance, 0.64 bulk/s higher
Performance improvement: 33.0%

Summary

Whether in log scenarios or business scenarios where data synchronization is more valuable, gateway write acceleration can consistently save 25-30% of write time.

Tags: elasticsearch Bulk Operations Gateway Optimization Performance Tuning Data Processing

Posted on Wed, 13 May 2026 20:50:17 +0000 by PhillNeedsHelp