Event Store blog


Articles

Microsoft Orleans with EventStoreDB

Derek Comartin  |  05 August 2020

Microsoft Orleans is a cross-platform framework for building distributed applications using Virtual Actors. A Virtual Actor, known as a Grain, is the primary building block that provides identity, behavior, and state. ​ Grain State can be persisted using any storage system, however, EventStoreDB and event sourcing provide many advantages over standard state persistence. ​ Event sourcing and virtual actors complement each other in two ways: ​

  1. Event Sourcing clearly defines and separates the state and behavior of a Grain.
  2. Grains state is kept in memory when it’s active and thus increases application performance

Separation

​ Using event sourcing as a means for persisting application state provides the benefit of a clear separation between behavior and state. All state transitions are modeled as events which are persisted in an EventStoreDB stream per unique Grain. These events are then used as state transitions within the Grain state to maintain the current state that is kept in memory. ​ To demonstrate, here’s a simple example of a Shipment. A shipment represents a package being picked up at a location and delivered to another location. To model this, we will create two events to present this workflow.

public class PickedUp
{
    public PickedUp(DateTime dateTime)
    {
        DateTime = dateTime;
    }

        public DateTime DateTime { get; }
}

public class Delivered
{
    public Delivered(DateTime dateTime)
    {
        DateTime = dateTime;
    }

    public DateTime DateTime { get; }
}

​ Next we can implement the state transitions for our grain with these events. ​

public enum TransitStatus
{
    AwaitingPickup,
    InTransit,
    Delivered
}
​
public class ShipmentState
{
    public TransitStatus Status { get; set; } = TransitStatus.AwaitingPickup;
    public DateTime? LastEventDateTime { get; set; }
​
    public ShipmentState Apply(PickedUp pickedUpEvent)
    {
        Status = TransitStatus.InTransit;
        LastEventDateTime = pickedUpEvent.DateTime;
        return this;
    }
    
    public ShipmentState Apply(Delivered delivered)
    {
        Status = TransitStatus.Delivered;
        LastEventDateTime = delivered.DateTime;
        return this;
    }
}

​ The ShipmentState represents the current state of the shipment. ​ Finally, we can now implement our grain that will contain the business logic and raise events if they are applicable. Once an event is raised, the appropriate Apply() method on ShipmentState will be invoked by Orleans through the JournaledGrain base class. ​

public interface IShipment : IGrainWithGuidKey
{
    Task Pickup();
    Task Deliver();
    Task<TransitStatus> GetStatus();
}
​
public class Shipment : JournaledGrain<ShipmentState>, IShipment
{
    public Task Pickup()
    {
        if (State.Status == TransitStatus.InTransit)
        {
            throw new InvalidOperationException("Shipment has already been picked up.");
        }
        
        if (State.Status == TransitStatus.Delivered)
        {
            throw new InvalidOperationException("Shipment has already been delivered.");
        }
        
        RaiseEvent(new PickedUp(DateTime.UtcNow));
        return ConfirmEvents();
    }
​
    public Task Deliver()
    {
        if (State.Status == TransitStatus.AwaitingPickup)
        {
            throw new InvalidOperationException("Shipment has not yet been picked up.");
        }
        
        if (State.Status == TransitStatus.Delivered)
        {
            throw new InvalidOperationException("Shipment has already been delivered.");
        }
        
        RaiseEvent(new Delivered(DateTime.UtcNow));
        return ConfirmEvents();
    }
​
    public Task<TransitStatus> GetStatus()
    {
        return Task.FromResult(State.Status);
    }
}

​ ​

In-Memory Projection

​ The ShipmentState we’ve created is a projection that our grain is using. Since active grains are kept in-memory, this has some added benefits of not having to re-fetch and process all of the prior events to rebuild to the current state. We already have the current state in-memory. ​ Depending on the model, you may always need your grain to be in the current state to process other behaviors. In the Shipment example, you cannot Deliver a shipment if the current state has not yet been picked up or has already been delivered. ​ If a grain hasn’t be accessed, Orleans will deactivate the grain and remove it from memory. This is the case so far in our Shipment example above as it is not persisting the events to any durable storage. This is where EventStoreDB comes in. ​ Once a deactivated grain is requested, Orleans will activate the grain using the OnActivateAsync lifecycle method, which is where we can re-hydrate our Grain. This is where we retrieve all of the prior events from EventStoreDB to get our ShipmentState back to the most current state. ​ Grain activations are single-threaded, as with any grain method invocation. Each request must finish, including activation if required, before other requests are processed. This also solves the issue of concurrency on our event stream if you are only using actors to save events to our stream. ​ Here’s a new implementation of Shipment grain that persists events to an EventStoreDB stream that is unique per grain instance. It also rebuilds ShipmentState on activation to the current state by reading all the events from the stream. ​ ​

public class Shipment : JournaledGrain<ShipmentState>, IShipment
{
    private readonly IEventStoreConnection _esConnection;
    private string _stream;
​
    public Shipment(ILogger<Shipment> logger)
    {
        _logger = logger;

        var settings = new EventStoreClientSettings
        {
            ConnectivitySettings =
            {
                Address = new Uri("http://localhost:2113")
            }
        };

        // For demo, assume we're running EventStoreDB locally
        _esClient = new EventStoreClient(settings);
    }
    
    public override async Task OnActivateAsync()
    {
        _logger.LogDebug("OnActivateAsync");

        _stream = $"Shipment-{this.GetPrimaryKey()}";

        var stream = _esClient.ReadStreamAsync(
            Direction.Forwards, 
            _stream, 
            StreamPosition.Start);

        if (await stream.ReadState == ReadState.StreamNotFound)
        {
            await ConfirmEvents();
        }

        await foreach (var resolvedEvent in stream)
        {
            object eventObj;
            var json = Encoding.UTF8.GetString(resolvedEvent.Event.Data.Span);

            switch (resolvedEvent.Event.EventType)
            {
                case "PickedUp":
                    eventObj = JsonConvert.DeserializeObject<PickedUp>(json);
                    break;
                case "Delivered":
                    eventObj = JsonConvert.DeserializeObject<Delivered>(json);
                    break;
                default:
                    throw new InvalidOperationException(
                        $"Unknown Event: {resolvedEvent.Event.EventType}");
            }

            base.RaiseEvent(eventObj);
        }

        await ConfirmEvents();
    }

    public override Task OnDeactivateAsync()
    {
        _logger.LogDebug("OnDeactivateAsync");

        return base.OnDeactivateAsync();
    }

    private async Task RaiseEvent<T>(string eventName, T raisedEvent)
    {
        var json = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(raisedEvent));
        var eventData = new EventData(Uuid.NewUuid(), eventName, json);

        await _esClient.AppendToStreamAsync(_stream, Convert.ToUInt64(Version - 1),
            new List<EventData> {eventData});

        base.RaiseEvent(raisedEvent);

        await ConfirmEvents();
    }
    
    public async Task Pickup()
    {
        if (State.Status == TransitStatus.InTransit)
        {
            throw new InvalidOperationException("Shipment has already been picked up.");
        }
        
        if (State.Status == TransitStatus.Delivered)
        {
            throw new InvalidOperationException("Shipment has already been delivered.");
        }
        
        await RaiseEvent("PickedUp", new PickedUp(DateTime.UtcNow));

        await ConfirmEvents();
    }
​
    public async Task Deliver()
    {
        if (State.Status == TransitStatus.AwaitingPickup)
        {
            throw new InvalidOperationException("Shipment has not yet been picked up.");
        }
        
        if (State.Status == TransitStatus.Delivered)
        {
            throw new InvalidOperationException("Shipment has already been delivered.");
        }
        
        await RaiseEvent("Delivered", new Delivered(DateTime.UtcNow));

        await ConfirmEvents();
    }
​
    public Task<TransitStatus> GetStatus()
    {
        return Task.FromResult(State.Status);
    }
}

​ The new RaiseEvent method is persisting the events from our behavior methods Pickup() and Deliver() to the event stream. ​ OnActivateAsync is fetching all the events from the EventStoreDB event stream for the specific grain instance. It passes each event to the RaiseEvent on the base class which calls the appropriate Apply() on ShipmentState. This is essentially replaying all of the historical events to get to the current state. ​ It’s important to note that all of the Apply() methods in ShipmentState should never fail. There should be no reason for them to fail a state transition. Events are facts that have already occurred. Once RaiseEvent() emits and persists an event, it is declaring a state transition. All validation or business logic should live in the Grain behavior methods, Pickup() and Deliver() ​ For the purpose of example, all the persistence and rehydrating of the grain is in the Grain itself, however, it should be moved out of the grain so we can solely focus our grain on behaviors. ​

Outcome

​ The Shipment defines behaviors and ShipmentState is an in-memory projection of all the prior events. There is a clear separation between behavior and state. We’ve also gained in both read as well as write performance. Active grains do not need to always rehydrate by fetching the entire event stream to rebuild to current the state before invoking behaviors. ​ The benefits we’ve gained from a paring of Orleans and EventStoreDB are: ​

  1. Separation of Behavior and State
  2. Faster reads & writes because active Grains are in-memory.
  3. Bonus: Event Stream Concurrency is a non-issue since Grains are single-threaded

Resources


Photo of Derek Comartin

Derek Comartin Derek Comartin is a software developer and Microsoft MVP with two decades of professional experience that span enterprise, startups, professional services, and product development.

He’s written software for a variety of business domains such as consumer goods, distribution, transportation, manufacturing, and accounting.

Derek runs the Windsor-Essex .NET User Group and has a very active blog @ codeopinion.com, that focuses on .NET, CQRS, Event Sourcing, HTTP APIs and Hypermedia.