diff --git a/.travis.yml b/.travis.yml index 5cdff4c..0bd0f9c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,7 +3,7 @@ branches: only: [master] dart: - dev - - 2.2.0 + # 2.2.0 cache: directories: - $HOME/.pub-cache diff --git a/CHANGELOG.md b/CHANGELOG.md index e631a97..52b1c51 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,25 @@ +## 0.0.20-dev + +- Add extension methods for most transformers. These should be used in place + of the current methods. All current implementations are deprecated and will + be removed in the next major version bump. + - Migrating typical use: Instead of + `stream.transform(debounce(Duration(seconds: 1)))` use + `stream.debounce(Duration(seconds: 1))`. + - To migrate a usage where a `StreamTransformer` instance is stored or + passed see "Getting a StreamTransformer instance" on the README. +- The `map` and `chainTransformers` utilities are no longer useful with the + new patterns so they are deprecated without a replacement. If you still have + a need for them they can be replicated with `StreamTransformer.fromBind`: + + ``` + // Replace `map(convert)` + StreamTransformer.fromBind((s) => s.map(convert)); + + // Replace `chainTransformers(first, second)` + StreamTransformer.fromBind((s) => s.transform(first).transform(second)); + ``` + ## 0.0.19 - Add `asyncMapSample` transform. diff --git a/README.md b/README.md index fc44207..313e6f1 100644 --- a/README.md +++ b/README.md @@ -1,84 +1,100 @@ -Utility methods to create `StreamTransfomer` instances to manipulate Streams. +Extension methods on `Stream` adding common transform operators. -# asyncMapBuffer +# Operators + +## asyncMapBuffer Like `asyncMap` but events are buffered in a List until previous events have been processed rather than being called for each element individually. -# asyncMapSample +## asyncMapSample Like `asyncMap` but events are discarded, keeping only the latest, until previous events have been processed rather than being called for every element. -# asyncWhere +## asyncWhere Like `where` but allows an asynchronous predicate. -# audit +## audit -Audit waits for a period of time after receiving a value and then only emits -the most recent value. +Waits for a period of time after receiving a value and then only emits the most +recent value. -# buffer +## buffer Collects values from a source stream until a `trigger` stream fires and the collected values are emitted. -# combineLatest +## combineLatest Combine the most recent event from two streams through a callback and emit the result. -# combineLatestAll +## combineLatestAll Combines the latest events emitted from multiple source streams and yields a list of the values. -# debounce, debounceBuffer +## debounce, debounceBuffer Prevents a source stream from emitting too frequently by dropping or collecting values that occur within a given duration. -# concurrentAsyncMap +## concurrentAsyncMap Like `asyncMap` but the convert callback can be called with subsequent values before it has finished for previous values. -# followedBy +## followedBy Appends the values of a stream after another stream finishes. -# merge, mergeAll +## merge, mergeAll Interleaves events from multiple streams into a single stream. -# scan +## scan Scan is like fold, but instead of producing a single value it yields each intermediate accumulation. -# startWith, startWithMany, startWithStream +## startWith, startWithMany, startWithStream Prepend a value, an iterable, or a stream to the beginning of another stream. -# switchMap, switchLatest +## switchMap, switchLatest Flatten a Stream of Streams into a Stream which forwards values from the most recent Stream -# takeUntil +## takeUntil Let values through until a Future fires. -# tap +## tap Taps into a single-subscriber stream to react to values as they pass, without being a real subscriber. -# throttle +## throttle Blocks events for a duration after an event is successfully emitted. -# whereType +## whereType Like `Iterable.whereType` for a stream. + +# Getting a `StreamTransformer` instance + +It may be useful to pass an instance of `StreamTransformer` so that it can be +used with `stream.transform` calls rather than reference the specific operator +in place. Any operator on `Stream` that returns a `Stream` can be modeled as a +`StreamTransformer` using the [`fromBind` constructor][fromBind]. + +```dart +final debounce = StreamTransformer.fromBind( + (s) => s.debounce(const Duration(milliseconds: 100))); +``` + +[fromBind]: https://api.dart.dev/stable/dart-async/StreamTransformer/StreamTransformer.fromBind.html diff --git a/lib/src/async_map_buffer.dart b/lib/src/async_map_buffer.dart index 70c5bdb..57defc2 100644 --- a/lib/src/async_map_buffer.dart +++ b/lib/src/async_map_buffer.dart @@ -9,6 +9,62 @@ import 'buffer.dart'; import 'chain_transformers.dart'; import 'from_handlers.dart'; +extension AsyncMap on Stream { + /// Like [asyncMap] but events are buffered until previous events have been + /// processed by [convert]. + /// + /// If the source stream is a broadcast stream the result will be as well. When + /// used with a broadcast stream behavior also differs from [Stream.asyncMap] in + /// that the [convert] function is only called once per event, rather than once + /// per listener per event. + /// + /// The first event from the source stream is always passed to [convert] as a + /// List with a single element. After that events are buffered until the + /// previous Future returned from [convert] has fired. + /// + /// Errors from the source stream are forwarded directly to the result stream. + /// Errors during the conversion are also forwarded to the result stream and + /// are considered completing work so the next values are let through. + /// + /// The result stream will not close until the source stream closes and all + /// pending conversions have finished. + Stream asyncMapBuffer(Future Function(List) convert) { + var workFinished = StreamController() + // Let the first event through. + ..add(null); + return this + .buffer(workFinished.stream) + .transform(_asyncMapThen(convert, workFinished.add)); + } + + /// Like [asyncMap] but events are discarded while work is happening in + /// [convert]. + /// + /// If the source stream is a broadcast stream the result will be as well. When + /// used with a broadcast stream behavior also differs from [Stream.asyncMap] in + /// that the [convert] function is only called once per event, rather than once + /// per listener per event. + /// + /// If no work is happening when an event is emitted it will be immediately + /// passed to [convert]. If there is ongoing work when an event is emitted it + /// will be held until the work is finished. New events emitted will replace a + /// pending event. + /// + /// Errors from the source stream are forwarded directly to the result stream. + /// Errors during the conversion are also forwarded to the result stream and are + /// considered completing work so the next values are let through. + /// + /// The result stream will not close until the source stream closes and all + /// pending conversions have finished. + Stream asyncMapSample(Future Function(T) convert) { + var workFinished = StreamController() + // Let the first event through. + ..add(null); + return transform(AggregateSample(workFinished.stream, _dropPrevious)) + .transform(_asyncMapThen(convert, workFinished.add)); + } +} + /// Like [Stream.asyncMap] but events are buffered until previous events have /// been processed by [convert]. /// @@ -27,6 +83,7 @@ import 'from_handlers.dart'; /// /// The result stream will not close until the source stream closes and all /// pending conversions have finished. +@Deprecated('Use the extension instead') StreamTransformer asyncMapBuffer( Future Function(List) convert) { var workFinished = StreamController() @@ -55,6 +112,7 @@ StreamTransformer asyncMapBuffer( /// /// The result stream will not close until the source stream closes and all /// pending conversions have finished. +@Deprecated('Use the extension instead') StreamTransformer asyncMapSample(Future Function(S) convert) { var workFinished = StreamController() // Let the first event through. diff --git a/lib/src/async_where.dart b/lib/src/async_where.dart index 734f65f..a611063 100644 --- a/lib/src/async_where.dart +++ b/lib/src/async_where.dart @@ -5,6 +5,43 @@ import 'dart:async'; import 'from_handlers.dart'; +extension AsyncWhere on Stream { + /// Like [where] but allows the [test] to return a [Future]. + /// + /// Events on the result stream will be emitted in the order that [test] + /// completes which may not match the order of the original stream. + /// + /// If the source stream is a broadcast stream the result will be as well. When + /// used with a broadcast stream behavior also differs from [Stream.where] in + /// that the [test] function is only called once per event, rather than once + /// per listener per event. + /// + /// Errors from the source stream are forwarded directly to the result stream. + /// Errors from [test] are also forwarded to the result stream. + /// + /// The result stream will not close until the source stream closes and all + /// pending [test] calls have finished. + Stream asyncWhere(FutureOr test(T element)) { + var valuesWaiting = 0; + var sourceDone = false; + return transform(fromHandlers(handleData: (element, sink) { + valuesWaiting++; + () async { + try { + if (await test(element)) sink.add(element); + } catch (e, st) { + sink.addError(e, st); + } + valuesWaiting--; + if (valuesWaiting <= 0 && sourceDone) sink.close(); + }(); + }, handleDone: (sink) { + sourceDone = true; + if (valuesWaiting <= 0) sink.close(); + })); + } +} + /// Like [Stream.where] but allows the [test] to return a [Future]. /// /// Events on the result stream will be emitted in the order that [test] @@ -20,6 +57,7 @@ import 'from_handlers.dart'; /// /// The result stream will not close until the source stream closes and all /// pending [test] calls have finished. +@Deprecated('Use the extension instead') StreamTransformer asyncWhere(FutureOr test(T element)) { var valuesWaiting = 0; var sourceDone = false; diff --git a/lib/src/audit.dart b/lib/src/audit.dart index 7b4d767..b175e6d 100644 --- a/lib/src/audit.dart +++ b/lib/src/audit.dart @@ -5,6 +5,58 @@ import 'dart:async'; import 'from_handlers.dart'; +extension Audit on Stream { + /// Returns a Stream which only emits once per [duration], at the end of the + /// period. + /// + /// If the source stream is a broadcast stream, the result will be as well. + /// Errors are forwarded immediately. + /// + /// If there is no pending event when the source stream closes the output + /// stream will close immediately. If there is a pending event the output + /// stream will wait to emit it before closing. + /// + /// Differs from `throttle` in that it always emits the most recently received + /// event rather than the first in the period. The events that are emitted are + /// always delayed by some amount. If the event that started the period is the + /// one that is emitted it will be delayed by [duration]. If a later event + /// comes in within the period it's delay will be shorter by the difference in + /// arrival times. + /// + /// Differs from `debounce` in that a value will always be emitted after + /// [duration], the output will not be starved by values coming in repeatedly + /// within [duration]. + /// + /// For example: + /// + /// source.audit(Duration(seconds: 5)); + /// + /// source: a------b--c----d--| + /// output: -----a------c--------d| + Stream audit(Duration duration) { + Timer timer; + var shouldClose = false; + T recentData; + + return transform(fromHandlers(handleData: (T data, EventSink sink) { + recentData = data; + timer ??= Timer(duration, () { + sink.add(recentData); + timer = null; + if (shouldClose) { + sink.close(); + } + }); + }, handleDone: (EventSink sink) { + if (timer != null) { + shouldClose = true; + } else { + sink.close(); + } + })); + } +} + /// Creates a StreamTransformer which only emits once per [duration], at the /// end of the period. /// @@ -16,6 +68,7 @@ import 'from_handlers.dart'; /// Differs from `debounce` in that a value will always be emitted after /// [duration], the output will not be starved by values coming in repeatedly /// within [duration]. +@Deprecated('Use the extension instead') StreamTransformer audit(Duration duration) { Timer timer; var shouldClose = false; diff --git a/lib/src/buffer.dart b/lib/src/buffer.dart index 3478a52..cf8b0c3 100644 --- a/lib/src/buffer.dart +++ b/lib/src/buffer.dart @@ -6,6 +6,21 @@ import 'dart:async'; import 'aggregate_sample.dart'; +extension Buffer on Stream { + /// Returns a Stream which collects values and emits when it sees a value on + /// [trigger]. + /// + /// If there are no pending values when [trigger] emits, the next value on the + /// source Stream will immediately flow through. Otherwise, the pending values + /// are released when [trigger] emits. + /// + /// If the source stream is a broadcast stream, the result will be as well. + /// Errors from the source stream or the trigger are immediately forwarded to + /// the output. + Stream> buffer(Stream trigger) => + transform(AggregateSample>(trigger, _collect)); +} + /// Creates a [StreamTransformer] which collects values and emits when it sees a /// value on [trigger]. /// @@ -15,6 +30,7 @@ import 'aggregate_sample.dart'; /// /// Errors from the source stream or the trigger are immediately forwarded to /// the output. +@Deprecated('Use the extension instead') StreamTransformer> buffer(Stream trigger) => AggregateSample>(trigger, _collect); diff --git a/lib/src/chain_transformers.dart b/lib/src/chain_transformers.dart index 549befc..1e812d9 100644 --- a/lib/src/chain_transformers.dart +++ b/lib/src/chain_transformers.dart @@ -17,6 +17,7 @@ import 'dart:async'; /// /// values.transform(utf8.decoder).transform(const LineSplitter()) /// final splitDecoded = chainTransformers(utf8.decoder, const LineSplitter()); /// ``` +@Deprecated('This utility should not be needed with extension methods') StreamTransformer chainTransformers( StreamTransformer first, StreamTransformer second) => StreamTransformer.fromBind( diff --git a/lib/src/combine_latest.dart b/lib/src/combine_latest.dart index 489f344..7fb9ac9 100644 --- a/lib/src/combine_latest.dart +++ b/lib/src/combine_latest.dart @@ -4,6 +4,76 @@ import 'dart:async'; +extension CombineLatest on Stream { + /// Returns a stream which combines the latest value from the source stream + /// with the latest value from [other] using [combine]. + /// + /// No event will be emitted until both the source stream and [other] have + /// each emitted at least one event. If either the source stream or [other] + /// emit multiple events before the other emits the first event, all but the + /// last value will be discarded. Once both streams have emitted at least + /// once, the result stream will emit any time either input stream emits. + /// + /// The result stream will not close until both the source stream and [other] + /// have closed. + /// + /// For example: + /// + /// source.combineLatest(other, (a, b) => a + b); + /// + /// source: --1--2--------4--| + /// other: -------3--| + /// result: -------5------7--| + /// + /// Errors thrown by [combine], along with any errors on the source stream or + /// [other], are forwarded to the result stream. + /// + /// If the source stream is a broadcast stream, the result stream will be as + /// well, regardless of [other]'s type. If a single subscription stream is + /// combined with a broadcast stream it may never be canceled. + Stream combineLatest( + Stream other, FutureOr Function(T, T2) combine) => + transform(_CombineLatest(other, combine)); + + /// Combine the latest value emitted from the source stream with the latest + /// values emitted from [others]. + /// + /// [combineLatestAll] subscribes to the source stream and [others] and when + /// any one of the streams emits, the result stream will emit a [List] of + /// the latest values emitted from all streams. + /// + /// No event will be emitted until all source streams emit at least once. If a + /// source stream emits multiple values before another starts emitting, all + /// but the last value will be discarded. Once all source streams have emitted + /// at least once, the result stream will emit any time any source stream + /// emits. + /// + /// The result stream will not close until all source streams have closed. When + /// a source stream closes, the result stream will continue to emit the last + /// value from the closed stream when the other source streams emit until the + /// result stream has closed. If a source stream closes without emitting any + /// value, the result stream will close as well. + /// + /// For example: + /// + /// final combined = first + /// .combineLatestAll([second, third]) + /// .map((data) => data.join()); + /// + /// first: a----b------------------c--------d---| + /// second: --1---------2-----------------| + /// third: -------&----------%---| + /// combined: -------b1&--b2&---b2%---c2%------d2%-| + /// + /// Errors thrown by any source stream will be forwarded to the result stream. + /// + /// If the source stream is a broadcast stream, the result stream will be as + /// well, regardless of the types of [others]. If a single subscription stream + /// is combined with a broadcast source stream, it may never be canceled. + Stream> combineLatestAll(Iterable> others) => + transform(_CombineLatestAll(others)); +} + /// Combine the latest value from the source stream with the latest value from /// [other] using [combine]. /// @@ -31,6 +101,7 @@ import 'dart:async'; /// If the source stream is a broadcast stream, the result stream will be as /// well, regardless of [other]'s type. If a single subscription stream is /// combined with a broadcast stream it may never be canceled. +@Deprecated('Use the extension instead') StreamTransformer combineLatest( Stream other, FutureOr Function(S, T) combine) => _CombineLatest(other, combine); @@ -144,3 +215,110 @@ class _CombineLatest extends StreamTransformerBase { return controller.stream; } } + +/// Combine the latest value emitted from the source stream with the latest +/// values emitted from [others]. +/// +/// [combineLatestAll] subscribes to the source stream and [others] and when +/// any one of the streams emits, the result stream will emit a [List] of +/// the latest values emitted from all streams. +/// +/// The result stream will not emit until all source streams emit at least +/// once. If a source stream emits multiple values before another starts +/// emitting, all but the last value will be lost. +/// +/// The result stream will not close until all source streams have closed. When +/// a source stream closes, the result stream will continue to emit the last +/// value from the closed stream when the other source streams emit until the +/// result stream has closed. If a source stream closes without emitting any +/// value, the result stream will close as well. +/// +/// Errors thrown by any source stream will be forwarded to the result stream. +/// +/// If the source stream is a broadcast stream, the result stream will be as +/// well, regardless of the types of [others]. If a single subscription stream +/// is combined with a broadcast source stream, it may never be canceled. +/// +/// ## Example +/// +/// (Suppose first, second, and third are Stream) +/// final combined = first +/// .transform(combineLatestAll([second, third])) +/// .map((data) => data.join()); +/// +/// first: a----b------------------c--------d---| +/// second: --1---------2-----------------| +/// third: -------&----------%---| +/// combined: -------b1&--b2&---b2%---c2%------d2%-| +/// +@Deprecated('Use the extension instead') +StreamTransformer> combineLatestAll(Iterable> others) => + _CombineLatestAll(others); + +class _CombineLatestAll extends StreamTransformerBase> { + final Iterable> _others; + + _CombineLatestAll(this._others); + + @override + Stream> bind(Stream source) { + final controller = source.isBroadcast + ? StreamController>.broadcast(sync: true) + : StreamController>(sync: true); + + var allStreams = [source]..addAll(_others); + if (source.isBroadcast) { + allStreams = allStreams + .map((s) => s.isBroadcast ? s : s.asBroadcastStream()) + .toList(); + } + + List> subscriptions; + + controller.onListen = () { + assert(subscriptions == null); + + final latestData = List(allStreams.length); + final hasEmitted = {}; + void handleData(int index, T data) { + latestData[index] = data; + hasEmitted.add(index); + if (hasEmitted.length == allStreams.length) { + controller.add(List.from(latestData)); + } + } + + var activeStreamCount = 0; + subscriptions = allStreams.map((stream) { + final index = activeStreamCount; + activeStreamCount++; + return stream.listen((data) => handleData(index, data), + onError: controller.addError, onDone: () { + if (--activeStreamCount <= 0 || !hasEmitted.contains(index)) { + controller.close(); + } + }); + }).toList(); + if (!source.isBroadcast) { + controller + ..onPause = () { + for (var subscription in subscriptions) { + subscription.pause(); + } + } + ..onResume = () { + for (var subscription in subscriptions) { + subscription.resume(); + } + }; + } + controller.onCancel = () { + final toCancel = subscriptions; + subscriptions = null; + if (activeStreamCount <= 0) return null; + return Future.wait(toCancel.map((s) => s.cancel())); + }; + }; + return controller.stream; + } +} diff --git a/lib/src/combine_latest_all.dart b/lib/src/combine_latest_all.dart deleted file mode 100644 index 045ccdc..0000000 --- a/lib/src/combine_latest_all.dart +++ /dev/null @@ -1,111 +0,0 @@ -// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file -// for details. All rights reserved. Use of this source code is governed by a -// BSD-style license that can be found in the LICENSE file. - -import 'dart:async'; - -/// Combine the latest value emitted from the source stream with the latest -/// values emitted from [others]. -/// -/// [combineLatestAll] subscribes to the source stream and [others] and when -/// any one of the streams emits, the result stream will emit a [List] of -/// the latest values emitted from all streams. -/// -/// The result stream will not emit until all source streams emit at least -/// once. If a source stream emits multiple values before another starts -/// emitting, all but the last value will be lost. -/// -/// The result stream will not close until all source streams have closed. When -/// a source stream closes, the result stream will continue to emit the last -/// value from the closed stream when the other source streams emit until the -/// result stream has closed. If a source stream closes without emitting any -/// value, the result stream will close as well. -/// -/// Errors thrown by any source stream will be forwarded to the result stream. -/// -/// If the source stream is a broadcast stream, the result stream will be as -/// well, regardless of the types of [others]. If a single subscription stream -/// is combined with a broadcast source stream, it may never be canceled. -/// -/// ## Example -/// -/// (Suppose first, second, and third are Stream) -/// final combined = first -/// .transform(combineLatestAll([second, third])) -/// .map((data) => data.join()); -/// -/// first: a----b------------------c--------d---| -/// second: --1---------2-----------------| -/// third: -------&----------%---| -/// combined: -------b1&--b2&---b2%---c2%------d2%-| -/// -StreamTransformer> combineLatestAll(Iterable> others) => - _CombineLatestAll(others); - -class _CombineLatestAll extends StreamTransformerBase> { - final Iterable> _others; - - _CombineLatestAll(this._others); - - @override - Stream> bind(Stream source) { - final controller = source.isBroadcast - ? StreamController>.broadcast(sync: true) - : StreamController>(sync: true); - - var allStreams = [source]..addAll(_others); - if (source.isBroadcast) { - allStreams = allStreams - .map((s) => s.isBroadcast ? s : s.asBroadcastStream()) - .toList(); - } - - List> subscriptions; - - controller.onListen = () { - assert(subscriptions == null); - - final latestData = List(allStreams.length); - final hasEmitted = {}; - void handleData(int index, T data) { - latestData[index] = data; - hasEmitted.add(index); - if (hasEmitted.length == allStreams.length) { - controller.add(List.from(latestData)); - } - } - - var activeStreamCount = 0; - subscriptions = allStreams.map((stream) { - final index = activeStreamCount; - activeStreamCount++; - return stream.listen((data) => handleData(index, data), - onError: controller.addError, onDone: () { - if (--activeStreamCount <= 0 || !hasEmitted.contains(index)) { - controller.close(); - } - }); - }).toList(); - if (!source.isBroadcast) { - controller - ..onPause = () { - for (var subscription in subscriptions) { - subscription.pause(); - } - } - ..onResume = () { - for (var subscription in subscriptions) { - subscription.resume(); - } - }; - } - controller.onCancel = () { - final toCancel = subscriptions; - subscriptions = null; - if (activeStreamCount <= 0) return null; - return Future.wait(toCancel.map((s) => s.cancel())); - }; - }; - return controller.stream; - } -} diff --git a/lib/src/concurrent_async_map.dart b/lib/src/concurrent_async_map.dart index 791c84c..c0fae2f 100644 --- a/lib/src/concurrent_async_map.dart +++ b/lib/src/concurrent_async_map.dart @@ -6,6 +6,45 @@ import 'dart:async'; import 'from_handlers.dart'; +extension ConcurrentAsyncMap on Stream { + /// Like [asyncMap] but the [convert] callback may be called for an element + /// before processing for the previous element is finished. + /// + /// Events on the result stream will be emitted in the order that [convert] + /// completed which may not match the order of the original stream. + /// + /// If the source stream is a broadcast stream the result will be as well. + /// When used with a broadcast stream behavior also differs from [asyncMap] in + /// that the [convert] function is only called once per event, rather than + /// once per listener per event. The [convert] callback won't be called for + /// events while a broadcast stream has no listener. + /// + /// Errors from [convert] or the source stream are forwarded directly to the + /// result stream. + /// + /// The result stream will not close until the source stream closes and all + /// pending conversions have finished. + Stream concurrentAsyncMap(FutureOr convert(T event)) { + var valuesWaiting = 0; + var sourceDone = false; + return transform(fromHandlers(handleData: (element, sink) { + valuesWaiting++; + () async { + try { + sink.add(await convert(element)); + } catch (e, st) { + sink.addError(e, st); + } + valuesWaiting--; + if (valuesWaiting <= 0 && sourceDone) sink.close(); + }(); + }, handleDone: (sink) { + sourceDone = true; + if (valuesWaiting <= 0) sink.close(); + })); + } +} + /// Like [Stream.asyncMap] but the [convert] callback may be called for an /// element before processing for the previous element is finished. /// @@ -23,6 +62,7 @@ import 'from_handlers.dart'; /// /// The result stream will not close until the source stream closes and all /// pending conversions have finished. +@Deprecated('Use the extension instead') StreamTransformer concurrentAsyncMap(FutureOr convert(S event)) { var valuesWaiting = 0; var sourceDone = false; diff --git a/lib/src/debounce.dart b/lib/src/debounce.dart index ff3f21f..0987ec6 100644 --- a/lib/src/debounce.dart +++ b/lib/src/debounce.dart @@ -5,12 +5,53 @@ import 'dart:async'; import 'from_handlers.dart'; +extension Debounce on Stream { + /// Returns a Stream which only emits when the source stream does not emit for + /// [duration]. + /// + /// Values will always be delayed by at least [duration], and values which + /// come within this time will replace the old values, only the most + /// recent value will be emitted. + /// + /// If the source stream is a broadcast stream, the result will be as well. + /// Errors are forwarded immediately. + /// + /// If there is an event waiting during the debounce period when the source + /// stream closes the returned stream will wait to emit it following the + /// debounce period before closing. If there is no pending debounced event + /// when the source stream closes the returned stream will close immediately. + /// + /// To collect values emitted during the debounce period see [debounceBuffer]. + Stream debounce(Duration duration) => + transform(_debounceAggregate(duration, _dropPrevious)); + + /// Returns a Stream which collects values until the source stream does not + /// emit for [duration] then emits the collected values. + /// + /// Values will always be delayed by at least [duration], and values which + /// come within this time will be aggregated into the same list. + /// + /// If the source stream is a broadcast stream, the result will be as well. + /// Errors are forwarded immediately. + /// + /// If there are events waiting during the debounce period when the source + /// stream closes the returned stream will wait to emit them following the + /// debounce period before closing. If there are no pending debounced events + /// when the source stream closes the returned stream will close immediately. + /// + /// To keep only the most recent event during the debounce perios see + /// [debounce]. + Stream> debounceBuffer(Duration duration) => + transform(_debounceAggregate(duration, _collectToList)); +} + /// Creates a StreamTransformer which only emits when the source stream does not /// emit for [duration]. /// /// Source values will always be delayed by at least [duration], and values /// which come within this time will replace the old values, only the most /// recent value will be emitted. +@Deprecated('Use the extension instead') StreamTransformer debounce(Duration duration) => _debounceAggregate(duration, _dropPrevious); @@ -19,6 +60,7 @@ StreamTransformer debounce(Duration duration) => /// /// This differs from [debounce] in that values are aggregated instead of /// skipped. +@Deprecated('Use the extension instead') StreamTransformer> debounceBuffer(Duration duration) => _debounceAggregate(duration, _collectToList); diff --git a/lib/src/followed_by.dart b/lib/src/followed_by.dart index 3c5c6b3..41c7718 100644 --- a/lib/src/followed_by.dart +++ b/lib/src/followed_by.dart @@ -4,6 +4,25 @@ import 'dart:async'; +extension FollowedBy on Stream { + /// Returns a stream which emits values and errors from [next] after the + /// original stream is complete. + /// + /// If the source stream never finishes, the [next] stream will never be + /// listened to. + /// + /// If the source stream is a broadcast stream, the result will be as well. + /// If a single-subscription follows a broadcast stream it may be listened + /// to and never canceled since there may be broadcast listeners added later. + /// + /// If a broadcast stream follows any other stream it will miss any events or + /// errors which occur before the first stream is done. If a broadcast stream + /// follows a single-subscription stream, pausing the stream while it is + /// listening to the second stream will cause events to be dropped rather than + /// buffered. + Stream followedBy(Stream next) => transform(_FollowedBy(next)); +} + /// Starts emitting values from [next] after the original stream is complete. /// /// If the initial stream never finishes, the [next] stream will never be @@ -16,6 +35,7 @@ import 'dart:async'; /// occur before the first stream is done. If a broadcast stream follows a /// single-subscription stream, pausing the stream while it is listening to the /// second stream will cause events to be dropped rather than buffered. +@Deprecated('Use the extension instead') StreamTransformer followedBy(Stream next) => _FollowedBy(next); class _FollowedBy extends StreamTransformerBase { diff --git a/lib/src/map.dart b/lib/src/map.dart index 09ea0d6..e1f7137 100644 --- a/lib/src/map.dart +++ b/lib/src/map.dart @@ -16,5 +16,6 @@ import 'dart:async'; /// final sinkMapper = new StreamSinkTransformer.fromStreamTransformer( /// map((v) => '$v')); /// ``` +@Deprecated('This utility should not be needed with extension methods') StreamTransformer map(T convert(S event)) => StreamTransformer.fromBind((stream) => stream.map(convert)); diff --git a/lib/src/merge.dart b/lib/src/merge.dart index 74a1e1a..878a8d1 100644 --- a/lib/src/merge.dart +++ b/lib/src/merge.dart @@ -4,11 +4,63 @@ import 'dart:async'; +extension Merge on Stream { + /// Returns a stream which emits values and errors from the source stream and + /// [other] in any order as they arrive. + /// + /// The result stream will not close until both the source stream and [other] + /// have closed. + /// + /// For example: + /// + /// final result = source.merge(other); + /// + /// source: 1--2-----3--| + /// other: ------4-------5--| + /// result: 1--2--4--3----5--| + /// + /// If the source stream is a broadcast stream, the result stream will be as + /// well, regardless of [other]'s type. If a single subscription stream is + /// merged into a broadcast stream it may never be canceled since there may be + /// broadcast listeners added later. + /// + /// If a broadcast stream is merged into a single-subscription stream any + /// events emitted by [other] before the result stream has a subscriber will + /// be discarded. + Stream merge(Stream other) => transform(_Merge([other])); + + /// Returns a stream which emits values and errors from the source stream and + /// any stream in [others] in any order as they arrive. + /// + /// The result stream will not close until the source stream and all streams + /// in [others] have closed. + /// + /// For example: + /// + /// final result = first.mergeAll([second, third]); + /// + /// first: 1--2--------3--| + /// second: ---------4-------5--| + /// third: ------6---------------7--| + /// result: 1--2--6--4--3----5----7--| + /// + /// If the source stream is a broadcast stream, the result stream will be as + /// well, regardless the types of streams in [others]. If a single + /// subscription stream is merged into a broadcast stream it may never be + /// canceled since there may be broadcast listeners added later. + /// + /// If a broadcast stream is merged into a single-subscription stream any + /// events emitted by that stream before the result stream has a subscriber + /// will be discarded. + Stream mergeAll(Iterable> others) => transform(_Merge(others)); +} + /// Emits values from the source stream and [other] in any order as they arrive. /// /// If the source stream is a broadcast stream, the result stream will be as /// well, regardless of [other]'s type. If a single subscription stream is /// merged into a broadcast stream it may never be canceled. +@Deprecated('Use the extension instead') StreamTransformer merge(Stream other) => _Merge([other]); /// Emits values from the source stream and all streams in [others] in any order @@ -18,6 +70,7 @@ StreamTransformer merge(Stream other) => _Merge([other]); /// well, regardless of the types of streams in [others]. If single /// subscription streams are merged into a broadcast stream they may never be /// canceled. +@Deprecated('Use the extension instead') StreamTransformer mergeAll(Iterable> others) => _Merge(others); diff --git a/lib/src/scan.dart b/lib/src/scan.dart index b2f76ef..be9899e 100644 --- a/lib/src/scan.dart +++ b/lib/src/scan.dart @@ -4,6 +4,28 @@ import 'dart:async'; +extension Scan on Stream { + /// Like [fold], but instead of producing a single value it yields each + /// intermediate accumulation. + /// + /// If [combine] returns a Future it will not be called again for subsequent + /// events from the source until it completes, therefor the combine callback + /// is always called for elements in order, and the result stream always + /// maintains the same order as the original. + Stream scan( + S initialValue, FutureOr combine(S previousValue, T element)) { + var accumulated = initialValue; + return asyncMap((value) { + var result = combine(accumulated, value); + if (result is Future) { + return result.then((r) => accumulated = r); + } else { + return accumulated = result as S; + } + }); + } +} + /// Scan is like fold, but instead of producing a single value it yields /// each intermediate accumulation. /// @@ -11,6 +33,7 @@ import 'dart:async'; /// events from the source until it completes, therefor the combine callback is /// always called for elements in order, and the result stream always maintains /// the same order as the original. +@Deprecated('Use the extension instead') StreamTransformer scan( T initialValue, FutureOr combine(T previousValue, S element)) => StreamTransformer.fromBind((source) { diff --git a/lib/src/start_with.dart b/lib/src/start_with.dart index fbaf46a..a9ecaef 100644 --- a/lib/src/start_with.dart +++ b/lib/src/start_with.dart @@ -6,9 +6,41 @@ import 'dart:async'; import 'followed_by.dart'; +extension StartWith on Stream { + /// Returns a stream which emits [initial] before any values from the original + /// stream. + /// + /// If the original stream is a broadcast stream the result will be as well. + Stream startWith(T initial) => + startWithStream(Future.value(initial).asStream()); + + /// Returns a stream which emits all values in [initial] before any values + /// from the original stream. + /// + /// If the original stream is a broadcast stream the result will be as well. + /// If the original stream is a broadcast stream it will miss any events which + /// occur before the initial values are all emitted. + Stream startWithMany(Iterable initial) => + startWithStream(Stream.fromIterable(initial)); + + /// Returns a stream which emits all values in [initial] before any values + /// from the original stream. + /// + /// If the original stream is a broadcast stream the result will be as well. If + /// the original stream is a broadcast stream it will miss any events which + /// occur before [initial] closes. + Stream startWithStream(Stream initial) { + if (isBroadcast && !initial.isBroadcast) { + initial = initial.asBroadcastStream(); + } + return initial.followedBy(this); + } +} + /// Emits [initial] before any values from the original stream. /// /// If the original stream is a broadcast stream the result will be as well. +@Deprecated('Use the extension instead') StreamTransformer startWith(T initial) => startWithStream(Future.value(initial).asStream()); @@ -17,6 +49,7 @@ StreamTransformer startWith(T initial) => /// If the original stream is a broadcast stream the result will be as well. If /// the original stream is a broadcast stream it will miss any events which /// occur before the initial values are all emitted. +@Deprecated('Use the extension instead') StreamTransformer startWithMany(Iterable initial) => startWithStream(Stream.fromIterable(initial)); @@ -25,6 +58,7 @@ StreamTransformer startWithMany(Iterable initial) => /// If the original stream is a broadcast stream the result will be as well. If /// the original stream is a broadcast stream it will miss any events which /// occur before [initial] closes. +@Deprecated('Use the extension instead') StreamTransformer startWithStream(Stream initial) => StreamTransformer.fromBind((values) { if (values.isBroadcast && !initial.isBroadcast) { diff --git a/lib/src/switch.dart b/lib/src/switch.dart index e02917d..e4f4787 100644 --- a/lib/src/switch.dart +++ b/lib/src/switch.dart @@ -1,10 +1,33 @@ // Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. + import 'dart:async'; -import 'chain_transformers.dart'; -import 'map.dart'; +extension Switch on Stream { + /// Maps events to a Stream and emits values from the most recently created + /// Stream. + /// + /// When the source emits a value it will be converted to a [Stream] using + /// [convert] and the output will switch to emitting events from that result. + /// + /// If the source stream is a broadcast stream, the result stream will be as + /// well, regardless of the types of the streams produced by [convert]. + Stream switchMap(Stream convert(T event)) { + return map(convert).switchLatest(); + } +} + +extension SwitchLatest on Stream> { + /// Emits values from the most recently emitted Stream. + /// + /// When the source emits a stream the output will switch to emitting events + /// from that stream. + /// + /// If the source stream is a broadcast stream, the result stream will be as + /// well, regardless of the types of streams emitted. + Stream switchLatest() => transform(_SwitchTransformer()); +} /// Maps events to a Stream and emits values from the most recently created /// Stream. @@ -14,8 +37,10 @@ import 'map.dart'; /// /// If the source stream is a broadcast stream, the result stream will be as /// well, regardless of the types of the streams produced by [map]. +@Deprecated('Use the extension instead') StreamTransformer switchMap(Stream convert(S event)) => - chainTransformers(map(convert), switchLatest()); + StreamTransformer.fromBind( + (source) => source.map(convert).transform(switchLatest())); /// Emits values from the most recently emitted Stream. /// @@ -24,6 +49,7 @@ StreamTransformer switchMap(Stream convert(S event)) => /// /// If the source stream is a broadcast stream, the result stream will be as /// well, regardless of the types of streams emitted. +@Deprecated('Use the extension instead') StreamTransformer, T> switchLatest() => _SwitchTransformer(); class _SwitchTransformer extends StreamTransformerBase, T> { diff --git a/lib/src/take_until.dart b/lib/src/take_until.dart index ad12ae8..d415c24 100644 --- a/lib/src/take_until.dart +++ b/lib/src/take_until.dart @@ -4,12 +4,24 @@ import 'dart:async'; +extension TakeUntil on Stream { + /// Returns a stram which emits values from the source stream until [trigger] + /// fires. + /// + /// Completing [trigger] differs from canceling a subscription in that values + /// which are emitted before the trigger, but have further asynchronous delays + /// in transformations following the takeUtil, will still go through. + /// Cancelling a subscription immediately stops values. + Stream takeUntil(Future trigger) => transform(_TakeUntil(trigger)); +} + /// Emits values from the stream until [trigger] fires. /// /// Completing [trigger] differs from canceling a subscription in that values /// which are emitted before the trigger, but have further asynchronous delays /// in transformations following the takeUtil, will still go through. Cancelling /// a subscription immediately stops values. +@Deprecated('Use the extension instead') StreamTransformer takeUntil(Future trigger) => _TakeUntil(trigger); diff --git a/lib/src/tap.dart b/lib/src/tap.dart index 1b799fa..61ee198 100644 --- a/lib/src/tap.dart +++ b/lib/src/tap.dart @@ -5,6 +5,45 @@ import 'dart:async'; import 'from_handlers.dart'; +extension Tap on Stream { + /// Taps into this stream to allow additional handling on a single-subscriber + /// stream without first wrapping as a broadcast stream. + /// + /// The [onValue] callback will be called with every value from the source + /// stream before it is forwarded to listeners on the resulting stream. May be + /// null if only [onError] or [onDone] callbacks are needed. + /// + /// The [onError] callback will be called with every error from the source + /// stream before it is forwarded to listeners on the resulting stream. + /// + /// The [onDone] callback will be called after the source stream closes and + /// before the resulting stream is closed. + /// + /// Errors from any of the callbacks are caught and ignored. + /// + /// The callbacks may not be called until the tapped stream has a listener, + /// and may not be called after the listener has canceled the subscription. + Stream tap(void Function(T) onValue, + {void Function(Object, StackTrace) onError, + void Function() onDone}) => + transform(fromHandlers(handleData: (value, sink) { + try { + onValue?.call(value); + } catch (_) {/*Ignore*/} + sink.add(value); + }, handleError: (error, stackTrace, sink) { + try { + onError?.call(error, stackTrace); + } catch (_) {/*Ignore*/} + sink.addError(error, stackTrace); + }, handleDone: (sink) { + try { + onDone?.call(); + } catch (_) {/*Ignore*/} + sink.close(); + })); +} + /// Taps into a Stream to allow additional handling on a single-subscriber /// stream without first wrapping as a broadcast stream. /// @@ -22,6 +61,7 @@ import 'from_handlers.dart'; /// /// The callbacks may not be called until the tapped stream has a listener, and /// may not be called after the listener has canceled the subscription. +@Deprecated('Use the extension instead') StreamTransformer tap(void Function(T) onValue, {void Function(Object, StackTrace) onError, void Function() onDone}) => fromHandlers(handleData: (value, sink) { diff --git a/lib/src/throttle.dart b/lib/src/throttle.dart index 475ad92..bea2980 100644 --- a/lib/src/throttle.dart +++ b/lib/src/throttle.dart @@ -5,8 +5,29 @@ import 'dart:async'; import 'from_handlers.dart'; +extension Throttle on Stream { + /// Returns a stream which only emits once per [duration], at the beginning of + /// the period. + /// + /// Events emitted by the source stream within [duration] following an emitted + /// event will be discarded. Errors are always forwarded immediately. + Stream throttle(Duration duration) { + Timer timer; + + return transform(fromHandlers(handleData: (data, sink) { + if (timer == null) { + sink.add(data); + timer = Timer(duration, () { + timer = null; + }); + } + })); + } +} + /// Creates a StreamTransformer which only emits once per [duration], at the /// beginning of the period. +@Deprecated('Use the extension instead') StreamTransformer throttle(Duration duration) { Timer timer; diff --git a/lib/src/where_type.dart b/lib/src/where_type.dart index 9164f91..eb79bb5 100644 --- a/lib/src/where_type.dart +++ b/lib/src/where_type.dart @@ -4,6 +4,19 @@ import 'dart:async'; +extension WhereType on Stream { + /// Returns a stream which emits only the events which have type [S]. + /// + /// If the source stream is a broadcast stream the result will be as well. + /// + /// Errors from the source stream are forwarded directly to the result stream. + /// + /// [S] should be a subtype of the stream's generic type, otherwise nothing of + /// type [S] could possibly be emitted, however there is no static or runtime + /// checking that this is the case. + Stream whereType() => transform(_WhereType()); +} + /// Emits only the events which have type [R]. /// /// If the source stream is a broadcast stream the result will be as well. @@ -19,6 +32,7 @@ import 'dart:async'; /// [R] should be a subtype of the stream's generic type, otherwise nothing of /// type [R] could possibly be emitted, however there is no static or runtime /// checking that this is the case. +@Deprecated('Use the extension instead') StreamTransformer whereType() => _WhereType(); class _WhereType extends StreamTransformerBase { diff --git a/lib/stream_transform.dart b/lib/stream_transform.dart index b8b15f4..557fc82 100644 --- a/lib/stream_transform.dart +++ b/lib/stream_transform.dart @@ -8,7 +8,6 @@ export 'src/audit.dart'; export 'src/buffer.dart'; export 'src/chain_transformers.dart'; export 'src/combine_latest.dart'; -export 'src/combine_latest_all.dart'; export 'src/concat.dart'; export 'src/concurrent_async_map.dart'; export 'src/debounce.dart'; diff --git a/pubspec.yaml b/pubspec.yaml index 38ad8d5..7542887 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -2,10 +2,10 @@ name: stream_transform description: A collection of utilities to transform and manipulate streams. author: Dart Team homepage: https://www.github.com/dart-lang/stream_transform -version: 0.0.19 +version: 0.0.20-dev environment: - sdk: ">=2.2.0 <3.0.0" + sdk: ">=2.6.0-dev.5.0 <3.0.0" dev_dependencies: pedantic: ^1.5.0 diff --git a/test/async_map_buffer_test.dart b/test/async_map_buffer_test.dart index 021755a..9b46e1f 100644 --- a/test/async_map_buffer_test.dart +++ b/test/async_map_buffer_test.dart @@ -52,7 +52,7 @@ void main() { isDone = false; finishWork = null; workArgument = null; - transformed = values.stream.transform(asyncMapBuffer(work)); + transformed = values.stream.asyncMapBuffer(work); subscription = transformed .listen(emittedValues.add, onError: errors.add, onDone: () { isDone = true; diff --git a/test/async_map_sample_test.dart b/test/async_map_sample_test.dart index 1a6440e..119e0c8 100644 --- a/test/async_map_sample_test.dart +++ b/test/async_map_sample_test.dart @@ -52,7 +52,7 @@ void main() { isDone = false; finishWork = null; workArgument = null; - transformed = values.stream.transform(asyncMapSample(work)); + transformed = values.stream.asyncMapSample(work); subscription = transformed .listen(emittedValues.add, onError: errors.add, onDone: () { isDone = true; diff --git a/test/async_where_test.dart b/test/async_where_test.dart index 9d0c9f2..a90210c 100644 --- a/test/async_where_test.dart +++ b/test/async_where_test.dart @@ -10,34 +10,34 @@ import 'package:stream_transform/stream_transform.dart'; void main() { test('forwards only events that pass the predicate', () async { var values = Stream.fromIterable([1, 2, 3, 4]); - var filtered = values.transform(asyncWhere((e) async => e > 2)); + var filtered = values.asyncWhere((e) async => e > 2); expect(await filtered.toList(), [3, 4]); }); test('allows predicates that go through event loop', () async { var values = Stream.fromIterable([1, 2, 3, 4]); - var filtered = values.transform(asyncWhere((e) async { + var filtered = values.asyncWhere((e) async { await Future(() {}); return e > 2; - })); + }); expect(await filtered.toList(), [3, 4]); }); test('allows synchronous predicate', () async { var values = Stream.fromIterable([1, 2, 3, 4]); - var filtered = values.transform(asyncWhere((e) => e > 2)); + var filtered = values.asyncWhere((e) => e > 2); expect(await filtered.toList(), [3, 4]); }); test('can result in empty stream', () async { var values = Stream.fromIterable([1, 2, 3, 4]); - var filtered = values.transform(asyncWhere((e) => e > 4)); + var filtered = values.asyncWhere((e) => e > 4); expect(await filtered.isEmpty, true); }); test('forwards values to multiple listeners', () async { var values = StreamController.broadcast(); - var filtered = values.stream.transform(asyncWhere((e) async => e > 2)); + var filtered = values.stream.asyncWhere((e) async => e > 2); var firstValues = []; var secondValues = []; filtered..listen(firstValues.add)..listen(secondValues.add); @@ -50,7 +50,7 @@ void main() { test('closes streams with multiple listeners', () async { var values = StreamController.broadcast(); var predicate = Completer(); - var filtered = values.stream.transform(asyncWhere((_) => predicate.future)); + var filtered = values.stream.asyncWhere((_) => predicate.future); var firstDone = false; var secondDone = false; filtered @@ -71,11 +71,11 @@ void main() { var errors = []; var emitted = []; var values = Stream.fromIterable([1, 2, 3, 4]); - var filtered = values.transform(asyncWhere((e) async { + var filtered = values.asyncWhere((e) async { await Future(() {}); if (e.isEven) throw Exception('$e'); return true; - })); + }); var done = Completer(); filtered.listen(emitted.add, onError: errors.add, onDone: done.complete); await done.future; diff --git a/test/audit_test.dart b/test/audit_test.dart index 168227a..58cb95b 100644 --- a/test/audit_test.dart +++ b/test/audit_test.dart @@ -20,25 +20,21 @@ void main() { Stream transformed; StreamSubscription subscription; - void setUpStreams(StreamTransformer transformer) { - valuesCanceled = false; - values = createController(streamType) - ..onCancel = () { - valuesCanceled = true; - }; - emittedValues = []; - errors = []; - isDone = false; - transformed = values.stream.transform(transformer); - subscription = transformed - .listen(emittedValues.add, onError: errors.add, onDone: () { - isDone = true; - }); - } - group('audit', () { setUp(() async { - setUpStreams(audit(const Duration(milliseconds: 5))); + valuesCanceled = false; + values = createController(streamType) + ..onCancel = () { + valuesCanceled = true; + }; + emittedValues = []; + errors = []; + isDone = false; + transformed = values.stream.audit(const Duration(milliseconds: 5)); + subscription = transformed + .listen(emittedValues.add, onError: errors.add, onDone: () { + isDone = true; + }); }); test('cancels values', () async { diff --git a/test/buffer_test.dart b/test/buffer_test.dart index 3dcd505..5800586 100644 --- a/test/buffer_test.dart +++ b/test/buffer_test.dart @@ -41,7 +41,7 @@ void main() { emittedValues = []; errors = []; isDone = false; - transformed = values.stream.transform(buffer(trigger.stream)); + transformed = values.stream.buffer(trigger.stream); subscription = transformed.listen(emittedValues.add, onError: errors.add, onDone: () { isDone = true; diff --git a/test/combine_latest_all_test.dart b/test/combine_latest_all_test.dart index cdfedb0..2943449 100644 --- a/test/combine_latest_all_test.dart +++ b/test/combine_latest_all_test.dart @@ -16,9 +16,8 @@ void main() { final first = StreamController(); final second = StreamController(); final third = StreamController(); - final combined = first.stream - .transform(combineLatestAll([second.stream, third.stream])) - .map((data) => data.join()); + final combined = first.stream.combineLatestAll( + [second.stream, third.stream]).map((data) => data.join()); // first: a----b------------------c--------d---| // second: --1---------2-----------------| @@ -54,8 +53,7 @@ void main() { test('ends if a Stream closes without ever emitting a value', () async { final first = StreamController(); final second = StreamController(); - final combined = - first.stream.transform(combineLatestAll([second.stream])); + final combined = first.stream.combineLatestAll([second.stream]); // first: -a------b-------| // second: -----| @@ -74,8 +72,7 @@ void main() { final first = StreamController(); final second = StreamController(); final combined = first.stream - .transform(combineLatestAll([second.stream])) - .map((data) => data.join()); + .combineLatestAll([second.stream]).map((data) => data.join()); // first: -a---------| // second: ----1---# @@ -95,9 +92,8 @@ void main() { final second = StreamController(); var done = false; - first.stream - .transform(combineLatestAll([second.stream])) - .listen(null, onDone: () => done = true); + first.stream.combineLatestAll([second.stream]).listen(null, + onDone: () => done = true); // first: -a---| // second: --------1--| @@ -123,8 +119,7 @@ void main() { final first = StreamController.broadcast(); final second = StreamController.broadcast(); final combined = first.stream - .transform(combineLatestAll([second.stream])) - .map((data) => data.join()); + .combineLatestAll([second.stream]).map((data) => data.join()); // first: a------b----------------c------d----e---| // second: --1---------2---3---4------5-| diff --git a/test/combine_latest_test.dart b/test/combine_latest_test.dart index f8156bc..2b35e2c 100644 --- a/test/combine_latest_test.dart +++ b/test/combine_latest_test.dart @@ -17,9 +17,8 @@ void main() { int sum(int a, int b) => a + b; var results = []; - unawaited(source.stream - .transform(combineLatest(other.stream, sum)) - .forEach(results.add)); + unawaited( + source.stream.combineLatest(other.stream, sum).forEach(results.add)); source.add(1); await Future(() {}); @@ -49,7 +48,7 @@ void main() { var results = []; unawaited(source.stream - .transform(combineLatest(other.stream, times)) + .combineLatest(other.stream, times) .forEach(results.add)); source..add('a')..add('b'); @@ -76,7 +75,7 @@ void main() { var done = false; source.stream - .transform(combineLatest(other.stream, sum)) + .combineLatest(other.stream, sum) .listen(null, onDone: () => done = true); source.add(1); @@ -99,7 +98,7 @@ void main() { var done = false; source - .transform(combineLatest(other.stream, sum)) + .combineLatest(other.stream, sum) .listen(null, onDone: () => done = true); await Future(() {}); @@ -115,7 +114,7 @@ void main() { var done = false; source.stream - .transform(combineLatest(other, sum)) + .combineLatest(other, sum) .listen(null, onDone: () => done = true); await Future(() {}); @@ -130,7 +129,7 @@ void main() { var errors = []; source.stream - .transform(combineLatest(other.stream, sum)) + .combineLatest(other.stream, sum) .listen(null, onError: errors.add); source.addError(_NumberedException(1)); @@ -151,8 +150,7 @@ void main() { int combine(int a, int b) => a + b; var emittedValues = []; - var transformed = - source.stream.transform(combineLatest(other.stream, combine)); + var transformed = source.stream.combineLatest(other.stream, combine); var subscription = transformed.listen(emittedValues.add); diff --git a/test/concurrent_async_map_test.dart b/test/concurrent_async_map_test.dart index dceaa87..6da2101 100644 --- a/test/concurrent_async_map_test.dart +++ b/test/concurrent_async_map_test.dart @@ -42,7 +42,7 @@ void main() { isDone = false; finishWork = []; values = []; - transformed = controller.stream.transform(concurrentAsyncMap(convert)); + transformed = controller.stream.concurrentAsyncMap(convert); subscription = transformed .listen(emittedValues.add, onError: errors.add, onDone: () { isDone = true; diff --git a/test/debounce_test.dart b/test/debounce_test.dart index 89fede9..cb027c5 100644 --- a/test/debounce_test.dart +++ b/test/debounce_test.dart @@ -21,7 +21,7 @@ void main() { StreamSubscription subscription; Stream transformed; - void setUpStreams(StreamTransformer transformer) { + setUp(() async { valuesCanceled = false; values = createController(streamType) ..onCancel = () { @@ -30,15 +30,11 @@ void main() { emittedValues = []; errors = []; isDone = false; - transformed = values.stream.transform(transformer); + transformed = values.stream.debounce(const Duration(milliseconds: 5)); subscription = transformed .listen(emittedValues.add, onError: errors.add, onDone: () { isDone = true; }); - } - - setUp(() async { - setUpStreams(debounce(const Duration(milliseconds: 5))); }); test('cancels values', () async { @@ -103,7 +99,7 @@ void main() { emittedValues = []; errors = []; transformed = values.stream - .transform(debounceBuffer(const Duration(milliseconds: 5))) + .debounceBuffer(const Duration(milliseconds: 5)) ..listen(emittedValues.add, onError: errors.add); }); diff --git a/test/followd_by_test.dart b/test/followd_by_test.dart index c5f48aa..c8864b5 100644 --- a/test/followd_by_test.dart +++ b/test/followd_by_test.dart @@ -43,7 +43,7 @@ void main() { emittedValues = []; errors = []; isDone = false; - transformed = first.stream.transform(followedBy(second.stream)); + transformed = first.stream.followedBy(second.stream); subscription = transformed .listen(emittedValues.add, onError: errors.add, onDone: () { isDone = true; diff --git a/test/merge_test.dart b/test/merge_test.dart index 4d3050a..24cd76b 100644 --- a/test/merge_test.dart +++ b/test/merge_test.dart @@ -13,7 +13,7 @@ void main() { test('includes all values', () async { var first = Stream.fromIterable([1, 2, 3]); var second = Stream.fromIterable([4, 5, 6]); - var allValues = await first.transform(merge(second)).toList(); + var allValues = await first.merge(second).toList(); expect(allValues, containsAllInOrder([1, 2, 3])); expect(allValues, containsAllInOrder([4, 5, 6])); expect(allValues, hasLength(6)); @@ -30,8 +30,7 @@ void main() { ..onCancel = () { secondCanceled = true; }; - var subscription = - first.stream.transform(merge(second.stream)).listen((_) {}); + var subscription = first.stream.merge(second.stream).listen((_) {}); await subscription.cancel(); expect(firstCanceled, true); expect(secondCanceled, true); @@ -41,7 +40,7 @@ void main() { var first = StreamController(); var second = StreamController(); var isDone = false; - first.stream.transform(merge(second.stream)).listen((_) {}, onDone: () { + first.stream.merge(second.stream).listen((_) {}, onDone: () { isDone = true; }); await first.close(); @@ -54,7 +53,7 @@ void main() { var first = StreamController.broadcast(); var second = StreamController(); var emittedValues = []; - var transformed = first.stream.transform(merge(second.stream)); + var transformed = first.stream.merge(second.stream); var subscription = transformed.listen(emittedValues.add); first.add(1); second.add(2); @@ -77,7 +76,7 @@ void main() { var first = Stream.fromIterable([1, 2, 3]); var second = Stream.fromIterable([4, 5, 6]); var third = Stream.fromIterable([7, 8, 9]); - var allValues = await first.transform(mergeAll([second, third])).toList(); + var allValues = await first.mergeAll([second, third]).toList(); expect(allValues, containsAllInOrder([1, 2, 3])); expect(allValues, containsAllInOrder([4, 5, 6])); expect(allValues, containsAllInOrder([7, 8, 9])); @@ -101,8 +100,8 @@ void main() { secondSingleCanceled = true; }; - var merged = first.stream - .transform(mergeAll([secondBroadcast.stream, secondSingle.stream])); + var merged = + first.stream.mergeAll([secondBroadcast.stream, secondSingle.stream]); var firstListenerValues = []; var secondListenerValues = []; diff --git a/test/scan_test.dart b/test/scan_test.dart index 662831c..4e58680 100644 --- a/test/scan_test.dart +++ b/test/scan_test.dart @@ -14,7 +14,7 @@ void main() { test('produces intermediate values', () async { var source = Stream.fromIterable([1, 2, 3, 4]); int sum(int x, int y) => x + y; - var result = await source.transform(scan(0, sum)).toList(); + var result = await source.scan(0, sum).toList(); expect(result, [1, 3, 6, 10]); }); @@ -22,7 +22,7 @@ void main() { test('can create a broadcast stream', () { var source = StreamController.broadcast(); - var transformed = source.stream.transform(scan(null, null)); + var transformed = source.stream.scan(null, null); expect(transformed.isBroadcast, true); }); @@ -34,7 +34,7 @@ void main() { var errors = []; - source.stream.transform(scan(0, sum)).listen(null, onError: errors.add); + source.stream.scan(0, sum).listen(null, onError: errors.add); source.addError(StateError('fail')); await Future(() {}); @@ -46,7 +46,7 @@ void main() { test('returns a Stream of non-futures', () async { var source = Stream.fromIterable([1, 2, 3, 4]); Future sum(int x, int y) async => x + y; - var result = await source.transform(scan(0, sum)).toList(); + var result = await source.scan(0, sum).toList(); expect(result, [1, 3, 6, 10]); }); @@ -54,9 +54,8 @@ void main() { test('can return a Stream of futures when specified', () async { var source = Stream.fromIterable([1, 2]); Future sum(Future x, int y) async => (await x) + y; - var result = await source - .transform(scan>(Future.value(0), sum)) - .toList(); + var result = + await source.scan>(Future.value(0), sum).toList(); expect(result, [ const TypeMatcher>(), @@ -78,8 +77,7 @@ void main() { var results = []; - unawaited( - source.stream.transform(scan(0, combine)).forEach(results.add)); + unawaited(source.stream.scan(0, combine).forEach(results.add)); source..add(1)..add(2); await Future(() {}); @@ -99,9 +97,7 @@ void main() { var errors = []; - source.stream - .transform(scan(0, combine)) - .listen(null, onError: errors.add); + source.stream.scan(0, combine).listen(null, onError: errors.add); source.add(1); await Future(() {}); diff --git a/test/start_with_test.dart b/test/start_with_test.dart index 8ce8075..3208903 100644 --- a/test/start_with_test.dart +++ b/test/start_with_test.dart @@ -19,18 +19,18 @@ void main() { bool isDone; setupForStreamType( - String streamType, StreamTransformer transformer) { + String streamType, Stream Function(Stream) transform) { emittedValues = []; isDone = false; values = createController(streamType); - transformed = values.stream.transform(transformer); + transformed = transform(values.stream); subscription = transformed.listen(emittedValues.add, onDone: () => isDone = true); } for (var streamType in streamTypes) { group('startWith then [$streamType]', () { - setUp(() => setupForStreamType(streamType, startWith(1))); + setUp(() => setupForStreamType(streamType, (s) => s.startWith(1))); test('outputs all values', () async { values..add(2)..add(3); @@ -65,7 +65,7 @@ void main() { group('startWithMany then [$streamType]', () { setUp(() async { - setupForStreamType(streamType, startWithMany([1, 2])); + setupForStreamType(streamType, (s) => s.startWithMany([1, 2])); // Ensure all initial values go through await Future(() {}); }); @@ -105,7 +105,8 @@ void main() { StreamController starting; setUp(() async { starting = createController(startingStreamType); - setupForStreamType(streamType, startWithStream(starting.stream)); + setupForStreamType( + streamType, (s) => s.startWithStream(starting.stream)); }); test('outputs all values', () async { diff --git a/test/switch_test.dart b/test/switch_test.dart index e32026a..ba26503 100644 --- a/test/switch_test.dart +++ b/test/switch_test.dart @@ -41,7 +41,7 @@ void main() { errors = []; isDone = false; subscription = outer.stream - .transform(switchLatest()) + .switchLatest() .listen(emittedValues.add, onError: errors.add, onDone: () { isDone = true; }); @@ -129,9 +129,7 @@ void main() { var outer = StreamController>(); var values = []; - outer.stream - .transform(switchMap((l) => Stream.fromIterable(l))) - .listen(values.add); + outer.stream.switchMap((l) => Stream.fromIterable(l)).listen(values.add); outer.add([1, 2, 3]); await Future(() {}); @@ -143,7 +141,7 @@ void main() { test('can create a broadcast stream', () async { var outer = StreamController.broadcast(); - var transformed = outer.stream.transform(switchMap(null)); + var transformed = outer.stream.switchMap(null); expect(transformed.isBroadcast, true); }); diff --git a/test/take_until_test.dart b/test/take_until_test.dart index 74c5167..c25d342 100644 --- a/test/take_until_test.dart +++ b/test/take_until_test.dart @@ -32,7 +32,7 @@ void main() { errors = []; isDone = false; closeTrigger = Completer(); - transformed = values.stream.transform(takeUntil(closeTrigger.future)); + transformed = values.stream.takeUntil(closeTrigger.future); subscription = transformed .listen(emittedValues.add, onError: errors.add, onDone: () { isDone = true; diff --git a/test/tap_test.dart b/test/tap_test.dart index 816c80f..79d47b4 100644 --- a/test/tap_test.dart +++ b/test/tap_test.dart @@ -12,24 +12,22 @@ void main() { test('calls function for values', () async { var valuesSeen = []; var stream = Stream.fromIterable([1, 2, 3]); - await stream.transform(tap(valuesSeen.add)).last; + await stream.tap(valuesSeen.add).last; expect(valuesSeen, [1, 2, 3]); }); test('forwards values', () async { var stream = Stream.fromIterable([1, 2, 3]); - var values = await stream.transform(tap((_) {})).toList(); + var values = await stream.tap((_) {}).toList(); expect(values, [1, 2, 3]); }); test('calls function for errors', () async { dynamic error; var source = StreamController(); - source.stream - .transform(tap((_) {}, onError: (e, st) { - error = e; - })) - .listen((_) {}, onError: (_) {}); + source.stream.tap((_) {}, onError: (e, st) { + error = e; + }).listen((_) {}, onError: (_) {}); source.addError('error'); await Future(() {}); expect(error, 'error'); @@ -38,8 +36,7 @@ void main() { test('forwards errors', () async { dynamic error; var source = StreamController(); - source.stream.transform(tap((_) {}, onError: (e, st) {})).listen((_) {}, - onError: (e) { + source.stream.tap((_) {}, onError: (e, st) {}).listen((_) {}, onError: (e) { error = e; }); source.addError('error'); @@ -50,11 +47,9 @@ void main() { test('calls function on done', () async { var doneCalled = false; var source = StreamController(); - source.stream - .transform(tap((_) {}, onDone: () { - doneCalled = true; - })) - .listen((_) {}); + source.stream.tap((_) {}, onDone: () { + doneCalled = true; + }).listen((_) {}); await source.close(); expect(doneCalled, true); }); @@ -63,9 +58,9 @@ void main() { () async { var dataCallCount = 0; var source = StreamController.broadcast(); - source.stream.transform(tap((_) { + source.stream.tap((_) { dataCallCount++; - })) + }) ..listen((_) {}) ..listen((_) {}); source.add(1); @@ -78,9 +73,9 @@ void main() { () async { var errorCallCount = 0; var source = StreamController.broadcast(); - source.stream.transform(tap((_) {}, onError: (_, __) { + source.stream.tap((_) {}, onError: (_, __) { errorCallCount++; - })) + }) ..listen((_) {}, onError: (_, __) {}) ..listen((_) {}, onError: (_, __) {}); source.addError('error'); @@ -92,9 +87,9 @@ void main() { () async { var doneCallCount = 0; var source = StreamController.broadcast(); - source.stream.transform(tap((_) {}, onDone: () { + source.stream.tap((_) {}, onDone: () { doneCallCount++; - })) + }) ..listen((_) {}) ..listen((_) {}); await source.close(); @@ -105,7 +100,7 @@ void main() { var source = StreamController.broadcast(); var emittedValues1 = []; var emittedValues2 = []; - source.stream.transform(tap((_) {})) + source.stream.tap((_) {}) ..listen(emittedValues1.add) ..listen(emittedValues2.add); source.add(1); @@ -116,6 +111,6 @@ void main() { test('allows null callback', () async { var stream = Stream.fromIterable([1, 2, 3]); - await stream.transform(tap(null)).last; + await stream.tap(null).last; }); } diff --git a/test/throttle_test.dart b/test/throttle_test.dart index ae77b7c..e51ef84 100644 --- a/test/throttle_test.dart +++ b/test/throttle_test.dart @@ -19,23 +19,19 @@ void main() { Stream transformed; StreamSubscription subscription; - void setUpStreams(StreamTransformer transformer) { - valuesCanceled = false; - values = createController(streamType) - ..onCancel = () { - valuesCanceled = true; - }; - emittedValues = []; - isDone = false; - transformed = values.stream.transform(transformer); - subscription = transformed.listen(emittedValues.add, onDone: () { - isDone = true; - }); - } - group('throttle', () { setUp(() async { - setUpStreams(throttle(const Duration(milliseconds: 5))); + valuesCanceled = false; + values = createController(streamType) + ..onCancel = () { + valuesCanceled = true; + }; + emittedValues = []; + isDone = false; + transformed = values.stream.throttle(const Duration(milliseconds: 5)); + subscription = transformed.listen(emittedValues.add, onDone: () { + isDone = true; + }); }); test('cancels values', () async { diff --git a/test/where_type_test.dart b/test/where_type_test.dart index 75062c2..2ea4782 100644 --- a/test/where_type_test.dart +++ b/test/where_type_test.dart @@ -11,19 +11,19 @@ import 'package:stream_transform/stream_transform.dart'; void main() { test('forwards only events that match the type', () async { var values = Stream.fromIterable([1, 'a', 2, 'b']); - var filtered = values.transform(whereType()); + var filtered = values.whereType(); expect(await filtered.toList(), ['a', 'b']); }); test('can result in empty stream', () async { var values = Stream.fromIterable([1, 2, 3, 4]); - var filtered = values.transform(whereType()); + var filtered = values.whereType(); expect(await filtered.isEmpty, true); }); test('forwards values to multiple listeners', () async { var values = StreamController.broadcast(); - var filtered = values.stream.transform(whereType()); + var filtered = values.stream.whereType(); var firstValues = []; var secondValues = []; filtered..listen(firstValues.add)..listen(secondValues.add); @@ -35,7 +35,7 @@ void main() { test('closes streams with multiple listeners', () async { var values = StreamController.broadcast(); - var filtered = values.stream.transform(whereType()); + var filtered = values.stream.whereType(); var firstDone = false; var secondDone = false; filtered