Skip to content

Commit eb64149

Browse files
akarnokdOren Novotny
authored andcommitted
4.x: Optimize Concat(IObservable<IObservable<T>>) (#491)
* 4.x: Optimize Concat(IObservable<IObservable<T>>) * Add license header. * Fix error handling
1 parent d1f88cc commit eb64149

File tree

2 files changed

+238
-1
lines changed

2 files changed

+238
-1
lines changed
Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System.Collections.Concurrent;
6+
using System.Reactive.Disposables;
7+
using System.Threading;
8+
9+
namespace System.Reactive.Linq.ObservableImpl
10+
{
11+
internal sealed class ConcatMany<T> : IObservable<T>
12+
{
13+
readonly IObservable<IObservable<T>> sources;
14+
15+
internal ConcatMany(IObservable<IObservable<T>> sources)
16+
{
17+
this.sources = sources;
18+
}
19+
20+
public IDisposable Subscribe(IObserver<T> observer)
21+
{
22+
var parent = new ConcatManyOuterObserver(observer);
23+
var d = sources.SubscribeSafe(parent);
24+
parent.OnSubscribe(d);
25+
return parent;
26+
}
27+
28+
internal sealed class ConcatManyOuterObserver : IObserver<IObservable<T>>, IDisposable
29+
{
30+
readonly IObserver<T> downstream;
31+
32+
readonly ConcurrentQueue<IObservable<T>> queue;
33+
34+
readonly InnerObserver innerObserver;
35+
36+
IDisposable upstream;
37+
38+
int trampoline;
39+
40+
Exception error;
41+
42+
bool done;
43+
44+
int active;
45+
46+
internal ConcatManyOuterObserver(IObserver<T> downstream)
47+
{
48+
this.downstream = downstream;
49+
this.queue = new ConcurrentQueue<IObservable<T>>();
50+
this.innerObserver = new InnerObserver(this);
51+
}
52+
53+
internal void OnSubscribe(IDisposable d)
54+
{
55+
if (Interlocked.CompareExchange(ref upstream, d, null) != null)
56+
{
57+
d?.Dispose();
58+
}
59+
}
60+
61+
public void Dispose()
62+
{
63+
innerObserver.Dispose();
64+
DisposeMain();
65+
}
66+
67+
void DisposeMain()
68+
{
69+
Interlocked.Exchange(ref upstream, BooleanDisposable.True)?.Dispose();
70+
}
71+
72+
bool IsDisposed()
73+
{
74+
return Volatile.Read(ref upstream) == BooleanDisposable.True;
75+
}
76+
77+
public void OnCompleted()
78+
{
79+
Volatile.Write(ref done, true);
80+
Drain();
81+
}
82+
83+
public void OnError(Exception error)
84+
{
85+
if (Interlocked.CompareExchange(ref this.error, error, null) == null)
86+
{
87+
Volatile.Write(ref done, true);
88+
Drain();
89+
}
90+
}
91+
92+
public void OnNext(IObservable<T> value)
93+
{
94+
queue.Enqueue(value);
95+
Drain();
96+
}
97+
98+
void InnerNext(T item)
99+
{
100+
downstream.OnNext(item);
101+
}
102+
103+
void InnerError(Exception error)
104+
{
105+
if (innerObserver.Finish())
106+
{
107+
if (Interlocked.CompareExchange(ref this.error, error, null) == null)
108+
{
109+
Volatile.Write(ref done, true);
110+
Volatile.Write(ref active, 0);
111+
Drain();
112+
}
113+
}
114+
}
115+
116+
void InnerComplete()
117+
{
118+
if (innerObserver.Finish())
119+
{
120+
Volatile.Write(ref active, 0);
121+
Drain();
122+
}
123+
}
124+
125+
void Drain()
126+
{
127+
if (Interlocked.Increment(ref trampoline) != 1)
128+
{
129+
return;
130+
}
131+
132+
do
133+
{
134+
if (IsDisposed())
135+
{
136+
while (queue.TryDequeue(out var _)) ;
137+
}
138+
else
139+
{
140+
if (Volatile.Read(ref active) == 0)
141+
{
142+
var isDone = Volatile.Read(ref done);
143+
144+
if (isDone)
145+
{
146+
var ex = Volatile.Read(ref error);
147+
if (ex != null)
148+
{
149+
downstream.OnError(ex);
150+
DisposeMain();
151+
continue;
152+
}
153+
}
154+
155+
if (queue.TryDequeue(out var source))
156+
{
157+
var sad = new SingleAssignmentDisposable();
158+
if (innerObserver.SetDisposable(sad))
159+
{
160+
Interlocked.Exchange(ref active, 1);
161+
sad.Disposable = source.SubscribeSafe(innerObserver);
162+
}
163+
}
164+
else
165+
{
166+
if (isDone)
167+
{
168+
downstream.OnCompleted();
169+
DisposeMain();
170+
}
171+
}
172+
}
173+
}
174+
} while (Interlocked.Decrement(ref trampoline) != 0);
175+
}
176+
177+
internal sealed class InnerObserver : IObserver<T>, IDisposable
178+
{
179+
readonly ConcatManyOuterObserver parent;
180+
181+
internal SingleAssignmentDisposable upstream;
182+
183+
static readonly SingleAssignmentDisposable DISPOSED;
184+
185+
static InnerObserver()
186+
{
187+
DISPOSED = new SingleAssignmentDisposable();
188+
DISPOSED.Dispose();
189+
}
190+
191+
internal InnerObserver(ConcatManyOuterObserver parent)
192+
{
193+
this.parent = parent;
194+
}
195+
196+
internal bool SetDisposable(SingleAssignmentDisposable sad)
197+
{
198+
return Interlocked.CompareExchange(ref upstream, sad, null) == null;
199+
}
200+
201+
internal bool Finish()
202+
{
203+
var sad = Volatile.Read(ref upstream);
204+
if (sad != DISPOSED)
205+
{
206+
if (Interlocked.CompareExchange(ref upstream, null, sad) == sad)
207+
{
208+
sad.Dispose();
209+
return true;
210+
}
211+
}
212+
return false;
213+
}
214+
215+
public void Dispose()
216+
{
217+
Interlocked.Exchange(ref upstream, DISPOSED)?.Dispose();
218+
}
219+
220+
public void OnCompleted()
221+
{
222+
parent.InnerComplete();
223+
}
224+
225+
public void OnError(Exception error)
226+
{
227+
parent.InnerError(error);
228+
}
229+
230+
public void OnNext(T value)
231+
{
232+
parent.InnerNext(value);
233+
}
234+
}
235+
}
236+
}
237+
}

Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Multiple.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ public virtual IObservable<TSource> Concat<TSource>(IObservable<Task<TSource>> s
230230

231231
private IObservable<TSource> Concat_<TSource>(IObservable<IObservable<TSource>> sources)
232232
{
233-
return Merge(sources, 1);
233+
return new ConcatMany<TSource>(sources);
234234
}
235235

236236
#endregion

0 commit comments

Comments
 (0)