0
votes

I am using GenStage and starting a few processes through it.

The code bit which is starting the process:

  defp start_snapshot_extractor(config, id) do
    config = Map.put(config, :id, id)
    case Process.whereis(:snapshot_extractor) do
      nil ->
        {:ok, pid} = GenStage.start_link(EvercamMedia.SnapshotExtractor.CloudExtractor, {}, name: :snapshot_extractor)
        pid
      pid -> pid
    end
    |> GenStage.cast({:snapshot_extractor, config})
  end

and in this module EvercamMedia.SnapshotExtractor.CloudExtractor

defmodule EvercamMedia.SnapshotExtractor.CloudExtractor do
  use GenStage
  require Logger
  import Commons
  import EvercamMedia.Snapshot.Storage

  @root_dir Application.get_env(:evercam_media, :storage_dir)

  def init(args) do
    {:producer, args}
  end

  def handle_cast({:snapshot_extractor, config}, state) do
    IO.inspect "here "
    _start_extractor(config)
    {:noreply, [], state}
  end
end

Now the issue is. I am using an endpoint to start this process with different configurations, sometimes with the same configuration.

When I first start the process it prints "here " and then after the completion of this process. it outputs "here " again, why is that the case? Instead of running both processes parallel why it's waiting for the first one to complete and then run?

Update:

this is how first method is being called

  extraction_pid = spawn(fn ->
    EvercamMedia.UserMailer.snapshot_extraction_started(full_snapshot_extractor, "Cloud")
    start_snapshot_extractor(config)
  end)
  :ets.insert(:extractions, {exid <> "-cloud-#{full_snapshot_extractor.id}", extraction_pid})
1
Are you sure you need a GenStage in the first place? The issue here is that the GenServer behind your producer is receiving messages from the process mailbox only upon the completion of previous handle_cast/2. You need to start multiple instances, probably managed by DynamicSupervisor`.Aleksei Matiushkin
Yes you are right I dont need GenStage, may be, I am actually following already written code, and in that one, it starts multiple processes in the same way.Junaid Farooq
I also updated that how I am staring the first method anyhow.Junaid Farooq
Try to remove name: :snapshot_extractor from the call to GenStage.start_link/3 only anonymous processes can be started that way; the named one would return :already_started.Aleksei Matiushkin
if I change snapshot_extractor it to snapshot_extractor_anynumber ?Junaid Farooq

1 Answers

0
votes

There is the only one message from the mailbox processed by each process at any time, even despite handle_cast/2 returns immediately. The process would ultimately finish processing the previous message before accessing the one waiting in the mailbox queue.

The easiest solution requiring a minimal number of changes would be probably to add id to the process name so that new processes for new id could be spawned. Also, you don’t need a lookup by name, GenStage.start_link/3 is smart enough to do that for you.

defp start_snapshot_extractor(config, id) do
  config = Map.put(config, :id, id)
  name = :"snapshot_extractor_#{id}"

  CloudExtractor
  |> GenStage.start_link({}, name: name)
  |> case do
    {:ok, pid} -> pid
    {:error, {:already_started, pid}} -> pid
  end
  |> GenStage.cast({:snapshot_extractor, config})
end