Building truly distributed systems remains one of the most challenging aspects of modern software development. While many languages and frameworks offer tools to address distribution, Elixir and the underlying BEAM virtual machine provide native capabilities that make distributed programming more accessible and robust. In this article, we’ll explore how Elixir’s distribution model works, examine practical patterns for building distributed applications, and provide concrete examples from real-world systems.
The Erlang Distribution Model
To understand Elixir’s approach to distribution, we need to start with the foundations provided by Erlang and the BEAM virtual machine.
Fundamentals of BEAM Distribution
The BEAM’s distribution model is built on several key principles:
-
Location Transparency: Processes can communicate with each other regardless of whether they’re on the same machine or different nodes in a cluster.
-
Message Passing: All inter-process communication happens through asynchronous message passing, creating natural boundaries that work well in distributed environments.
-
Process Isolation: Processes are fully isolated with no shared memory, which eliminates many classes of concurrency issues.
-
Fault Tolerance: The “let it crash” philosophy, combined with supervision trees, creates systems that can recover from failures automatically.
This foundation provides a natural fit for distributed systems, where partial failures are inevitable and resilience is essential.
Nodes and Connections
In the BEAM model, a node is an instance of the virtual machine, typically running on a single server. Nodes can connect to form a cluster:
## Starting a named node
$ iex --name node1@192.168.1.100 --cookie secret_cookie
## Connecting nodes
iex(node1@192.168.1.100)> Node.connect(:"node2@192.168.1.101")
true
## Listing connected nodes
iex(node1@192.168.1.100)> Node.list()
[:"node2@192.168.1.101"]
When nodes connect, they form a fully connected mesh network—each node maintains a direct connection to every other node. This architecture simplifies communication but can limit scalability for very large clusters.
Distributed Process Communication
Communication between processes on different nodes uses the same primitives as local communication:
## Send a message to a process on another node
iex(node1@192.168.1.100)> send({:process_name, :"node2@192.168.1.101"}, {:hello, self()})
## Receive a response
iex(node1@192.168.1.100)> receive do
{:response, message} -> message
end
This transparency makes it easier to develop distributed systems because the same programming model applies regardless of whether processes are local or remote.
Building Blocks for Distributed Elixir Applications
Elixir provides several abstractions that facilitate building distributed systems:
Global Process Registry
The :global
module provides a cluster-wide process registry:
## Register a process globally
:global.register_name(:my_service, self())
## Find and send a message to a globally registered process
pid = :global.whereis_name(:my_service)
send(pid, {:request, data})
The :global
registry handles the challenges of network partitions with conflict resolution strategies.
Distributed Named Processes with GenServer
GenServers can be registered globally, making them accessible from any node in the cluster:
defmodule MyService do
use GenServer
def start_link(args) do
GenServer.start_link(__MODULE__, args, name: {:global, __MODULE__})
end
def call(request) do
GenServer.call({:global, __MODULE__}, request)
end
# Implementation of callbacks...
end
Clients can call this service from any node:
## This works from any connected node
result = MyService.call(:some_request)
Task Distribution
The Task
module supports executing functions on remote nodes:
## Start a task on a specific node
task = Task.Supervisor.async({TaskSupervisor, :"node2@192.168.1.101"}, fn ->
# This function executes on node2
perform_computation()
end)
## Wait for the result
result = Task.await(task)
This provides a simple way to distribute computation across a cluster.
Phoenix PubSub
For more complex event distribution, Phoenix PubSub offers a distributed publish-subscribe mechanism:
defmodule MyApp.PubSub do
def start_link do
Phoenix.PubSub.PG2.start_link(__MODULE__, [])
end
end
## Subscribe to a topic
Phoenix.PubSub.subscribe(MyApp.PubSub, "updates:123")
## Broadcast to all subscribers, across all nodes
Phoenix.PubSub.broadcast(MyApp.PubSub, "updates:123", {:new_data, %{...}})
This makes it easy to implement real-time features that work across a cluster.
Distributed Application Patterns
Building on these foundations, several patterns have emerged for creating robust distributed applications in Elixir.
Distributed State Management
Managing state across a cluster is a common challenge. Here are three approaches with different tradeoffs:
1. GenServer with Global Registry
For simple cases, a GenServer registered globally can maintain state:
defmodule Counter do
use GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, 0, name: {:global, __MODULE__})
end
def increment do
GenServer.call({:global, __MODULE__}, :increment)
end
def get do
GenServer.call({:global, __MODULE__}, :get)
end
# Callbacks
def init(state), do: {:ok, state}
def handle_call(:increment, _from, count) do
{:reply, count + 1, count + 1}
end
def handle_call(:get, _from, count) do
{:reply, count, count}
end
end
This approach is simple but has limitations:
- The state exists only on one node
- If that node fails, the state is lost
- All operations must go through a single process, limiting throughput
2. Distributed ETS with :mnesia
For more robust state management, :mnesia
provides a distributed database that’s part of the Erlang standard library:
defmodule DistributedCounter do
def setup do
:mnesia.create_schema([node() | Node.list()])
:mnesia.start()
:mnesia.create_table(:counters, [
attributes: [:key, :value],
disc_copies: [node() | Node.list()]
])
end
def increment(key) do
:mnesia.transaction(fn ->
case :mnesia.read({:counters, key}) do
[{:counters, ^key, value}] ->
:mnesia.write({:counters, key, value + 1})
value + 1
[] ->
:mnesia.write({:counters, key, 1})
1
end
end)
end
def get(key) do
:mnesia.transaction(fn ->
case :mnesia.read({:counters, key}) do
[{:counters, ^key, value}] -> value
[] -> 0
end
end)
end
end
This approach offers:
- Distributed, replicated state
- ACID transactions
- Persistence options
- Fault tolerance
However, it also comes with challenges:
- Schema management across nodes
- Conflict resolution
- Performance considerations for large datasets
3. Eventually Consistent State with CRDTs
For scenarios where eventual consistency is acceptable, Conflict-Free Replicated Data Types (CRDTs) provide an elegant solution:
defmodule DistributedCounter.CRDT do
def new do
%{by_node: %{}}
end
def increment(counter, node_name) do
update_in(counter, [:by_node, node_name], fn
nil -> 1
count -> count + 1
end)
end
def value(counter) do
counter.by_node
|> Map.values()
|> Enum.sum()
end
# Merge function handles conflicts by taking the maximum count for each node
def merge(counter1, counter2) do
merged_by_node = Map.merge(
counter1.by_node,
counter2.by_node,
fn _key, count1, count2 -> max(count1, count2) end
)
%{by_node: merged_by_node}
end
end
In practice, you might use a library like delta_crdt
to implement this pattern:
{:ok, counter} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap)
## Add a key to the CRDT
DeltaCrdt.mutate(counter, :add, ["hits", self()], 1)
## Read the value
DeltaCrdt.read(counter, ["hits", self()])
This approach offers excellent availability and partition tolerance, but with eventual rather than strong consistency.
Distributed Task Processing
Processing tasks across a cluster enables parallel execution and load distribution:
Work Queues with Distributed Task Supervisor
For distributing tasks across a cluster, we can use a combination of the Task module and distributed supervisors:
defmodule WorkDistributor do
def start_link do
# Start a task supervisor on each node
children = [
{Task.Supervisor, name: WorkDistributor.TaskSupervisor}
]
Supervisor.start_link(children, strategy: :one_for_one)
end
def process_in_parallel(work_items) do
# Get all nodes including this one
nodes = [node() | Node.list()]
# Distribute work across nodes in round-robin fashion
work_items
|> Enum.with_index()
|> Enum.map(fn {item, index} ->
# Select a node using round-robin
target_node = Enum.at(nodes, rem(index, length(nodes)))
# Start task on the selected node
Task.Supervisor.async({WorkDistributor.TaskSupervisor, target_node}, fn ->
process_item(item)
end)
end)
|> Enum.map(&Task.await/1)
end
defp process_item(item) do
# Process the work item...
IO.puts("Processing #{inspect(item)} on node #{inspect(node())}")
# Return the result
{:processed, item, node()}
end
end
This pattern is useful for CPU-intensive work that can be parallelized.
Distributed Job Processing with Oban
For more robust job processing, libraries like Oban provide persistence and retry capabilities:
defmodule MyApp.Jobs.ImageProcessor do
use Oban.Worker, queue: :media
@impl Oban.Worker
def perform(%Oban.Job{args: %{"path" => path}}) do
# This job might run on any node in the cluster
# Process the image at the given path
process_image(path)
:ok
end
defp process_image(path) do
# Image processing logic...
end
end
## Enqueue a job (can be done from any node)
%{path: "/uploads/image.jpg"}
|> MyApp.Jobs.ImageProcessor.new()
|> Oban.insert()
Combined with a load balancer in front of your application nodes, this creates a scalable job processing system.
Distributed Real-Time Systems
Elixir excels at building distributed real-time systems, where multiple nodes need to coordinate live updates:
Phoenix LiveView with PubSub
Phoenix LiveView combined with PubSub enables real-time features that work across a cluster:
defmodule MyAppWeb.DashboardLive do
use MyAppWeb, :live_view
def mount(_params, _session, socket) do
if connected?(socket) do
Phoenix.PubSub.subscribe(MyApp.PubSub, "dashboard:updates")
end
metrics = fetch_initial_metrics()
{:ok, assign(socket, metrics: metrics)}
end
def handle_info({:metrics_update, new_metrics}, socket) do
{:noreply, assign(socket, metrics: new_metrics)}
end
end
## From anywhere in the cluster, broadcast updates:
def publish_metrics(metrics) do
Phoenix.PubSub.broadcast(
MyApp.PubSub,
"dashboard:updates",
{:metrics_update, metrics}
)
end
This pattern enables features like real-time dashboards, collaborative editing, and live notifications that work seamlessly across a cluster.
Distributed Presence Tracking
Phoenix also provides Presence for tracking who/what is online:
defmodule MyAppWeb.Presence do
use Phoenix.Presence,
otp_app: :my_app,
pubsub_server: MyApp.PubSub
end
## Track a user joining a room
MyAppWeb.Presence.track(
self(),
"room:lobby",
user_id,
%{status: "online", last_active: System.system_time(:second)}
)
## Get current presence information
MyAppWeb.Presence.list("room:lobby")
Presence uses a CRDT-based algorithm to maintain a consistent view of who’s online across all nodes, handling node joins and failures gracefully.
Advanced Distribution Patterns
Beyond the basic patterns, several advanced techniques enable more sophisticated distributed systems.
Node Discovery and Cluster Formation
In production environments, automatic node discovery and cluster formation are essential. Libraries like libcluster
simplify this process:
defmodule MyApp.Application do
use Application
def start(_type, _args) do
topologies = [
example: [
strategy: Cluster.Strategy.Kubernetes.DNS,
config: [
service: "myapp-headless",
application_name: "myapp"
]
]
]
children = [
{Cluster.Supervisor, [topologies, [name: MyApp.ClusterSupervisor]]},
# Other children...
]
Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
end
end
This example uses Kubernetes DNS for service discovery, but libcluster
supports multiple strategies including:
- Gossip-based discovery
- DNS-based discovery
- Kubernetes API
- Amazon EC2 autoscaling groups
- Static configuration
Distributed Data Consistency
Managing consistency across distributed nodes requires careful consideration:
Consensus with rafter
For strong consistency requirements, consensus algorithms like Raft can be implemented using libraries such as rafter
:
defmodule MyApp.ConsensusStore do
use Rafter.Server
@impl Rafter.Server
def init(_args) do
{:ok, %{data: %{}}}
end
@impl Rafter.Server
def handle_command({:set, key, value}, state) do
new_state = put_in(state.data[key], value)
{:ok, value, new_state}
end
@impl Rafter.Server
def handle_command({:get, key}, state) do
value = get_in(state.data, [key])
{:ok, value, state}
end
end
This provides:
- Leader election
- Linearizable operations
- Distributed consensus
- Strong consistency guarantees
However, these benefits come at the cost of reduced availability during network partitions, following the implications of the CAP theorem.
Conflict Resolution with CRDTs
For scenarios where availability is prioritized over consistency, CRDTs offer a principled approach to conflict resolution:
defmodule MyApp.ReplicatedSet do
use GenServer
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: opts[:name])
end
def add(server, element) do
GenServer.call(server, {:add, element})
end
def contains?(server, element) do
GenServer.call(server, {:contains, element})
end
def sync(server, remote_server) do
GenServer.cast(server, {:sync, remote_server})
end
# GenServer callbacks
def init(opts) do
# Using a simple add-wins set CRDT
{:ok, %{elements: MapSet.new(), node: node()}}
end
def handle_call({:add, element}, _from, state) do
new_state = %{state | elements: MapSet.put(state.elements, element)}
{:reply, :ok, new_state}
end
def handle_call({:contains, element}, _from, state) do
result = MapSet.member?(state.elements, element)
{:reply, result, state}
end
def handle_cast({:sync, remote_server}, state) do
# Get the remote state
remote_state = GenServer.call(remote_server, :get_state)
# Merge states (union for add-wins set)
merged_elements = MapSet.union(state.elements, remote_state.elements)
new_state = %{state | elements: merged_elements}
{:noreply, new_state}
end
def handle_call(:get_state, _from, state) do
{:reply, state, state}
end
end
This simple example implements an add-wins set CRDT. More sophisticated CRDT implementations support operations like counters, maps, and sequences with principled conflict resolution.
Monitoring and Debugging Distributed Systems
Observability becomes particularly important in distributed systems:
Distributed Tracing
Libraries like opentelemetry
enable distributed tracing across service boundaries:
defmodule MyApp.UserService do
def get_user(id) do
OpenTelemetry.Tracer.with_span "get_user" do
# Add context to the span
OpenTelemetry.Span.set_attributes([
{"user.id", id}
])
# Perform the operation
user = Database.get_user(id)
if user do
# Make a call to another service, propagating the trace context
profile = MyApp.ProfileService.get_profile(user.profile_id)
%{user: user, profile: profile}
else
# Record an error event
OpenTelemetry.Span.add_event("error", [
{"error.type", "user_not_found"},
{"user.id", id}
])
nil
end
end
end
end
This enables end-to-end tracing of requests as they flow through your distributed system.
Metrics Collection
The :telemetry
package provides a standard way to emit metrics:
defmodule MyApp.MetricsExample do
def process_item(item) do
start_time = System.monotonic_time()
# Measure the frequency of operations
:telemetry.execute([:myapp, :item, :process, :start], %{}, %{item_id: item.id})
result = do_process_item(item)
# Measure the duration of operations
duration = System.monotonic_time() - start_time
:telemetry.execute(
[:myapp, :item, :process, :stop],
%{duration: duration},
%{item_id: item.id, result: result}
)
result
end
defp do_process_item(item) do
# Processing logic...
end
end
Combined with reporters for systems like Prometheus, this provides valuable insights into system behavior.
Distributed Logging
Centralized logging is essential for understanding distributed systems:
defmodule MyApp.Logger do
require Logger
def info(message, metadata \\ []) do
metadata = Keyword.merge(default_metadata(), metadata)
Logger.info(message, metadata)
end
def error(message, metadata \\ []) do
metadata = Keyword.merge(default_metadata(), metadata)
Logger.error(message, metadata)
end
defp default_metadata do
[node: node(), application: :myapp]
end
end
This can be combined with a logger backend that ships logs to a centralized service like the ELK stack or Datadog.
Real-World Example: A Distributed Chat System
To illustrate these concepts in action, let’s build a simple distributed chat system:
defmodule ChatSystem.Application do
use Application
def start(_type, _args) do
children = [
{Phoenix.PubSub, name: ChatSystem.PubSub},
{ChatSystem.Presence, name: ChatSystem.Presence},
{ChatSystem.MessageStore, name: ChatSystem.MessageStore},
ChatSystemWeb.Endpoint
]
opts = [strategy: :one_for_one, name: ChatSystem.Supervisor]
Supervisor.start_link(children, opts)
end
end
defmodule ChatSystem.Presence do
use Phoenix.Presence,
otp_app: :chat_system,
pubsub_server: ChatSystem.PubSub
end
defmodule ChatSystem.MessageStore do
use GenServer
# Client API
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: opts[:name])
end
def add_message(room, sender, content) do
GenServer.cast(__MODULE__, {:add_message, room, sender, content})
end
def get_recent_messages(room, limit \\ 50) do
GenServer.call(__MODULE__, {:get_recent, room, limit})
end
# Server callbacks
def init(_opts) do
# Initialize with empty message store
{:ok, %{rooms: %{}}}
end
def handle_cast({:add_message, room, sender, content}, state) do
# Create message record
message = %{
id: generate_id(),
sender: sender,
content: content,
timestamp: :os.system_time(:millisecond)
}
# Add to room's message list
new_rooms = Map.update(
state.rooms,
room,
[message],
fn messages -> [message | messages] |> Enum.take(100) end
)
# Broadcast the new message to all nodes
Phoenix.PubSub.broadcast(
ChatSystem.PubSub,
"room:#{room}",
{:new_message, message}
)
{:noreply, %{state | rooms: new_rooms}}
end
def handle_call({:get_recent, room, limit}, _from, state) do
messages =
state.rooms
|> Map.get(room, [])
|> Enum.take(limit)
|> Enum.reverse() # Return in chronological order
{:reply, messages, state}
end
defp generate_id, do: System.unique_integer([:positive, :monotonic])
end
defmodule ChatSystemWeb.RoomChannel do
use Phoenix.Channel
alias ChatSystem.{Presence, MessageStore}
def join("room:" <> room_id, _params, socket) do
send(self(), :after_join)
# Fetch recent messages
recent_messages = MessageStore.get_recent_messages(room_id)
{:ok, %{messages: recent_messages}, assign(socket, :room_id, room_id)}
end
def handle_info(:after_join, socket) do
# Track user presence
Presence.track(
self(),
"room:#{socket.assigns.room_id}:presence",
socket.assigns.user_id,
%{
online_at: :os.system_time(:millisecond)
}
)
# Send current presence state
push(socket, "presence_state", Presence.list("room:#{socket.assigns.room_id}:presence"))
{:noreply, socket}
end
def handle_in("new_message", %{"content" => content}, socket) do
room_id = socket.assigns.room_id
user_id = socket.assigns.user_id
# Store the message (distributed across the cluster)
MessageStore.add_message(room_id, user_id, content)
{:reply, :ok, socket}
end
end
This example demonstrates several key distributed patterns:
- Distributed presence with Phoenix.Presence for tracking users across nodes
- Real-time messaging with Phoenix.PubSub for cross-node communication
- Shared state with a GenServer for the message store
- Event propagation across the cluster
In a real-world system, you might extend this with:
- Persistence using a database
- CRDT-based message ordering
- Cluster formation with libcluster
- More sophisticated presence features
Deployment Considerations
Successfully running a distributed Elixir application in production requires careful deployment planning:
Network Configuration
Ensure proper network configuration between nodes:
- EPMD port (typically 4369) must be accessible between nodes
- Dynamic port range for inter-node communication (typically 9000-9100)
- Use a firewall to prevent external access to these ports
- Consider using IPsec or VPN tunnels for secure communication
Node Naming and Discovery
In production environments, use fully qualified domain names or static IPs:
--name app@172.16.0.1 --cookie my_secure_cookie
For containerized deployments, libraries like libcluster
simplify node discovery:
## For Kubernetes-based deployments
config :libcluster,
topologies: [
k8s: [
strategy: Cluster.Strategy.Kubernetes.DNS,
config: [
service: "myapp-headless",
application_name: "myapp"
]
]
]
Cookie Security
The Erlang distribution cookie controls node authentication:
- Use a strong, random cookie value
- Keep the cookie value secure (treat it like a password)
- Consider using environment variables or runtime configuration
- Different environments should use different cookies
Load Balancing
For web applications, configure load balancers to support distributed session management:
- Use sticky sessions if sessions are node-local
- Ensure WebSocket connections are properly handled
- Consider session storage in a distributed database if sticky sessions aren’t desired
Conclusion
Elixir and the BEAM virtual machine provide a robust foundation for building distributed systems. By embracing the actor model, message passing, and process isolation, Elixir offers a programming model that naturally extends from concurrent to distributed applications.
The patterns we’ve explored—from distributed state management to real-time features—demonstrate how Elixir’s built-in distribution capabilities can be applied to solve real-world challenges. Libraries like Phoenix, libcluster, and various CRDT implementations extend these capabilities further, providing battle-tested solutions for common distributed system problems.
When building distributed systems with Elixir, remember these key principles:
- Embrace asynchronous communication through message passing
- Design for failure with supervision trees and crash recovery
- Consider data consistency needs carefully, choosing appropriate patterns
- Implement proper observability with tracing, metrics, and logging
- Test thoroughly, including partition scenarios
By leveraging Elixir’s strengths and following these patterns, you can build distributed systems that are not only functional but also resilient, maintainable, and scalable.
References
-
Cesarini, F., & Vinoski, S. (2016). Designing for Scalability with Erlang/OTP. O’Reilly Media.
-
Gospodinov, S. (2021). Concurrent Data Processing in Elixir: Fast, Resilient Applications with OTP, GenStage, Flow, and Broadway. The Pragmatic Bookshelf.
-
Taylor, S. (2020). Real-Time Phoenix: Build Highly Scalable Systems with Channels. The Pragmatic Bookshelf.
-
Shapiro, M., Preguiça, N., Baquero, C., & Zawirski, M. (2011). Conflict-Free Replicated Data Types. https://hal.inria.fr/inria-00609399v1/document
-
Phoenix Framework Documentation. https://hexdocs.pm/phoenix/Phoenix.html