I was searching for an implementation for the Producer-Consumer problem in Erlang, but apparently Google wasn’t on my side:

Having had no choice, and seeing enough examples in Java that made my eyes bleed, I was left to try out an implementation on my own.

Here is how it looks like

Here’s a running example of the implementation I did, with 2 producers and 5 workers:

If you are interested, the repo is located on GitHub.

The Moving Parts

My implementation consists of 3 moving parts: the producer, consumer, and manager. Let’s start with the manager.ex.

manager.ex

defmodule Manager do

  def start(num_workers) do
    spawn(fn -> 
      consumer_pids = Enum.map(1..num_workers, fn _ -> Consumer.start(self) end)
      loop(consumer_pids)
    end)
  end

  def loop([]) do
    receive do
      {:request, producer_pid} ->
        loop([])

      {:done, consumer_pid} ->
        loop([consumer_pid])
    end
  end

  def loop([first | rest] = consumer_pids) do
    receive do
      {:request, producer_pid} ->
        send producer_pid, {:ok, first}
        loop(rest)

      {:done, consumer_pid} ->
        loop([consumer_pid | consumer_pids])
    end
  end

end

We begin with Manager.start/1:

defmodule Manager do

  def start(num_workers) do
    spawn(fn -> 
      consumer_pids = Enum.map(1..num_workers, fn _ -> Consumer.start(self) end)
      loop(consumer_pids)
    end)
  end
  # ...
end

The start/1 function takes in a non-negative integer, and in return, spawns up num_workers of Consumers. The resulting consumer_pids, a list of consumer pids, that are the passed in as the argument for loop/1.

The main idea is consumers that are available will be part of the loo’s arguments. This leads naturally to 2 cases, either we have consumers, or we don’t:

Case 1: No more consumers

This means that all our consumers are busy.

defmodule Manager do
  # ...
  def loop([]) do
    receive do
      {:request, producer_pid} ->
        loop([])

      {:done, consumer_pid} ->
        loop([consumer_pid])
    end
  end

Whenever a producer had something to produce, it will send a {:request, producer_pid} message to the manager. In this case, the manager is busy, so it will simply not respond, and call loop on itself.

Whenever a consumer had completed processing something, it will send a message to the manager with a {:done, consumer_pid}. This also means that the said consumer is available for a new job. Therefore the loop contains that consumer_pid.

Case 2: Happy path

defmodule Manager do
  # ...
  def loop([first | rest] = consumer_pids) do
    receive do
      {:request, producer_pid} ->
        send producer_pid, {:ok, first}
        loop(rest)

      {:done, consumer_pid} ->
        loop([consumer_pid | consumer_pids])
    end
  end
end

Compare this with the previous case. Here, we can reply to the producer with {:of, first}, where first is the first pid in the list of consumer_pids. When {:done, consumer_pid} is received from the consumer, then consumer_pid is prepended to the list of consumer_pids.

producer.ex

Now let’s turn our attention to the producer:

defmodule Producer do
  @timeout 2000

  def start(manager_pid) do
    spawn(fn -> run(manager_pid) end)
  end

  def run(manager_pid) do
    Stream.repeatedly(fn ->
      send manager_pid, {:request, self} 

      receive do
        {:ok, consumer_pid} ->
          :random.seed(:erlang.now)
          send consumer_pid, {:run, :random.uniform(3)}

      after @timeout ->
        send manager_pid, {:request, self} 
      end
    end) |> Enum.to_list
  end

end

When we start the producer, we supply the manager_pid, so that both parties can communicate:

defmodule Producer do
  @timeout 2000

  def start(manager_pid) do
    spawn(fn -> run(manager_pid) end)
  end
  # omitted ...
end

In order to simulate a never ending stream, I used Stream.repeatedly/1 and Enum.to_list:

defmodule Producer do
  # ....
  def run(manager_pid) do
    Stream.repeatedly(fn ->
      # ...
    end) |> Enum.to_list
  end
end

Now, let’s take a look at the messages being sent and received:

defmodule Producer do
  # ....
  def run(manager_pid) do
    Stream.repeatedly(fn ->
      send manager_pid, {:request, self} 

      receive do
        {:ok, consumer_pid} ->
          :random.seed(:erlang.now)
          send consumer_pid, {:run, :random.uniform(3)}

      after @timeout ->
        send manager_pid, {:request, self} 
      end
    end) |> Enum.to_list
  end
end

Just before the receive block, the producer first sends a request to the manager for a consumer_pid. If it gets one, then it will send a message to the consumer to run the job.

Here’s the slightly more interesting bit: If it doesn’t receive a reply, we timeout after @timeout (2 seconds, in this case), and send the another request again. Why will this timeout in the first place? That’s because the manager will simply not answer a request if there are no consumers available.

consumer.ex

Implementing the consumer is simple:

defmodule Consumer do

  def start(manager_pid) do
    spawn(fn -> loop(manager_pid) end)
  end

  def loop(manager_pid) do
    receive do
      {:run, work} ->
        :timer.sleep(work * 1000)
        send manager_pid, {:done, self}
        loop(manager_pid)
    end
  end

end

Similar to the producer, we pass in the manager_pid because the consumer needs to talk to the manager. The consumer only responds to one message, {:run, work}. All it does is sleep a couple of seconds (to simulate doing work), then inform the manager that it should be added back into the list of available consumer_pids.

That’s it!

This little exercise took me quite a while to complete, because I hit into starvation issues along the way, although that was due to a bad pattern match on my part.

This technique might come in useful when the producer produces more data than the consumers can handle, and we have to tell the producer to slow down somehow.

One more thing …

I’ve started reading Erlang in Anger, a free ebook by Fred Hebert, of Learn You Some Erlang for Great Good! fame. Lots of interesting stuff and advice on running Erlang in production. Highly recommend reading it.

Thanks for reading!