0
votes

Currently I have something like this:

ref = Process.monitor(worker)

receive do
  {:DOWN, ^ref, :process, ^worker, :normal} ->
    IO.puts("Normal exit from #{inspect(worker)}")

  {:DOWN, ^ref, :process, ^worker, msg} ->
    IO.puts("Received :DOWN from #{inspect(worker)}")
end

That worker have a start_link which send a stream of data into a consumer, all this happens inside a mix task, so if I dont add the receive do, as soon as the mix dies, it also kills the children process, same way as if I run it from a iex -S mix shell.

flow
... some producer consumer in the middle
|> Flow.into_specs(consumer)

The thing is, when the flow is empty (is a finite flow), the worker doesnt seem to die, the receive do doesnt get triggered, is there any other way to achieve this?

Edit:

Testing with a dummy process I dont even need receive do

defmodule Mix.Tasks.Stuff do


  def run([]) do
    {:ok, worker} = Worker.start_link([])
  end
end

defmodule Worker do

  def start_link([]) do
    Enum.map([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], fn number ->
      IO.puts "COUNTING"
      Process.sleep(1000)
      IO.puts "-----------"
    end)
  end
end

But when the worker launches a Flow, if I dont wait with receive do it instantly dies as soon as I execute the task, so maybe that's the issue.

1
When the flow is empty it should return immediately, faster than your process goes to receive. Could you please |> IO.inspect of what Process.monitor(worker) returns?Aleksei Matiushkin
ref is just a pidlapinkoira
The worker is started like this; {:ok, worker} = Worker.start_link([]) then ref = Process.monitor(worker)lapinkoira

1 Answers

1
votes

According to the documentation on Process.monitor/1:

If the process is already dead when calling Process.monitor/1, a :DOWN message is delivered immediately.

That said when the Flow is empty the receive do sets handlers for :DOWN message after it was delivered. To handle the case one should swap the calls and prepare message handlers before starting to monitor the process.

receive do
  {:DOWN, ref, :process, ^worker, msg} ->
    case [retrieve_ref(), msg] do
      [^ref, :normal] ->
        IO.puts("Normal exit from #{inspect(worker)}")
      [^ref, msg] ->
        IO.puts("Received :DOWN from #{inspect(worker)}")
    end
end

ref = Process.monitor(worker)
store_ref_somewhere_ets_or_agent_or_whatever(ref)

I assume store_ref_somewhere_ets_or_agent_or_whatever/1 and respective retrieve_ref/0 would be easy to implement.