Aligning Timestamps Across Time Zones
When shipping data from Logstash to Elasticsearch, the @timestamp field often reflects UTC time, causing mismatch with local time zones. A permanent fix involves adjusting the timestamp in the filter stage.
ruby {
code => "evt.set('local_ts', evt.get('@timestamp').time.localtime + 28800)"
}
ruby {
code => "evt.set('@timestamp', evt.get('local_ts'))"
}
mutate {
remove_field => ["local_ts"]
}
The above shifts the stored timestamp by eight hours to align with local time, eliminating manual adjustments downstream.
Splitting Log Lines into Structured Fields
Raw logs often need decomposition in to discrete attributes like timestamp, thread ID, severity level, class name, and message body. Using Grok patterns within Logstash filters facilitates structured parsing.
Assume Logback emits logs in this layout:
|2021-03-15 09:25:12.123|[Worker-7]|DEBUG|org.sample.Service|-Initialization complete
Define a matching Grok expression:
\|%{DATA:event_time}\|\[%{DATA:thread_id}\]\|%{DATA:severity}\|%{DATA:source_class}\|-%{GREEDYDATA:message_body}
Configure the filter:
grok {
match => {
"message" => "\\|%{DATA:event_time}\\|\[%{DATA:thread_id}\\]\|%{DATA:severity}\\|%{DATA:source_class}\\|-%{GREEDYDATA:message_body}"
}
}
This extracts individual components, enabling precise querying and filtering in Elasticsearch.
Defining Custom Index Templates
Without an explicit template, Logstash applies the default logstash mapping in Elasticsearch. To control field types and indexing behavior, create a dedicated template.
Register a template named custom_log_template:
PUT _template/custom_log_template
{
"index_patterns": ["app-logs-*"],
"order": 20,
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"severity": { "type": "keyword" },
"thread_id": { "type": "keyword" },
"source_class": { "type": "keyword" },
"message_body": { "type": "text" },
"event_time": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss.SSS" }
}
}
}
The index_patterns ensure indices prefixed with app-logs- adopt this mapping. order resolves conflicts when multiple templates match.
In Logstash output, reference the pattern via index naming:
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "app-logs-%{+YYYY.MM.dd}"
}
Routing Logs to Multiple Indices
Different log sources may require segregation into distinct indices. Use a custom type field during ingestion and branch output accordingly.
Input definitions:
file {
path => ["/var/data/app1/*.log"]
type => "app1_log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
file {
path => ["/var/data/app2/*.log"]
type => "app2_log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
Conditional outputs:
if [type] == "app1_log" {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "app1-index-%{+YYYY.MM.dd}"
}
}
if [type] == "app2_log" {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "app2-index-%{+YYYY.MM}"
}
}
Merging Multiline Exceptions
Stack traces span several lines, breaking readability if indexed line-by-line. Configure multiline handling in the input section to concatenate related lines.
Example configuration:
codec => multiline {
pattern => "^\\s*\\["
negate => true
what => "previous"
}
Lines not starting with optional whitespace followed by [ are appenedd to the preceding line, preserving exception context.
Consolidated Configuration Example
input {
file {
path => ["/var/data/app1/log.txt"]
type => "app1_log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
file {
path => ["/var/data/app2/*.log"]
type => "app2_log"
codec => multiline {
pattern => "^\\s*\\["
negate => true
what => "previous"
}
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
grok {
match => {
"message" => "\\|%{DATA:event_time}\\|\[%{DATA:thread_id}\\]\|%{DATA:severity}\\|%{DATA:source_class}\\|-%{GREEDYDATA:message_body}"
}
}
ruby {
code => "evt.set('local_ts', evt.get('@timestamp').time.localtime + 28800)"
}
ruby {
code => "evt.set('@timestamp', evt.get('local_ts'))"
}
mutate {
remove_field => ["local_ts"]
}
}
output {
stdout { codec => rubydebug }
if [type] == "app1_log" {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "app1-index-%{+YYYY.MM.dd}"
}
}
if [type] == "app2_log" {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "app2-index-%{+YYYY.MM}"
}
}
}
Common Issues and Remedies
- Pipeline creation failure: Ensure backslashes in paths use forward slashes (
/). - Mapping conflict for
hostfield: Rename conflicting field in filter:
mutate {
rename => { "host" => "host.name" }
}