Hanso Group

Building Distributed Systems with Elixir

Marasy Phi 25 minutes read

Building truly distributed systems remains one of the most challenging aspects of modern software development. While many languages and frameworks offer tools to address distribution, Elixir and the underlying BEAM virtual machine provide native capabilities that make distributed programming more accessible and robust. In this article, we’ll explore how Elixir’s distribution model works, examine practical patterns for building distributed applications, and provide concrete examples from real-world systems.

The Erlang Distribution Model

To understand Elixir’s approach to distribution, we need to start with the foundations provided by Erlang and the BEAM virtual machine.

Fundamentals of BEAM Distribution

The BEAM’s distribution model is built on several key principles:

  1. Location Transparency: Processes can communicate with each other regardless of whether they’re on the same machine or different nodes in a cluster.

  2. Message Passing: All inter-process communication happens through asynchronous message passing, creating natural boundaries that work well in distributed environments.

  3. Process Isolation: Processes are fully isolated with no shared memory, which eliminates many classes of concurrency issues.

  4. Fault Tolerance: The “let it crash” philosophy, combined with supervision trees, creates systems that can recover from failures automatically.

This foundation provides a natural fit for distributed systems, where partial failures are inevitable and resilience is essential.

Nodes and Connections

In the BEAM model, a node is an instance of the virtual machine, typically running on a single server. Nodes can connect to form a cluster:

## Starting a named node
$ iex --name node1@192.168.1.100 --cookie secret_cookie

## Connecting nodes
iex(node1@192.168.1.100)> Node.connect(:"node2@192.168.1.101")
true

## Listing connected nodes
iex(node1@192.168.1.100)> Node.list()
[:"node2@192.168.1.101"]

When nodes connect, they form a fully connected mesh network—each node maintains a direct connection to every other node. This architecture simplifies communication but can limit scalability for very large clusters.

Distributed Process Communication

Communication between processes on different nodes uses the same primitives as local communication:

## Send a message to a process on another node
iex(node1@192.168.1.100)> send({:process_name, :"node2@192.168.1.101"}, {:hello, self()})

## Receive a response
iex(node1@192.168.1.100)> receive do
  {:response, message} -> message
end

This transparency makes it easier to develop distributed systems because the same programming model applies regardless of whether processes are local or remote.

Building Blocks for Distributed Elixir Applications

Elixir provides several abstractions that facilitate building distributed systems:

Global Process Registry

The :global module provides a cluster-wide process registry:

## Register a process globally
:global.register_name(:my_service, self())

## Find and send a message to a globally registered process
pid = :global.whereis_name(:my_service)
send(pid, {:request, data})

The :global registry handles the challenges of network partitions with conflict resolution strategies.

Distributed Named Processes with GenServer

GenServers can be registered globally, making them accessible from any node in the cluster:

defmodule MyService do
  use GenServer

  def start_link(args) do
    GenServer.start_link(__MODULE__, args, name: {:global, __MODULE__})
  end

  def call(request) do
    GenServer.call({:global, __MODULE__}, request)
  end

  # Implementation of callbacks...
end

Clients can call this service from any node:

## This works from any connected node
result = MyService.call(:some_request)

Task Distribution

The Task module supports executing functions on remote nodes:

## Start a task on a specific node
task = Task.Supervisor.async({TaskSupervisor, :"node2@192.168.1.101"}, fn ->
  # This function executes on node2
  perform_computation()
end)

## Wait for the result
result = Task.await(task)

This provides a simple way to distribute computation across a cluster.

Phoenix PubSub

For more complex event distribution, Phoenix PubSub offers a distributed publish-subscribe mechanism:

defmodule MyApp.PubSub do
  def start_link do
    Phoenix.PubSub.PG2.start_link(__MODULE__, [])
  end
end

## Subscribe to a topic
Phoenix.PubSub.subscribe(MyApp.PubSub, "updates:123")

## Broadcast to all subscribers, across all nodes
Phoenix.PubSub.broadcast(MyApp.PubSub, "updates:123", {:new_data, %{...}})

This makes it easy to implement real-time features that work across a cluster.

Distributed Application Patterns

Building on these foundations, several patterns have emerged for creating robust distributed applications in Elixir.

Distributed State Management

Managing state across a cluster is a common challenge. Here are three approaches with different tradeoffs:

1. GenServer with Global Registry

For simple cases, a GenServer registered globally can maintain state:

defmodule Counter do
  use GenServer

  def start_link(_) do
    GenServer.start_link(__MODULE__, 0, name: {:global, __MODULE__})
  end

  def increment do
    GenServer.call({:global, __MODULE__}, :increment)
  end

  def get do
    GenServer.call({:global, __MODULE__}, :get)
  end

  # Callbacks
  def init(state), do: {:ok, state}

  def handle_call(:increment, _from, count) do
    {:reply, count + 1, count + 1}
  end

  def handle_call(:get, _from, count) do
    {:reply, count, count}
  end
end

This approach is simple but has limitations:

  • The state exists only on one node
  • If that node fails, the state is lost
  • All operations must go through a single process, limiting throughput
2. Distributed ETS with :mnesia

For more robust state management, :mnesia provides a distributed database that’s part of the Erlang standard library:

defmodule DistributedCounter do
  def setup do
    :mnesia.create_schema([node() | Node.list()])
    :mnesia.start()
    :mnesia.create_table(:counters, [
      attributes: [:key, :value],
      disc_copies: [node() | Node.list()]
    ])
  end

  def increment(key) do
    :mnesia.transaction(fn ->
      case :mnesia.read({:counters, key}) do
        [{:counters, ^key, value}] ->
          :mnesia.write({:counters, key, value + 1})
          value + 1
        [] ->
          :mnesia.write({:counters, key, 1})
          1
      end
    end)
  end

  def get(key) do
    :mnesia.transaction(fn ->
      case :mnesia.read({:counters, key}) do
        [{:counters, ^key, value}] -> value
        [] -> 0
      end
    end)
  end
end

This approach offers:

  • Distributed, replicated state
  • ACID transactions
  • Persistence options
  • Fault tolerance

However, it also comes with challenges:

  • Schema management across nodes
  • Conflict resolution
  • Performance considerations for large datasets
3. Eventually Consistent State with CRDTs

For scenarios where eventual consistency is acceptable, Conflict-Free Replicated Data Types (CRDTs) provide an elegant solution:

defmodule DistributedCounter.CRDT do
  def new do
    %{by_node: %{}}
  end

  def increment(counter, node_name) do
    update_in(counter, [:by_node, node_name], fn
      nil -> 1
      count -> count + 1
    end)
  end

  def value(counter) do
    counter.by_node
    |> Map.values()
    |> Enum.sum()
  end

  # Merge function handles conflicts by taking the maximum count for each node
  def merge(counter1, counter2) do
    merged_by_node = Map.merge(
      counter1.by_node,
      counter2.by_node,
      fn _key, count1, count2 -> max(count1, count2) end
    )

    %{by_node: merged_by_node}
  end
end

In practice, you might use a library like delta_crdt to implement this pattern:

{:ok, counter} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap)

## Add a key to the CRDT
DeltaCrdt.mutate(counter, :add, ["hits", self()], 1)

## Read the value
DeltaCrdt.read(counter, ["hits", self()])

This approach offers excellent availability and partition tolerance, but with eventual rather than strong consistency.

Distributed Task Processing

Processing tasks across a cluster enables parallel execution and load distribution:

Work Queues with Distributed Task Supervisor

For distributing tasks across a cluster, we can use a combination of the Task module and distributed supervisors:

defmodule WorkDistributor do
  def start_link do
    # Start a task supervisor on each node
    children = [
      {Task.Supervisor, name: WorkDistributor.TaskSupervisor}
    ]

    Supervisor.start_link(children, strategy: :one_for_one)
  end

  def process_in_parallel(work_items) do
    # Get all nodes including this one
    nodes = [node() | Node.list()]

    # Distribute work across nodes in round-robin fashion
    work_items
    |> Enum.with_index()
    |> Enum.map(fn {item, index} ->
      # Select a node using round-robin
      target_node = Enum.at(nodes, rem(index, length(nodes)))

      # Start task on the selected node
      Task.Supervisor.async({WorkDistributor.TaskSupervisor, target_node}, fn ->
        process_item(item)
      end)
    end)
    |> Enum.map(&Task.await/1)
  end

  defp process_item(item) do
    # Process the work item...
    IO.puts("Processing #{inspect(item)} on node #{inspect(node())}")

    # Return the result
    {:processed, item, node()}
  end
end

This pattern is useful for CPU-intensive work that can be parallelized.

Distributed Job Processing with Oban

For more robust job processing, libraries like Oban provide persistence and retry capabilities:

defmodule MyApp.Jobs.ImageProcessor do
  use Oban.Worker, queue: :media

  @impl Oban.Worker
  def perform(%Oban.Job{args: %{"path" => path}}) do
    # This job might run on any node in the cluster
    # Process the image at the given path
    process_image(path)

    :ok
  end

  defp process_image(path) do
    # Image processing logic...
  end
end

## Enqueue a job (can be done from any node)
%{path: "/uploads/image.jpg"}
|> MyApp.Jobs.ImageProcessor.new()
|> Oban.insert()

Combined with a load balancer in front of your application nodes, this creates a scalable job processing system.

Distributed Real-Time Systems

Elixir excels at building distributed real-time systems, where multiple nodes need to coordinate live updates:

Phoenix LiveView with PubSub

Phoenix LiveView combined with PubSub enables real-time features that work across a cluster:

defmodule MyAppWeb.DashboardLive do
  use MyAppWeb, :live_view

  def mount(_params, _session, socket) do
    if connected?(socket) do
      Phoenix.PubSub.subscribe(MyApp.PubSub, "dashboard:updates")
    end

    metrics = fetch_initial_metrics()
    {:ok, assign(socket, metrics: metrics)}
  end

  def handle_info({:metrics_update, new_metrics}, socket) do
    {:noreply, assign(socket, metrics: new_metrics)}
  end
end

## From anywhere in the cluster, broadcast updates:
def publish_metrics(metrics) do
  Phoenix.PubSub.broadcast(
    MyApp.PubSub,
    "dashboard:updates",
    {:metrics_update, metrics}
  )
end

This pattern enables features like real-time dashboards, collaborative editing, and live notifications that work seamlessly across a cluster.

Distributed Presence Tracking

Phoenix also provides Presence for tracking who/what is online:

defmodule MyAppWeb.Presence do
  use Phoenix.Presence,
    otp_app: :my_app,
    pubsub_server: MyApp.PubSub
end

## Track a user joining a room
MyAppWeb.Presence.track(
  self(),
  "room:lobby",
  user_id,
  %{status: "online", last_active: System.system_time(:second)}
)

## Get current presence information
MyAppWeb.Presence.list("room:lobby")

Presence uses a CRDT-based algorithm to maintain a consistent view of who’s online across all nodes, handling node joins and failures gracefully.

Advanced Distribution Patterns

Beyond the basic patterns, several advanced techniques enable more sophisticated distributed systems.

Node Discovery and Cluster Formation

In production environments, automatic node discovery and cluster formation are essential. Libraries like libcluster simplify this process:

defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    topologies = [
      example: [
        strategy: Cluster.Strategy.Kubernetes.DNS,
        config: [
          service: "myapp-headless",
          application_name: "myapp"
        ]
      ]
    ]

    children = [
      {Cluster.Supervisor, [topologies, [name: MyApp.ClusterSupervisor]]},
      # Other children...
    ]

    Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
  end
end

This example uses Kubernetes DNS for service discovery, but libcluster supports multiple strategies including:

  • Gossip-based discovery
  • DNS-based discovery
  • Kubernetes API
  • Amazon EC2 autoscaling groups
  • Static configuration

Distributed Data Consistency

Managing consistency across distributed nodes requires careful consideration:

Consensus with rafter

For strong consistency requirements, consensus algorithms like Raft can be implemented using libraries such as rafter:

defmodule MyApp.ConsensusStore do
  use Rafter.Server

  @impl Rafter.Server
  def init(_args) do
    {:ok, %{data: %{}}}
  end

  @impl Rafter.Server
  def handle_command({:set, key, value}, state) do
    new_state = put_in(state.data[key], value)
    {:ok, value, new_state}
  end

  @impl Rafter.Server
  def handle_command({:get, key}, state) do
    value = get_in(state.data, [key])
    {:ok, value, state}
  end
end

This provides:

  • Leader election
  • Linearizable operations
  • Distributed consensus
  • Strong consistency guarantees

However, these benefits come at the cost of reduced availability during network partitions, following the implications of the CAP theorem.

Conflict Resolution with CRDTs

For scenarios where availability is prioritized over consistency, CRDTs offer a principled approach to conflict resolution:

defmodule MyApp.ReplicatedSet do
  use GenServer

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: opts[:name])
  end

  def add(server, element) do
    GenServer.call(server, {:add, element})
  end

  def contains?(server, element) do
    GenServer.call(server, {:contains, element})
  end

  def sync(server, remote_server) do
    GenServer.cast(server, {:sync, remote_server})
  end

  # GenServer callbacks

  def init(opts) do
    # Using a simple add-wins set CRDT
    {:ok, %{elements: MapSet.new(), node: node()}}
  end

  def handle_call({:add, element}, _from, state) do
    new_state = %{state | elements: MapSet.put(state.elements, element)}
    {:reply, :ok, new_state}
  end

  def handle_call({:contains, element}, _from, state) do
    result = MapSet.member?(state.elements, element)
    {:reply, result, state}
  end

  def handle_cast({:sync, remote_server}, state) do
    # Get the remote state
    remote_state = GenServer.call(remote_server, :get_state)

    # Merge states (union for add-wins set)
    merged_elements = MapSet.union(state.elements, remote_state.elements)
    new_state = %{state | elements: merged_elements}

    {:noreply, new_state}
  end

  def handle_call(:get_state, _from, state) do
    {:reply, state, state}
  end
end

This simple example implements an add-wins set CRDT. More sophisticated CRDT implementations support operations like counters, maps, and sequences with principled conflict resolution.

Monitoring and Debugging Distributed Systems

Observability becomes particularly important in distributed systems:

Distributed Tracing

Libraries like opentelemetry enable distributed tracing across service boundaries:

defmodule MyApp.UserService do
  def get_user(id) do
    OpenTelemetry.Tracer.with_span "get_user" do
      # Add context to the span
      OpenTelemetry.Span.set_attributes([
        {"user.id", id}
      ])

      # Perform the operation
      user = Database.get_user(id)

      if user do
        # Make a call to another service, propagating the trace context
        profile = MyApp.ProfileService.get_profile(user.profile_id)
        %{user: user, profile: profile}
      else
        # Record an error event
        OpenTelemetry.Span.add_event("error", [
          {"error.type", "user_not_found"},
          {"user.id", id}
        ])
        nil
      end
    end
  end
end

This enables end-to-end tracing of requests as they flow through your distributed system.

Metrics Collection

The :telemetry package provides a standard way to emit metrics:

defmodule MyApp.MetricsExample do
  def process_item(item) do
    start_time = System.monotonic_time()

    # Measure the frequency of operations
    :telemetry.execute([:myapp, :item, :process, :start], %{}, %{item_id: item.id})

    result = do_process_item(item)

    # Measure the duration of operations
    duration = System.monotonic_time() - start_time
    :telemetry.execute(
      [:myapp, :item, :process, :stop],
      %{duration: duration},
      %{item_id: item.id, result: result}
    )

    result
  end

  defp do_process_item(item) do
    # Processing logic...
  end
end

Combined with reporters for systems like Prometheus, this provides valuable insights into system behavior.

Distributed Logging

Centralized logging is essential for understanding distributed systems:

defmodule MyApp.Logger do
  require Logger

  def info(message, metadata \\ []) do
    metadata = Keyword.merge(default_metadata(), metadata)
    Logger.info(message, metadata)
  end

  def error(message, metadata \\ []) do
    metadata = Keyword.merge(default_metadata(), metadata)
    Logger.error(message, metadata)
  end

  defp default_metadata do
    [node: node(), application: :myapp]
  end
end

This can be combined with a logger backend that ships logs to a centralized service like the ELK stack or Datadog.

Real-World Example: A Distributed Chat System

To illustrate these concepts in action, let’s build a simple distributed chat system:

defmodule ChatSystem.Application do
  use Application

  def start(_type, _args) do
    children = [
      {Phoenix.PubSub, name: ChatSystem.PubSub},
      {ChatSystem.Presence, name: ChatSystem.Presence},
      {ChatSystem.MessageStore, name: ChatSystem.MessageStore},
      ChatSystemWeb.Endpoint
    ]

    opts = [strategy: :one_for_one, name: ChatSystem.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

defmodule ChatSystem.Presence do
  use Phoenix.Presence,
    otp_app: :chat_system,
    pubsub_server: ChatSystem.PubSub
end

defmodule ChatSystem.MessageStore do
  use GenServer

  # Client API

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: opts[:name])
  end

  def add_message(room, sender, content) do
    GenServer.cast(__MODULE__, {:add_message, room, sender, content})
  end

  def get_recent_messages(room, limit \\ 50) do
    GenServer.call(__MODULE__, {:get_recent, room, limit})
  end

  # Server callbacks

  def init(_opts) do
    # Initialize with empty message store
    {:ok, %{rooms: %{}}}
  end

  def handle_cast({:add_message, room, sender, content}, state) do
    # Create message record
    message = %{
      id: generate_id(),
      sender: sender,
      content: content,
      timestamp: :os.system_time(:millisecond)
    }

    # Add to room's message list
    new_rooms = Map.update(
      state.rooms,
      room,
      [message],
      fn messages -> [message | messages] |> Enum.take(100) end
    )

    # Broadcast the new message to all nodes
    Phoenix.PubSub.broadcast(
      ChatSystem.PubSub,
      "room:#{room}",
      {:new_message, message}
    )

    {:noreply, %{state | rooms: new_rooms}}
  end

  def handle_call({:get_recent, room, limit}, _from, state) do
    messages =
      state.rooms
      |> Map.get(room, [])
      |> Enum.take(limit)
      |> Enum.reverse() # Return in chronological order

    {:reply, messages, state}
  end

  defp generate_id, do: System.unique_integer([:positive, :monotonic])
end

defmodule ChatSystemWeb.RoomChannel do
  use Phoenix.Channel
  alias ChatSystem.{Presence, MessageStore}

  def join("room:" <> room_id, _params, socket) do
    send(self(), :after_join)

    # Fetch recent messages
    recent_messages = MessageStore.get_recent_messages(room_id)

    {:ok, %{messages: recent_messages}, assign(socket, :room_id, room_id)}
  end

  def handle_info(:after_join, socket) do
    # Track user presence
    Presence.track(
      self(),
      "room:#{socket.assigns.room_id}:presence",
      socket.assigns.user_id,
      %{
        online_at: :os.system_time(:millisecond)
      }
    )

    # Send current presence state
    push(socket, "presence_state", Presence.list("room:#{socket.assigns.room_id}:presence"))

    {:noreply, socket}
  end

  def handle_in("new_message", %{"content" => content}, socket) do
    room_id = socket.assigns.room_id
    user_id = socket.assigns.user_id

    # Store the message (distributed across the cluster)
    MessageStore.add_message(room_id, user_id, content)

    {:reply, :ok, socket}
  end
end

This example demonstrates several key distributed patterns:

  1. Distributed presence with Phoenix.Presence for tracking users across nodes
  2. Real-time messaging with Phoenix.PubSub for cross-node communication
  3. Shared state with a GenServer for the message store
  4. Event propagation across the cluster

In a real-world system, you might extend this with:

  • Persistence using a database
  • CRDT-based message ordering
  • Cluster formation with libcluster
  • More sophisticated presence features

Deployment Considerations

Successfully running a distributed Elixir application in production requires careful deployment planning:

Network Configuration

Ensure proper network configuration between nodes:

  • EPMD port (typically 4369) must be accessible between nodes
  • Dynamic port range for inter-node communication (typically 9000-9100)
  • Use a firewall to prevent external access to these ports
  • Consider using IPsec or VPN tunnels for secure communication

Node Naming and Discovery

In production environments, use fully qualified domain names or static IPs:

--name app@172.16.0.1 --cookie my_secure_cookie

For containerized deployments, libraries like libcluster simplify node discovery:

## For Kubernetes-based deployments
config :libcluster,
  topologies: [
    k8s: [
      strategy: Cluster.Strategy.Kubernetes.DNS,
      config: [
        service: "myapp-headless",
        application_name: "myapp"
      ]
    ]
  ]

Cookie Security

The Erlang distribution cookie controls node authentication:

  • Use a strong, random cookie value
  • Keep the cookie value secure (treat it like a password)
  • Consider using environment variables or runtime configuration
  • Different environments should use different cookies

Load Balancing

For web applications, configure load balancers to support distributed session management:

  • Use sticky sessions if sessions are node-local
  • Ensure WebSocket connections are properly handled
  • Consider session storage in a distributed database if sticky sessions aren’t desired

Conclusion

Elixir and the BEAM virtual machine provide a robust foundation for building distributed systems. By embracing the actor model, message passing, and process isolation, Elixir offers a programming model that naturally extends from concurrent to distributed applications.

The patterns we’ve explored—from distributed state management to real-time features—demonstrate how Elixir’s built-in distribution capabilities can be applied to solve real-world challenges. Libraries like Phoenix, libcluster, and various CRDT implementations extend these capabilities further, providing battle-tested solutions for common distributed system problems.

When building distributed systems with Elixir, remember these key principles:

  1. Embrace asynchronous communication through message passing
  2. Design for failure with supervision trees and crash recovery
  3. Consider data consistency needs carefully, choosing appropriate patterns
  4. Implement proper observability with tracing, metrics, and logging
  5. Test thoroughly, including partition scenarios

By leveraging Elixir’s strengths and following these patterns, you can build distributed systems that are not only functional but also resilient, maintainable, and scalable.

References

  1. Cesarini, F., & Vinoski, S. (2016). Designing for Scalability with Erlang/OTP. O’Reilly Media.

  2. Gospodinov, S. (2021). Concurrent Data Processing in Elixir: Fast, Resilient Applications with OTP, GenStage, Flow, and Broadway. The Pragmatic Bookshelf.

  3. Taylor, S. (2020). Real-Time Phoenix: Build Highly Scalable Systems with Channels. The Pragmatic Bookshelf.

  4. Shapiro, M., Preguiça, N., Baquero, C., & Zawirski, M. (2011). Conflict-Free Replicated Data Types. https://hal.inria.fr/inria-00609399v1/document

  5. Phoenix Framework Documentation. https://hexdocs.pm/phoenix/Phoenix.html

Back to all articles