Java Stream API: Functional Data Processing

Stream Processing Fundamentals

Java 8 introduced the Stream API, enabling declaratvie data processing similar to SQL queries. Streams provide a higher-level abstraction for collection operations, improving code clarity and efficiency.

<div style="text-align:center">
+--------------------+       +------+   +------+   +---+   +-------+
| stream of elements +-----> |filter+-> |sorted+-> |map+-> |collect|
+--------------------+       +------+   +------+   +---+   +-------+
</div>
// Pipeline implementation
List<Integer> productWeights = 
inventory.getItems().stream()
          .filter(item -> item.getColor() == BLUE)
          .sorted(Comparator.comparingInt(Item::getWeight))
          .mapToInt(Item::getWeight)
          .sum();

Stream Characteristics

A Stream represents a sequence of elements from a data source (collections, arrays, I/O) supporting aggregate operations:

  • Element sequence: Processes elements on-demand without storage
  • Data source: Origin of stream elements
  • Aggregate operations: Filter, map, reduce, match, sort

Key features distinguishing Streams from collections:

  • Pipelining: Operations return streams for fluent chaining with lazy evaluation
  • Internal iteration: Automatic traversal via Visitor pattern

Stream Creation Methods

// Collection-based streams
List<String> words = Arrays.asList("hello", "", "world", "java");
List<String> nonEmpty = words.stream().filter(s -> !s.isEmpty()).toList();

// Static factory methods
Stream<Integer> numberStream = Stream.of(10, 20, 30);
Stream<Integer> evenNumbers = Stream.iterate(0, n -> n + 2);
Stream<Double> randomValues = Stream.generate(Math::random);

Intermediate Operations

Operations that trnasform streams without immediate execution:

Filtering and Slicing

employees.stream()
         .filter(emp -> emp.getYearsExperience() > 5)
         .limit(5)
         .distinct()
         .skip(2)
         .forEach(System.out::println);

Transformation Operations

// Simple mapping
List<String> names = employees.stream()
                              .map(Employee::getFullName)
                              .toList();

// Flattening nested structures
List<Character> letters = words.stream()
                               .flatMap(this::extractCharacters)
                               .toList();

Sorting Mechanisms

// Natural ordering
numbers.stream().sorted().forEach(System.out::println);

// Custom ordering
employees.stream()
         .sorted((e1, e2) -> {
             int ageComp = e1.getAge().compareTo(e2.getAge());
             return ageComp != 0 ? ageComp : e1.getName().compareTo(e2.getName());
         })
         .forEach(System.out::println);

Terminal Operations

Operations that trigger pipeline execution and produce results:

Matching and Finding

boolean allMatch = statusList.stream().allMatch(Status.ACTIVE::equals);
boolean anyMatch = statusList.stream().anyMatch(Status.INACTIVE::equals);
Optional<Status> first = statusList.stream().findFirst();
long totalCount = statusList.stream().count();

Reduction and Collection

// Summation
int total = numbers.stream().reduce(0, Integer::sum);

// Aggregation
DoubleSummaryStatistics stats = employees.stream()
                                         .collect(Collectors.summarizingDouble(Employee::getSalary));

// Grouping
Map<Department, List<Employee>> byDept = employees.stream()
                                                   .collect(Collectors.groupingBy(Employee::getDepartment));

Paralel Stream Processing

// Parallel computation
long result = LongStream.rangeClosed(1, 100_000_000)
                        .parallel()
                        .reduce(0, Long::sum);

Fork/Join Framework

public class SumCalculator extends RecursiveTask<Long> {
    private final long start;
    private final long end;
    private static final long THRESHOLD = 10_000;

    public SumCalculator(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        long length = end - start;
        if (length <= THRESHOLD) {
            return calculateSequentially();
        }
        long mid = start + length/2;
        SumCalculator left = new SumCalculator(start, mid);
        SumCalculator right = new SumCalculator(mid + 1, end);
        left.fork();
        return right.compute() + left.join();
    }

    private long calculateSequentially() {
        long sum = 0;
        for (long i = start; i <= end; i++) {
            sum += i;
        }
        return sum;
    }
}

Tags: java StreamAPI FunctionalProgramming DataProcessing ParallelComputing

Posted on Tue, 19 May 2026 01:12:06 +0000 by factoring2117