Articles

Getting started with Elixir and EventStoreDB with the Spear gRPC client

Michael Davis  |  15 June 2021

Elixir is a dynamic, functional language built on the Erlang VM. Elixir has powerful built-in abstractions for writing reactive, eventually-consistent applications, making it perfect for Event Sourcing.

In this post we'll explore how we can leverage EventStoreDB and Spear, a new open-source Elixir client for the EventStoreDB, to build some basic components of an Event Sourced system.

Installation

If you already have Elixir set up locally, you can jump into IEx or add Spear to your dependencies in amix.exs. If you're starting from scratch, this docker-compose will set up EventStoreDB and Elixir containers:

# docker-compose.yml
version: '3.8'
services:
  eventstoredb:
    image: eventstore/eventstore:latest
    ports:
    - 2113:2113
    command: /opt/eventstore/eventstored --insecure --run-projections=All
  elixir:
    image: elixir:1.12.1
    depends_on:
    - eventstoredb
    volumes:
    - ./:/app
    command: tail -f /dev/null
    

Now let's bring up the containers and start a bash shell in the elixir container:

docker-compose up --detach
docker-compose exec elixir bash

Now that we've got a shell going, let's see what we have for Elixir:

$ mix --version
Erlang/OTP 24 [erts-12.0.2] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [jit]

Mix 1.12.1 (compiled with Erlang/OTP 24)

Mix is Elixir's build tool, similar to NodeJS's npm or yarn and Rust's cargo. We'll use it to generate a new project, fetch dependencies and compile our project, but it also has modern utilities for formatting, running test suites, publishing libraries and building fully-contained releases. Let's use Mix to generate a new project:

$ mix new hello_elixir --sup
$ cd hello_elixir
$ tree
.
├── README.md
├── lib
│   ├── hello_elixir
│   │   └── application.ex
│   └── hello_elixir.ex
├── mix.exs
└── test
    ├── hello_elixir_test.exs
    └── test_helper.exs

Let's open up the mix.exs in an editor and add some dependencies:

defmodule HelloElixir.MixProject do
  use Mix.Project

  def project do
    [
      app: :hello_elixir,
      version: "0.1.0",
      elixir: "~> 1.12",
      start_permanent: Mix.env() == :prod,
      deps: deps()
    ]
  end

  # Run "mix help compile.app" to learn about applications.
  def application do
    [
      extra_applications: [:logger],
      mod: {HelloElixir.Application, []}
    ]
  end

  # Run "mix help deps" to learn about dependencies.
  defp deps do
    [
      # {:dep_from_hexpm, "~> 0.3.0"},
      # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}
    ]
  end
end

The mix.exs is a Mix project's manifest, just like a package.json for a NodeJS project. Let's add some dependencies on Spear, an open-source EventStoreDB gRPC client, and Jason, a JSON parser/generator.

defp deps do
  [
    {:spear, "~> 0.9"},
    {:jason, "~> 1.0"}
  ]
end

And now we'll fetch our dependencies and start up an IEx shell.

$ mix deps.get
$ iex -S mix

Interactive Elixir (IEx) is a REPL: a language shell that can be used to manually interact with a code-base or live system for exploration or debugging. In this post, we'll use IEx to explore Spear and EventStoreDB. For example, we can use our open IEx shell to see what version of Spear we have installed.

iex> :application.get_key(:spear, :vsn)
{:ok, '0.9.1'}

Or to look up documentation on a function, module, or type:

iex> h Spear.Connection.start_link
..
Starts a connection process...

And press Control+C twice to exit the shell. Now that we've got all the setup out of the way, we're ready to connect!

Connecting to the EventStoreDB and processes

Let's spawn a process representing our connection to the EventStoreDB:

$ iex -S mix
iex> {:ok, conn} = Spear.Connection.start_link(connection_string: "esdb://eventstoredb:2113")
{:ok, #PID<0.1012.0>}
iex> conn
#PID<0.1012.0>

There are two interesting parts of this snippet. First, the left-hand side is a data structure instead of a name like you might see in many languages. In Elixir, you create bindings through pattern matching instead of creating variables through assignment. Here we're matching on a two element tuple where the first element is the :ok atom (similar to a symbol in Ruby or a keyword in Clojure) and the second element is our connection process. The other interesting part in this snippet is the process. Where in object-oriented languages you might instantiate a new object, in Elixir you typically spawn a new process. What we see here is that we've spawned a new process and bound the name "conn" to the process' ID (or PID for short). We can check the connection with a simple ping

iex> Spear.ping(conn)
:pong

Reading and writing events

Like other clients, Spear can write a list of events to the EventStoreDB.

iex> events = [Spear.Event.new("TestEvent", %{}), Spear.Event.new("TestEvent", %{})]
[%Spear.Event{..}, %Spear.Event{..}]
iex> Spear.append(events, conn, "TestStream")
:ok

While Spear accepts lists, it is designed to use Elixir's lazy Stream data structure. Streams can efficiently describe and operate on large (or even infinite) collections. For example, if we have a large CSV of charges, we can append them to a stream like so:

iex> File.stream!("charges.csv", read_ahead: 100_000)
...> |> MyCsvParser.parse_stream()
...> |> Stream.map(fn [code, amount] ->
...>   Spear.Event.new("ChargeIncurred", %{charge_code: code, amount: amount})
...> end)
...> |> Spear.append(conn, "ChargesFromCsvs")
:ok

The chunks of the CSV will only be read and turned into events as the events are passed over-the-wire to the EventStoreDB, so even if the CSV is much larger than the available memory on a machine, this can succeed! By the way, what's that |> operator doing? That's the pipe operator, a macro that transforms the code to make the result of left-hand side of the operator the first argument to the right-hand side. For example

iex> Kernel.+(2, 3)
5
iex> 2 |> Kernel.+(3)
5

The pipe operator is popular in functional languages like Elm, F#, and Clojure. Function programs often transform values by passing them through conceptual pipelines of functions. Normally this would lead to some ugly nested calls, but the pipe operator keeps the chain of calls readable. Similar to writing many events with a Stream, we can read a large EventStoreDB stream with Elixir Streams. Let's sum all of the charges we read from the CSV.

iex> Spear.stream!(conn, "ChargesFromCsvs", chunk_size: 100)
...> |> Enum.reduce(%{}, fn %Spear.Event{body: %{charge_code: code, amount: amount}}, acc ->
...>   Map.update(acc, charge_code, amount, &(&1 + amount))
...> end)
%{"fuel" => 4_006_000, "linehaul" => 12_300_000, ..}

Here we fold through the EventStoreDB stream in chunks of 100 events as we sum the charges, never holding more than 100 events in memory. We can also stream through the special $all log: a collection of all of the readable events in the EventStoreDB. Let's see all of the event types we've written so far:

iex> Spear.stream!(conn, :all, from: :start, direction: :forwards)
...> |> Stream.map(fn %Spear.Event{type: event_type} -> type end)
...> |> Enum.uniq()
["TestEvent", "ChargeIncurred", ..]

Subscriptions

Subscriptions in Spear work a bit differently than subscriptions in other languages. In JavaScript, for example, you might write a subscription by passing a callback function:

client
  .subscribeToStream(myStreamName, { fromRevision: START })
  .on("data", myCallbackFunction);

Spear subscriptions instead work with message passing between the connection process and some subscriber process. Let's see some message passing in our IEx session:

iex> self()
#PID<0.106.0>
iex> send(self(), :hello)
iex> send(self(), :world)
iex> flush()
:hello
:world
:ok

Here we've sent ourselves some messages with the send/2 function and then flushed the mailbox out with the flush/0 function. Now let's sign up our IEx process for a subscription:

iex> {:ok, subscription} = Spear.subscribe(conn, self(), "ChargesFromCsvs")
{:ok, #Reference<0.644476470.1070071811.35868>}

Here we've asked the connection process to send the IEx process messages from the subscription as they arrive from the EventStoreDB. If we flush out our IEx process' mailbox, we'll see the events we wrote earlier.

iex> flush()
%Spear.Event{type: "ChargeIncurred", ..}
%Spear.Event{type: "ChargeIncurred", ..}
%Spear.Event{type: "ChargeIncurred", ..}
..
:ok
iex> Spear.cancel_subscription(subscription)
:ok

This doesn't provide much utility inside IEx, but this asynchronous message passing is an important building-block for writing reactive, eventually-consistent actors. For example, let's write a process resembling a read model using the powerful GenServer abstraction from the Erlang Open Telecom Platform (OTP).

# lib/hello_elixir/my_subscriber.ex
defmodule HelloElixir.MySubscriber do
  use GenServer

  def start_link(conn) do
    GenServer.start_link(__MODULE__, conn, name: __MODULE__)
  end

  @impl GenServer
  def init(conn) do
    {:ok, _subscription} = Spear.subscribe(conn, self(), "ChargesFromCsvs")

    {:ok, %{}}
  end

  @impl GenServer
  def handle_info(
        %Spear.Event{
          type: "ChargeIncurred",
          body: %{"charge_code" => charge_code, "amount" => amount}
        },
        state
      ) do
    new_state = Map.update(state, charge_code, amount, &(&1 + amount))

    {:noreply, new_state}
  end

  @impl GenServer
  def handle_call({:fetch_amount_by_charge_code, charge_code}, _from, state) do
    {:reply, Map.fetch(state, charge_code), state}
  end
end
$ iex -S mix
iex> {:ok, server} = GenServer.start_link(MySubscriber, conn)
{:ok, #PID<0.276.0>}
iex> GenServer.call(server, {:fetch_amount_by_charge_code, "fuel"})
{:ok, 4_006_000}

This process will start up, subscribe to the stream, and await new messages, performing the GenServer.handle_info/2 callback any time it receives an event from the subscription. As we write more charges to the stream, this process stays up-to-date on the latest amounts. Other processes can ask our make-shift read-model about amounts with the synchronous message-passing function GenServer.call/3. Along with pattern matching, Elixir allows one to write multiple clauses for a single function name. Whichever clause matches first will be invoked, so if we expect other event types in the "ChargesFromCsvs" stream, we can add function clauses to match them. For example, if we have a ChargeVoided event type, we may wish to subtract the amounts:

@impl GenServer
def handle_info(
      %Spear.Event{
        type: "ChargeIncurred",
        body: %{"charge_code" => charge_code, "amount" => amount}
      },
      state
    ) do
  new_state = Map.update(state, charge_code, amount, &(&1 + amount))

  {:noreply, new_state}
end

def handle_info(
      %Spear.Event{
        type: "ChargeVoided",
        body: %{"charge_code" => charge_code, "amount" => amount}
      },
      state
    ) do
  new_state = Map.update(state, charge_code, amount, &(&1 - amount))

  {:noreply, new_state}
end

Supervision

It's a bit inconvenient to have to start all of these processes by hand. Luckily there's a special type of process perfect for the job: a supervisor. Let's open up our project's application definition.

# lib/hello_elixir/application.ex
defmodule HelloElixir.Application do
  # See https://hexdocs.pm/elixir/Application.html
  # for more information on OTP Applications
  @moduledoc false

  use Application

  @impl true
  def start(_type, _args) do
    children = [
      # Starts a worker by calling: HelloElixir.Worker.start_link(arg)
      # {HelloElixir.Worker, arg}
    ]

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: HelloElixir.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

A project's application is the root of its supervision hierarchy. Here we start a supervisor that will overlook child processes. We'll edit that list of children to add our connection process and the read-model.

children = [
  {Spear.Connection, name: HelloElixir.Connection, connection_string: "esdb://eventstoredb:2113"},
  {HelloElixir.MySubscriber, HelloElixir.Connection}
]

When we start up our IEx session again, our application will start up a supervision tree with a connection process and our read-model. If the processes exit, the supervisor will take care of performing restarts. Now in a few lines of Elixir we've built a reactive, eventually-consistent read model with built-in fault-tolerance. This is just the beginning of what's possible with Elixir and Event Sourcing, though. We can extend a subscriber like the one above with the popular GenStage and Broadway libraries to take advantage of features like concurrent processing, batching, and partitioning.

Source Code

Spear is Open Sourced under the Apache 2.0 License. Find the source code and license in the GitHub repository. Spear is published to the Hex package manager and online documentation can be found on HexDocs. Issues and pull requests are always welcome!


Photo of Michael Davis

Michael Davis Michael Davis is a Platform Developer at NFI Industries. He focuses on writing libraries and tools in Elixir. In his free time, Michael can be found playing arcade games and skateboarding around the city of Chicago. Michael can be found on GitHub as @the-mikedavis.


Comment on this post