Stream Processing Fundamentals
Java 8 introduced the Stream API, enabling declaratvie data processing similar to SQL queries. Streams provide a higher-level abstraction for collection operations, improving code clarity and efficiency.
<div style="text-align:center">
+--------------------+ +------+ +------+ +---+ +-------+
| stream of elements +-----> |filter+-> |sorted+-> |map+-> |collect|
+--------------------+ +------+ +------+ +---+ +-------+
</div>
// Pipeline implementation
List<Integer> productWeights =
inventory.getItems().stream()
.filter(item -> item.getColor() == BLUE)
.sorted(Comparator.comparingInt(Item::getWeight))
.mapToInt(Item::getWeight)
.sum();
Stream Characteristics
A Stream represents a sequence of elements from a data source (collections, arrays, I/O) supporting aggregate operations:
- Element sequence: Processes elements on-demand without storage
- Data source: Origin of stream elements
- Aggregate operations: Filter, map, reduce, match, sort
Key features distinguishing Streams from collections:
- Pipelining: Operations return streams for fluent chaining with lazy evaluation
- Internal iteration: Automatic traversal via Visitor pattern
Stream Creation Methods
// Collection-based streams
List<String> words = Arrays.asList("hello", "", "world", "java");
List<String> nonEmpty = words.stream().filter(s -> !s.isEmpty()).toList();
// Static factory methods
Stream<Integer> numberStream = Stream.of(10, 20, 30);
Stream<Integer> evenNumbers = Stream.iterate(0, n -> n + 2);
Stream<Double> randomValues = Stream.generate(Math::random);
Intermediate Operations
Operations that trnasform streams without immediate execution:
Filtering and Slicing
employees.stream()
.filter(emp -> emp.getYearsExperience() > 5)
.limit(5)
.distinct()
.skip(2)
.forEach(System.out::println);
Transformation Operations
// Simple mapping
List<String> names = employees.stream()
.map(Employee::getFullName)
.toList();
// Flattening nested structures
List<Character> letters = words.stream()
.flatMap(this::extractCharacters)
.toList();
Sorting Mechanisms
// Natural ordering
numbers.stream().sorted().forEach(System.out::println);
// Custom ordering
employees.stream()
.sorted((e1, e2) -> {
int ageComp = e1.getAge().compareTo(e2.getAge());
return ageComp != 0 ? ageComp : e1.getName().compareTo(e2.getName());
})
.forEach(System.out::println);
Terminal Operations
Operations that trigger pipeline execution and produce results:
Matching and Finding
boolean allMatch = statusList.stream().allMatch(Status.ACTIVE::equals);
boolean anyMatch = statusList.stream().anyMatch(Status.INACTIVE::equals);
Optional<Status> first = statusList.stream().findFirst();
long totalCount = statusList.stream().count();
Reduction and Collection
// Summation
int total = numbers.stream().reduce(0, Integer::sum);
// Aggregation
DoubleSummaryStatistics stats = employees.stream()
.collect(Collectors.summarizingDouble(Employee::getSalary));
// Grouping
Map<Department, List<Employee>> byDept = employees.stream()
.collect(Collectors.groupingBy(Employee::getDepartment));
Paralel Stream Processing
// Parallel computation
long result = LongStream.rangeClosed(1, 100_000_000)
.parallel()
.reduce(0, Long::sum);
Fork/Join Framework
public class SumCalculator extends RecursiveTask<Long> {
private final long start;
private final long end;
private static final long THRESHOLD = 10_000;
public SumCalculator(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long length = end - start;
if (length <= THRESHOLD) {
return calculateSequentially();
}
long mid = start + length/2;
SumCalculator left = new SumCalculator(start, mid);
SumCalculator right = new SumCalculator(mid + 1, end);
left.fork();
return right.compute() + left.join();
}
private long calculateSequentially() {
long sum = 0;
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
}
}