Skip to content

Conversation

akarnokd
Copy link
Collaborator

@akarnokd akarnokd commented May 4, 2018

This PR implements the Concat(IObservable<IObservable<T>>) directly in an inlined (less allocation and indirection) and lock-free fashion. (The original was delegated to Merge(sources, 1) which I'd assume has more overhead because dealing with an arbitrary max-concurrency requires a lot of work in Rx.NET too.)

I didn't add extra unit tests as this code is meant to be a drop-in replacement of the previous delegated version and is supposed to be well tested already on its own, right?

There is one scenario that I'm not sure about: where both the outer and the current inner sources signal an OnError, it is not expected the outer error to stop the inner sequence immediately, and whoever's error wins, that is signaled on the boundary between two inner sources. Either I happened to implement this replacement correctly, or the behavior is not verified or specified for this operator.

@akarnokd
Copy link
Collaborator Author

There are some slight improvements over the standard implementation:

Cross-map performance:
image
(milliseconds, smaller is better, green=improvement)

var outer = new int[100000 / times];
var inner = new int[times];

concat = outer.ToObservable().Select(v => inner.ToObservable()).Concat();

concatMany = outer.ToObservable().Select(v => inner.ToObservable()).ConcatMany();
BenchmarkDotNet=v0.10.14, OS=Windows 7 SP1 (6.1.7601.0)
Intel Core i7-4770K CPU 3.50GHz (Haswell), 1 CPU, 8 logical and 4 physical cores

Frequency=3418046 Hz, Resolution=292.5648 ns, Timer=TSC
  [Host]     : .NET Framework 4.6.1 (CLR 4.0.30319.42000), 32bit LegacyJIT-v4.7.2558.0
  Job-YSPJNA : .NET Framework 4.6.1 (CLR 4.0.30319.42000), 32bit LegacyJIT-v4.7.2558.0

IterationTime=1.0000 s  LaunchCount=1  TargetCount=5  WarmupCount=5

     Method |      N |      Mean |     Error |    StdDev |
----------- |------- |----------:|----------:|----------:|
     Concat |      1 | 301.05 ms | 1.6440 ms | 0.4270 ms |
 ConcatMany |      1 | 286.74 ms | 2.6475 ms | 0.6877 ms |
     Concat |     10 |  72.51 ms | 1.3039 ms | 0.3387 ms |
 ConcatMany |     10 |  72.86 ms | 0.9030 ms | 0.2345 ms |
     Concat |    100 |  50.71 ms | 0.6166 ms | 0.1602 ms |
 ConcatMany |    100 |  49.52 ms | 0.9418 ms | 0.2446 ms |
     Concat |   1000 |  47.70 ms | 0.6130 ms | 0.1592 ms |
 ConcatMany |   1000 |  46.84 ms | 0.2921 ms | 0.0759 ms |
     Concat |  10000 |  47.25 ms | 0.6051 ms | 0.1572 ms |
 ConcatMany |  10000 |  46.66 ms | 0.8227 ms | 0.2137 ms |
     Concat | 100000 |  48.09 ms | 1.0049 ms | 0.2610 ms |
 ConcatMany | 100000 |  46.61 ms | 0.6448 ms | 0.1675 ms |

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants