Skip to content

Commit 3ff2e66

Browse files
authored
Rewrite transformers as extension methods (dart-archive/stream_transform#83)
The usage is much nicer as extension methods. In most cases a small bit of code is copied for the replacement, though in some cases nearly the entire implementation is copied. This was done over extracting shared pieces so that the deprecated methods can be removed outright in the next major version change. Update and clean up some doc comments in the extension method version of these functions. Migrate tests to use the extension method versions. The old copies are now untested but they won't be modified before they are removed. Since there aren't useful needs within this package for `chainTransformers` or `map` they are deprecated without a replacement.
1 parent dbc38a6 commit 3ff2e66

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+837
-271
lines changed

pkgs/stream_transform/.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ branches:
33
only: [master]
44
dart:
55
- dev
6-
- 2.2.0
6+
# 2.2.0
77
cache:
88
directories:
99
- $HOME/.pub-cache

pkgs/stream_transform/CHANGELOG.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,25 @@
1+
## 0.0.20-dev
2+
3+
- Add extension methods for most transformers. These should be used in place
4+
of the current methods. All current implementations are deprecated and will
5+
be removed in the next major version bump.
6+
- Migrating typical use: Instead of
7+
`stream.transform(debounce(Duration(seconds: 1)))` use
8+
`stream.debounce(Duration(seconds: 1))`.
9+
- To migrate a usage where a `StreamTransformer` instance is stored or
10+
passed see "Getting a StreamTransformer instance" on the README.
11+
- The `map` and `chainTransformers` utilities are no longer useful with the
12+
new patterns so they are deprecated without a replacement. If you still have
13+
a need for them they can be replicated with `StreamTransformer.fromBind`:
14+
15+
```
16+
// Replace `map(convert)`
17+
StreamTransformer.fromBind((s) => s.map(convert));
18+
19+
// Replace `chainTransformers(first, second)`
20+
StreamTransformer.fromBind((s) => s.transform(first).transform(second));
21+
```
22+
123
## 0.0.19
224
325
- Add `asyncMapSample` transform.

pkgs/stream_transform/README.md

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,84 +1,100 @@
1-
Utility methods to create `StreamTransfomer` instances to manipulate Streams.
1+
Extension methods on `Stream` adding common transform operators.
22

3-
# asyncMapBuffer
3+
# Operators
4+
5+
## asyncMapBuffer
46

57
Like `asyncMap` but events are buffered in a List until previous events have
68
been processed rather than being called for each element individually.
79

8-
# asyncMapSample
10+
## asyncMapSample
911

1012
Like `asyncMap` but events are discarded, keeping only the latest, until
1113
previous events have been processed rather than being called for every element.
1214

13-
# asyncWhere
15+
## asyncWhere
1416

1517
Like `where` but allows an asynchronous predicate.
1618

17-
# audit
19+
## audit
1820

19-
Audit waits for a period of time after receiving a value and then only emits
20-
the most recent value.
21+
Waits for a period of time after receiving a value and then only emits the most
22+
recent value.
2123

22-
# buffer
24+
## buffer
2325

2426
Collects values from a source stream until a `trigger` stream fires and the
2527
collected values are emitted.
2628

27-
# combineLatest
29+
## combineLatest
2830

2931
Combine the most recent event from two streams through a callback and emit the
3032
result.
3133

32-
# combineLatestAll
34+
## combineLatestAll
3335

3436
Combines the latest events emitted from multiple source streams and yields a
3537
list of the values.
3638

37-
# debounce, debounceBuffer
39+
## debounce, debounceBuffer
3840

3941
Prevents a source stream from emitting too frequently by dropping or collecting
4042
values that occur within a given duration.
4143

42-
# concurrentAsyncMap
44+
## concurrentAsyncMap
4345

4446
Like `asyncMap` but the convert callback can be called with subsequent values
4547
before it has finished for previous values.
4648

47-
# followedBy
49+
## followedBy
4850

4951
Appends the values of a stream after another stream finishes.
5052

51-
# merge, mergeAll
53+
## merge, mergeAll
5254

5355
Interleaves events from multiple streams into a single stream.
5456

55-
# scan
57+
## scan
5658

5759
Scan is like fold, but instead of producing a single value it yields each
5860
intermediate accumulation.
5961

60-
# startWith, startWithMany, startWithStream
62+
## startWith, startWithMany, startWithStream
6163

6264
Prepend a value, an iterable, or a stream to the beginning of another stream.
6365

64-
# switchMap, switchLatest
66+
## switchMap, switchLatest
6567

6668
Flatten a Stream of Streams into a Stream which forwards values from the most
6769
recent Stream
6870

69-
# takeUntil
71+
## takeUntil
7072

7173
Let values through until a Future fires.
7274

73-
# tap
75+
## tap
7476

7577
Taps into a single-subscriber stream to react to values as they pass, without
7678
being a real subscriber.
7779

78-
# throttle
80+
## throttle
7981

8082
Blocks events for a duration after an event is successfully emitted.
8183

82-
# whereType
84+
## whereType
8385

8486
Like `Iterable.whereType` for a stream.
87+
88+
# Getting a `StreamTransformer` instance
89+
90+
It may be useful to pass an instance of `StreamTransformer` so that it can be
91+
used with `stream.transform` calls rather than reference the specific operator
92+
in place. Any operator on `Stream` that returns a `Stream` can be modeled as a
93+
`StreamTransformer` using the [`fromBind` constructor][fromBind].
94+
95+
```dart
96+
final debounce = StreamTransformer.fromBind(
97+
(s) => s.debounce(const Duration(milliseconds: 100)));
98+
```
99+
100+
[fromBind]: https://api.dart.dev/stable/dart-async/StreamTransformer/StreamTransformer.fromBind.html

pkgs/stream_transform/lib/src/async_map_buffer.dart

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,62 @@ import 'buffer.dart';
99
import 'chain_transformers.dart';
1010
import 'from_handlers.dart';
1111

12+
extension AsyncMap<T> on Stream<T> {
13+
/// Like [asyncMap] but events are buffered until previous events have been
14+
/// processed by [convert].
15+
///
16+
/// If the source stream is a broadcast stream the result will be as well. When
17+
/// used with a broadcast stream behavior also differs from [Stream.asyncMap] in
18+
/// that the [convert] function is only called once per event, rather than once
19+
/// per listener per event.
20+
///
21+
/// The first event from the source stream is always passed to [convert] as a
22+
/// List with a single element. After that events are buffered until the
23+
/// previous Future returned from [convert] has fired.
24+
///
25+
/// Errors from the source stream are forwarded directly to the result stream.
26+
/// Errors during the conversion are also forwarded to the result stream and
27+
/// are considered completing work so the next values are let through.
28+
///
29+
/// The result stream will not close until the source stream closes and all
30+
/// pending conversions have finished.
31+
Stream<S> asyncMapBuffer<S>(Future<S> Function(List<T>) convert) {
32+
var workFinished = StreamController<void>()
33+
// Let the first event through.
34+
..add(null);
35+
return this
36+
.buffer(workFinished.stream)
37+
.transform(_asyncMapThen(convert, workFinished.add));
38+
}
39+
40+
/// Like [asyncMap] but events are discarded while work is happening in
41+
/// [convert].
42+
///
43+
/// If the source stream is a broadcast stream the result will be as well. When
44+
/// used with a broadcast stream behavior also differs from [Stream.asyncMap] in
45+
/// that the [convert] function is only called once per event, rather than once
46+
/// per listener per event.
47+
///
48+
/// If no work is happening when an event is emitted it will be immediately
49+
/// passed to [convert]. If there is ongoing work when an event is emitted it
50+
/// will be held until the work is finished. New events emitted will replace a
51+
/// pending event.
52+
///
53+
/// Errors from the source stream are forwarded directly to the result stream.
54+
/// Errors during the conversion are also forwarded to the result stream and are
55+
/// considered completing work so the next values are let through.
56+
///
57+
/// The result stream will not close until the source stream closes and all
58+
/// pending conversions have finished.
59+
Stream<S> asyncMapSample<S>(Future<S> Function(T) convert) {
60+
var workFinished = StreamController<void>()
61+
// Let the first event through.
62+
..add(null);
63+
return transform(AggregateSample(workFinished.stream, _dropPrevious))
64+
.transform(_asyncMapThen(convert, workFinished.add));
65+
}
66+
}
67+
1268
/// Like [Stream.asyncMap] but events are buffered until previous events have
1369
/// been processed by [convert].
1470
///
@@ -27,6 +83,7 @@ import 'from_handlers.dart';
2783
///
2884
/// The result stream will not close until the source stream closes and all
2985
/// pending conversions have finished.
86+
@Deprecated('Use the extension instead')
3087
StreamTransformer<S, T> asyncMapBuffer<S, T>(
3188
Future<T> Function(List<S>) convert) {
3289
var workFinished = StreamController<void>()
@@ -55,6 +112,7 @@ StreamTransformer<S, T> asyncMapBuffer<S, T>(
55112
///
56113
/// The result stream will not close until the source stream closes and all
57114
/// pending conversions have finished.
115+
@Deprecated('Use the extension instead')
58116
StreamTransformer<S, T> asyncMapSample<S, T>(Future<T> Function(S) convert) {
59117
var workFinished = StreamController<void>()
60118
// Let the first event through.

pkgs/stream_transform/lib/src/async_where.dart

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,43 @@ import 'dart:async';
55

66
import 'from_handlers.dart';
77

8+
extension AsyncWhere<T> on Stream<T> {
9+
/// Like [where] but allows the [test] to return a [Future].
10+
///
11+
/// Events on the result stream will be emitted in the order that [test]
12+
/// completes which may not match the order of the original stream.
13+
///
14+
/// If the source stream is a broadcast stream the result will be as well. When
15+
/// used with a broadcast stream behavior also differs from [Stream.where] in
16+
/// that the [test] function is only called once per event, rather than once
17+
/// per listener per event.
18+
///
19+
/// Errors from the source stream are forwarded directly to the result stream.
20+
/// Errors from [test] are also forwarded to the result stream.
21+
///
22+
/// The result stream will not close until the source stream closes and all
23+
/// pending [test] calls have finished.
24+
Stream<T> asyncWhere(FutureOr<bool> test(T element)) {
25+
var valuesWaiting = 0;
26+
var sourceDone = false;
27+
return transform(fromHandlers(handleData: (element, sink) {
28+
valuesWaiting++;
29+
() async {
30+
try {
31+
if (await test(element)) sink.add(element);
32+
} catch (e, st) {
33+
sink.addError(e, st);
34+
}
35+
valuesWaiting--;
36+
if (valuesWaiting <= 0 && sourceDone) sink.close();
37+
}();
38+
}, handleDone: (sink) {
39+
sourceDone = true;
40+
if (valuesWaiting <= 0) sink.close();
41+
}));
42+
}
43+
}
44+
845
/// Like [Stream.where] but allows the [test] to return a [Future].
946
///
1047
/// Events on the result stream will be emitted in the order that [test]
@@ -20,6 +57,7 @@ import 'from_handlers.dart';
2057
///
2158
/// The result stream will not close until the source stream closes and all
2259
/// pending [test] calls have finished.
60+
@Deprecated('Use the extension instead')
2361
StreamTransformer<T, T> asyncWhere<T>(FutureOr<bool> test(T element)) {
2462
var valuesWaiting = 0;
2563
var sourceDone = false;

pkgs/stream_transform/lib/src/audit.dart

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,58 @@ import 'dart:async';
55

66
import 'from_handlers.dart';
77

8+
extension Audit<T> on Stream<T> {
9+
/// Returns a Stream which only emits once per [duration], at the end of the
10+
/// period.
11+
///
12+
/// If the source stream is a broadcast stream, the result will be as well.
13+
/// Errors are forwarded immediately.
14+
///
15+
/// If there is no pending event when the source stream closes the output
16+
/// stream will close immediately. If there is a pending event the output
17+
/// stream will wait to emit it before closing.
18+
///
19+
/// Differs from `throttle` in that it always emits the most recently received
20+
/// event rather than the first in the period. The events that are emitted are
21+
/// always delayed by some amount. If the event that started the period is the
22+
/// one that is emitted it will be delayed by [duration]. If a later event
23+
/// comes in within the period it's delay will be shorter by the difference in
24+
/// arrival times.
25+
///
26+
/// Differs from `debounce` in that a value will always be emitted after
27+
/// [duration], the output will not be starved by values coming in repeatedly
28+
/// within [duration].
29+
///
30+
/// For example:
31+
///
32+
/// source.audit(Duration(seconds: 5));
33+
///
34+
/// source: a------b--c----d--|
35+
/// output: -----a------c--------d|
36+
Stream<T> audit(Duration duration) {
37+
Timer timer;
38+
var shouldClose = false;
39+
T recentData;
40+
41+
return transform(fromHandlers(handleData: (T data, EventSink<T> sink) {
42+
recentData = data;
43+
timer ??= Timer(duration, () {
44+
sink.add(recentData);
45+
timer = null;
46+
if (shouldClose) {
47+
sink.close();
48+
}
49+
});
50+
}, handleDone: (EventSink<T> sink) {
51+
if (timer != null) {
52+
shouldClose = true;
53+
} else {
54+
sink.close();
55+
}
56+
}));
57+
}
58+
}
59+
860
/// Creates a StreamTransformer which only emits once per [duration], at the
961
/// end of the period.
1062
///
@@ -16,6 +68,7 @@ import 'from_handlers.dart';
1668
/// Differs from `debounce` in that a value will always be emitted after
1769
/// [duration], the output will not be starved by values coming in repeatedly
1870
/// within [duration].
71+
@Deprecated('Use the extension instead')
1972
StreamTransformer<T, T> audit<T>(Duration duration) {
2073
Timer timer;
2174
var shouldClose = false;

pkgs/stream_transform/lib/src/buffer.dart

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,21 @@ import 'dart:async';
66

77
import 'aggregate_sample.dart';
88

9+
extension Buffer<T> on Stream<T> {
10+
/// Returns a Stream which collects values and emits when it sees a value on
11+
/// [trigger].
12+
///
13+
/// If there are no pending values when [trigger] emits, the next value on the
14+
/// source Stream will immediately flow through. Otherwise, the pending values
15+
/// are released when [trigger] emits.
16+
///
17+
/// If the source stream is a broadcast stream, the result will be as well.
18+
/// Errors from the source stream or the trigger are immediately forwarded to
19+
/// the output.
20+
Stream<List<T>> buffer(Stream<void> trigger) =>
21+
transform(AggregateSample<T, List<T>>(trigger, _collect));
22+
}
23+
924
/// Creates a [StreamTransformer] which collects values and emits when it sees a
1025
/// value on [trigger].
1126
///
@@ -15,6 +30,7 @@ import 'aggregate_sample.dart';
1530
///
1631
/// Errors from the source stream or the trigger are immediately forwarded to
1732
/// the output.
33+
@Deprecated('Use the extension instead')
1834
StreamTransformer<T, List<T>> buffer<T>(Stream<void> trigger) =>
1935
AggregateSample<T, List<T>>(trigger, _collect);
2036

pkgs/stream_transform/lib/src/chain_transformers.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import 'dart:async';
1717
/// /// values.transform(utf8.decoder).transform(const LineSplitter())
1818
/// final splitDecoded = chainTransformers(utf8.decoder, const LineSplitter());
1919
/// ```
20+
@Deprecated('This utility should not be needed with extension methods')
2021
StreamTransformer<S, T> chainTransformers<S, I, T>(
2122
StreamTransformer<S, I> first, StreamTransformer<I, T> second) =>
2223
StreamTransformer.fromBind(

0 commit comments

Comments
 (0)