3
votes

Application works simple. When it receives request it spawn's thread_listener and loops it 10 times and passes the index (i). ndc_thread takes this data (i) and returns to core. Core is looping and waiting for message from thread, when it receives, it sends chunk containing message returned by thread.

problem is that after that 1..10 loop function is executed it sends "End" chunk and closes connection. So output from curl http://localhost:4000 is: "StartEnd" Desired result is: "Start12345678910End"

Is there any way to keep connection open and wait until custom timeout or wait while processes executed?

defmodule AlivePlug do
  import Plug.Conn

  def init(opts) do
    opts
  end

  def call(conn, _opts) do
    conn = send_chunked(conn, 200)
    chunk(conn, "Start")

    core_pid = spawn_link(fn -> core_listener(conn) end)
    thread = spawn_link(fn -> thread_listener end)
    1..10 |> Enum.each(fn i -> send thread, {core_pid, i} end)

    chunk(conn, "End")
    conn
  end

  defp core_listener(conn) do
    receive do
      {status, i} ->
        _conn = chunk(conn, Integer.to_string(i))
        core_listener(conn)
    end
  end

  defp thread_listener do
    receive do
      {core_pid, i} ->
        send core_pid, {:ok, i}
        thread_listener
      _ ->
        thread_listener
    end
  end
end

this is working application just run and use with postman or curl http://localhost:4000 https://github.com/programisti/keep-alive-elixir

1
@Atomic_alarm was there a meta-decision on the Elixir/Erlang tags not being paired? I only know Erlang, but the code/knowledge needed for this question is trivially interchangeable here so it seems legitimate to tag both.Nathaniel Waisbrot
@NathanielWaisbrot, I'm not sure. I don't know the Elixir and was not able understand the essence of of question. Since the wording of the question was only the Elixir and not Elixir/Erlang I assumed that the question is not related to Erlang. If you want can continue the conversation in the chat - chat.stackoverflow.com/rooms/75358/erlang-otp or return as it was ;-)user4651282

1 Answers

1
votes

Well, the problem is that you spawn the thread process, send it some data, but never wait for it to get a chance to work and then the your function that's holding the connection ends.

Let's start with a very naive solution:

    1..10 |> Enum.each(fn i -> send thread, {core_pid, i} end)
    Process.sleep 5000
    chunk(conn, "End")

If we sleep for 5 seconds, that should be enough time for thread to get all its work done. This is a bad solution because if you loop to 10000 then 5 seconds might be too short and for 1..10 5 seconds is probably way too long. Moreover, you shouldn't trust blind timing. In theory the system might wait 30 seconds one time and 10 ms another time to schedule thread on the CPU.

Now let's do a real solution, based on the fact that process mailboxes are a FIFO.

First, we'll add another type of message that thread_listener understands:

  defp thread_listener do
    receive do
      {core_pid, i} ->
        send core_pid, {:ok, i}
        thread_listener
      {:please_ack, pid} ->
        send pid :ack
        thread_listener
      _ ->
        thread_listener
    end
  end

Next, let's replace that Process.sleep with something smarter

    1..10 |> Enum.each(fn i -> send thread, {core_pid, i} end)
    send thread, {:please_ack, self}
    receive do
      :ack ->
        ok
    end
    chunk(conn, "End")

Now we're putting an 11th message into thread's mailbox, and then waiting for an ack. Once all 11 of our messages have been processed, we'll get a message back and be able to continue. It doesn't matter if thread handles each message the moment it's sent or if there's a 10 second pause before it even starts. (These numbers are hyperbolic, but I'm trying to emphasize that in multithreaded programming you can't rely on timing.)

You may now notice that what we've implemented does not actually use processes in a useful way. You spawn two threads off your main thread and they both sit idle until your main thread sends them data whereupon your main thread sits idle until the spawned threads are done. It would have been clearer to just do the work in the main thread. This example is too trivial to actually need threads and I'm not able to spot how you plan to extend it, but I hope this has given you some general help on programming with threads.