Elixir’s concurrency model, built on the battle-tested Erlang VM, provides powerful tools for building robust, scalable, and fault-tolerant applications. But with great power comes great responsibility. Understanding how to effectively leverage Elixir’s concurrency primitives is crucial for creating systems that perform well under real-world conditions. This article explores best practices for concurrency in Elixir, focusing on practical examples and patterns that you can apply to your projects today.
Understanding Elixir’s Concurrency Model
Before diving into best practices, let’s briefly review the fundamentals of Elixir’s concurrency model:
- Processes: Lightweight, isolated units of execution (not OS processes)
- Message Passing: Communication between processes via message passing (not shared memory)
- Supervision: Hierarchical process monitoring and recovery strategies
- Distribution: Ability to run processes across multiple nodes, even on different machines
These building blocks enable the “Let it crash” philosophy: rather than defensive programming and complex error handling, Elixir encourages designing systems where components can fail independently and be automatically restarted.
Choosing the Right Abstraction
Elixir offers several abstractions for concurrent programming, each suited to different use cases:
Processes and Message Passing
The most fundamental concurrency primitives in Elixir are processes and message passing:
## Spawn a process
pid = spawn(fn ->
receive do
{:compute, n, caller} ->
result = n * n
send(caller, {:result, result})
end
end)
## Send a message
send(pid, {:compute, 42, self()})
## Receive the response
receive do
{:result, value} -> IO.puts("Result: #{value}")
after
5000 -> IO.puts("Timeout")
end
This low-level approach gives you complete control but requires manual handling of many concerns.
Tasks
Tasks provide a higher-level abstraction for asynchronous computation:
## Run an async task
task = Task.async(fn ->
# Simulate work
Process.sleep(1000)
42 * 42
end)
## Do other work here...
## Await the result
result = Task.await(task)
IO.puts("Result: #{result}")
Tasks are ideal for:
- One-off asynchronous operations
- Concurrent mapping operations
- Operations where you need the result
GenServer
GenServers provide a standard structure for building stateful, long-running processes:
defmodule Counter do
use GenServer
# Client API
def start_link(initial_count) do
GenServer.start_link(__MODULE__, initial_count, name: __MODULE__)
end
def increment(by \\ 1) do
GenServer.call(__MODULE__, {:increment, by})
end
def get_count do
GenServer.call(__MODULE__, :get_count)
end
# Server callbacks
@impl true
def init(initial_count) do
{:ok, initial_count}
end
@impl true
def handle_call({:increment, by}, _from, count) do
new_count = count + by
{:reply, new_count, new_count}
end
@impl true
def handle_call(:get_count, _from, count) do
{:reply, count, count}
end
end
GenServers are ideal for:
- Managing shared state
- Rate limiting or coordinating access
- Long-running services
Agent
Agents provide a simpler abstraction for state management:
{:ok, agent} = Agent.start_link(fn -> %{} end)
## Update state
Agent.update(agent, fn map -> Map.put(map, :key, "value") end)
## Get state
value = Agent.get(agent, fn map -> Map.get(map, :key) end)
Agents are ideal for:
- Simple state storage without complex logic
- When you don’t need the full GenServer behavior
Best Practice 1: Process Isolation and Error Handling
One of Elixir’s core principles is fault isolation. Processes should be designed to have a single responsibility, allowing them to fail independently without affecting the entire system.
Separate Concerns into Independent Processes
Rather than building a monolithic process, divide your application into smaller processes with well-defined responsibilities:
defmodule UserService do
use Supervisor
def start_link(init_arg) do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
@impl true
def init(_init_arg) do
children = [
{UserRepository, []},
{UserAuthenticator, []},
{UserNotifier, []}
]
Supervisor.init(children, strategy: :one_for_one)
end
end
This approach allows each component to be managed independently. If the UserNotifier
process crashes, it won’t affect the UserRepository
or UserAuthenticator
.
Let It Crash (Strategically)
Instead of defensive programming, let processes crash and restart when encountering unexpected conditions:
defmodule ImageProcessor do
use GenServer
# ... other callbacks ...
@impl true
def handle_call({:process, image_data}, _from, state) do
# Don't handle every possible error case
result = process_image(image_data)
{:reply, result, state}
end
defp process_image(image_data) do
# If this fails with unhandled exceptions, the process will crash
# and be restarted by its supervisor
image_data
|> decode_image()
|> apply_filters()
|> optimize()
end
end
Combined with proper supervision, this approach leads to more robust systems.
Use Supervision Trees Effectively
Supervisors should group processes with similar lifecycle requirements:
defmodule MyApp.Application do
use Application
def start(_type, _args) do
children = [
# Database connections and caches
MyApp.Repo,
{Finch, name: MyApp.Finch},
{Cachex, name: MyApp.Cache},
# Core business logic services
MyApp.UserService,
MyApp.BillingService,
# Web interface
{Phoenix.PubSub, name: MyApp.PubSub},
MyAppWeb.Telemetry,
MyAppWeb.Endpoint
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
Choose the appropriate supervision strategy:
-
:one_for_one
- If one child process terminates, only that process is restarted -
:one_for_all
- If one child process terminates, all other child processes are terminated and restarted -
:rest_for_one
- If one child process terminates, the rest of the child processes (that were started after the terminated one) are terminated and restarted
Best Practice 2: Handling Concurrency Patterns
Different concurrency patterns are suited to different problems. Here are some common patterns and when to use them:
Fan-Out / Fan-In (Scatter/Gather)
This pattern distributes work across multiple processes and then collects the results:
defmodule ImageResizer do
def resize_images(image_files) do
# Fan out - start concurrent tasks
tasks = Enum.map(image_files, fn file ->
Task.async(fn -> resize_image(file) end)
end)
# Fan in - collect all results
results = Task.await_many(tasks, 30000)
# Process combined results
Enum.count(results, fn result -> result == :ok end)
end
defp resize_image(file) do
# Image resizing logic
# ...
:ok
end
end
This pattern is ideal for:
- CPU-bound operations that can be parallelized
- IO-bound operations where you want to process multiple requests concurrently
- When you need all results before proceeding
Worker Pools
Worker pools manage a group of worker processes to handle tasks:
defmodule WorkerPool do
use Supervisor
def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
end
@impl true
def init(opts) do
pool_size = Keyword.get(opts, :pool_size, 5)
children = Enum.map(1..pool_size, fn i ->
Supervisor.child_spec({Worker, []}, id: {Worker, i})
end)
Supervisor.init(children, strategy: :one_for_one)
end
def process_job(job) do
# Get a random worker
workers = Supervisor.which_children(__MODULE__)
{_id, pid, _type, _modules} = Enum.random(workers)
Worker.process(pid, job)
end
end
defmodule Worker do
use GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, %{})
end
def process(pid, job) do
GenServer.call(pid, {:process, job})
end
@impl true
def init(state) do
{:ok, state}
end
@impl true
def handle_call({:process, job}, _from, state) do
result = do_process(job)
{:reply, result, state}
end
defp do_process(job) do
# Process the job
# ...
:ok
end
end
For production use, consider using established libraries like Poolboy or NimblePool instead of implementing your own pool.
Worker pools are ideal for:
- Managing access to limited resources (like database connections)
- Processing a stream of jobs with controlled concurrency
- Distributing load across multiple workers
Pipeline Pattern
The pipeline pattern processes data through a series of stages:
defmodule DataPipeline do
def start_link do
# Start each stage as a GenStage producer/consumer
{:ok, producer} = DataProducer.start_link()
{:ok, transformer} = DataTransformer.start_link()
{:ok, consumer} = DataConsumer.start_link()
# Connect the stages
GenStage.sync_subscribe(transformer, to: producer)
GenStage.sync_subscribe(consumer, to: transformer)
{:ok, %{producer: producer, transformer: transformer, consumer: consumer}}
end
end
For a simplified pipeline without the backpressure handling of GenStage:
defmodule SimplePipeline do
def process_file(file_path) do
file_path
|> File.stream!([], 1024)
|> Stream.map(&parse_line/1)
|> Stream.map(&transform_data/1)
|> Stream.map(&validate_record/1)
|> Stream.map(&save_record/1)
|> Stream.run()
end
defp parse_line(line), do: # ...
defp transform_data(data), do: # ...
defp validate_record(record), do: # ...
defp save_record(record), do: # ...
end
Pipelines are ideal for:
- Data processing with multiple stages
- When there’s a natural flow of data through different processing steps
- Managing backpressure in systems with different processing rates
Best Practice 3: Performance Optimization
Effective concurrency isn’t just about creating many processes; it’s about optimizing the overall system performance.
Process Communication Patterns
Communication between processes introduces overhead. Choose the right pattern:
## Pattern 1: Fire and forget
def notify_user(user_id, message) do
spawn(fn ->
user = Users.get(user_id)
Notifications.send(user.email, message)
end)
:ok
end
## Pattern 2: Async with timeout
def process_payment(payment) do
task = Task.async(fn ->
PaymentGateway.charge(payment)
end)
try do
Task.await(task, 10_000)
catch
:exit, {:timeout, _} ->
Process.exit(task.pid, :kill)
{:error, :timeout}
end
end
## Pattern 3: Pooled resources
def query_database(query) do
:poolboy.transaction(:db_pool, fn conn ->
DBConnection.execute(conn, query, [])
end)
end
Choose based on your needs:
- Fire and forget: When you don’t need a response
- Async with timeout: When you need a response but want concurrency
- Pooled resources: For managing limited resources
Process Linking and Monitoring
Understand the difference between linking and monitoring:
## Linking - if the spawned process dies, this process dies too
def start_critical_component do
spawn_link(fn ->
critical_process()
end)
end
## Monitoring - get a message if the monitored process dies
def start_monitored_component do
pid = spawn(fn ->
monitored_process()
end)
Process.monitor(pid)
receive do
{:DOWN, _ref, :process, ^pid, reason} ->
Logger.warn("Monitored process terminated: #{inspect(reason)}")
# Handle the failure
end
end
- Use linking when processes have dependent lifecycles
- Use monitoring when you need to know about process termination but want to decide how to handle it
Message Passing Optimization
Be mindful of message passing performance:
## Avoid this - sending large data structures
def process_large_data(large_data) do
spawn(fn ->
# Large data is copied across process boundaries
result = do_process(large_data)
send(self(), {:result, result})
end)
end
## Better - pass references to data when possible
def process_large_file(file_path) do
spawn(fn ->
# Only the file path is copied, not the contents
result = process_file(file_path)
send(self(), {:result, result})
end)
end
For large data structures, consider:
- Passing file paths or database IDs instead of entire data structures
- Using ETS (Erlang Term Storage) for shared access to large data
- Breaking large data into smaller chunks
Best Practice 4: Managing Process State
Properly managing state in concurrent systems is crucial for correctness and performance.
Immutable Data Structures
Leverage Elixir’s immutable data structures to simplify concurrent programming:
defmodule UserStats do
use GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def record_login(user_id) do
GenServer.cast(__MODULE__, {:login, user_id})
end
def get_stats do
GenServer.call(__MODULE__, :get_stats)
end
@impl true
def init(_) do
{:ok, %{logins: %{}}}
end
@impl true
def handle_cast({:login, user_id}, state) do
# Create a new immutable state rather than modifying the existing one
new_logins = Map.update(state.logins, user_id, 1, &(&1 + 1))
{:noreply, %{state | logins: new_logins}}
end
@impl true
def handle_call(:get_stats, _from, state) do
{:reply, state, state}
end
end
Immutable data structures ensure that data isn’t inadvertently shared or modified.
Process Dictionary Considerations
The process dictionary can be tempting but should be used sparingly:
## Avoid this pattern for application state
def set_config(key, value) do
Process.put(key, value)
end
def get_config(key) do
Process.get(key)
end
## Better approach - use a dedicated process
defmodule Config do
use GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def set(key, value) do
GenServer.cast(__MODULE__, {:set, key, value})
end
def get(key) do
GenServer.call(__MODULE__, {:get, key})
end
@impl true
def init(_) do
{:ok, %{}}
end
@impl true
def handle_cast({:set, key, value}, state) do
{:noreply, Map.put(state, key, value)}
end
@impl true
def handle_call({:get, key}, _from, state) do
{:reply, Map.get(state, key), state}
end
end
The process dictionary has legitimate uses (like storing process-specific context), but for shared application state, dedicated processes are better.
ETS Tables for Shared State
For high-performance shared state, consider ETS tables:
defmodule Cache do
use GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end
def put(key, value) do
:ets.insert(:cache, {key, value})
end
def get(key) do
case :ets.lookup(:cache, key) do
[{^key, value}] -> {:ok, value}
[] -> :error
end
end
@impl true
def init(_) do
:ets.new(:cache, [:set, :named_table, :public, read_concurrency: true])
{:ok, nil}
end
end
ETS tables are ideal for:
- Read-heavy workloads with occasional writes
- When you need faster access than message passing
- When multiple processes need to access the same data
Best Practice 5: Testing Concurrent Systems
Testing concurrent systems presents unique challenges. Here are some approaches:
Unit Testing Concurrent Code
Test the behavior of individual components in isolation:
defmodule WorkerTest do
use ExUnit.Case
test "worker processes jobs correctly" do
{:ok, pid} = Worker.start_link([])
result = Worker.process(pid, %{type: :test})
assert result == :ok
end
test "worker handles errors gracefully" do
{:ok, pid} = Worker.start_link([])
result = Worker.process(pid, %{type: :invalid})
assert result == {:error, :invalid_job}
end
end
Testing Race Conditions and Timing Issues
For timing-sensitive code, use explicit synchronization in tests:
defmodule ConcurrencyTest do
use ExUnit.Case
test "multiple processes can update state concurrently" do
{:ok, pid} = Counter.start_link(0)
parent = self()
# Start multiple processes that increment the counter
for _ <- 1..100 do
spawn_link(fn ->
Counter.increment(pid)
send(parent, :done)
end)
end
# Wait for all processes to complete
for _ <- 1..100 do
assert_receive :done, 1000
end
# Verify the final count
assert Counter.get_count(pid) == 100
end
end
Property-Based Testing for Concurrency
For complex concurrent systems, consider property-based testing:
defmodule QueuePropertyTest do
use ExUnit.Case
use PropCheck
property "concurrent enqueues and dequeues maintain count invariant" do
forall cmds <- commands() do
initial_state = %{count: 0}
{:ok, queue} = SafeQueue.start_link()
run_result = run_commands(cmds, initial_state, queue)
# Verify the final queue count matches our model
final_count = SafeQueue.count(queue)
model_count = run_result.state.count
assert final_count == model_count
end
end
# Define commands that can be executed
def commands do
let operations <- list(
frequency([
{3, {:call, SafeQueue, :enqueue, [term()]}},
{2, {:call, SafeQueue, :dequeue, []}}
])
) do
operations
end
end
# Define state transitions
def next_state(state, _result, {:call, SafeQueue, :enqueue, [_]}) do
%{state | count: state.count + 1}
end
def next_state(state, _result, {:call, SafeQueue, :dequeue, []}) do
if state.count > 0 do
%{state | count: state.count - 1}
else
state
end
end
end
Best Practice 6: Monitoring and Debugging
Effective monitoring and debugging are essential for maintaining concurrent systems in production.
Process Naming and Registration
Name important processes for easier debugging:
defmodule ImportantService do
use GenServer
def start_link(_) do
# Register with a well-known name
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end
def start_link_pool(id) do
# For multiple instances, use a derived name
GenServer.start_link(__MODULE__, nil, name: via_tuple(id))
end
defp via_tuple(id) do
# Using Registry for dynamic registration
{:via, Registry, {MyApp.Registry, {__MODULE__, id}}}
end
end
Process Tracing and Debugging
Use :sys
and :observer
for runtime debugging:
## Get the current state of a GenServer
:sys.get_state(MyService)
## Trace calls to a specific process
:sys.trace(MyService, true)
## Start the observer GUI (run in IEx)
:observer.start()
For distributed debugging, consider using Recon or AppMon.
Logging and Telemetry
Implement proper observability for concurrent systems:
defmodule Worker do
use GenServer
require Logger
# ... other callbacks ...
@impl true
def handle_call({:process, job_id}, _from, state) do
start_time = System.monotonic_time()
Logger.info("Starting job #{job_id}")
result =
try do
do_job(job_id)
rescue
e ->
Logger.error("Job #{job_id} failed: #{inspect(e)}")
{:error, :failed}
end
end_time = System.monotonic_time()
duration = System.convert_time_unit(end_time - start_time, :native, :millisecond)
:telemetry.execute(
[:worker, :job, :completed],
%{duration: duration},
%{job_id: job_id, result: result}
)
{:reply, result, state}
end
end
For production systems, consider using TelemetryMetrics to collect and report metrics.
Conclusion
Elixir’s concurrency model provides powerful tools for building robust, scalable applications. By following these best practices, you can harness the full potential of the BEAM while avoiding common pitfalls.
Remember that effective concurrency is about more than just creating processes—it’s about designing systems that communicate efficiently, handle failures gracefully, and maintain consistent state. The right approach often involves a combination of different patterns, each suited to specific parts of your application.
As you apply these practices, always keep the Elixir philosophy in mind: embrace process isolation, let processes crash and restart, and build systems that are resilient by design. With these principles, you can create concurrent applications that are not only powerful but also maintainable and reliable.
References
-
Thomas, D. (2018). Programming Elixir 1.6: Functional |> Concurrent |> Pragmatic |> Fun. The Pragmatic Bookshelf.
-
Gospodinov, S. (2021). Concurrent Data Processing in Elixir: Fast, Resilient Applications with OTP, GenStage, Flow, and Broadway. The Pragmatic Bookshelf.
-
Cesarini, F., & Vinoski, S. (2016). Designing for Scalability with Erlang/OTP. O’Reilly Media.
-
Armstrong, J. (2013). Programming Erlang: Software for a Concurrent World (2nd Edition). The Pragmatic Bookshelf.
-
Elixir Documentation: Processes. https://elixir-lang.org/getting-started/processes.html