3
votes

I am trying to concatenate strings using parallel streams.

StringBuffer concat = Arrays.stream(grades)
        .parallel()
        .reduce(
                new StringBuffer(),
                (sb, s) -> sb.append(s),
                (sb1, sb2) -> sb1.append(sb2)
        );

Even though Using a collector(Mutable reduce) would be a better approach. I would like to know why this not returning correct result.

For Example, List<String> grades = List.of("A", "B");

While the non parallel version of this pipeline is working fine. The result I am seeing with this is BABA, while it should just AB.

I am already using StringBuffer which is thread-safe instead of StringBuilder.

I am also finding the same issue with the following code.

List<Integer> ages = people
            .stream()
            .parallel()
            .reduce(
                Collections.synchronizedList(new ArrayList<>()),
                (list, p) -> { list.add(p.getAge()); return list; },
                (list1, list2) -> { list1.addAll(list2) ; return list1; }
            );

Here also, I am using a synchronised Collection, and all the methods are thread-safe.

And I see this on the Java docs

However, there are good reasons to prefer a reduce operation over a mutative accumulation such as the above. Not only is a reduction "more abstract" -- it operates on the stream as a whole rather than individual elements -- but a properly constructed reduce operation is inherently parallelizable, so long as the function(s) used to process the elements are associative and stateless. For example, given a stream of numbers for which we want to find the sum, we can write:

int sum = numbers.stream().reduce(0, (x,y) -> x+y);   or:

int sum = numbers.stream().reduce(0, Integer::sum);   These reduction operations can run safely in parallel with almost no

modification:

int sum = numbers.parallelStream().reduce(0, Integer::sum);   Reduction parallellizes well because the implementation can operate on

subsets of the data in parallel, and then combine the intermediate results to get the final correct answer. (Even if the language had a "parallel for-each" construct, the mutative accumulation approach would still required the developer to provide thread-safe updates to the shared accumulating variable sum, and the required synchronization would then likely eliminate any performance gain from parallelism.) Using reduce() instead removes all of the burden of parallelizing the reduction operation, and the library can provide an efficient parallel implementation with no additional synchronization required.

From this, I understand that, it is very well possible to have a parallel reduction.

Am I missing something here? Is use of thread-safe data structures not enough?

1
Just continue to read that section, including the subsequent “Mutable reduction” topic. Your code using a synchronized list is still broken, by the way.Holger

1 Answers

4
votes

When you do new StringBuffer(), you are creating a reference to a single buffer. When you do .parallel(), both of the parallel streams are passed this reference and thereby operate on this same mutable buffer. The empty buffer first gets reduced with "B", then with "A", and is then reduced onto itself, resulting in "BABA".

For doing something like this with mutable structures such as StringBuffers, try .collect() instead:

StringBuffer concat = Arrays.stream(grades).parallel().collect(
    () -> new StringBuffer(),
    (sb, s) -> sb.append(s),
    (sb1, sb2) -> sb1.append(sb2));