0
votes

At https://hexdocs.pm/gen_stage/GenStage.html#module-init-and-subscribe_to I define the GenStage modules with subscribe_to option

defmodule A do
  use GenStage

  def start_link(number) do
    GenStage.start_link(A, number)
  end

  def init(counter) do
    {:producer, counter}
  end

  def handle_demand(demand, counter) when demand > 0 do
    # If the counter is 3 and we ask for 2 items, we will
    # emit the items 3 and 4, and set the state to 5.
    events = Enum.to_list(counter..counter+demand-1)
    {:noreply, events, counter + demand}
  end
end

defmodule B do
  use GenStage

  def start_link(number) do
    GenStage.start_link(B, number)
  end

  def init(number) do
    {:producer_consumer, number, subscribe_to: [{A, max_demand: 10}]}
  end

  def handle_events(events, _from, number) do
    events = Enum.map(events, & &1 * number)
    {:noreply, events, number}
  end
end

defmodule C do
  use GenStage

  def start_link() do
    GenStage.start_link(C, :ok)
  end

  def init(:ok) do
    {:consumer, :the_state_does_not_matter, subscribe_to: [B]}
  end

  def handle_events(events, _from, state) do
    # Wait for a second.
    Process.sleep(1000)

    # Inspect the events.
    IO.inspect(events)

    # We are a consumer, so we would never emit items.
    {:noreply, [], state}
  end
end

If I run them manually it works

iex(1)> GenStage.start_link(A, 0, name: A)
{:ok, #PID<0.195.0>}
iex(2)> GenStage.start_link(B, 2, name: B)
{:ok, #PID<0.197.0>}
iex(3)> GenStage.start_link(C, :ok)
{:ok, #PID<0.199.0>}
[0, 2, 4, 6, 8]
[10, 12, 14, 16, 18]
[20, 22, 24, 26, 28]
[30, 32, 34, 36, 38]
‘(*,.0’

Then it suggest it can be added in a supervisor tree:

In a supervision tree, this is often done by starting multiple workers:

defmodule TestDep.Application do
  @moduledoc false

  use Application

  def start(_type, _args) do
    import Supervisor.Spec

    children = [
      worker(A, [0]),
      worker(B, [2]),
      worker(C, []),
    ]
    opts = [strategy: :rest_for_one]
    Supervisor.start_link(children, opts)
  end
end

That’s my supervisor tree but when running the app with iex -S mix I receive a:

** (Mix) Could not start application testdep: TestDep.Application.start(:normal, []) returned an error: shutdown: failed to start child: B ** (EXIT) no process: the process is not alive or there’s no process currently associated with the given name, possibly because its application isn’t started

My app is defined on mix.ex as

def application do
[
extra_applications: [:logger],
mod: {TestDep.Application, []}
]
end

Is there something I am missing?

1

1 Answers

1
votes

If I run them manually it works

It does not :( What works is not what you wanted to test and is not the equivalent of what you are starting in the supervision tree, this is:

A.start_link(0)
B.start_link(2)
C.start_link()

That said, you likely want to pass names to wrapped GenStage.start_link:

defmodule A do
  use GenStage

  def start_link(number) do
    #                            ⇓⇓⇓⇓⇓⇓⇓⇓⇓ THIS
    GenStage.start_link(A, number, name: A)
  end

And the same for the rest.