Skip to content
This repository was archived by the owner on Feb 10, 2025. It is now read-only.

Rewrite transformers as extension methods #83

Merged
merged 11 commits into from
Oct 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ branches:
only: [master]
dart:
- dev
- 2.2.0
# 2.2.0
cache:
directories:
- $HOME/.pub-cache
Expand Down
22 changes: 22 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,25 @@
## 0.0.20-dev

- Add extension methods for most transformers. These should be used in place
of the current methods. All current implementations are deprecated and will
be removed in the next major version bump.
- Migrating typical use: Instead of
`stream.transform(debounce(Duration(seconds: 1)))` use
`stream.debounce(Duration(seconds: 1))`.
- To migrate a usage where a `StreamTransformer` instance is stored or
passed see "Getting a StreamTransformer instance" on the README.
- The `map` and `chainTransformers` utilities are no longer useful with the
new patterns so they are deprecated without a replacement. If you still have
a need for them they can be replicated with `StreamTransformer.fromBind`:

```
// Replace `map(convert)`
StreamTransformer.fromBind((s) => s.map(convert));

// Replace `chainTransformers(first, second)`
StreamTransformer.fromBind((s) => s.transform(first).transform(second));
```

## 0.0.19

- Add `asyncMapSample` transform.
Expand Down
58 changes: 37 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,84 +1,100 @@
Utility methods to create `StreamTransfomer` instances to manipulate Streams.
Extension methods on `Stream` adding common transform operators.

# asyncMapBuffer
# Operators

## asyncMapBuffer

Like `asyncMap` but events are buffered in a List until previous events have
been processed rather than being called for each element individually.

# asyncMapSample
## asyncMapSample

Like `asyncMap` but events are discarded, keeping only the latest, until
previous events have been processed rather than being called for every element.

# asyncWhere
## asyncWhere

Like `where` but allows an asynchronous predicate.

# audit
## audit

Audit waits for a period of time after receiving a value and then only emits
the most recent value.
Waits for a period of time after receiving a value and then only emits the most
recent value.

# buffer
## buffer

Collects values from a source stream until a `trigger` stream fires and the
collected values are emitted.

# combineLatest
## combineLatest

Combine the most recent event from two streams through a callback and emit the
result.

# combineLatestAll
## combineLatestAll

Combines the latest events emitted from multiple source streams and yields a
list of the values.

# debounce, debounceBuffer
## debounce, debounceBuffer

Prevents a source stream from emitting too frequently by dropping or collecting
values that occur within a given duration.

# concurrentAsyncMap
## concurrentAsyncMap

Like `asyncMap` but the convert callback can be called with subsequent values
before it has finished for previous values.

# followedBy
## followedBy

Appends the values of a stream after another stream finishes.

# merge, mergeAll
## merge, mergeAll

Interleaves events from multiple streams into a single stream.

# scan
## scan

Scan is like fold, but instead of producing a single value it yields each
intermediate accumulation.

# startWith, startWithMany, startWithStream
## startWith, startWithMany, startWithStream

Prepend a value, an iterable, or a stream to the beginning of another stream.

# switchMap, switchLatest
## switchMap, switchLatest

Flatten a Stream of Streams into a Stream which forwards values from the most
recent Stream

# takeUntil
## takeUntil

Let values through until a Future fires.

# tap
## tap

Taps into a single-subscriber stream to react to values as they pass, without
being a real subscriber.

# throttle
## throttle

Blocks events for a duration after an event is successfully emitted.

# whereType
## whereType

Like `Iterable.whereType` for a stream.

# Getting a `StreamTransformer` instance

It may be useful to pass an instance of `StreamTransformer` so that it can be
used with `stream.transform` calls rather than reference the specific operator
in place. Any operator on `Stream` that returns a `Stream` can be modeled as a
`StreamTransformer` using the [`fromBind` constructor][fromBind].

```dart
final debounce = StreamTransformer.fromBind(
(s) => s.debounce(const Duration(milliseconds: 100)));
```

[fromBind]: https://api.dart.dev/stable/dart-async/StreamTransformer/StreamTransformer.fromBind.html
58 changes: 58 additions & 0 deletions lib/src/async_map_buffer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,62 @@ import 'buffer.dart';
import 'chain_transformers.dart';
import 'from_handlers.dart';

extension AsyncMap<T> on Stream<T> {
/// Like [asyncMap] but events are buffered until previous events have been
/// processed by [convert].
///
/// If the source stream is a broadcast stream the result will be as well. When
/// used with a broadcast stream behavior also differs from [Stream.asyncMap] in
/// that the [convert] function is only called once per event, rather than once
/// per listener per event.
///
/// The first event from the source stream is always passed to [convert] as a
/// List with a single element. After that events are buffered until the
/// previous Future returned from [convert] has fired.
///
/// Errors from the source stream are forwarded directly to the result stream.
/// Errors during the conversion are also forwarded to the result stream and
/// are considered completing work so the next values are let through.
///
/// The result stream will not close until the source stream closes and all
/// pending conversions have finished.
Stream<S> asyncMapBuffer<S>(Future<S> Function(List<T>) convert) {
var workFinished = StreamController<void>()
// Let the first event through.
..add(null);
return this
.buffer(workFinished.stream)
.transform(_asyncMapThen(convert, workFinished.add));
}

/// Like [asyncMap] but events are discarded while work is happening in
/// [convert].
///
/// If the source stream is a broadcast stream the result will be as well. When
/// used with a broadcast stream behavior also differs from [Stream.asyncMap] in
/// that the [convert] function is only called once per event, rather than once
/// per listener per event.
///
/// If no work is happening when an event is emitted it will be immediately
/// passed to [convert]. If there is ongoing work when an event is emitted it
/// will be held until the work is finished. New events emitted will replace a
/// pending event.
///
/// Errors from the source stream are forwarded directly to the result stream.
/// Errors during the conversion are also forwarded to the result stream and are
/// considered completing work so the next values are let through.
///
/// The result stream will not close until the source stream closes and all
/// pending conversions have finished.
Stream<S> asyncMapSample<S>(Future<S> Function(T) convert) {
var workFinished = StreamController<void>()
// Let the first event through.
..add(null);
return transform(AggregateSample(workFinished.stream, _dropPrevious))
.transform(_asyncMapThen(convert, workFinished.add));
}
}

/// Like [Stream.asyncMap] but events are buffered until previous events have
/// been processed by [convert].
///
Expand All @@ -27,6 +83,7 @@ import 'from_handlers.dart';
///
/// The result stream will not close until the source stream closes and all
/// pending conversions have finished.
@Deprecated('Use the extension instead')
StreamTransformer<S, T> asyncMapBuffer<S, T>(
Future<T> Function(List<S>) convert) {
var workFinished = StreamController<void>()
Expand Down Expand Up @@ -55,6 +112,7 @@ StreamTransformer<S, T> asyncMapBuffer<S, T>(
///
/// The result stream will not close until the source stream closes and all
/// pending conversions have finished.
@Deprecated('Use the extension instead')
StreamTransformer<S, T> asyncMapSample<S, T>(Future<T> Function(S) convert) {
var workFinished = StreamController<void>()
// Let the first event through.
Expand Down
38 changes: 38 additions & 0 deletions lib/src/async_where.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,43 @@ import 'dart:async';

import 'from_handlers.dart';

extension AsyncWhere<T> on Stream<T> {
/// Like [where] but allows the [test] to return a [Future].
///
/// Events on the result stream will be emitted in the order that [test]
/// completes which may not match the order of the original stream.
///
/// If the source stream is a broadcast stream the result will be as well. When
/// used with a broadcast stream behavior also differs from [Stream.where] in
/// that the [test] function is only called once per event, rather than once
/// per listener per event.
///
/// Errors from the source stream are forwarded directly to the result stream.
/// Errors from [test] are also forwarded to the result stream.
///
/// The result stream will not close until the source stream closes and all
/// pending [test] calls have finished.
Stream<T> asyncWhere(FutureOr<bool> test(T element)) {
var valuesWaiting = 0;
var sourceDone = false;
return transform(fromHandlers(handleData: (element, sink) {
valuesWaiting++;
() async {
try {
if (await test(element)) sink.add(element);
} catch (e, st) {
sink.addError(e, st);
}
valuesWaiting--;
if (valuesWaiting <= 0 && sourceDone) sink.close();
}();
}, handleDone: (sink) {
sourceDone = true;
if (valuesWaiting <= 0) sink.close();
}));
}
}

/// Like [Stream.where] but allows the [test] to return a [Future].
///
/// Events on the result stream will be emitted in the order that [test]
Expand All @@ -20,6 +57,7 @@ import 'from_handlers.dart';
///
/// The result stream will not close until the source stream closes and all
/// pending [test] calls have finished.
@Deprecated('Use the extension instead')
StreamTransformer<T, T> asyncWhere<T>(FutureOr<bool> test(T element)) {
var valuesWaiting = 0;
var sourceDone = false;
Expand Down
53 changes: 53 additions & 0 deletions lib/src/audit.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,58 @@ import 'dart:async';

import 'from_handlers.dart';

extension Audit<T> on Stream<T> {
/// Returns a Stream which only emits once per [duration], at the end of the
/// period.
///
/// If the source stream is a broadcast stream, the result will be as well.
/// Errors are forwarded immediately.
///
/// If there is no pending event when the source stream closes the output
/// stream will close immediately. If there is a pending event the output
/// stream will wait to emit it before closing.
///
/// Differs from `throttle` in that it always emits the most recently received
/// event rather than the first in the period. The events that are emitted are
/// always delayed by some amount. If the event that started the period is the
/// one that is emitted it will be delayed by [duration]. If a later event
/// comes in within the period it's delay will be shorter by the difference in
/// arrival times.
///
/// Differs from `debounce` in that a value will always be emitted after
/// [duration], the output will not be starved by values coming in repeatedly
/// within [duration].
///
/// For example:
///
/// source.audit(Duration(seconds: 5));
///
/// source: a------b--c----d--|
/// output: -----a------c--------d|
Stream<T> audit(Duration duration) {
Timer timer;
var shouldClose = false;
T recentData;

return transform(fromHandlers(handleData: (T data, EventSink<T> sink) {
recentData = data;
timer ??= Timer(duration, () {
sink.add(recentData);
timer = null;
if (shouldClose) {
sink.close();
}
});
}, handleDone: (EventSink<T> sink) {
if (timer != null) {
shouldClose = true;
} else {
sink.close();
}
}));
}
}

/// Creates a StreamTransformer which only emits once per [duration], at the
/// end of the period.
///
Expand All @@ -16,6 +68,7 @@ import 'from_handlers.dart';
/// Differs from `debounce` in that a value will always be emitted after
/// [duration], the output will not be starved by values coming in repeatedly
/// within [duration].
@Deprecated('Use the extension instead')
StreamTransformer<T, T> audit<T>(Duration duration) {
Timer timer;
var shouldClose = false;
Expand Down
16 changes: 16 additions & 0 deletions lib/src/buffer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,21 @@ import 'dart:async';

import 'aggregate_sample.dart';

extension Buffer<T> on Stream<T> {
/// Returns a Stream which collects values and emits when it sees a value on
/// [trigger].
///
/// If there are no pending values when [trigger] emits, the next value on the
/// source Stream will immediately flow through. Otherwise, the pending values
/// are released when [trigger] emits.
///
/// If the source stream is a broadcast stream, the result will be as well.
/// Errors from the source stream or the trigger are immediately forwarded to
/// the output.
Stream<List<T>> buffer(Stream<void> trigger) =>
transform(AggregateSample<T, List<T>>(trigger, _collect));
}

/// Creates a [StreamTransformer] which collects values and emits when it sees a
/// value on [trigger].
///
Expand All @@ -15,6 +30,7 @@ import 'aggregate_sample.dart';
///
/// Errors from the source stream or the trigger are immediately forwarded to
/// the output.
@Deprecated('Use the extension instead')
StreamTransformer<T, List<T>> buffer<T>(Stream<void> trigger) =>
AggregateSample<T, List<T>>(trigger, _collect);

Expand Down
1 change: 1 addition & 0 deletions lib/src/chain_transformers.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import 'dart:async';
/// /// values.transform(utf8.decoder).transform(const LineSplitter())
/// final splitDecoded = chainTransformers(utf8.decoder, const LineSplitter());
/// ```
@Deprecated('This utility should not be needed with extension methods')
StreamTransformer<S, T> chainTransformers<S, I, T>(
StreamTransformer<S, I> first, StreamTransformer<I, T> second) =>
StreamTransformer.fromBind(
Expand Down
Loading