Hanso Group

Concurrency Best Practices in Elixir

Julian Lindner 21 minutes read

Elixir’s concurrency model, built on the battle-tested Erlang VM, provides powerful tools for building robust, scalable, and fault-tolerant applications. But with great power comes great responsibility. Understanding how to effectively leverage Elixir’s concurrency primitives is crucial for creating systems that perform well under real-world conditions. This article explores best practices for concurrency in Elixir, focusing on practical examples and patterns that you can apply to your projects today.

Understanding Elixir’s Concurrency Model

Before diving into best practices, let’s briefly review the fundamentals of Elixir’s concurrency model:

  • Processes: Lightweight, isolated units of execution (not OS processes)
  • Message Passing: Communication between processes via message passing (not shared memory)
  • Supervision: Hierarchical process monitoring and recovery strategies
  • Distribution: Ability to run processes across multiple nodes, even on different machines

These building blocks enable the “Let it crash” philosophy: rather than defensive programming and complex error handling, Elixir encourages designing systems where components can fail independently and be automatically restarted.

Choosing the Right Abstraction

Elixir offers several abstractions for concurrent programming, each suited to different use cases:

Processes and Message Passing

The most fundamental concurrency primitives in Elixir are processes and message passing:

## Spawn a process
pid = spawn(fn ->
  receive do
    {:compute, n, caller} ->
      result = n * n
      send(caller, {:result, result})
  end
end)

## Send a message
send(pid, {:compute, 42, self()})

## Receive the response
receive do
  {:result, value} -> IO.puts("Result: #{value}")
after
  5000 -> IO.puts("Timeout")
end

This low-level approach gives you complete control but requires manual handling of many concerns.

Tasks

Tasks provide a higher-level abstraction for asynchronous computation:

## Run an async task
task = Task.async(fn ->
  # Simulate work
  Process.sleep(1000)
  42 * 42
end)

## Do other work here...

## Await the result
result = Task.await(task)
IO.puts("Result: #{result}")

Tasks are ideal for:

  • One-off asynchronous operations
  • Concurrent mapping operations
  • Operations where you need the result
GenServer

GenServers provide a standard structure for building stateful, long-running processes:

defmodule Counter do
  use GenServer

  # Client API
  def start_link(initial_count) do
    GenServer.start_link(__MODULE__, initial_count, name: __MODULE__)
  end

  def increment(by \\ 1) do
    GenServer.call(__MODULE__, {:increment, by})
  end

  def get_count do
    GenServer.call(__MODULE__, :get_count)
  end

  # Server callbacks
  @impl true
  def init(initial_count) do
    {:ok, initial_count}
  end

  @impl true
  def handle_call({:increment, by}, _from, count) do
    new_count = count + by
    {:reply, new_count, new_count}
  end

  @impl true
  def handle_call(:get_count, _from, count) do
    {:reply, count, count}
  end
end

GenServers are ideal for:

  • Managing shared state
  • Rate limiting or coordinating access
  • Long-running services
Agent

Agents provide a simpler abstraction for state management:

{:ok, agent} = Agent.start_link(fn -> %{} end)

## Update state
Agent.update(agent, fn map -> Map.put(map, :key, "value") end)

## Get state
value = Agent.get(agent, fn map -> Map.get(map, :key) end)

Agents are ideal for:

  • Simple state storage without complex logic
  • When you don’t need the full GenServer behavior

Best Practice 1: Process Isolation and Error Handling

One of Elixir’s core principles is fault isolation. Processes should be designed to have a single responsibility, allowing them to fail independently without affecting the entire system.

Separate Concerns into Independent Processes

Rather than building a monolithic process, divide your application into smaller processes with well-defined responsibilities:

defmodule UserService do
  use Supervisor

  def start_link(init_arg) do
    Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  @impl true
  def init(_init_arg) do
    children = [
      {UserRepository, []},
      {UserAuthenticator, []},
      {UserNotifier, []}
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end
end

This approach allows each component to be managed independently. If the UserNotifier process crashes, it won’t affect the UserRepository or UserAuthenticator.

Let It Crash (Strategically)

Instead of defensive programming, let processes crash and restart when encountering unexpected conditions:

defmodule ImageProcessor do
  use GenServer

  # ... other callbacks ...

  @impl true
  def handle_call({:process, image_data}, _from, state) do
    # Don't handle every possible error case
    result = process_image(image_data)
    {:reply, result, state}
  end

  defp process_image(image_data) do
    # If this fails with unhandled exceptions, the process will crash
    # and be restarted by its supervisor
    image_data
    |> decode_image()
    |> apply_filters()
    |> optimize()
  end
end

Combined with proper supervision, this approach leads to more robust systems.

Use Supervision Trees Effectively

Supervisors should group processes with similar lifecycle requirements:

defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      # Database connections and caches
      MyApp.Repo,
      {Finch, name: MyApp.Finch},
      {Cachex, name: MyApp.Cache},

      # Core business logic services
      MyApp.UserService,
      MyApp.BillingService,

      # Web interface
      {Phoenix.PubSub, name: MyApp.PubSub},
      MyAppWeb.Telemetry,
      MyAppWeb.Endpoint
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Choose the appropriate supervision strategy:

  • :one_for_one - If one child process terminates, only that process is restarted
  • :one_for_all - If one child process terminates, all other child processes are terminated and restarted
  • :rest_for_one - If one child process terminates, the rest of the child processes (that were started after the terminated one) are terminated and restarted

Best Practice 2: Handling Concurrency Patterns

Different concurrency patterns are suited to different problems. Here are some common patterns and when to use them:

Fan-Out / Fan-In (Scatter/Gather)

This pattern distributes work across multiple processes and then collects the results:

defmodule ImageResizer do
  def resize_images(image_files) do
    # Fan out - start concurrent tasks
    tasks = Enum.map(image_files, fn file ->
      Task.async(fn -> resize_image(file) end)
    end)

    # Fan in - collect all results
    results = Task.await_many(tasks, 30000)

    # Process combined results
    Enum.count(results, fn result -> result == :ok end)
  end

  defp resize_image(file) do
    # Image resizing logic
    # ...
    :ok
  end
end

This pattern is ideal for:

  • CPU-bound operations that can be parallelized
  • IO-bound operations where you want to process multiple requests concurrently
  • When you need all results before proceeding
Worker Pools

Worker pools manage a group of worker processes to handle tasks:

defmodule WorkerPool do
  use Supervisor

  def start_link(opts) do
    Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
  end

  @impl true
  def init(opts) do
    pool_size = Keyword.get(opts, :pool_size, 5)

    children = Enum.map(1..pool_size, fn i ->
      Supervisor.child_spec({Worker, []}, id: {Worker, i})
    end)

    Supervisor.init(children, strategy: :one_for_one)
  end

  def process_job(job) do
    # Get a random worker
    workers = Supervisor.which_children(__MODULE__)
    {_id, pid, _type, _modules} = Enum.random(workers)

    Worker.process(pid, job)
  end
end

defmodule Worker do
  use GenServer

  def start_link(_) do
    GenServer.start_link(__MODULE__, %{})
  end

  def process(pid, job) do
    GenServer.call(pid, {:process, job})
  end

  @impl true
  def init(state) do
    {:ok, state}
  end

  @impl true
  def handle_call({:process, job}, _from, state) do
    result = do_process(job)
    {:reply, result, state}
  end

  defp do_process(job) do
    # Process the job
    # ...
    :ok
  end
end

For production use, consider using established libraries like Poolboy or NimblePool instead of implementing your own pool.

Worker pools are ideal for:

  • Managing access to limited resources (like database connections)
  • Processing a stream of jobs with controlled concurrency
  • Distributing load across multiple workers
Pipeline Pattern

The pipeline pattern processes data through a series of stages:

defmodule DataPipeline do
  def start_link do
    # Start each stage as a GenStage producer/consumer
    {:ok, producer} = DataProducer.start_link()
    {:ok, transformer} = DataTransformer.start_link()
    {:ok, consumer} = DataConsumer.start_link()

    # Connect the stages
    GenStage.sync_subscribe(transformer, to: producer)
    GenStage.sync_subscribe(consumer, to: transformer)

    {:ok, %{producer: producer, transformer: transformer, consumer: consumer}}
  end
end

For a simplified pipeline without the backpressure handling of GenStage:

defmodule SimplePipeline do
  def process_file(file_path) do
    file_path
    |> File.stream!([], 1024)
    |> Stream.map(&parse_line/1)
    |> Stream.map(&transform_data/1)
    |> Stream.map(&validate_record/1)
    |> Stream.map(&save_record/1)
    |> Stream.run()
  end

  defp parse_line(line), do: # ...
  defp transform_data(data), do: # ...
  defp validate_record(record), do: # ...
  defp save_record(record), do: # ...
end

Pipelines are ideal for:

  • Data processing with multiple stages
  • When there’s a natural flow of data through different processing steps
  • Managing backpressure in systems with different processing rates

Best Practice 3: Performance Optimization

Effective concurrency isn’t just about creating many processes; it’s about optimizing the overall system performance.

Process Communication Patterns

Communication between processes introduces overhead. Choose the right pattern:

## Pattern 1: Fire and forget
def notify_user(user_id, message) do
  spawn(fn ->
    user = Users.get(user_id)
    Notifications.send(user.email, message)
  end)
  :ok
end

## Pattern 2: Async with timeout
def process_payment(payment) do
  task = Task.async(fn ->
    PaymentGateway.charge(payment)
  end)

  try do
    Task.await(task, 10_000)
  catch
    :exit, {:timeout, _} ->
      Process.exit(task.pid, :kill)
      {:error, :timeout}
  end
end

## Pattern 3: Pooled resources
def query_database(query) do
  :poolboy.transaction(:db_pool, fn conn ->
    DBConnection.execute(conn, query, [])
  end)
end

Choose based on your needs:

  • Fire and forget: When you don’t need a response
  • Async with timeout: When you need a response but want concurrency
  • Pooled resources: For managing limited resources
Process Linking and Monitoring

Understand the difference between linking and monitoring:

## Linking - if the spawned process dies, this process dies too
def start_critical_component do
  spawn_link(fn ->
    critical_process()
  end)
end

## Monitoring - get a message if the monitored process dies
def start_monitored_component do
  pid = spawn(fn ->
    monitored_process()
  end)

  Process.monitor(pid)

  receive do
    {:DOWN, _ref, :process, ^pid, reason} ->
      Logger.warn("Monitored process terminated: #{inspect(reason)}")
      # Handle the failure
  end
end
  • Use linking when processes have dependent lifecycles
  • Use monitoring when you need to know about process termination but want to decide how to handle it
Message Passing Optimization

Be mindful of message passing performance:

## Avoid this - sending large data structures
def process_large_data(large_data) do
  spawn(fn ->
    # Large data is copied across process boundaries
    result = do_process(large_data)
    send(self(), {:result, result})
  end)
end

## Better - pass references to data when possible
def process_large_file(file_path) do
  spawn(fn ->
    # Only the file path is copied, not the contents
    result = process_file(file_path)
    send(self(), {:result, result})
  end)
end

For large data structures, consider:

  • Passing file paths or database IDs instead of entire data structures
  • Using ETS (Erlang Term Storage) for shared access to large data
  • Breaking large data into smaller chunks

Best Practice 4: Managing Process State

Properly managing state in concurrent systems is crucial for correctness and performance.

Immutable Data Structures

Leverage Elixir’s immutable data structures to simplify concurrent programming:

defmodule UserStats do
  use GenServer

  def start_link(_) do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end

  def record_login(user_id) do
    GenServer.cast(__MODULE__, {:login, user_id})
  end

  def get_stats do
    GenServer.call(__MODULE__, :get_stats)
  end

  @impl true
  def init(_) do
    {:ok, %{logins: %{}}}
  end

  @impl true
  def handle_cast({:login, user_id}, state) do
    # Create a new immutable state rather than modifying the existing one
    new_logins = Map.update(state.logins, user_id, 1, &(&1 + 1))
    {:noreply, %{state | logins: new_logins}}
  end

  @impl true
  def handle_call(:get_stats, _from, state) do
    {:reply, state, state}
  end
end

Immutable data structures ensure that data isn’t inadvertently shared or modified.

Process Dictionary Considerations

The process dictionary can be tempting but should be used sparingly:

## Avoid this pattern for application state
def set_config(key, value) do
  Process.put(key, value)
end

def get_config(key) do
  Process.get(key)
end

## Better approach - use a dedicated process
defmodule Config do
  use GenServer

  def start_link(_) do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end

  def set(key, value) do
    GenServer.cast(__MODULE__, {:set, key, value})
  end

  def get(key) do
    GenServer.call(__MODULE__, {:get, key})
  end

  @impl true
  def init(_) do
    {:ok, %{}}
  end

  @impl true
  def handle_cast({:set, key, value}, state) do
    {:noreply, Map.put(state, key, value)}
  end

  @impl true
  def handle_call({:get, key}, _from, state) do
    {:reply, Map.get(state, key), state}
  end
end

The process dictionary has legitimate uses (like storing process-specific context), but for shared application state, dedicated processes are better.

ETS Tables for Shared State

For high-performance shared state, consider ETS tables:

defmodule Cache do
  use GenServer

  def start_link(_) do
    GenServer.start_link(__MODULE__, nil, name: __MODULE__)
  end

  def put(key, value) do
    :ets.insert(:cache, {key, value})
  end

  def get(key) do
    case :ets.lookup(:cache, key) do
      [{^key, value}] -> {:ok, value}
      [] -> :error
    end
  end

  @impl true
  def init(_) do
    :ets.new(:cache, [:set, :named_table, :public, read_concurrency: true])
    {:ok, nil}
  end
end

ETS tables are ideal for:

  • Read-heavy workloads with occasional writes
  • When you need faster access than message passing
  • When multiple processes need to access the same data

Best Practice 5: Testing Concurrent Systems

Testing concurrent systems presents unique challenges. Here are some approaches:

Unit Testing Concurrent Code

Test the behavior of individual components in isolation:

defmodule WorkerTest do
  use ExUnit.Case

  test "worker processes jobs correctly" do
    {:ok, pid} = Worker.start_link([])

    result = Worker.process(pid, %{type: :test})

    assert result == :ok
  end

  test "worker handles errors gracefully" do
    {:ok, pid} = Worker.start_link([])

    result = Worker.process(pid, %{type: :invalid})

    assert result == {:error, :invalid_job}
  end
end
Testing Race Conditions and Timing Issues

For timing-sensitive code, use explicit synchronization in tests:

defmodule ConcurrencyTest do
  use ExUnit.Case

  test "multiple processes can update state concurrently" do
    {:ok, pid} = Counter.start_link(0)
    parent = self()

    # Start multiple processes that increment the counter
    for _ <- 1..100 do
      spawn_link(fn ->
        Counter.increment(pid)
        send(parent, :done)
      end)
    end

    # Wait for all processes to complete
    for _ <- 1..100 do
      assert_receive :done, 1000
    end

    # Verify the final count
    assert Counter.get_count(pid) == 100
  end
end
Property-Based Testing for Concurrency

For complex concurrent systems, consider property-based testing:

defmodule QueuePropertyTest do
  use ExUnit.Case
  use PropCheck

  property "concurrent enqueues and dequeues maintain count invariant" do
    forall cmds <- commands() do
      initial_state = %{count: 0}
      {:ok, queue} = SafeQueue.start_link()

      run_result = run_commands(cmds, initial_state, queue)

      # Verify the final queue count matches our model
      final_count = SafeQueue.count(queue)
      model_count = run_result.state.count

      assert final_count == model_count
    end
  end

  # Define commands that can be executed
  def commands do
    let operations <- list(
      frequency([
        {3, {:call, SafeQueue, :enqueue, [term()]}},
        {2, {:call, SafeQueue, :dequeue, []}}
      ])
    ) do
      operations
    end
  end

  # Define state transitions
  def next_state(state, _result, {:call, SafeQueue, :enqueue, [_]}) do
    %{state | count: state.count + 1}
  end

  def next_state(state, _result, {:call, SafeQueue, :dequeue, []}) do
    if state.count > 0 do
      %{state | count: state.count - 1}
    else
      state
    end
  end
end

Best Practice 6: Monitoring and Debugging

Effective monitoring and debugging are essential for maintaining concurrent systems in production.

Process Naming and Registration

Name important processes for easier debugging:

defmodule ImportantService do
  use GenServer

  def start_link(_) do
    # Register with a well-known name
    GenServer.start_link(__MODULE__, nil, name: __MODULE__)
  end

  def start_link_pool(id) do
    # For multiple instances, use a derived name
    GenServer.start_link(__MODULE__, nil, name: via_tuple(id))
  end

  defp via_tuple(id) do
    # Using Registry for dynamic registration
    {:via, Registry, {MyApp.Registry, {__MODULE__, id}}}
  end
end
Process Tracing and Debugging

Use :sys and :observer for runtime debugging:

## Get the current state of a GenServer
:sys.get_state(MyService)

## Trace calls to a specific process
:sys.trace(MyService, true)

## Start the observer GUI (run in IEx)
:observer.start()

For distributed debugging, consider using Recon or AppMon.

Logging and Telemetry

Implement proper observability for concurrent systems:

defmodule Worker do
  use GenServer
  require Logger

  # ... other callbacks ...

  @impl true
  def handle_call({:process, job_id}, _from, state) do
    start_time = System.monotonic_time()

    Logger.info("Starting job #{job_id}")

    result =
      try do
        do_job(job_id)
      rescue
        e ->
          Logger.error("Job #{job_id} failed: #{inspect(e)}")
          {:error, :failed}
      end

    end_time = System.monotonic_time()
    duration = System.convert_time_unit(end_time - start_time, :native, :millisecond)

    :telemetry.execute(
      [:worker, :job, :completed],
      %{duration: duration},
      %{job_id: job_id, result: result}
    )

    {:reply, result, state}
  end
end

For production systems, consider using TelemetryMetrics to collect and report metrics.

Conclusion

Elixir’s concurrency model provides powerful tools for building robust, scalable applications. By following these best practices, you can harness the full potential of the BEAM while avoiding common pitfalls.

Remember that effective concurrency is about more than just creating processes—it’s about designing systems that communicate efficiently, handle failures gracefully, and maintain consistent state. The right approach often involves a combination of different patterns, each suited to specific parts of your application.

As you apply these practices, always keep the Elixir philosophy in mind: embrace process isolation, let processes crash and restart, and build systems that are resilient by design. With these principles, you can create concurrent applications that are not only powerful but also maintainable and reliable.

References

  1. Thomas, D. (2018). Programming Elixir 1.6: Functional |> Concurrent |> Pragmatic |> Fun. The Pragmatic Bookshelf.

  2. Gospodinov, S. (2021). Concurrent Data Processing in Elixir: Fast, Resilient Applications with OTP, GenStage, Flow, and Broadway. The Pragmatic Bookshelf.

  3. Cesarini, F., & Vinoski, S. (2016). Designing for Scalability with Erlang/OTP. O’Reilly Media.

  4. Armstrong, J. (2013). Programming Erlang: Software for a Concurrent World (2nd Edition). The Pragmatic Bookshelf.

  5. Elixir Documentation: Processes. https://elixir-lang.org/getting-started/processes.html

Back to all articles