Background: Bulk Operations Optimization
In Elasticsearch, bulk operations are widely used for batch data processing. Bulk operations allow users to submit multiple data operations—such as indexing, updating, and deleting—in a single request, thereby enhancing data processing efficiency. The implementation principle of bulk operations involves packaging data operation requests into HTTP requests and submitting them in batches to the Elasticsearch server. This enables the Elasticsearch server to process multiple data operations simultaneously, improving overall efficiency.
The core value of this optimization lies in reducing network round trips and connection establishment overhead. Each individual write operation requires a complete request-response cycle, whereas batch writing combines multiple operations into a single communication, accomplishing what would otherwise require multiple interactions. This not only saves time but also frees up system resources, allowing servers to focus on actual data processing rather than frequent protocol handshakes and state maintenance.
While such batch requests can optimize write request efficiency and allow ES clusters to allocate more resources to centralized write processing, other intermediate processes can also be optimized.
Gateway Optimization Points
The optimization concept behind bulk operations is to centralize scattered daily write demands, minimizing request overhead and maximizing resource utilization. In essence, this approach ensures resources are used where they matter most.
However, after receiving bulk write requests, Elasticsearch must still coordinate nodes to calculate the appropriate shard for each document based on its ID to distribute data to corresponding data nodes. This process incurs some overhead. If the data in a bulk request is widely distributed, with each shard requiring writes, the advantages of centralized bulk writing are not fully realized.
Gateway write acceleration complements the bulk optimization concept to its fullest extent. A gateway can locally calculate the target storage location in the backend Elasticsearch cluster for each indexed document, enabling precise write request targeting.
Within a bulk request, data may be destined for multiple backend nodes. The bulk_reshuffle filter is used to break down normal bulk requests, reorganizing them by target node or shard. This prevents Elasticsearch nodes from needing to redistribute requests after receipt, reducing inter-cluster traffic and load in Elasticsearch. It also prevents individual nodes from becoming hotspots and ensures balanced processing across data nodes, thereby enhancing the cluster's overall indexing throughput.
Optimization Implementation
Let's implement this approach to measure how much gateway can improve write performance. We'll test two scenarios:
- Basic centralized write testing without document IDs, directly performing batch writes. This scenario more closely resembles log or monitoring data collection.
- Write testing with document IDs, more aligned with search scenarios or large-scale batch synchronization.
Both scenarios compare efficiency between direct writes to ES and gateway-forwarded writes to ES.
Besides requiring a gateway and an ES cluster, the test setup includes the following materials. The test index mappings are identical, with names distinguished as follows:
PUT gateway_bulk_test
{
"settings": {
"number_of_shards": 6,
"number_of_replicas": 0
},
"mappings": {
"properties": {
"timestamp": {
"type": "date",
"format": "strict_date_optional_time"
},
"field1": {
"type": "keyword"
},
"field2": {
"type": "keyword"
},
"field3": {
"type": "keyword"
},
"field4": {
"type": "integer"
},
"field5": {
"type": "keyword"
},
"field6": {
"type": "float"
}
}
}
}
PUT bulk_test
{
"settings": {
"number_of_shards": 6,
"number_of_replicas": 0
},
"mappings": {
"properties": {
"timestamp": {
"type": "date",
"format": "strict_date_optional_time"
},
"field1": {
"type": "keyword"
},
"field2": {
"type": "keyword"
},
"field3": {
"type": "keyword"
},
"field4": {
"type": "integer"
},
"field5": {
"type": "keyword"
},
"field6": {
"type": "float"
}
}
}
}
The gateway configuration file is as follows:
path.data: data
path.logs: log
entry:
- name: my_es_entry
enabled: true
router: my_router
max_concurrency: 200000
network:
binding: 0.0.0.0:8000
flow:
- name: async_bulk
filter:
- bulk_reshuffle:
when:
contains:
_ctx.request.path: /_bulk
elasticsearch: prod
level: node
partition_size: 1
fix_null_id: true
- elasticsearch:
elasticsearch: prod
max_connection_per_node: 1000
max_response_size: -1
balancer: weight
refresh:
enabled: true
interval: 60s
filter:
roles:
exclude:
- master
router:
- name: my_router
default_flow: async_bulk
elasticsearch:
- name: prod
enabled: true
endpoints:
- https://127.0.0.1:9221
- https://127.0.0.1:9222
- https://127.0.0.1:9223
basic_auth:
username: admin
password: admin
pipeline:
- name: bulk_request_ingest
auto_start: true
keep_running: true
retry_delay_in_ms: 1000
processor:
- bulk_indexing:
max_connection_per_node: 100
num_of_slices: 3
max_worker_size: 30
idle_timeout_in_seconds: 10
bulk:
compress: false
batch_size_in_mb: 10
batch_size_in_docs: 10000
consumer:
fetch_max_messages: 100
queue_selector:
labels:
type: bulk_reshuffle
The test script is as follows:
#!/usr/bin/env python3
"""
Elasticsearch Bulk Write Performance Test Script
"""
import hashlib
import
import random
import string
import time
from typing import List, Dict, Any
import requests
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
import urllib3
# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class ESBulkTester:
def __init__(self):
# Configuration variables - modifiable
self.es_configs = [
{
"name": "Direct ES Connection",
"url": "https://127.0.0.1:9221",
"index": "bulk_test",
"username": "admin",
"password": "admin",
"verify_ssl": False
},
{
"name": "Gateway Proxy",
"url": "http://localhost:8000",
"index": "gateway_bulk_test",
"username": None,
"password": None,
"verify_ssl": False
}
]
self.batch_size = 10000 # Documents per bulk operation
self.log_interval = 100000 # Log output every N bulk operations
# ID generation rules - prefix 2 digits, suffix 5 digits
self.id_prefix_start = 1
self.id_prefix_end = 999
self.id_suffix_start = 1
self.id_suffix_end = 9999
# Current ID counter
self.current_prefix = self.id_prefix_start
self.current_suffix = self.id_suffix_start
def generate_id(self) -> str:
"""Generate ID with fixed format - 2 digits prefix, 5 digits suffix"""
id_str = f"{self.current_prefix:02d}{self.current_suffix:05d}"
# Update counter
self.current_suffix += 1
if self.current_suffix > self.id_suffix_end:
self.current_suffix = self.id_suffix_start
self.current_prefix += 1
if self.current_prefix > self.id_prefix_end:
self.current_prefix = self.id_prefix_start
return id_str
def generate_random_hash(self, length: int = 32) -> str:
"""Generate random hash value"""
random_string = ''.join(random.choices(string.ascii_letters + string.digits, k=length))
return hashlib.md5(random_string.encode()).hexdigest()
def generate_document(self) -> Dict[str, Any]:
"""Generate random document content"""
return {
"timestamp": datetime.now().isoformat(),
"field1": self.generate_random_hash(),
"field2": self.generate_random_hash(),
"field3": self.generate_random_hash(),
"field4": random.randint(1, 1000),
"field5": random.choice(["A", "B", "C", "D"]),
"field6": random.uniform(0.1, 100.0)
}
def create_bulk_payload(self, index_name: str) -> str:
"""Create bulk write payload"""
bulk_data = []
for _ in range(self.batch_size):
doc = self.generate_document()
# Add index operation
bulk_data.append(.dumps({
"index": {
"_index": index_name
}
}))
bulk_data.append(.dumps(doc))
return "\n".join(bulk_data) + "\n"
def bulk_index(self, config: Dict[str, Any], payload: str) -> bool:
"""Execute bulk write"""
url = f"{config['url']}/_bulk"
headers = {
"Content-Type": "application/x-nd"
}
# Set authentication information
auth = None
if config.get('username') and config.get('password'):
auth = (config['username'], config['password'])
try:
response = requests.post(
url,
data=payload,
headers=headers,
auth=auth,
verify=config.get('verify_ssl', True),
timeout=30
)
return response.status_code == 200
except Exception as e:
print(f"Bulk write failed: {e}")
return False
def refresh_index(self, config: Dict[str, Any]) -> bool:
"""Refresh index"""
url = f"{config['url']}/{config['index']}/_refresh"
# Set authentication information
auth = None
if config.get('username') and config.get('password'):
auth = (config['username'], config['password'])
try:
response = requests.post(
url,
auth=auth,
verify=config.get('verify_ssl', True),
timeout=10
)
success = response.status_code == 200
print(f"Index refresh {'successful' if success else 'failed'}: {config['index']}")
return success
except Exception as e:
print(f"Index refresh failed: {e}")
return False
def run_test(self, config: Dict[str, Any], round_num: int, total_iterations: int = 100000):
"""Run performance test"""
test_name = f"{config['name']}-Round {round_num}"
print(f"\nStarting test: {test_name}")
print(f"ES address: {config['url']}")
print(f"Index name: {config['index']}")
print(f"Authentication: {'Enabled' if config.get('username') else 'Disabled'}")
print(f"Bulk size: {self.batch_size} documents")
print(f"Total planned writes: {total_iterations * self.batch_size} documents")
print("-" * 50)
start_time = time.time()
success_count = 0
error_count = 0
for i in range(1, total_iterations + 1):
payload = self.create_bulk_payload(config['index'])
if self.bulk_index(config, payload):
success_count += 1
else:
error_count += 1
# Log output every N operations
if i % self.log_interval == 0:
elapsed_time = time.time() - start_time
rate = i / elapsed_time if elapsed_time > 0 else 0
print(f"Completed {i:,} bulk operations, time elapsed: {elapsed_time:.2f}s, rate: {rate:.2f} bulk/s")
end_time = time.time()
total_time = end_time - start_time
total_docs = total_iterations * self.batch_size
print(f"\n{test_name} test completed!")
print(f"Total time: {total_time:.2f}s")
print(f"Successful bulk operations: {success_count:,}")
print(f"Failed bulk operations: {error_count:,}")
print(f"Total documents: {total_docs:,}")
print(f"Average bulk rate: {success_count/total_time:.2f} bulk/s")
print(f"Document write rate: {total_docs/total_time:.2f} docs/s")
print("=" * 60)
return {
"test_name": test_name,
"config_name": config['name'],
"round": round_num,
"es_url": config['url'],
"index": config['index'],
"total_time": total_time,
"success_count": success_count,
"error_count": error_count,
"total_docs": total_docs,
"bulk_rate": success_count/total_time,
"doc_rate": total_docs/total_time
}
def run_comparison_test(self, total_iterations: int = 10000):
"""Run dual-address comparison test"""
print("Elasticsearch Bulk Write Performance Test Started")
print(f"Test time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 60)
results = []
rounds = 2 # Test each configuration 2 rounds
# Test all configurations in loop
for config in self.es_configs:
print(f"\nStarting test for: {config['name']}")
print("*" * 40)
for round_num in range(1, rounds + 1):
# Run test
result = self.run_test(config, round_num, total_iterations)
results.append(result)
# Refresh index after each round
print(f"\nRound {round_num} completed, refreshing index...")
self.refresh_index(config)
# Reset ID counter
if round_num == 1:
# Round 1: Use initial ID range (new data)
print("Round 1: New data mode")
else:
# Round 2: Reuse same IDs (update data mode)
print("Round 2: Data update mode, reusing Round 1 IDs")
self.current_prefix = self.id_prefix_start
self.current_suffix = self.id_suffix_start
print(f"{config['name']} Round {round_num} test completed\n")
# Output comparison results
print("\nPerformance Comparison Results:")
print("=" * 80)
# Group results by configuration
config_results = {}
for result in results:
config_name = result['config_name']
if config_name not in config_results:
config_results[config_name] = []
config_results[config_name].append(result)
for config_name, rounds_data in config_results.items():
print(f"\n{config_name}:")
total_time = 0
total_bulk_rate = 0
total_doc_rate = 0
for round_data in rounds_data:
print(f" Round {round_data['round']}:")
print(f" Time: {round_data['total_time']:.2f}s")
print(f" Bulk rate: {round_data['bulk_rate']:.2f} bulk/s")
print(f" Document rate: {round_data['doc_rate']:.2f} docs/s")
print(f" Success rate: {round_data['success_count']/(round_data['success_count']+round_data['error_count'])*100:.2f}%")
total_time += round_data['total_time']
total_bulk_rate += round_data['bulk_rate']
total_doc_rate += round_data['doc_rate']
avg_bulk_rate = total_bulk_rate / len(rounds_data)
avg_doc_rate = total_doc_rate / len(rounds_data)
print(f" Average performance:")
print(f" Total time: {total_time:.2f}s")
print(f" Average bulk rate: {avg_bulk_rate:.2f} bulk/s")
print(f" Average document rate: {avg_doc_rate:.2f} docs/s")
# Overall comparison
if len(config_results) >= 2:
config_names = list(config_results.keys())
config1_avg = sum([r['bulk_rate'] for r in config_results[config_names[0]]]) / len(config_results[config_names[0]])
config2_avg = sum([r['bulk_rate'] for r in config_results[config_names[1]]]) / len(config_results[config_names[1]])
if config1_avg > config2_avg:
faster = config_names[0]
rate_diff = config1_avg - config2_avg
else:
faster = config_names[1]
rate_diff = config2_avg - config1_avg
print(f"\nOverall performance comparison:")
print(f"{faster} has better average performance, {rate_diff:.2f} bulk/s higher")
print(f"Performance improvement: {(rate_diff/min(config1_avg, config2_avg)*100):.1f}%")
def main():
"""Main function"""
tester = ESBulkTester()
# Run test (10,000 docs per bulk, 30 bulks = 300,000 documents)
tester.run_comparison_test(total_iterations=30)
if __name__ == "__main__":
main()
1. Log Scenario: Writes Without IDs
Test conditions:
- Bulk write data without document IDs
- 10,000 documents per bulk batch, totaling 300,000 documents
Results:
Performance Comparison Results:
================================================================================
Direct ES Connection:
Round 1:
Time: 152.29s
Bulk rate: 1.97 bulk/s
Document rate: 19699.59 docs/s
Success rate: 100.00%
Average performance:
Total time: 152.29s
Average bulk rate: 1.97 bulk/s
Average document rate: 19699.59 docs/s
Gateway Proxy:
Round 1:
Time: 115.63s
Bulk rate: 2.59 bulk/s
Document rate: 25944.35 docs/s
Success rate: 100.00%
Average performance:
Total time: 115.63s
Average bulk rate: 2.59 bulk/s
Average document rate: 25944.35 docs/s
Overall performance comparison:
Gateway Proxy has better average performance, 0.62 bulk/s higher
Performance improvement: 31.7%
2. Business Scenario: Writes With Document IDs
Test conditions:
- Bulk write data with document IDs, with consistent and duplicate ID generation rules for both tests
- 10,000 documents per bulk batch, totaling 300,000 documents
Results:
Performance Comparison Results:
================================================================================
Direct ES Connection:
Round 1:
Time: 155.30s
Bulk rate: 1.93 bulk/s
Document rate: 19317.39 docs/s
Success rate: 100.00%
Average performance:
Total time: 155.30s
Average bulk rate: 1.93 bulk/s
Average document rate: 19317.39 docs/s
Gateway Proxy:
Round 1:
Time: 116.73s
Bulk rate: 2.57 bulk/s
Document rate: 25700.06 docs/s
Success rate: 100.00%
Average performance:
Total time: 116.73s
Average bulk rate: 2.57 bulk/s
Average document rate: 25700.06 docs/s
Overall performance comparison:
Gateway Proxy has better average performance, 0.64 bulk/s higher
Performance improvement: 33.0%
Summary
Whether in log scenarios or business scenarios where data synchronization is more valuable, gateway write acceleration can consistently save 25-30% of write time.