RX – Everything is a stream!

Introduction

I’m a huge fan of Reactive Extensions (RX) and have been using it in a professional context for many years.  One of the interesting topics of debate between co-workers is when to use Tasks and when to use RX.  This post is about my opinion, which might be well off, but at least I have shared it and hopefully receive some compelling counter arguments.

When to use Tasks or RX?

The answer stems from what problem you’re trying to achieve.  To state the obvious, RX is best for streaming data or data that changes over time.  It’s surprising how much data out there is not “static”, even most so-called “static data” is not actually static.  In a finance application, you might consider a list of currency pairs to be static.  Whilst they don’t change often, they certainly do change.

Tasks on the other hand offer a really nice abstraction for parallelising work on (real) static data.  It makes a lot of sense to run 10 tasks and then do something when they all complete.  You can parallise in RX too, but the abstraction is weaker.  It makes less sense to subscribe and combine latest 10 “streams”.

One other compelling reason to use tasks, especially on an API is to be unambiguous to the consumer.  It’s clear the API will return a single value at a later time (or an exception).  If the API returns a stream, it’s not clear how many events you will receive.  There is an argument to suggest you may not care.  The consumer can choose to only take one event, or they may decide to consume updates or future events too.  In either case the clarity on the API is a trade-off against composability (as discussed in the next section).

I think that a lot (if not all) or what can be achieved using Tasks can be achieved in RX too and I think in some circumstances RX provides a more compelling case.

Why is RX superior in many cases?

There are a large number of advantages to chosing RX over tasks, these are the strongest arguments:

  • Composability: In Tasks, when we are awaiting the results from many running tasks we are pretty limited with what we can do.  We can use Task.WhenAny or Task.WhenAll, although that’s about as advanced as it gets.  In the RX world, we can combine latest (WhenAll), we can merge (WhenAny), we can Zip or Join.
  • Extensions: There are a lot of functions we can perform on a stream through the various extension methods, such as filtering, mapping, throttling, sampling, buffering and loads more.  You don’t have this same flexibility with Tasks and you’ll likely be left writing a lot more code with more defects to achieve the same thing.
  • Simple, intuitive API: There are three clear channels (onNext, onError, onComplete) within an RX stream and doing something on any of these is trivial.  You simply provide a delegate when you subscribe (if using the Subscribe extension method) and away you go.  In Tasks, the API feels really clunky and it’s just not intuitive.  You have to call ContinueWith passing some TaskContinuationOptions and multiples of these continuations for all the permutations you care about.  Every time I have to read these carefully to select the right one… OnlyOnFaulted, NotOnRanToCompletion, NotOnCanceled to name a few!
  • Testability: RX is really easy to test and mocking out things like schedulers is really simple due to the good use of interfaces (IScheduler).  This makes running the code synchronously for tests is really easy.  If you have complex temporal aspects, there is a TestScheduler provided that allows you to control time.  There is also a base class called ReactiveTest for creating and recording streams for test cases.  The task world is really lacking any decent API in my opinion.  There’s a concrete TaskScheduler you can pass around, without any interfaces that might be able to help out.
  • Switching Threads: You have fine-grained control in RX as to what part of the pipe is run on which thread.  It’s trivial to switch between threads, run things on an event loop or spawning new tasks or threads.  Tasks are pretty limited, they run on the “task pool” then you can pick which thread you want to come back on, usually using the SynchornisationContext.  If you don’t want to switch threads, you can use the aptly named “ConfigureAwait(false)”.
  • Consumers are in control: I prefer to avoid any concurrency or thread switching in the RX pipe, until it’s consumed (i.e. near the Subscribe).  This allows the consumer to dictate which thread they would like to subscribe and observe on.  In the task world, the consumer has little control over which thread the task is actually run on.
  • Clean-up: In the RX world, you’ll end up with a disposable that represents your subscription.  Disposing this cleans up the entire chain and stops the subscription.  The subscription and it’s underly connection the data source is nicely disposed of.  To achieve the same thing in tasks would require some development.  You only provide a function to the task, so there’s no way to clean up if you want to cancel.  Cancellation requires use of the CancellationToken which just feels overly verbose.  Admittedly if you’re in the factory, there is also no way to “stop” it executing (for example in a while-true loop).
  • Ubiquitous platform: Once you have mastered RX, you’ll find it’s easy to use your skills else where, whether is Python, JavaScript or C++.

It’s worth mentioning that async/await significantly improves on a few of these points, especially around the “Simple, intuitive API” point.

RX on the Web

The web is a hugely up and coming area and the technology here is really exciting.  We are starting to see RX being used ubiquitously in JavaScript, especially in frameworks such as Angular.  I think one reason for this is JavaScripts lack of async/await, which amounts to “callback hell” and large amounts of state when combining multiple callbacks .  Netflix have been using RX for a long time and have posted an interesting video some time ago about their experience with RX and promises (similar to Tasks).

Further Reading

If you want to know how async/await works under the covers, I would highly recommend Jon Skeets series named Eduasync.

If you want to learn more about RX, then Lee Campbells Intro to RX is the defacto reference in the .NET world.

Summary

I find myself using RX much more often than I do tasks, although that might be more about the types of applications and problems I am solving at work, in the financial sector.  I think this sector perhaps lends it’s self more to streaming technologies, since traders typically want live, streaming data delivered to their desktops.  I find myself writing much more asynchronous code and find the user experience is better.

Tasks and async await still have their place and with the help of extension methods such as ToTask and ToObservable, makes using them both together easier.  It’s also worthy to note that streams are awaitable.  Tasks are perhaps best suited to parallelising work on data that does not change or perhaps you have an older synchronous API you need to connect to.  Recently I have found myself using RX and Tasks with async/await together in harmony, for example using async/await inside of an observable.create factory, then calling into parallel, blocking APIs using tasks.

 

Advertisements

One thought on “RX – Everything is a stream!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s