0
votes

I am trying to understand elixir stream.
First I have a list that iterate through a range and multiple with 2.

iex(5)>   stream = 1..3 |>
...(5)>   Enum.map(&IO.inspect(&1)) |>
...(5)>   Enum.map(&(&1 * 2)) |>
...(5)>   Enum.map(&IO.inspect(&1)) |>
...(5)>   Enum.reduce(4, &+/2)
1
2
3
2
4
6
16

Here everything is clear, every enum return a list for further processing the list. It iterates four times for each element in the list.
Lets build it with stream:

iex(4)>   stream = 1..3 |>
...(4)>   Stream.map(&IO.inspect(&1)) |>
...(4)>   Stream.map(&(&1 * 2)) |>
...(4)>   Stream.map(&IO.inspect(&1)) |>
...(4)>   Enum.reduce(4, &+/2)
1
2
2
4
3
6
16

Here it iterates only at once for each number in the range not like the example above with enum. What I do not understand is, how the accumulator of Enum.reduce keep the value and make a recursive call by them self, when it iterates only once?
When I iterate an enum as follow:

iex(25)> f = fn(x, y) ->
...(25)> IO.puts(y)
...(25)> x + y
...(25)> end
#Function<12.54118792/2 in :erl_eval.expr/5>
iex(26)> iter = 1..5 |> Enum.reduce(4, f)
4
5
7
10
14
19

then I can imaging, how enum.reduce pass the accumulator to the next recursive call.

But how the stream pass the accumulator for the next recursion call, if the iteration by stream execute only once?

1
Your question is very confused - it's not that complicated take a look at elixir-lang.org/getting-started/enumerables-and-streams.html - GavinBrelstaff
You can go to Elixir language in GitHub and take a look at the source code although it may be a mapping from Erlang. - Sasha Fonseca

1 Answers

4
votes

Not sure what it is that you're missing/asking, but let me give explaining this a try. First, you have to know how streams work and are represented in Elixir: streams are just a set of functions to operate on an enumerable. When you compose streams (e.g., 1..3 |> Stream.map(...) |> Stream.map(...)), you're not doing anything "concrete" with the stream, you're just adding functions to the list of functions that will be used to process the initial enumerable. When call Enum.reduce/3 with a stream, that's when you start to iterate over the enumerable (one element at a time).

Your example could maybe be understood better by inspecting the progress of Enum.reduce/3 too:

iex> printer1 = fn el -> IO.puts "1st iteration, el: #{inspect el}"; el end
iex> printer2 = fn el -> IO.puts "2nd iteration, el: #{inspect el}"; el end
iex> reducer = fn el, acc ->
...>   IO.puts "reducing, el: #{inspect el}, acc: #{inspect acc}\n"
...>   el + acc
...> end
iex> 1..3 |> Stream.map(printer1) |> Stream.map(&(&1 + 10)) |> Stream.map(printer2) |> Enum.reduce(0, reducer)
1st iteration, el: 1
2nd iteration, el: 11
reducing, el: 11, acc: 0

1st iteration, el: 2
2nd iteration, el: 12
reducing, el: 12, acc: 11

1st iteration, el: 3
2nd iteration, el: 13
reducing, el: 13, acc: 23

36

As you can see, there's little magic happening: when we call Enum.reduce/3, the stream starts to get unfolded and we go through each element mapping the three functions and calling reducer on it and the accumulator.

It may help you to study the docs for the Enumerable protocol as that protocol is used to get elements out of streams (usually one at a time). When you chain Enum.map/2 operations (as you do in your first example), Enum.reduce/3 will receive a list as its first argument, so the Enumerable protocol for lists will be used: its implementation just iterates over the list and "yields" elements of the list to Enumerable.reduce, one at a time. When you chain Stream.map/2 operations instead, Enum.reduce/3 will receive a stream and thus the Enumerable protocol for streams will be used: its implementation takes each element out of the original enumerable (1..3), applies the stream operations (the chain of Stream.map/2 in this case), and yields their results one at a time.

Hope I've made things clearer for you, please ask anything if you still have doubts :).