0
votes

I this this simple genserver:

def handle_info(:tick, items) do
  items2 = do_job(items)
  tick()
  {:noreply, items2}
end

In "do_job" I need to a) iterate throug items, b) make an http request which can take long and c) depending on a response, update a database and finish with the current item by removing it from "items" or merely update the database:

def do_job(items) do
  Enum.each(items, fn(a) -> # or Enum.map
    Task.start fn ->

      case external_request(a) do
        {:terminate, data} ->
          Repo.update(....)
          remove(a) # send a message to this GenServer to remove itself

        {:continue, data} ->
          Repo.update(....)
      end

    end
  end)
end

1) Do I have to return a new value -- updated list/state -- from "do_job" to allow my GenServer work properly?

2) If so, how? I can't return an updated state from "do_job" because I create a Task for each item because it requires sending an http request and CRUD operation in a database. That's why an asyncronous task.

3) And in general, GenServer manages the state variable by itself, in this case it's "items". What allows GenServer to understand how to update it? Let's consider this code:

def add(a) do
  GenServer.cast(__MODULE__, {:add, a}) # what/who utilizes this return value?
end

def remove(....) do
  # ....



def handle_cast({:add, a}, items) do
  {:noreply, [a | items]}  # what/who utilizes this return value?
end

A client doesn't use a returned value from "add" or "remove", thus it gets thrown away. Nonetheless, when a client calls "add" 3 times, this GenServer will have 3 items in a list. But why? How does GenServer handle that?

1
What happens if you get a second tick message before the first one's HTTP requests have finished? With the method you're proposing, you may end up processing the same item multiple times even if it returns {:terminate, _} every time. You probably want to make handle_info wait for all requests but run the HTTP requests in parallel (use Task.async and Task.await). - Dogbert
@Dogbert, suppose that won't happen, my questions aren't about that. - user7905648
I could not quite understand the question. Are you asking how GenServer works in keeping the new state after calling cast on it (which actives some custom data transformation in it) ? - elpddev
@elpddev, yes.. - user7905648
If I remember correctly, GenServer is just an abstraction over a behavior in which a pure function calls itself recursively, passing the transformed state to the newly invocation of the function. The function stops and listen for messages each iteration. If the message is 'do something with the data' it does that and transfer the new state to the next function call. The next function invocation that stops and listen to a new message, will have the newly state in its parameters. - elpddev

1 Answers

2
votes

1) Do I have to return a new value -- updated list/state -- from "do_job" to allow my GenServer work properly?

You don't have to return the new updated list from handle_info, but if you return the same list, the next :tick message can cause a new set of requests to be made to the same items even if their first run would eventually return {:terminate, _} if you handle removing the jobs by sending a message to this GenServer from do_job later.

2) If so, how? I can't return an updated state from "do_job" because I create a Task for each item because it requires sending an http request and CRUD operation in a database. That's why an asyncronous task.

You can return the new list while running the jobs in parallel. This can be done by spawning all the tasks first, and then waiting for all. I would definitely suggest this way, as it's simple and effective against duplicate jobs being run.

Here's an implementation of this method (untested):

def do_job(items) do
  items
  |> Enum.map(fn(a) ->
    Task.async(fn ->
      case external_request(a) do
        {:terminate, data} ->
          Repo.update(....)
          [] # remove this item from list
        {:continue, data} ->
          Repo.update(....)
          [a] # keep this item in list
      end
    end)
  end)
  |> Enum.flat_map(&Task.await/1)
end

do_job will now take a list of items, pass all of them to external_request in parallel, then wait until all of them have returned, and remove all the items from the list which went to the {:terminate, _} case.

You can keep the handle_info implementation the same as before.