José F. Romaniello

Las aventuras y desventuras de un codificador.

This is the second part of my post about Event Aggregator. The idea is to show you an easy way to create and use an event aggregator in your application.

The interface of IEventAggregator in Prism looks as follows:

public interface IEventAggregator
{
    TEventType GetEvent<TEventType>() 
        where TEventType : EventBase;
}

Let’s go to see what is EventBase:

///<summary>
/// Defines a base class to publish and subscribe to events.
///</summary>
public abstract class EventBase
{
    private readonly List<IEventSubscription> _subscriptions = new List<IEventSubscription>();


    protected ICollection<IEventSubscription> Subscriptions
    {
        get { return _subscriptions; }
    }

    protected virtual SubscriptionToken InternalSubscribe(IEventSubscription eventSubscription)
    {
        eventSubscription.SubscriptionToken = new SubscriptionToken();
        lock (Subscriptions)
        {
            Subscriptions.Add(eventSubscription);
        }
        return eventSubscription.SubscriptionToken;
    }

    protected virtual void InternalPublish(params object[] arguments)
    {
        List<Action<object[]>> executionStrategies = PruneAndReturnStrategies();
        foreach (var executionStrategy in executionStrategies)
        {
            executionStrategy(arguments);
        }
    }

    public virtual void Unsubscribe(SubscriptionToken token)
    {
        lock (Subscriptions)
        {
            IEventSubscription subscription = Subscriptions.FirstOrDefault(evt => evt.SubscriptionToken == token);
            if (subscription != null)
            {
                Subscriptions.Remove(subscription);
            }
        }
    }

    public virtual bool Contains(SubscriptionToken token)
    {
        lock (Subscriptions)
        {
            IEventSubscription subscription = Subscriptions.FirstOrDefault(evt => evt.SubscriptionToken == token);
            return subscription != null;
        }
    }

    private List<Action<object[]>> PruneAndReturnStrategies()
    {
        List<Action<object[]>> returnList = new List<Action<object[]>>();

        lock (Subscriptions)
        {
            for (var i = Subscriptions.Count - 1; i >= 0; i--)
            {
                Action<object[]> listItem =
                    _subscriptions[i].GetExecutionStrategy();

                if (listItem == null)
                {
                    // Prune from main list. Log?
                    _subscriptions.RemoveAt(i);
                }
                else
                {
                    returnList.Add(listItem);
                }
            }
        }

        return returnList;
    }
}

Although its look fine, I think we can do less and achieve more with Reactive Extensions framework.

So, for this example my interface will look as follows:

public interface IEventPublisher
{
  void Publish<TEvent>(TEvent sampleEvent);
  IObservable<TEvent> GetEvent<TEvent>();
}

The first method is for publishing a TEvent, and the second method is used to get an IObservable of TEvent. I like this approach because I leverage too many things of the reactive framework. On the other hand, there is no restriction about TEvent, in fact, any class could be an event. Another interesting point, is that IObservable is part of the framework now.

Usage examples

Simple subscription

bool eventWasRaised = false;
var eventPublisher = new EventPublisher();

eventPublisher.GetEvent<SampleEvent>()
    .Subscribe(se => eventWasRaised = true);

eventPublisher.Publish(new SampleEvent());
eventWasRaised.Should().Be.True();

UnSubscribe

bool eventWasRaised = false;
var eventPublisher = new EventPublisher();

var subscription = eventPublisher.GetEvent<SampleEvent>()
    .Subscribe(se => eventWasRaised = true);

subscription.Dispose();
eventPublisher.Publish(new SampleEvent());
eventWasRaised.Should().Be.False();

Selective subscription

bool eventWasRaised = false;
var eventPublisher = new EventPublisher();

eventPublisher.GetEvent<SampleEvent>()
    .Where(se => se.Status == 1)
    .Subscribe(se => eventWasRaised = true);

eventPublisher.Publish(new SampleEvent{Status = 1});
eventWasRaised.Should().Be.True();

Subscribe to projection

bool eventWasRaised = false;
var eventPublisher = new EventPublisher();

eventPublisher.GetEvent<SampleEvent>()
    .Select(se => se.Status)
    .Subscribe(status => Console.WriteLine(status));

eventPublisher.Publish(new SampleEvent{Status = 1});

Observe on dispatcher

No matter what is the thread the event was published, execute the desired handler in the UI thread.

bool eventWasRaised = false;
var eventPublisher = new EventPublisher();

eventPublisher.GetEvent<SampleEvent>()
    .ObserveOnDispatcher()
    .Select(se => se.Status)
    .Subscribe(status => Console.WriteLine(status));

eventPublisher.Publish(new SampleEvent{Status = 1});

These are just examples of what you can do when you combine Reactive Extensions with Event Aggregator.

The implementation

This is the whole implementation:

public class EventPublisher : IEventPublisher
{
    private readonly ConcurrentDictionary<Type, object> subjects
        = new ConcurrentDictionary<Type, object>();

    public IObservable<TEvent> GetEvent<TEvent>()
    {
        var subject = 
            (ISubject<TEvent>) subjects.GetOrAdd(typeof (TEvent), 
                        t => new Subject<TEvent>());
        return subject.AsObservable();
    }

    public void Publish<TEvent>(TEvent sampleEvent)
    {
        object subject;
        if (subjects.TryGetValue(typeof(TEvent), out subject))
        {
            ((ISubject<TEvent>)subject)
                .OnNext(sampleEvent);
        }
    }
}

Finally

Prism is just wrong. Your events should be like POCOs, put the subscription elsewhere.

| More

29 comentarios:

Jose, developers have been asking me to add an event aggregator to Caliburn for a long time.  I was wondering how you would feel about me using your implementation.  It's so simple and so powerful. It's exactly what I've been looking for.  Thoughts?

José F. Romaniello dijo...

Of course Rob go ahead!

José F. Romaniello dijo...

I'm not really sure about threading issues.... I'm just trying this
implementation. What do you think?

Well, the good thing is...if there is a threading issue...it's only 23 lines of code to search through to find it :)  It seams pretty straight forward to me. 

<span>Yes, I agree with you. I'll send you a gist by tweet with my testfixture.</span>

Hi Jose,
May we add this to RxContrib (http://rxcontrib.codeplex.com/)?

José F. Romaniello dijo...

Of course, go ahead!
You can use this explanation in the wiki or add a link to this post.

Welcome to Reacvive Contrib :)

http://rxcontrib.codeplex.com/releases/view/44812

José F. Romaniello dijo...

really nice!
2010/5/5 Echo <
js-kit-m2c-8EF30CHQOJGPRKPFK3NSVBNPI0FTDCHDB7U2QJAKF48S8TNNB430@reply.js-kit.com

Hi Jose - I have been having trouble getting these snipets to work.  Are you able to post a full sample ?

José F. Romaniello dijo...

What problems did you have?

Hi Jose - I got your example working... a case of me rushing through some your code and Rob's implementation in Caliburn while at work... It makes sense now that I am home and have some time to think. 

Thanks.

José F. Romaniello dijo...

glad to hear that!

How are you solving the memory leak issue with pub/sub? Most impls I've seen use some kind of Weak event pattern internally.

Thanks

José F. Romaniello dijo...

<span>Others implementations use weak events instead of CLR event, I don't use CLR events at all.</span>
There is no memory leak, because the Subscribe method (from Reactive Extension framework, not mine) returns an IDisposable, if you call the dispose method you simply "unsubscribe" from that "kind of event".
If you don't call Dispose explitly, the garbage collector will do the unsubscription (eventually).
You have to think about IObservable as an IEnumerable, and the publish mechanism as a "yield returns", not about CLR events.

Thanks for your comment.

Amol Kulkarni dijo...

Hi  I am using these classes . After I call the GetEvent method I do not get the select and where method mentioed  by you in your code above is there any thing special that I need to do

Can you please share an example of how to use these classes .

José F. Romaniello dijo...

"Where" come from ReactiveExtensions, more specificaly is in the namespace
System.Linq, of the dll System.Reactive.dll.
http://msdn.microsoft.com/en-us/devlabs/ee794896.aspx

José F. Romaniello dijo...

see my last post:
http://jfromaniello.blogspot.com/2010/07/event-aggregator-subscriptors.html

Ok . I tried using this but it is not working. I just incorporated thes classes as I am using Caliburn 1.1 . Do you think that may the cause?

José F. Romaniello dijo...

could you elaborate a test fixture and post it in caliburns list?
I will answer you there.
2010/7/12 Echo <
js-kit-m2c-8EF30CHQOJGPRKPFK3NSVBNPI2NAKLLV26GRKHQDV2AP05LUKU50@reply.js-kit.com

Rajendra Rajput dijo...

Hi Jose,

I have posted the code snippet in the caliburn list.
check this discussion: http://caliburn.codeplex.com/Thread/View.aspx?ThreadId=218875

José F. Romaniello dijo...

<span>just for keep this post up to date... GC will not dispose it eventually. You have to call Dispose() when you dispose the ViewModel.. follow the conversation here:</span>
http://caliburn.codeplex.com/Thread/View.aspx?ThreadId=218875
and here:
http://caliburn.codeplex.com/Thread/View.aspx?ThreadId=220210

José F. Romaniello dijo...

It is part of Reactive Extensions framework.
*Amr ElGarhy*

Andrea Pierini dijo...

Fabulous...

but how can a poor silverlight guy like me leverage your terrific <span>EventPublisher implementation since ConcurrentDictionary is not supported? </span>
<span></span>
<span>Anyway great job!!</span>

José F. Romaniello dijo...

Thanks Andrea, you can easily replaca concurrent dictionary with a
common dictionary using an external looking mechanism. Have a look to
DefaultEventPublisher in Caliburn.Reactive is this same code without
concurrent dic.
2010/10/12, Echo
:

many tnks!

There is one on my blog that shoud work: http://keith-woods.com/Blog/post/Rx-Event-Aggregator.aspx

Nice work, I posted something similar I've been using for a while yesterday. But hadn't thought of using ConcurrentDictionary. Mine is here: http://keith-woods.com/Blog/post/Another-version-of-the-EventAggregator.aspx ;)

Publicar un comentario en la entrada