This article provides a comprehensive explanation of Storm topologies. It assumes you have a basic understanding of Storm concepts. The topics covered include:
- TopologyBuilder API usage
- Submitting topologies to production clusters
- Running topologies locally for development and testing
Core Concepts
Building Topologies with TopologyBuilder
TopologyBuilder exposes the Java API for declaring Storm topologies. This class simplifies the creation of topologies by abstracting the underlying Thrift structures. While Storm topology are ultimately represented as Thrift objects, TopologyBuilder provides a much more approachable interface for developers.
The API uses component identifiers to establish connections between spouts and bolts. When you declare a spout or bolt, you assign it a unique identifier that downstream components reference when declaring their inputs. This identifier-based approach makes topology definitions clear and maintainable.
Local Development Example
When developing Storm applications, running topologies locally is essential for rapid iteration and debugging. The LocalCluster class provides a complete Storm runtime within your IDE without requiring a full cluster deployment.
// Initialize the topology builder
TopologyBuilder builder = new TopologyBuilder();
// Configure spouts with unique identifiers and parallelism hints
builder.setSpout("sentence-emitter", new RandomSentenceSpout(true), 5);
builder.setSpout("reference-source", new RandomSentenceSpout(true), 3);
// Define bolt with stream groupings
builder.setBolt("word-processor", new WordCountBolt(), 3)
.fieldsGrouping("sentence-emitter", new Fields("word"))
.fieldsGrouping("reference-source", new Fields("word"));
builder.setBolt("aggregator", new TotalCountBolt())
.globalGrouping("sentence-emitter");
// Configure topology settings
Map<String, Object> config = new HashMap<>();
config.put(Config.TOPOLOGY_WORKERS, 4);
config.put(Config.TOPOLOGY_DEBUG, true);
// Submit to local cluster for testing
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count-topology", config, builder.createTopology());
// Allow topology to run for specified duration
Thread.sleep(10000);
cluster.shutdown();
The parallelism hint (the third parameter in setSpout and setBolt) controls how many executor threads Storm creates for that component. Higher values distribute the workload across more threads, improving throughput for computationally intensive operations.
Production Deployment
Deployment Process
Deploying topologies to a production Storm cluster involves several steps. First, define your topology using TopologyBuilder in your Java code. The definition process remains identical to local development—the same API applies regardless of deployment target.
Next, package you're application as a JAR file containing all dependencies except Storm itself. Storm libraries are already available on the cluster classpath, so including them would cause conflicts. Maven users can achieve this using the Maven Assembly Plugin with the following configuration:
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.example.StormApplication</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
Execute mvn assembly:assembly to generate the executable JAR. Before submission, verify that Storm libraries are not bundled—this prevents classpath conflicts with the cluster's existing Storm installation.
Finally, submit the topology using the Storm CLI:
storm jar path/to/application.jar com.example.StormApplication arg1 arg2 arg3
The storm jar command handles several important tasks: uploading your JAR to the cluster, establishing connections to the correct Storm cluster configuration, and invoking your main class with the specified arguments. Your main method should use StormSubmitter to register the topology with the cluster.
Production Submission Code
// Define topology using TopologyBuilder
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("data-source", new RandomSentenceSpout(true), 5);
// Configure bolts and their connections
builder.setBolt("processor", new ProcessingBolt(), 8)
.shuffleGrouping("data-source");
// Set up cluster configuration
Config config = new Config();
config.setNumWorkers(20);
config.setMaxSpoutPending(5000);
// Submit to production cluster
StormSubmitter.submitTopology("production-topology", config, builder.createTopology());
Common Configuration Parameters
Storm provides extensive configuration options for tuning topology behavior. Properties prefixed with TOPOLOGY can be overridden on a per-topology basis, while cluster-level configurations apply globally and cannot be modified by individual topologies.
TOPOLOGY_WORKERS controls the number of worker processes allocated to your topology. Each worker runs as a separate JVM process and hosts multiple executor threads. For example, setting this to 25 with 150 total tasks means each worker handles approximately 6 task executors. Adjust this based on your cluster capacity and the computational requirements of your topology.
TOPOLOGY_ACKER_EXECUTORS configures the number ofacker tuples tracking tuples through the processing pipeline. Aackers ensure reliable message processing by tracking tuple trees until spout tuples are fully processed. When unspecified, Storm defaults this value to match the number of workers. Setting it to 0 disables reliability guarantees entirely, which may improve performance for scenarios where tuple loss is acceptable.
TOPOLOGY_MAX_SPOUT_PENDING limits the number of unacknowledged tuples a spout can have in flight at any time. This backpressure mechanism prevents queue overflow and controls memory consumption. A well-tuned value depends on your tuple processing latency and acceptable memory usage.
TOPOLOGY_MESSAGE_TIMEOUT_SECS defines how long Storm waits before considering a tuple failed. The default of 30 seconds suits most topologies, but adjust based on your processing requirements. Shorter timeouts fail faster but may prematurely timeout legitimate slow-processing tuples.
TOPOLOGY_SERIALIZATIONS registers custom serializers for complex data types that Storm will transmit between workers. This improves performance when working with non-primitive types by providing optimized serialization logic.
Managing Running Topologies
Stopping Topologies
To stop a runing topology, use the kill command with your topology's name:
storm kill topology-name
Topology termination is not instantaneous. Storm first disables spout emission, preventing new tuples from entering the system. It then waits for the message timeout duration configured by TOPOLOGY_MESSAGE_TIMEOUT_SECS, allowing in-flight tuples to complete processing. Finally, worker processes are terminated. This graceful shutdown ensures no tuples are lost during the transition.
Updating Topologies
Currently, Storm requires you to stop and resubmit a topology to deploy changes. The stop-then-restart approach ensures clean state transitions but causes brief service interruption. The Storm development team has announced plans for a swap command that would enable zero-downtime topology replacements, allowing smooth transitions between versions with minimal tuple loss.
Monitoring
The Storm UI provides comprehensive visibility into topology performance. Access it through your cluster's web interface to view real-time metrics including error rates, throughput statistics, and latency measurements for each component. The UI also displays executor-level details, helping identify bottlenecks or stragglers within your topology.
Additionally, examine cluster logs for detailed error traces and operational insights. Configure log aggregation to streamline this process in production environments.