0
votes

I am simply doing few operations against a list of servers from db as

list_servers()
|> Enum.map(fn(server) ->
  go_through_and_email(server)
end)

my only question is that, is there any possibility to start each go_through_email(server) process for each server, separately? Right now, It gets completed for the first one and then goes to the 2nd one. Can we run them parallelly? as list_servers could have 100 or 100+ objects.

I don't want to use Task.await or Task.async, are there any other options to explore?

4
Why don't you want to use those Task functions?Dogbert
I have a very bad experience from this in past, Each and every time, I put things in Task, they only get timed out. and FAILED..Junaid Farooq
You probably just needed to specify a larger timeout to Task.await. The default is 5 seconds. A simple Task.async + await will spawn all the tasks at the same time though. You probably want to spawn in batches of e.g. 10 I guess (especially if the list is large)?Dogbert
Okay, what you are saying sounds good, But I have no idea and I haven't done anything with it in past as well. the maximum, I experienced is to Remove Task.await, in our actual system, Do you have any example to that?Junaid Farooq

4 Answers

2
votes

You can use Task.async_stream/3. It allows you to pass an enumerable and run a function on them, with an option to limit the maximum number of tasks to run in parallel using the max_concurrency option. The default timeout is 5000ms, you'll probably want to specify a larger one.

Here's a small example:

1..20
|> Task.async_stream(fn x ->
  :timer.sleep(1000)
  x
end, max_concurrency: 4, timeout: 10_000)
|> Enum.each(&IO.inspect/1)

If you run this, you should see {:ok, 1} to {:ok, 20} printed in batches of 4 every second. The whole thing should finish in 5 seconds, as expected when running a 1 second sleep in batches of 4 for 20 items.

1
votes

You can use GenServer as well. If list_servers() and go_through_and_email() will be defined as helper functions you should be able to do something like this:

defmodule ExampleGenServer do
  use GenServer

  ## Client API
  def start_link(name) do
    GenServer.start_link(__MODULE__, [], name: name)
  end

  def go_through(name) do
    GenServer.cast(name, {:go_through, name})
  end

  ## Server API / Callbacks
  def init(initial_state) do
    {:ok, initial_state}
  end

  def handle_cast({:go_through, name}, state) do
    state = go_through_and_email(server)
    {:noreply, state}
  end

  def handle_info({:some_info}, state) do
    # stuff
    {:noreply, state}
  end

  ## Helper functions
  defp list_servers() do
    # stuff
  end

  defp go_through_and_email() do
    # stuff
  end
end

Then you could create new instance of GenServer in a loop:

list_servers()
|> Enum.map(fn(server) ->
  ExampleGenserver.start_link(server)
end)

I've made it quickly during a break so I could miss / mess up something ;) If you will need some additional details about GenServers you may refer to my Gist. It describes GenServer client / server API, messaging and arg passing ;) And one more thing, remember about GenServer teardown! ;)

1
votes

One thing you can do is to use Task.start, but you don't want to use Task module.

Another thing is to get rid of go_through_email(server) in the Enum.map. Assuming that server is DB Server, you should create a GenServers that will communicate with DB servers and in the map you would send a message to each of the GenServers.

Enum.map(list_of_gen_servers, &send_message_to_go_through_email(&1))

And then every GenServer would handle the code in the parallel. Of course you would need to have the list of GenServers prepared. It's definitely better approach than using send / receive explicitly.

Edit:

Task module would be great if you want to fire and forget. GenServers would be better choice, if you would frequently access DB servers and they allow to have full control how the communication model should looks like.

0
votes

You can also defer the real task to another supervisor. I can't decide whether a simple function or a gen server is needed in your case, but have a look at simple_one_for_one supervisor.

This kind of supervisor is a perfect fit for this kind of needs (sending emails, if I got it correctly).

PS. You said no Task, but playing with Task.async_stream and timeouts is another fit option too.