I'm not sure if this is exactly what you're looking for, but here is a cyclic barrier based on the java.util.concurrent.CyclicBarrier class in java and the Concurrent::CyclicBarrier class in ruby.
defmodule CyclicBarrier do
require Record
Record.defrecordp :barrier, CyclicBarrier,
pid: nil
def start(parties, action \\ nil)
when (is_integer(parties) and parties > 0)
and (action === nil or is_function(action, 0)),
do: barrier(pid: spawn(CyclicBarrier.Server, :init, [parties, action]))
def stop(barrier(pid: pid)) do
call(pid, :stop)
true
end
def alive?(barrier(pid: pid)) do
Process.alive?(pid)
end
def broken?(barrier(pid: pid)) do
case call(pid, :status) do
:waiting ->
false
_ ->
true
end
end
def number_waiting(barrier(pid: pid)) do
case call(pid, :number_waiting) do
n when is_integer(n) ->
n
_ ->
false
end
end
def parties(barrier(pid: pid)) do
case call(pid, :parties) do
n when is_integer(n) ->
n
_ ->
false
end
end
def reset(barrier(pid: pid)) do
case call(pid, :reset) do
:reset ->
true
:broken ->
true
_ ->
false
end
end
def wait(barrier = barrier()),
do: wait(nil, barrier)
def wait(timeout, barrier = barrier(pid: pid)) when timeout === nil or is_integer(timeout) do
case call(pid, :wait, timeout) do
:fulfilled ->
true
:broken ->
false
:timeout ->
reset(barrier)
false
_ ->
false
end
end
defp call(pid, request, timeout \\ nil) do
case Process.alive?(pid) do
false ->
{:EXIT, pid, :normal}
true ->
trap_exit = Process.flag(:trap_exit, true)
Process.link(pid)
ref = make_ref()
send(pid, {ref, self(), request})
case timeout do
nil ->
receive do
{^ref, reply} ->
Process.unlink(pid)
Process.flag(:trap_exit, trap_exit)
reply
exited = {:EXIT, ^pid, _} ->
Process.flag(:trap_exit, trap_exit)
exited
end
_ ->
receive do
{^ref, reply} ->
Process.unlink(pid)
Process.flag(:trap_exit, trap_exit)
reply
exited = {:EXIT, ^pid, _} ->
Process.flag(:trap_exit, trap_exit)
exited
after
timeout ->
Process.unlink(pid)
Process.flag(:trap_exit, trap_exit)
:timeout
end
end
end
end
defmodule Server do
require Record
Record.defrecordp :state_data,
waiting: 0,
parties: nil,
action: nil,
q: :queue.new()
def init(parties, action),
do: loop(:waiting, state_data(parties: parties, action: action))
defp loop(:waiting, sd = state_data(waiting: same, parties: same, action: action, q: q)),
do: loop(done(:fulfilled, action, q), state_data(sd, waiting: 0, q: :queue.new()))
defp loop(state_name, sd) do
receive do
{ref, pid, request} when is_reference(ref) and is_pid(pid) and is_atom(request) ->
handle(state_name, request, {ref, pid}, sd)
end
end
defp handle(:waiting, :wait, from, sd = state_data(waiting: w, q: q)),
do: loop(:waiting, state_data(sd, waiting: w + 1, q: :queue.in(from, q)))
defp handle(:waiting, :reset, from, sd = state_data(waiting: 0, q: q)),
do: loop(done(:reset, nil, :queue.in(from, q)), sd)
defp handle(:waiting, :reset, from, sd = state_data(q: q)),
do: loop(done(:broken, nil, :queue.in(from, q), false), state_data(sd, waiting: 0, q: :queue.new()))
defp handle(:broken, :reset, from, sd = state_data(q: q)),
do: loop(done(:reset, nil, :queue.in(from, q)), sd)
defp handle(:broken, :wait, from, sd) do
cast(from, :broken)
loop(:broken, sd)
end
defp handle(state_name, :number_waiting, from, sd = state_data(waiting: number_waiting)) do
cast(from, number_waiting)
loop(state_name, sd)
end
defp handle(state_name, :parties, from, sd = state_data(parties: parties)) do
cast(from, parties)
loop(state_name, sd)
end
defp handle(state_name, :status, from, sd) do
cast(from, state_name)
loop(state_name, sd)
end
defp handle(_state_name, :stop, _from, _sd) do
exit(:normal)
end
defp broadcast(q, message),
do: for from <- :queue.to_list(q),
do: cast(from, message)
defp cast({ref, pid}, message),
do: send(pid, {ref, message})
defp done(state, action, q, continue \\ true) do
run(action)
broadcast(q, state)
case continue do
true ->
:waiting
false ->
state
end
end
defp run(nil),
do: nil
defp run(action),
do: action.()
end
end
Here's an example using CyclicBarrier
in an IEx shell for Elixir:
iex> barrier = CyclicBarrier.start(5, fn -> IO.puts("done") end)
{CyclicBarrier, #PID<0.281.0>}
iex> for i <- 1..5, do: spawn(fn -> IO.puts("process #{i}: #{barrier.wait}") end)
done
process 5: true
process 1: true
process 3: true
process 2: true
process 4: true
[#PID<0.283.0>, #PID<0.284.0>, #PID<0.285.0>, #PID<0.286.0>, #PID<0.287.0>]
The exact order of process execution is non-deterministic.
Other examples of the functions on CyclicBarrier
are below:
iex> barrier = CyclicBarrier.start(2)
{CyclicBarrier, #PID<0.280.0>}
iex> barrier.alive?
true
iex> barrier.broken?
false
iex> barrier.number_waiting
0
iex> barrier.parties
2
iex> # let's spawn another process which will wait on the barrier
iex> spawn(fn -> IO.puts("barrier returned: #{barrier.wait}") end)
#PID<0.288.0>
iex> barrier.number_waiting
1
iex> # if we reset the barrier while another process is waiting
iex> # on the barrier, it will break
iex> barrier.reset
barrier returned: false
true
iex> barrier.broken?
true
iex> # however, the barrier can be reset again to its initial state
iex> barrier.reset
true
iex> barrier.broken?
false
iex> # if a timeout is exceeded while waiting for a barrier, it
iex> # will also break the barrier
iex> barrier.wait(100)
false
iex> barrier.broken?
true
iex> # let's reset the barrier, spawn another process to wait,
iex> # and wait with a timeout in the current process
iex> barrier.reset
true
iex> spawn(fn -> IO.puts("barrier returned: #{barrier.wait}") end)
#PID<0.289.0>
iex> barrier.wait(100)
barrier returned: true
true
iex> # if stop is called on the barrier, the barrier process will
iex> # exit and all future calls to the barrier will return false
iex> barrier.stop
true
iex> barrier.alive?
false
iex> barrier.reset
false
iex> barrier.wait
false