2
votes

What could be the most elegant implementation of (cyclic) barrier for Elixir? An algorithm to be implemented (vertex coloring) has a loop with a waiting phase for the spawned processes ("execute ... synchronously in parallel", and then checking termination condition using all processes' results), it's Algorithm 5 "6-color" from Principles of Distributed Computing, Ch. 1.

Most references are for .NET, pthreads, and other thread-related computations, so I am not sure if barrier is the right pattern I am after. May be, there is more "Elixirish" way.

I do not have any code yet (in search for pattern), but here is the code, implementing "slow" version of the same problem: https://codereview.stackexchange.com/questions/111487/coloring-trees-in-elixir

The idea I got is to have top-level process (the one, which spawn one process per graph node) to send and receive messages, which will synchronize node processes. It must be mentioned, that node processes also communicate with each other: parents send messages to children during one top-level loop iteration. The complication however is, that no process should continue after top-level received node's message and before all nodes did their iteration (most probably I will use tail recursion though). This is why I thought about barrier mechanism.

2

2 Answers

3
votes

I'm not sure if this is exactly what you're looking for, but here is a cyclic barrier based on the java.util.concurrent.CyclicBarrier class in java and the Concurrent::CyclicBarrier class in ruby.

defmodule CyclicBarrier do

  require Record
  Record.defrecordp :barrier, CyclicBarrier,
    pid: nil

  def start(parties, action \\ nil)
      when (is_integer(parties) and parties > 0)
      and (action === nil or is_function(action, 0)),
    do: barrier(pid: spawn(CyclicBarrier.Server, :init, [parties, action]))

  def stop(barrier(pid: pid)) do
    call(pid, :stop)
    true
  end

  def alive?(barrier(pid: pid)) do
    Process.alive?(pid)
  end

  def broken?(barrier(pid: pid)) do
    case call(pid, :status) do
      :waiting ->
        false
      _ ->
        true
    end
  end

  def number_waiting(barrier(pid: pid)) do
    case call(pid, :number_waiting) do
      n when is_integer(n) ->
        n
      _ ->
        false
    end
  end

  def parties(barrier(pid: pid)) do
    case call(pid, :parties) do
      n when is_integer(n) ->
        n
      _ ->
        false
    end
  end

  def reset(barrier(pid: pid)) do
    case call(pid, :reset) do
      :reset ->
        true
      :broken ->
        true
      _ ->
        false
    end
  end

  def wait(barrier = barrier()),
    do: wait(nil, barrier)

  def wait(timeout, barrier = barrier(pid: pid)) when timeout === nil or is_integer(timeout) do
    case call(pid, :wait, timeout) do
      :fulfilled ->
        true
      :broken ->
        false
      :timeout ->
        reset(barrier)
        false
      _ ->
        false
    end
  end

  defp call(pid, request, timeout \\ nil) do
    case Process.alive?(pid) do
      false ->
        {:EXIT, pid, :normal}
      true ->
        trap_exit = Process.flag(:trap_exit, true)
        Process.link(pid)
        ref = make_ref()
        send(pid, {ref, self(), request})
        case timeout do
          nil ->
            receive do
              {^ref, reply} ->
                Process.unlink(pid)
                Process.flag(:trap_exit, trap_exit)
                reply
              exited = {:EXIT, ^pid, _} ->
                Process.flag(:trap_exit, trap_exit)
                exited
            end
          _ ->
            receive do
              {^ref, reply} ->
                Process.unlink(pid)
                Process.flag(:trap_exit, trap_exit)
                reply
              exited = {:EXIT, ^pid, _} ->
                Process.flag(:trap_exit, trap_exit)
                exited
            after
              timeout ->
                Process.unlink(pid)
                Process.flag(:trap_exit, trap_exit)
                :timeout
            end
        end

    end
  end

  defmodule Server do

    require Record
    Record.defrecordp :state_data,
      waiting: 0,
      parties: nil,
      action:  nil,
      q:       :queue.new()

    def init(parties, action),
      do: loop(:waiting, state_data(parties: parties, action: action))

    defp loop(:waiting, sd = state_data(waiting: same, parties: same, action: action, q: q)),
      do: loop(done(:fulfilled, action, q), state_data(sd, waiting: 0, q: :queue.new()))
    defp loop(state_name, sd) do
      receive do
        {ref, pid, request} when is_reference(ref) and is_pid(pid) and is_atom(request) ->
          handle(state_name, request, {ref, pid}, sd)
      end
    end

    defp handle(:waiting, :wait, from, sd = state_data(waiting: w, q: q)),
      do: loop(:waiting, state_data(sd, waiting: w + 1, q: :queue.in(from, q)))
    defp handle(:waiting, :reset, from, sd = state_data(waiting: 0, q: q)),
      do: loop(done(:reset, nil, :queue.in(from, q)), sd)
    defp handle(:waiting, :reset, from, sd = state_data(q: q)),
      do: loop(done(:broken, nil, :queue.in(from, q), false), state_data(sd, waiting: 0, q: :queue.new()))
    defp handle(:broken, :reset, from, sd = state_data(q: q)),
      do: loop(done(:reset, nil, :queue.in(from, q)), sd)
    defp handle(:broken, :wait, from, sd) do
      cast(from, :broken)
      loop(:broken, sd)
    end
    defp handle(state_name, :number_waiting, from, sd = state_data(waiting: number_waiting)) do
      cast(from, number_waiting)
      loop(state_name, sd)
    end
    defp handle(state_name, :parties, from, sd = state_data(parties: parties)) do
      cast(from, parties)
      loop(state_name, sd)
    end
    defp handle(state_name, :status, from, sd) do
      cast(from, state_name)
      loop(state_name, sd)
    end
    defp handle(_state_name, :stop, _from, _sd) do
      exit(:normal)
    end

    defp broadcast(q, message),
      do: for from <- :queue.to_list(q),
        do: cast(from, message)

    defp cast({ref, pid}, message),
      do: send(pid, {ref, message})

    defp done(state, action, q, continue \\ true) do
      run(action)
      broadcast(q, state)
      case continue do
        true ->
          :waiting
        false ->
          state
      end
    end

    defp run(nil),
      do: nil
    defp run(action),
      do: action.()

  end

end

Here's an example using CyclicBarrier in an IEx shell for Elixir:

iex> barrier = CyclicBarrier.start(5, fn -> IO.puts("done") end)
{CyclicBarrier, #PID<0.281.0>}
iex> for i <- 1..5, do: spawn(fn -> IO.puts("process #{i}: #{barrier.wait}") end)
done
process 5: true
process 1: true
process 3: true
process 2: true
process 4: true
[#PID<0.283.0>, #PID<0.284.0>, #PID<0.285.0>, #PID<0.286.0>, #PID<0.287.0>]

The exact order of process execution is non-deterministic.

Other examples of the functions on CyclicBarrier are below:

iex> barrier = CyclicBarrier.start(2)
{CyclicBarrier, #PID<0.280.0>}
iex> barrier.alive?
true
iex> barrier.broken?
false
iex> barrier.number_waiting
0
iex> barrier.parties
2
iex> # let's spawn another process which will wait on the barrier
iex> spawn(fn -> IO.puts("barrier returned: #{barrier.wait}") end)
#PID<0.288.0>
iex> barrier.number_waiting
1
iex> # if we reset the barrier while another process is waiting
iex> # on the barrier, it will break
iex> barrier.reset
barrier returned: false
true
iex> barrier.broken?
true
iex> # however, the barrier can be reset again to its initial state
iex> barrier.reset
true
iex> barrier.broken?
false
iex> # if a timeout is exceeded while waiting for a barrier, it
iex> # will also break the barrier
iex> barrier.wait(100)
false
iex> barrier.broken?
true
iex> # let's reset the barrier, spawn another process to wait,
iex> # and wait with a timeout in the current process
iex> barrier.reset
true
iex> spawn(fn -> IO.puts("barrier returned: #{barrier.wait}") end)
#PID<0.289.0>
iex> barrier.wait(100)
barrier returned: true
true
iex> # if stop is called on the barrier, the barrier process will
iex> # exit and all future calls to the barrier will return false
iex> barrier.stop
true
iex> barrier.alive?
false
iex> barrier.reset
false
iex> barrier.wait
false
0
votes

I've published a library on Hex, sync_primitives, that does exactly this! Source code and extensive unit tests (100% test coverage) on GitHub.

Take a look and let me know if I can improve it in anyway!