Streams are fascinating because they allow us to model infinite data. In addition, a stream can be composed together to form another stream. The possibilities are endless – pun intended!

I have covered streams in a previous post many moons ago. But let’s revisit the basics a bit, because why not?

After a quick revision, we will learn how to create our own streams, along with a very fun project to make sure the concepts sink in. Let’s do this!

If you are impatient, here’s a taste:

Stream Basics

Streams are enumerables that are composable and lazy. Let’s talk being lazy first, or rather, what it means to be not lazy.

Streams are Lazy

When you use map/2 of the Enum module, the values are eagerly computed. That’s just a fancy of saying that the results return immediately:

iex> [1, 2, 3] |> Enum.map(&(&1 * &1))
[1, 4, 9]

By contrast, what happens when we use Stream module instead?

[1, 2, 3] |> Stream.map(&(&1 * &1))
#Stream<[enum: [1, 2, 3],
 funs: [#Function<45.29647706/1 in Stream.map/2>]]>

Well, we see a Stream being returned, but no [1, 4, 9] in sight. This is what is meant by being lazy. Unless absolutely necessary, the stream will not return a value. There are a couple of ways to compel the Stream to return a result. All of them involve a call to one of the functions in Enum:

iex> [1,2,3] |> Stream.map(&(&1 * &1)) |> Enum.take(2)            [1, 4]

Calling the eager functions in Enum basically breaks the chain of laziness of the stream. This brings us to the next point.

Streams are Composable

We know that streams are lazy. Because streams don’t evaluate immediately, we can happily compose them together to combine computations. But can’t we do this with existing Enum functions? Sure you can, if your input is manageable. Let’s imagine we have a crazy large input:

wikipedia_titles = [...] # A list of a gazillion elements in it

Then let’s say we wanted to capitalize the Wikipedia titles. Therefore, we need to map/2 the String.upcase/1 function on wikipedia_titles:

wikipedia_titles |> Enum.map(&(String.upcase(&1)))

Finally, let’s just assume that we are interested in the first 10 articles. take/2 is just the function for that:

wikipedia_titles 
    |> Enum.map(&(String.upcase(&1)) 
    |> Enum.map(&(String.reverse(&1)) 
    |> Enum.take(10)

Trace the program and think about what would happen. You would realise that although we only wanted 10 capitalized Wikipedia titles, because of eager evaluation, we had to capitalized all gazillion entries first, then reverse all gazillion entries, before picking out the first 10. That is just plain wasteful.

Consider this version instead using Stream instead:

wikipedia_titles 
    |> Stream.map(&(String.upcase(&1)) 
    |> Stream.map(&(String.reverse(&1)) 
    |> Enum.take(10)

When we replace Enum.map/2 with Stream.map/2, both String.upcase/2 and String.reverse/2 are invoked only 10 times each. We don’t have to worry that mapping will take long because we are doing the bare minimum when we use the Stream module. This is why streams are awesome!

Building Your Own Streams

We are now going to learn how to create our own streams. We will go through two examples. The first example will be lame, but it will give you a feel of the general idea. The second is even more impractical, but will be very fun to play with.

An Infinite Number Stream

Let’s assume for a moment that we are blissfully unaware of functions that allow us to create infinite streams of numbers. Therefore, we are going to implement this functionality in a module called Streamy.

If you are following along go ahead and save the following in a file called streamy.ex:

defmodule Streamy do

  def from(start) do
    Stream.resource(
      fn -> start end,
      fn(num) -> 
        case num do 
          num when num < 1000 -> 
            {[num + 1], num + 1} 
          _ ->
            {:halt, num}
        end
      end,
      fn(num) -> num end
    )
  end

end

Here’s what I am after:

% iex streamy.ex
iex> Streamy.from(10) |> Stream.map(&(&1*2)) |> Enum.take 5
[22, 24, 26, 28, 30]

Building Streams with Stream.resource/3

The key to creating your own streams is Stream.resource/3. The key to understanding Stream.resource/3 is to pay attention to the three input arguments. In particular, the inputs and outputs of the functions that are to be passed to Stream.resource/3:

Stream.resource(start_function, 
                next_function, 
                after_function)

Time to learn the rules.

Argument 1 – The Start Function

The start_function that you pass into Stream.resource/3 sets up the resource and returns it. This function:

The resource could be a file handle, socket connection, or in our case, the initial number:

fn -> start end

See? Super simple.

Argument 3 – The After Function

Let’s do the after_function before doing next_function. It is called “after” because this is the final function that is called once the stream is done spitting out values.

This function:

fn resource ->
  # handle clean up of the resource
  # e.g. File.close(resource), IO.close(resource)
end

Argument 2 - The Next Function

The next_function is where all the action is at. Conceptually, it is simple: This function:

The return value must conform to a specific format. In particular, it must:

fn resource ->
    case read_from_resource do
    data -> {[data], accumulator}
end

The point here is to take note of the return value, because that is what Stream.resource/3 expects you to return. Any deviation and you get weird errors.

Let’s say you encounter an error, or you have exhausted the resource (end of file, for example). In that case, you should return {:halt, accumulator}

fn resource ->
    case read_from_resource do
    data -> {[data], accumulator}
    _    -> {:halt, accumulator}
end

What is the Accumulator?

Accumulator means different things given different situations. Here’s the way I like to think about it:

If you are setting up a file, socket, database connection etc, then accumulator is that file handle, socket or database connection. There’s nothing to accumulate per se, but it’s more like “bringing forward” to the next invocation of the stream.

Here’s an example take right out of the docs:

Stream.resource(fn -> File.open!("sample") end,
                fn file ->
                  case IO.read(file, :line) do
                    data when is_binary(data) -> {[data], file}
                    _ -> {:halt, file}
                  end
                end,
                fn file -> File.close(file) end)

Notice that the accumulator is always file. The only sane thing to do is pass along the file handle each time we request a value from the stream. Also notice that data is wrapped in a list. I got stung by this a few times before – you’ve been warned.

On the other hand, if the value you generate now depends on the previous value generated, then accumulator functions in the truest sense of the word. Here’s the infinite number generator again:

defmodule Streamy do

  def from(start) do
    Stream.resource(
      fn -> start end,
      fn(num) -> 
        case num do 
          num when num < 1000 -> 
            {[num + 1], num + 1} 
          _ ->
            {:halt, num}
        end
      end,
      fn(num) -> num end
    )
  end

end

Notice what happens when we purposely halt the stream by starting with a number close to 1000:

iex> Streamy.from(996) |> Stream.map(&(&1*2)) |> Enum.take 5
[1994, 1996, 1998, 2000]

In this case, only four values were generated. Hopefully by now, the infinite number generator makes sense, and you have a better idea how it works.

Click Stream

In the next example, we going to generate a stream using mouse coordinates as the data source. The full source code can be found on GitHub.

Setting Up the Project

The first order of things is to set up a project with mix:

% mix new click_stream

Here’s the source in its entirety, to be placed in lib/click_stream.ex:

defmodule ClickStream do
  require Record
  Record.defrecordp :wx, Record.extract(:wx, from_lib: "wx/include/wx.hrl")

  @title 'Click Stream'

  def create_stream_x do
    Stream.resource(fn -> create_frame end,
                    fn(frame) -> loop_x(frame) end,
                    fn(frame) -> destroy_frame(frame) end)
  end 

  def create_stream_y do
    Stream.resource(fn -> create_frame end,
                    fn(frame) -> loop_y(frame) end,
                    fn(frame) -> destroy_frame(frame) end)
  end 

  def create_frame do
    wx    = :wx.new
    frame = :wxFrame.new(wx, -1, @title)
    :wxWindow.connect(frame, :close_window)
    :wxWindow.connect(frame, :motion)
    :wxFrame.show(frame)
    frame
  end

  def loop_x(frame) do
    receive do
      {:wx, _, _, _, {:wxMouse, :motion, x, _y, _, _, _, _, _, _, _, _, _, _}} ->
        IO.puts "x: #{x}"
        {[x], frame}
      {:wx, _, {:wx_ref, _, :wxFrame, []}, [], {:wxClose, :close_window}} ->
        {:halt, frame}
      _ ->
        {:halt, frame}
    end
  end

  def loop_y(frame) do
    receive do
      {:wx, _, _, _, {:wxMouse, :motion, _x, y, _, _, _, _, _, _, _, _, _, _}} ->
        IO.puts "y: #{y}"
        {[y], frame}
      {:wx, _, {:wx_ref, _, :wxFrame, []}, [], {:wxClose, :close_window}} ->
        {:halt, frame}
      _ ->
        {:halt, frame}
    end
  end

  def destroy_frame(frame) do
    :wxFrame.destroy(frame)
  end

end

Although the file seems relatively lengthy, a closer inspection would reveal that create_stream_x/0 and create_stream_y/0 are almost identical, except for the invocation of the loop_x/0 and loop_y/0 respectively.

create_stream_x/0 reports the mouse’s x-coordinates. Same story for create_stream_y/0.

A Quick Demo

Here is what we want to do:

A Revisit to Stream.resource/3

Let’s take a closer look at create_stream_x/0:

def create_stream_x do
  Stream.resource(fn -> create_frame end,
                  fn(frame) -> loop_x(frame) end,
                  fn(frame) -> destroy_frame(frame) end)
end 

Recall that the first argument of Stream.resource/3 takes in a function that sets up and returns the resource. In this case, the resource is a wxWidget frame. No worries if you have no idea what wxWidget is. All you have to know is that wxWidget is a GUI library, and frame is a reference to a GUI window.

The last argument tears down the resource. In this case, we teardown the resource by destroying the frame.

The fun part is in loop_x/1:

def loop_x(frame) do
  receive do
    {:wx, _, _, _, {:wxMouse, :motion, x, _y, _, _, _, _, _, _, _, _, _, _}} ->
      IO.puts "x: #{x}"
      {[x], frame}
    {:wx, _, {:wx_ref, _, :wxFrame, []}, [], {:wxClose, :close_window}} ->
      {:halt, frame}
    _ ->
      {:halt, frame}
  end
end

The loop here executes the receive block each time it is called. In wXwidget, messages are sent to self whenever an event is triggered.

What kinds of events are there? In the above code, we are only concerned with two kinds – the motion event triggered by mouse movement, and the closing of the window.

I’d be the first to admit that the pattern to be matched looks extremely funky, but hey, it gets the job done. When the first pattern matches ({:wx, _, _, _, {:wxMouse, :motion, x, _y, _, _, _, _, _, _, _, _, _, _}}), the tuple {[x], frame} is returned. Once again, x – the x-coordinate of the mouse position` – is wrapped in a list. It is followed by the accumulator, which in this case, is the frame – the resource.

If the window is closed, or we get an unexpected message, we simply signal a close of the stream by returning {:halt, frame}.

That is really all to it!

Running Click Stream

Let’s run the project:

% iex -S mix

Next, we will create a stream that lazily reports the x-coordinates of the mouse movement:

iex> stream = ClickStream.create_stream_x
#Function<25.29647706/2 in Stream.resource/3>

Let’s only display the first 10 x coordinates that are less than 100. When you run the code, you will see a window appear. Run your mouse over it.

Because we placed an IO.puts/1, we can see all the x values that are being reported. Once 10 eligible values are created, the window is closed, and the return result is displayed:

iex> stream |> Stream.filter(fn x -> x < 100 end) |> Enum.take 10
x: 376
x: 376
x: 376
x: 13
x: 13
x: 15
x: 16
x: 17
x: 18
x: 20
x: 21
x: 22
x: 22
[13, 13, 15, 16, 17, 18, 20, 21, 22, 22]

Recall that streams are composable. This means that we can create another stream from existing ones. One way we can do that is through the Stream.zip/2 function, which takes in two streams and zips them up.

First, we create a new stream from two other streams:

iex> new_stream = Stream.zip(ClickStream.create_stream_x, ClickStream.create_stream_y)
#Function<6.29647706/2 in Stream.zip/2>

Let’s take 10 values from this new stream. Just like in the previous case, a window pops up. Fiddle with the mouse a bit until 10 values are generated:

iex> new_stream |> Enum.take 10
x: 380
y: 144
x: 359
y: 147
x: 340
y: 150
x: 329
y: 153
x: 316
y: 157
x: 307
y: 160
x: 299
y: 160
x: 298
y: 156
x: 296
y: 154
x: 282
y: 148
[{380, 144}, {359, 147}, {340, 150}, {329, 153}, {316, 157},
 {307, 160}, {299, 160}, {298, 156}, {296, 154}, {282, 148}]

Other Examples of Streams (Or: When GitHub > Google)

When I was learning how Stream.resource/3 worked, looking at other people’s code and finding patterns between them helped a lot.

Turns out, GitHub has a very handy feature that lets you search through code and filter it by language:

Xpra6sn

For example, here’s the search results for Stream.resource/3.

There are a few interesting examples. Here are two of my favourites:

  1. ExTwitter is a wonderful example to see how Elixir Streams work with Twitter’s Streaming API.

  2. DirWalker by Dave Thomas is a file-system directory tree walker that can handle large filesystems by traversing the directory tree lazily.

Thanks for reading! <3!