RX – What thread am I on?

Introduction

Threading in RX is usually performed by a set of classes called Schedulers, that implement the IScheduler interface.  I have interviewed a fair few people who don’t fully understand what thread they are on at various points in the pipeline, especially when we throw in a few ObserveOn and SubscribeOn calls.

What is SubscribeOn?

As the name suggests its what scheduler should be used to execute the subscription.  When I ask people what exactly is the subscription, there’s usually some blank faces.  I think this is because it depends on the circumstances.

SubscribeOn for Observable.Create

If you have an Observable.Create, then the factory you pass to it, is the subscription, so if you add a SubscribeOn(TaskPoolScheduler.Default), then it will run the factory on the TaskPool, i.e.

Observable.Create<int>(observer =>
           {
               // This is run on the TaskPool
               return Disposable.Empty;
           })
          .SubscribeOn(TaskPoolScheduler.Default)

SubscribeOn for Subjects

If you have a subject, then there is no factory, or “subscription” to run on.  That means adding a SubscribeOn to something that is ultimately a Subject is quite pointless.  I guess this is one of the reasons why “you should never create your own streams using a subject”, as banded around by various developers (other than the other obvious issues such as breaking the contract RX provides etc).

Where should I add ObserveOn in the pipeline?

It is interesting to discover a fair few people did not know that where you call ObserveOn in the pipeline actually matters.  I think this is because most online examples do the following:

this.stream.SubscribeOn(TaskPoolScheduler.Default)
           .ObserveOn(DispatcherScheduler.Current)
           .Subscribe(_ => { });

Anything after the ObserveOn, in this case, just the on next delegate, will happen on the dispatcher.  This also means anything before that, will run on the thread the subscription was run on, for example:

this.stream.SubscribeOn(TaskPoolScheduler.Default)
           .Select(_ => { /*This will run on the TaskPool.*/ })
           .ObserveOn(DispatcherScheduler.Current)
           .Select(_ => { /*This will run on the Dispatcher.*/ })
           .Do(_ => { /*This will run on the Dispatcher.*/ })
           .Subscribe(_ => { /*This will run on the Dispatcher.*/ });

And of course, when I say the first select will run on the TaskPool, that’s assuming the stream is created using some kind of factory (rather than just a subject).

Having a clear understanding of this means you can perhaps do any processing in a Select() before the ObserveOn to avoid doing unnecessary work on the dispatcher.

SubscribeOn and ObserveOn in a Multi Layer App

A lot of my current work is with WPF and we heavily use RX.  It seems to become pervasive throughout the application, for example if you start using it in a data access layer, you’ll quickly find it spreading through all your layers down to the view models.

In these kind of applications, I like to keep things simple and only manage threading in one place.  For me the ideal place is the view model, since this usually where you’ll want to come off the dispatcher to get or process some data and then come back to the dispatcher.

I find things start to become messy when there are various SubscribeOn or ObserveOn scattered throughout the layers.  Its hard to know what thread you’ll be on and its likely you’ll make various thread switches for no reason.

Conclusion

This article only touches on some of the threading aspects in RX.  Things can quickly become more complicated, especially when the pipeline contains a few switches and combining many streams together.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s