In Apache Storm, a Stream represents an unbounded sequence of tuples processed in parallel across a distributed topology. Each stream carries tuples—ordered collections of typed values—between spouts and bolts. By default, Storm supports primitive types including integers, longs, shorts, bytes, booleans, doubles, floats, strings, and byte arrays, alongside custom serializable objects when properly registered.
Tuple Structure and Access Patterns
The Tuple interface serves as the primary data container within Storm, extending the ITuple interface with TupleImpl as the standard implementation. Unlike statically-typed systems, tuples maintain dynamic typing through positional access methods such as getInteger(), getString(), and getLong(), eliminating manual casting requirements while preserving type safety at runtime.
Storm delegates serialization to Kryo, a high-performance binary serialization framework. While primitives and common collections (ArrayList, HashMap, HashSet) serialize automatically, complex domain objects require explicit registration to ensure proper transmission between worker nodes.
Schema Declaration with OutputFieldsDeclarer
Streams enforce contracts through field declarations. The OutputFieldsDeclarer interface enables bolts to define output schemas, establishing key-value semantics where field names map to positional values in emitted tuples.
Consider the following bolt implementation that processes text lines:
package processors;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class MessageTransformer implements IRichBolt {
private OutputCollector outputCollector;
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.outputCollector = collector;
}
public void execute(Tuple input) {
String textLine = input.getStringByField("line");
String[] tokens = textLine.split("\\s+");
for (String rawToken : tokens) {
String cleanToken = rawToken.trim();
if (!cleanToken.isEmpty()) {
cleanToken = cleanToken.toUpperCase();
List<Tuple> anchors = new ArrayList<>();
anchors.add(input);
outputCollector.emit(anchors, new Values(cleanToken));
}
}
outputCollector.ack(input);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("term"));
}
public void cleanup() {}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
This bolt establishes a schema containing the field "term", corresponding to the uppercase string values emitted during execution. Downstream components reference this field name to extract values from received tuples.
Dynamic Typing Architecture
Storm employs dynamic typing rather than static type constraints found in systems like Hadoop MapReduce. This design decision stems from several architectural considerations:
First, static typing would significantly complicate the API when bolts subscribe to multiple input streams with divergent schemas. A bolt processing heterogeneous streams would require overloaded execution methods or complex generic signatures for each possible tuple variation. Dynamic typing allows a single execute(Tuple input) signature to handle polymorphic inputs gracefully.
Second, Storm's polyglot nature—supporting JVM languages like Clojure and JRuby—benefits from runtime type flexibility, as these languages employ dynamic type systems themselves.
Serialization Configuration
Storm leverages Kryo for efficient object graph serialization. To register custom types, configure the topology.kryo.register property in your topology configuration, supporting two registration patterns:
- Class-only registration: Storm applies Kryo's default
FieldSerializer - Custom serializer mapping: Associates a class with a specific
com.esotericsoftware.kryo.Serializerimplementation
Configuration example:
topology.kryo.register:
- org.example.data.EventPayload
- org.example.data.MetricEvent: org.example.serialization.MetricSerializer
- org.example.data.LogEntry
Here, EventPayload and LogEntry utilize default field-based serialization, while MetricEvent employs the custom MetricSerializer.
The Config.registerSerialization() method provides programmatic registration. Additionally, setting Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS to true enables topology deployment despite missing serializer classes on the classpath—useful when managing diverse topologies through shared storm.yaml configurations.
Java Serialization Fallback
When encountering unregistered types, Storm optionally delegates to standard Java serialization. However, this mechanism incurs substantial performance penalties regarding CPU utilization and output size. For production deployments, explicit Kryo registration remains strongly recommended.
Disable Java serialization fallback by setting Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION to false, forcing strict serializer requirements.
Component-Specific Registrations
Storm 0.7.0+ supports serializer registrations at the component level through component-specific configurations. During topology submission, Storm merges component-specific registrations with topolgoy-wide settings to determine the complete serialization context.
When multiple serializers target identical classes, Storm selects arbitrarily unless overridden. To resolve conflicts, specify the desired serializer in the component configuration, which takes precedence over general topology registrations. Note that serializers must remain consistent across communicating components; otherwise, receiving bolts cannot deserialize emitted messages.