José F. Romaniello

Las aventuras y desventuras de un codificador.

This is the second part of my post about Event Aggregator. The idea is to show you an easy way to create and use an event aggregator in your application.

The interface of IEventAggregator in Prism looks as follows:

public interface IEventAggregator
{
    TEventType GetEvent<TEventType>() 
        where TEventType : EventBase;
}

Let’s go to see what is EventBase:

///<summary>
/// Defines a base class to publish and subscribe to events.
///</summary>
public abstract class EventBase
{
    private readonly List<IEventSubscription> _subscriptions = new List<IEventSubscription>();


    protected ICollection<IEventSubscription> Subscriptions
    {
        get { return _subscriptions; }
    }

    protected virtual SubscriptionToken InternalSubscribe(IEventSubscription eventSubscription)
    {
        eventSubscription.SubscriptionToken = new SubscriptionToken();
        lock (Subscriptions)
        {
            Subscriptions.Add(eventSubscription);
        }
        return eventSubscription.SubscriptionToken;
    }

    protected virtual void InternalPublish(params object[] arguments)
    {
        List<Action<object[]>> executionStrategies = PruneAndReturnStrategies();
        foreach (var executionStrategy in executionStrategies)
        {
            executionStrategy(arguments);
        }
    }

    public virtual void Unsubscribe(SubscriptionToken token)
    {
        lock (Subscriptions)
        {
            IEventSubscription subscription = Subscriptions.FirstOrDefault(evt => evt.SubscriptionToken == token);
            if (subscription != null)
            {
                Subscriptions.Remove(subscription);
            }
        }
    }

    public virtual bool Contains(SubscriptionToken token)
    {
        lock (Subscriptions)
        {
            IEventSubscription subscription = Subscriptions.FirstOrDefault(evt => evt.SubscriptionToken == token);
            return subscription != null;
        }
    }

    private List<Action<object[]>> PruneAndReturnStrategies()
    {
        List<Action<object[]>> returnList = new List<Action<object[]>>();

        lock (Subscriptions)
        {
            for (var i = Subscriptions.Count - 1; i >= 0; i--)
            {
                Action<object[]> listItem =
                    _subscriptions[i].GetExecutionStrategy();

                if (listItem == null)
                {
                    // Prune from main list. Log?
                    _subscriptions.RemoveAt(i);
                }
                else
                {
                    returnList.Add(listItem);
                }
            }
        }

        return returnList;
    }
}

Although its look fine, I think we can do less and achieve more with Reactive Extensions framework.

So, for this example my interface will look as follows:

public interface IEventPublisher
{
  void Publish<TEvent>(TEvent sampleEvent);
  IObservable<TEvent> GetEvent<TEvent>();
}

The first method is for publishing a TEvent, and the second method is used to get an IObservable of TEvent. I like this approach because I leverage too many things of the reactive framework. On the other hand, there is no restriction about TEvent, in fact, any class could be an event. Another interesting point, is that IObservable is part of the framework now.

Usage examples

Simple subscription

bool eventWasRaised = false;
var eventPublisher = new EventPublisher();

eventPublisher.GetEvent<SampleEvent>()
    .Subscribe(se => eventWasRaised = true);

eventPublisher.Publish(new SampleEvent());
eventWasRaised.Should().Be.True();

UnSubscribe

bool eventWasRaised = false;
var eventPublisher = new EventPublisher();

var subscription = eventPublisher.GetEvent<SampleEvent>()
    .Subscribe(se => eventWasRaised = true);

subscription.Dispose();
eventPublisher.Publish(new SampleEvent());
eventWasRaised.Should().Be.False();

Selective subscription

bool eventWasRaised = false;
var eventPublisher = new EventPublisher();

eventPublisher.GetEvent<SampleEvent>()
    .Where(se => se.Status == 1)
    .Subscribe(se => eventWasRaised = true);

eventPublisher.Publish(new SampleEvent{Status = 1});
eventWasRaised.Should().Be.True();

Subscribe to projection

bool eventWasRaised = false;
var eventPublisher = new EventPublisher();

eventPublisher.GetEvent<SampleEvent>()
    .Select(se => se.Status)
    .Subscribe(status => Console.WriteLine(status));

eventPublisher.Publish(new SampleEvent{Status = 1});

Observe on dispatcher

No matter what is the thread the event was published, execute the desired handler in the UI thread.

bool eventWasRaised = false;
var eventPublisher = new EventPublisher();

eventPublisher.GetEvent<SampleEvent>()
    .ObserveOnDispatcher()
    .Select(se => se.Status)
    .Subscribe(status => Console.WriteLine(status));

eventPublisher.Publish(new SampleEvent{Status = 1});

These are just examples of what you can do when you combine Reactive Extensions with Event Aggregator.

The implementation

This is the whole implementation:

public class EventPublisher : IEventPublisher
{
    private readonly ConcurrentDictionary<Type, object> subjects
        = new ConcurrentDictionary<Type, object>();

    public IObservable<TEvent> GetEvent<TEvent>()
    {
        var subject = 
            (ISubject<TEvent>) subjects.GetOrAdd(typeof (TEvent), 
                        t => new Subject<TEvent>());
        return subject.AsObservable();
    }

    public void Publish<TEvent>(TEvent sampleEvent)
    {
        object subject;
        if (subjects.TryGetValue(typeof(TEvent), out subject))
        {
            ((ISubject<TEvent>)subject)
                .OnNext(sampleEvent);
        }
    }
}

Finally

Prism is just wrong. Your events should be like POCOs, put the subscription elsewhere.

| More

The idea behind the Event Aggregation is to build loosely coupled components. In this post I’d like to introduce you a simple scenario for event aggregation.

Imagine that you are building the Microsoft Sql Server Management Studio Express, the “shell” of this program looks as follows:

This shell has a Create Connection panel and an Objects Explorer dialog. When the user press the Connect button, the connection must be added to the object explorer panel.

We will focus in this simple question:

Who is responsible to notify to the Object Explorer panel that a connection has been added?

I will not talk about MVVM, MVP, WPF and Winforms, this is pure theoretically.

Option A: direct notification

After processing the connect operation, the Connect use case, call a method in the object explorer:

objectExplorer.Add(theNewConnection);

The Connect use case needs to know too much about the Object Explorer use case, it needs a reference to the object explorer, ergo this is a bad design, because it violates almost any letter in S.O.L.I.D.

Option B: common events

The Connect use case has an event named ConnectionAdded and the Object explorer is subscribed to this event. We are moving the problem to the other side, the Connect use case is agnostic about the Object Explorer but the Object Explorer needs to know about the Connect use case, and this solution has the same problem that the former.

Option C: event publishing

Object explorer is subscribed to an event named ConnectionAdded which is defined as a class.

public class ObjectExplorer{

  public ObjectExplorer(IEventAggregator eventAggregator)
  {
     Connections = new List<Connection>();
     eventAggregator.Subscribe<ConnectionAdded>(AddConnection);
  }
  
  public IEnumerable<Connection> Connections{get; private set;}

  private void AddConnection(ConnectionAdded connectionAdded)
  {
     Connections.Add(connectionAdded.Connection);
  }
}

The Connect use case publish the event when its ready;

public class AddConnectionUseCase{

  private IEventAggregator eventAggregator;

  public AddConnectionUseCase(IEventAggregator eventAggregator)
  {
     this.eventAggregator = eventAggregator
  }
  
  private void Connect()
  {
     //do stuff
     eventAggregator.Publish<ConnectionAdded>(args);
  }
}

This is it. Any part of our system can publish this event, and any part of our system can be subscribed. This architecture is very extensible, flexible, easy to test and easy to maintain.

The concept is very close to Domain Events.

You can read more on Composite Application Guidance, also Ayende Rahien has an artifact in his Effectus application.

In my next post I’ll show you a concrete implementation of IEventAggregator.

| More