9
votes

I'm currently having an issue processing lists in elixir in parallel. The reason for the parallelism is that I'm saving the results to an API and if I blast them all at once it gets DDOS'd and shuts down.

The code below is supposed to split the result of a SQL query and process each row in a separate task and when all the tasks have been completed, it should terminate.

What happens is that the first task, after firing a message, causes the script to terminate. I've seen answers where they put the receive in a function and that function calls itself over and over but I feel like there has to be another better way to handle this.

results = Enum.chunk(results, 500)

# Give this process a name
Process.register(self(), :core)

# Loop over the chunks making a process for each
Enum.each results, fn(result) ->
  task = Task.async(fn -> Person.App.process(result, "Test", "1") end)
end

# And listen for messages
receive do
  {:hello, msg} -> IO.inspect msg
  {:world, _} -> "won't match"
end
1
This code no longer compilesAG1

1 Answers

12
votes

When using Task.async it is most convenient to obtain the result with Task.await:

results
|> Enum.map(fn result -> Task.async(fn -> Person.App.process(result, "Test", "1") end) end)
|> Enum.map(&Task.await/1)
|> Enum.each(&IO.inspect/1)

In fact if you don't await for the result of async it will still be sent to the process that called async and stored in its mailbox, potentially causing a memory leak! If your intention is to create a Task where you don't care about the result use Task.start or Task.start_link instead.