39
votes

Long version:

I'm new to erlang, and considering using it for a scalable architecture. I've found many proponents of the platform touting its reliability and fault tolerance.

However, I'm struggling to understand exactly how fault-tolerance is achieved in this system where messages are queued in transient memory. I understand that a supervisor hierarchy can be arranged to respawn deceased processes, but I've been unable to find much discussion of the implications of respawning on works-in-progress. What happens to in-flight messages and the artifacts of partially-completed work that were lost on a dying node?

Will all producers automatically retransmit messages that are not ack'd when consumer processes die? If not, how can this be considered fault-tolerant? And if so, what prevents a message that was processed -- but not quite acknowledged -- from being retransmitted, and hence reprocessed inappropriately?

(I recognize that these concerns are not unique to erlang; similar concerns will arise in any distributed processing system. But erlang enthusiasts seem to claim that the platform makes this all "easy"..?)

Assuming messages are retransmitted, I can easily envision a scenario where the downstream effects of a complex messaging chain could become very muddled after a fault. Without some sort of heavy distributed transaction system, I don't understand how consistency and correctness can be maintained without addressing duplication in every process. Must my application code always enforce constraints to prevent transactions from being executed more than once?

Short version:

Are distributed erlang processes subject to duplicated messages? If so, is duplicate-protection (ie, idempotency) an application responsibility, or does erlang/OTP somehow help us with this?

3
My understanding is that when a process dies, and is respawned, anything it was doing that wasn't passed back needs to be redone.James Black
It's obvious that unfinished work needs to be redone... I think you're suggesting that it's the application's responsibility to restart unfinished tasks (presumably by tracking completion and resending failed messages). Is that what you mean? Can someone confirm this from experience?joshng

3 Answers

117
votes

I'll separate this into points I hope will make sense. I might be re-hashing a bit of what I have written in The Hitchhiker's Guide to Concurrency. You might want to read that one to get details on the rationale behind the way message passing is done in Erlang.


1. Message transmission

Message passing in Erlang is done through asynchronous messages sent into mailboxes (a kind of queue for storing data). There is absolutely no assumption as to whether a message was received or not, or even that it was sent to a valid process. This is because it is plausible to assume [at a language level] that someone might want to treat a message in maybe only 4 days and won't even acknowledge its existence until it has reached a certain state.

A random example of this could be to imagine a long-running process that crunches data for 4 hours. Should it really acknowledge it received a message if it's unable to treat it? Maybe it should, maybe not. It really depends on your application. As such, no assumption is made. You can have half your messages asynchronous and only one that isn't.

Erlang expects you to send an acknowledgement message (and wait on it with a timeout) if you ever need it. The rules having to do with timing out and the format of the reply are left to the programmer to specify -- Erlang can't assume you want the acknowledgement on message reception, when a task is completed, whether it matches or not (the message could match in 4 hours when a new version of the code is hot-loaded), etc.

To make it short, whether a message isn't read, fails to be received or is interrupted by someone pulling the plug while it is in transit doesn't matter if you don't want it to. If you want it to matter, you need to design a logic across processes.

The burden of implementing a high-level message protocol between Erlang processes is given to the programmer.


2. Message protocols

As you said, these messages are stored in transient memory: if a process dies, all the messages it hadn't read yet are lost. If you want more, there are various strategies. A few of them are:

  • Read the message as fast as possible and write it to disk if needed, send an acknowledgement back and process it later. Compare this to queue software such as RabbitMQ and ActiveMQ with persistent queues.
  • Use process groups to duplicate messages across a group of processes on multiple nodes. At this point you might enter transactional semantics. This one is used for the mnesia database for the transaction commits;
  • Don't assume anything has worked until you receive either an acknowledgement that everything went fine or a failure message
  • A combination of process groups and failure messages. If a first process fails to handle a task (because the node goes down), a notification is automatically sent by the VM to a fail-over process which handles it instead. This method is sometimes used with full applications to handle hardware failures.

Depending on the task at hand, you might use one or many of these. They're all possible to implement in Erlang and in many cases modules are already written to do the heavy lifting for you.

So this might answer your question. Because you implement the protocols yourself, it's your choice whether messages get sent more than once or not.


3. What is fault-tolerance

Picking one of the above strategies does depend on what fault-tolerance means to you. In some cases, people mean it to say "no data is ever lost, no task ever fails." Other people use fault-tolerance to say "the user never sees a crash." In the case of Erlang systems, the usual meaning is about keeping the system running: it's alright to maybe have a single user dropping a phone call rather than having everyone dropping it.

Here the idea is then to let stuff that fails fail, but keep the rest running. To achieve this, there are a few things the VM gives you:

  • You can know when a process dies and why it did
  • You can force processes that depend on each other to die together if one of them goes wrong
  • You can run a logger that automatically logs every uncaught exception for you, and even define your own
  • Nodes can be monitored so you know when they went down (or got disconnected)
  • You can restart failed processes (or groups of failed processes)
  • Have whole applications restarting on different nodes if one fails
  • And a lot more more stuff with the OTP framework

With these tools and a few of the standard library's modules handling different scenarios for you, you can implement pretty much what you want on top of Erlang's asynchronous semantics, although it usually pays to be able to use Erlang's definition of fault tolerance.


4. A few notes

My personal opinion here is that it's pretty hard to have more assumptions than what exists in Erlang unless you want pure transactional semantics. One problem you'll always have trouble with is with nodes going down. You can never know if they went down because the server actually crashed or because the network failed.

In the case of a server crash, simply re-doing the tasks is easy enough. However with a net split, you have to make sure some vital operations are not done twice, but not lost either.

It usually boils down to the CAP theorem which basically gives you 3 options, of which you have to pick two:

  1. Consistency
  2. Partition tolerance
  3. Availability

Depending on where you position yourself, different approaches will be needed. The CAP theorem is usually used to describe databases, but I believe similar questions are to be asked whenever you need some level of fault tolerance when processing data.

5
votes

The erlang OTP system is fault tolerant. That doesn't relieve you of the need to build equally fault tolerant apps in it. If you use erlang and OTP then there are a few things you can rely on.

  1. When a process dies that process will be restarted.
  2. For the most part a process crashing won't bring down your whole app
  3. When a message is sent it will be received provided the receiver exists.

As far as I know messages in erlang are not subject to duplication. If you send a message and the process receives it then the message is gone from the queue. However if you send a message and the process receives that message but crashes while processing it then that message is gone and unhandled. That fact should be considered in the design of your system. OTP helps you handle all of this by using processes to isolate infrastructure critical code (eg. supervisors, gen_servers, ...) from application code that might be subject to crashes.

For instance you might have a gen_server that dispatches work to a process pool. The processes in the pool might crash and get restarted. But the gen_server remains up since its entire purpose is just to recieve messages and dispatch them to the pool to work on. This allows the whole system to stay up despite errors and crashes in the pool and there is always something waiting for your message.

Just because the system is fault tolerant doesn't mean your algorithm is.

2
votes

I think answer has nothing to do with Erlang at all. It lies in semantics of Client-Server interaction where you can chose to implement "at least once", "at most once" or "exactly once" guarantees into your client-server protocol. All of these invocation semantics can be implemented by combining unique tags, retries and logging client requests on both client and server before sending or executing it so that it can be picked up by server after crash. Besides duplicates you can get lost, orphaned or delayed messages.