Skip to content

Commit 95d01bd

Browse files
committed
Add updatesSync stream
1 parent 3a30096 commit 95d01bd

File tree

4 files changed

+55
-12
lines changed

4 files changed

+55
-12
lines changed

sqlite3/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 2.9.0
2+
3+
- Add `CommonDatabase.updatesSync`, a synchronous variant of the updates stream.
4+
15
## 2.8.0
26

37
- Support creating changesets and patchsets via the session extension.

sqlite3/lib/src/database.dart

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,21 @@ abstract class CommonDatabase {
5151
///
5252
/// See also:
5353
/// - [Data Change Notification Callbacks](https://www.sqlite.org/c3ref/update_hook.html)
54+
/// - [updatesSync], a synchronous stream.
5455
Stream<SqliteUpdate> get updates;
5556

57+
/// A _synchronous_ stream of data changes happening on this database.
58+
///
59+
/// This stream behaves similarly to [updates], except that listeners are
60+
/// invoked synchronously (before the update completes).
61+
///
62+
/// The purpose of this stream is to avoid a large internal buffer when a
63+
/// transaction updates a large amount of rows - instead, the updates can be
64+
/// handled one-by-one with this.
65+
///
66+
/// It is crucial that listeners on this stream don't modify the database.
67+
Stream<SqliteUpdate> get updatesSync;
68+
5669
/// The [VoidPredicate] that is used to filter out transactions before commiting.
5770
///
5871
/// This is run before every commit, i.e. before the end of an explicit

sqlite3/lib/src/implementation/database.dart

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,9 @@ base class DatabaseImplementation implements CommonDatabase {
465465
@override
466466
Stream<SqliteUpdate> get updates => _updatesHandler().stream;
467467

468+
@override
469+
Stream<SqliteUpdate> get updatesSync => _updatesHandler().syncStream;
470+
468471
@override
469472
Stream<void> get rollbacks => _rollbackHandler().stream;
470473

@@ -589,7 +592,8 @@ final class DatabaseConfigImplementation extends DatabaseConfig {
589592
/// commits into rollbacks. This is represented by [_syncCallback].
590593
final class _StreamHandlers<T, SyncCallback> {
591594
final DatabaseImplementation _database;
592-
final List<MultiStreamController<T>> _asyncListeners = [];
595+
final List<({MultiStreamController<T> controller, bool sync})>
596+
_asyncListeners = [];
593597
SyncCallback? _syncCallback;
594598

595599
/// Registers a native callback on the database.
@@ -599,29 +603,33 @@ final class _StreamHandlers<T, SyncCallback> {
599603
final void Function() _unregister;
600604

601605
Stream<T>? _stream;
606+
Stream<T>? _syncStream;
602607

603-
Stream<T> get stream => _stream!;
608+
Stream<T> get stream => _stream ??= _generateStream(false);
609+
Stream<T> get syncStream => _syncStream ??= _generateStream(true);
604610

605611
_StreamHandlers({
606612
required DatabaseImplementation database,
607613
required void Function() register,
608614
required void Function() unregister,
609615
}) : _database = database,
610616
_register = register,
611-
_unregister = unregister {
612-
_stream = Stream.multi(
617+
_unregister = unregister;
618+
619+
Stream<T> _generateStream(bool dispatchSynchronously) {
620+
return Stream.multi(
613621
(newListener) {
614622
if (_database._isClosed) {
615623
newListener.close();
616624
return;
617625
}
618626

619627
void addListener() {
620-
_addAsyncListener(newListener);
628+
_addAsyncListener(newListener, dispatchSynchronously);
621629
}
622630

623631
void removeListener() {
624-
_removeAsyncListener(newListener);
632+
_removeAsyncListener(newListener, dispatchSynchronously);
625633
}
626634

627635
newListener
@@ -653,17 +661,17 @@ final class _StreamHandlers<T, SyncCallback> {
653661
}
654662
}
655663

656-
void _addAsyncListener(MultiStreamController<T> listener) {
664+
void _addAsyncListener(MultiStreamController<T> listener, bool sync) {
657665
final isFirstListener = !hasListener;
658-
_asyncListeners.add(listener);
666+
_asyncListeners.add((controller: listener, sync: sync));
659667

660668
if (isFirstListener) {
661669
_register();
662670
}
663671
}
664672

665-
void _removeAsyncListener(MultiStreamController<T> listener) {
666-
_asyncListeners.remove(listener);
673+
void _removeAsyncListener(MultiStreamController<T> listener, bool sync) {
674+
_asyncListeners.remove((controller: listener, sync: sync));
667675

668676
if (!hasListener && !_database._isClosed) {
669677
_unregister();
@@ -672,13 +680,17 @@ final class _StreamHandlers<T, SyncCallback> {
672680

673681
void deliverAsyncEvent(T event) {
674682
for (final listener in _asyncListeners) {
675-
listener.add(event);
683+
if (listener.sync) {
684+
listener.controller.addSync(event);
685+
} else {
686+
listener.controller.add(event);
687+
}
676688
}
677689
}
678690

679691
void close() {
680692
for (final listener in _asyncListeners) {
681-
listener.close();
693+
listener.controller.close();
682694
}
683695
_syncCallback = null;
684696
}

sqlite3/test/common/database.dart

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -747,6 +747,20 @@ void testDatabase(
747747
expect(database.updates.listen(null).asFuture(null), completes);
748748
database.dispose();
749749
});
750+
751+
test('can listen synchronously', () async {
752+
var notifications = 0;
753+
var asyncNotifications = 0;
754+
755+
database.updatesSync.listen((_) => notifications++);
756+
database.updates.listen((_) => asyncNotifications++);
757+
758+
database.execute('INSERT INTO tbl DEFAULT VALUES');
759+
expect(notifications, 1);
760+
expect(asyncNotifications, 0);
761+
await pumpEventQueue();
762+
expect(asyncNotifications, 1);
763+
});
750764
});
751765

752766
group('rollback stream', () {

0 commit comments

Comments
 (0)