diff --git a/benchmark/bin/native_pointers.dart b/benchmark/bin/native_pointers.dart index 0cacaab57..19c4aa24e 100644 --- a/benchmark/bin/native_pointers.dart +++ b/benchmark/bin/native_pointers.dart @@ -44,7 +44,10 @@ class AsTypedList extends Benchmark { void setup() => nativePtr = malloc(length); @override - void teardown() => malloc.free(nativePtr); + void teardown() { + malloc.free(nativePtr); + super.teardown(); + } } class AsTypedListUint64 extends Benchmark { @@ -65,5 +68,8 @@ class AsTypedListUint64 extends Benchmark { void setup() => nativePtr = malloc(length); @override - void teardown() => malloc.free(nativePtr); + void teardown() { + malloc.free(nativePtr); + super.teardown(); + } } diff --git a/benchmark/bin/query.dart b/benchmark/bin/query.dart new file mode 100644 index 000000000..ee3405071 --- /dev/null +++ b/benchmark/bin/query.dart @@ -0,0 +1,60 @@ +import 'package:objectbox_benchmark/benchmark.dart'; +import 'package:objectbox_benchmark/model.dart'; +import 'package:objectbox_benchmark/objectbox.g.dart'; + +const count = 10000; + +void main() { + QueryFind().report(); + QueryFindIds().report(); + QueryStream().report(); +} + +class QueryBenchmark extends DbBenchmark { + late final Query query; + + QueryBenchmark(String name) + : super(name, iterations: 1, coefficient: 1 / count); + + @override + void setup() { + box.putMany(prepareTestEntities(count)); + query = box + .query(TestEntity_.tInt + .lessOrEqual((count / 10).floor()) + .or(TestEntity_.tInt.greaterThan(count - (count / 10).floor()))) + .build(); + + if (query.count() != count / 5) { + throw Exception('Unexpected number of query results ' + '${query.count()} vs expected ${count / 5}'); + } + } + + @override + void teardown() { + query.close(); + super.teardown(); + } +} + +class QueryFind extends QueryBenchmark { + QueryFind() : super('${QueryFind}'); + + @override + void run() => query.find(); +} + +class QueryFindIds extends QueryBenchmark { + QueryFindIds() : super('${QueryFindIds}'); + + @override + void run() => query.findIds(); +} + +class QueryStream extends QueryBenchmark { + QueryStream() : super('${QueryStream}'); + + @override + void run() async => await Future.wait([query.stream().toList()]); +} diff --git a/benchmark/bin/write.dart b/benchmark/bin/write.dart index 523190cf7..fd1253aee 100644 --- a/benchmark/bin/write.dart +++ b/benchmark/bin/write.dart @@ -1,14 +1,20 @@ import 'package:objectbox_benchmark/benchmark.dart'; import 'package:objectbox_benchmark/objectbox.g.dart'; +const count = 10000; + void main() { Put().report(); PutInTx().report(); PutMany().report(); + PutAsync().report(); + PutAsync2().report(); + PutAsync3().report(); + PutQueued().report(); } class Put extends DbBenchmark { - static const count = 1000; + static const count = 100; final items = prepareTestEntities(count, assignedIds: true); Put() : super('${Put}', iterations: count); @@ -18,23 +24,16 @@ class Put extends DbBenchmark { } class PutInTx extends DbBenchmark { - static const count = 1000; final items = prepareTestEntities(count, assignedIds: true); - PutInTx() : super('${PutInTx}', iterations: count); + PutInTx() : super('${PutInTx}', iterations: 1, coefficient: 1 / count); @override - void run() { - store.runInTransaction(TxMode.write, () { - for (var i = 0; i < items.length; i++) { - box.put(items[i]); - } - }); - } + void runIteration(int i) => + store.runInTransaction(TxMode.write, () => items.forEach(box.put)); } class PutMany extends DbBenchmark { - static final count = 10000; final items = prepareTestEntities(count, assignedIds: true); PutMany() : super('${PutMany}', iterations: 1, coefficient: 1 / count); @@ -42,3 +41,55 @@ class PutMany extends DbBenchmark { @override void runIteration(int i) => box.putMany(items); } + +class PutAsync extends DbBenchmark { + final items = prepareTestEntities(count, assignedIds: true); + + PutAsync() + : super('${PutAsync}[wait(map())] ', + iterations: 1, coefficient: 1 / count); + + @override + void run() async => await Future.wait(items.map(box.putAsync)); +} + +// This is slightly different (slower) then the [PutAsync] - all futures are +// prepared beforehand, only then it starts to wait for them to complete. +class PutAsync2 extends DbBenchmark { + final items = prepareTestEntities(count, assignedIds: true); + + PutAsync2() + : super('${PutAsync2}[wait(map().toList())] ', + iterations: 1, coefficient: 1 / count); + + @override + void run() async { + final futures = items.map(box.putAsync).toList(growable: false); + await Future.wait(futures); + store.awaitAsyncSubmitted(); + } +} + +class PutAsync3 extends DbBenchmark { + final items = prepareTestEntities(count, assignedIds: true); + + PutAsync3() : super('${PutAsync3}[wait(putAsync(i))]', iterations: count); + + @override + void run() { + items.forEach((item) async => await box.putAsync(item)); + store.awaitAsyncSubmitted(); + } +} + +class PutQueued extends DbBenchmark { + final items = prepareTestEntities(count, assignedIds: true); + + PutQueued() : super('${PutQueued}', iterations: count); + + @override + void run() { + items.forEach(box.putQueued); + store.awaitAsyncSubmitted(); + } +} diff --git a/benchmark/lib/benchmark.dart b/benchmark/lib/benchmark.dart index ebccd6c6c..378798198 100644 --- a/benchmark/lib/benchmark.dart +++ b/benchmark/lib/benchmark.dart @@ -1,30 +1,78 @@ +import 'dart:cli'; import 'dart:io'; -import 'package:benchmark_harness/benchmark_harness.dart'; +import 'package:meta/meta.dart'; import 'package:objectbox/objectbox.dart'; import 'model.dart'; import 'objectbox.g.dart'; -class Benchmark extends BenchmarkBase { +class Benchmark { + final String name; final int iterations; + final double coefficient; + final watch = Stopwatch(); + final Emitter emitter; + + Benchmark(this.name, {this.iterations = 1, this.coefficient = 1}) + : emitter = Emitter(iterations, coefficient) { + print('-------------------------------------------------------------'); + print('$name(iterations): ${Emitter._format(iterations.toDouble())}'); + print( + '$name(count): ${Emitter._format(iterations / coefficient)}'); + // Measure the total time of the test - if it's too high, you should + // decrease the number of iterations. Expected time is between 2 and 3 sec. + watch.start(); + } - Benchmark(String name, {int iterations = 1, double coefficient = 1}) - : iterations = iterations, - super(name, emitter: Emitter(iterations, coefficient)); + // Not measured setup code executed prior to the benchmark runs. + void setup() {} - @override - void exercise() => run(); + @mustCallSuper + void teardown() { + final color = watch.elapsedMilliseconds > 3000 ? '\x1B[31m' : ''; + print('$name(total time taken): $color${watch.elapsed.toString()}\x1B[0m'); + } - @override - void run() { + void run() async { for (var i = 0; i < iterations; i++) runIteration(i); } - void runIteration(int iteration) {} + void runIteration(int iteration) async {} + + // Runs [f] for at least [minimumMillis] milliseconds. + static Future _measureFor(Function f, int minimumMillis) async { + final minimumMicros = minimumMillis * 1000; + var iter = 0; + final watch = Stopwatch()..start(); + var elapsed = 0; + while (elapsed < minimumMicros) { + await f(); + elapsed = watch.elapsedMicroseconds; + iter++; + } + return elapsed / iter; + } + + // Measures the score for the benchmark and returns it. + @nonVirtual + Future _measure() async { + setup(); + // Warmup for at least 100ms. Discard result. + await _measureFor(run, 100); + // Run the benchmark for at least 2000ms. + var result = await _measureFor(run, 2000); + teardown(); + return result; + } + + @nonVirtual + void report() { + emitter.emit(name, waitFor(_measure())); + } } -class Emitter implements ScoreEmitter { +class Emitter { static const usInSec = 1000000; final int iterations; @@ -32,20 +80,19 @@ class Emitter implements ScoreEmitter { const Emitter(this.iterations, this.coefficient); - @override void emit(String testName, double value) { final timePerIter = value / iterations; final timePerUnit = timePerIter * coefficient; - print('$testName(Single iteration): ${format(timePerIter)} us'); - print('$testName(Runtime per unit): ${format(timePerUnit)} us'); - print('$testName(Runs per second): ${format(usInSec / timePerIter)}'); - print('$testName(Units per second): ${format(usInSec / timePerUnit)}'); + print('$testName(Single iteration): ${_format(timePerIter)} us'); + print('$testName(Runtime per unit): ${_format(timePerUnit)} us'); + print('$testName(Runs per second): ${_format(usInSec / timePerIter)}'); + print('$testName(Units per second): ${_format(usInSec / timePerUnit)}'); } // Simple number formatting, maybe use a lib? // * the smaller the number, the more decimal places it has (one up to four). // * large numbers use thousands separator (defaults to non-breaking space). - String format(double num, [String thousandsSeparator = ' ']) { + static String _format(double num, [String thousandsSeparator = ' ']) { final decimalPoints = num < 1 ? 4 : num < 10 @@ -72,18 +119,24 @@ class Emitter implements ScoreEmitter { class DbBenchmark extends Benchmark { static final String dbDir = 'benchmark-db'; - final Store store; + late final Store store; late final Box box; DbBenchmark(String name, {int iterations = 1, double coefficient = 1}) - : store = Store(getObjectBoxModel(), directory: dbDir), - super(name, iterations: iterations, coefficient: coefficient) { + : super(name, iterations: iterations, coefficient: coefficient) { + deleteDbDir(); + store = Store(getObjectBoxModel(), directory: dbDir); box = Box(store); } @override void teardown() { store.close(); + deleteDbDir(); + super.teardown(); + } + + void deleteDbDir() { final dir = Directory(dbDir); if (dir.existsSync()) dir.deleteSync(recursive: true); } diff --git a/benchmark/pubspec.yaml b/benchmark/pubspec.yaml index 4cf71927b..50198fea5 100644 --- a/benchmark/pubspec.yaml +++ b/benchmark/pubspec.yaml @@ -7,7 +7,7 @@ environment: dependencies: objectbox: any - benchmark_harness: any + meta: ^1.3.0 dev_dependencies: objectbox_generator: any diff --git a/objectbox/CHANGELOG.md b/objectbox/CHANGELOG.md index 913c1420d..7eacb5527 100644 --- a/objectbox/CHANGELOG.md +++ b/objectbox/CHANGELOG.md @@ -1,15 +1,16 @@ ## latest -This is a 1.0 release candidate - we encourage everyone to try it out and provide any last-minute feedback, -especially to new/changed APIs. +This is a 1.0 release candidate - please try it out and give us any last-minute feedback, especially to new and changed APIs. -* Query now supports auto-closing. You can still call `close()` manually if you want to free native resources sooner +* New Box `putAsync()` returning a `Future` and `putQueued()` for asynchronous writes. +* Query now supports auto-closing. You can still call `close()` manually if you want to free native resources sooner than they would be by Dart's garbage collector, but it's not mandatory anymore. * Change the "meta-model" fields to provide completely type-safe query building. Conditions you specify are now checked at compile time to match the queried entity. * Make property queries fully typed, `PropertyQuery.find()` now returns the appropriate `List<...>` type without casts. +* Query conditions `inside()` renamed to `oneOf()`, `notIn()` and `notInList()` renamed to `notOneOf()`. * Query `stream` and `findStream()` are replaced by `QueryBuilder.watch()`, i.e. `box.query(...).watch()`. -* Query conditions `inside()` renamed to `oneOf()`, `notIn()` and `notInList()` renamed to `notOneOf()`. +* New Query `stream()` to stream objects all the while the query is executed in the background. * Store `subscribe()` renamed to `watch()`. * Store `subscribeAll()` replaced by a shared broadcast stream `entityChanges`. * Entities can now contain `final` fields and they're properly stored/loaded (must be constructor params). diff --git a/objectbox/example/README.md b/objectbox/example/README.md index 7e592b2cd..e70d04dcc 100644 --- a/objectbox/example/README.md +++ b/objectbox/example/README.md @@ -197,28 +197,34 @@ dateQuery.close(); query.close(); ``` -### Query streams +### Reactive queries -Streams can be created from queries. -Note: Dart Streams can be extended with [rxdart](https://github.com/ReactiveX/rxdart). +You can create a reactive query to get notified any time queried entity types change. ```dart -final sub1 = box.query(condition).watch().listen((query) { +Stream> watchedQuery = box.query(condition).watch(); +final sub1 = watchedQuery.listen((Query query) { + // This gets triggered any there are changes to the queried entity types. + // You can call any query method here, for example: print(query.count()); -}); - -// box.put() creates some data ... - -sub1.cancel(); - -final sub2 = box.query(condition).watch().listen((query) { print(query.find()); }); +... +sub1.cancel(); // cancel the subscription after you're done +``` -// clean up -sub2.cancel(); +Similarly to the previous example but with an initial event immediately after you start listening: +```dart +Stream> watchedQuery = box.query(condition).watch(); +final sub1 = watchedQuery.listen((Query query) { + // This gets triggered once right away and then after queried entity types changes. +}); +... +sub1.cancel(); // cancel the subscription after you're done ``` +> Note: Dart Streams can be extended with [rxdart](https://github.com/ReactiveX/rxdart). + Relations --------- diff --git a/objectbox/lib/src/native/box.dart b/objectbox/lib/src/native/box.dart index 306564280..38d864df8 100644 --- a/objectbox/lib/src/native/box.dart +++ b/objectbox/lib/src/native/box.dart @@ -1,8 +1,11 @@ +import 'dart:async'; import 'dart:ffi'; +import 'dart:isolate'; import 'package:ffi/ffi.dart'; import 'package:meta/meta.dart'; +import '../common.dart'; import '../modelinfo/index.dart'; import '../relations/info.dart'; import '../relations/to_many.dart'; @@ -43,6 +46,7 @@ class Box { final bool _hasToOneRelations; final bool _hasToManyRelations; final _builder = BuilderWithCBuffer(); + _AsyncBoxHelper? _async; /// Create a box for an Entity. factory Box(Store store) => store.box(); @@ -58,20 +62,6 @@ class Box { bool get _hasRelations => _hasToOneRelations || _hasToManyRelations; - static int _getOBXPutMode(PutMode mode) { - // TODO microbenchmark if this is fast or we should just return mode.index+1 - switch (mode) { - case PutMode.put: - return OBXPutMode.PUT; - case PutMode.insert: - return OBXPutMode.INSERT; - case PutMode.update: - return OBXPutMode.UPDATE; - default: - throw ArgumentError.value(mode, 'mode'); - } - } - /// Puts the given Object in the box (aka persisting it). /// /// If this is a new object (its ID property is 0), a new ID will be assigned @@ -95,6 +85,81 @@ class Box { } } + /// Puts the given object in the box (persisting it) asynchronously. + /// + /// The returned future completes with an ID of the object. If it is a new + /// object (its ID property is 0), a new ID will be assigned to the object + /// argument, after the returned [Future] completes. + /// + /// In extreme scenarios (e.g. having hundreds of thousands async operations + /// per second), this may fail as internal queues fill up if the disk can't + /// keep up. However, this should not be a concern for typical apps. + /// The returned future may also complete with an error if the put failed + /// for another reason, for example a unique constraint violation. In that + /// case the [object]'s id field remains unchanged (0 if it was a new object). + /// + /// See also [putQueued] which doesn't return a [Future] but a pre-allocated + /// ID immediately, even though the actual database put operation may fail. + Future putAsync(T object, {PutMode mode = PutMode.put}) async => + // Wrap with [Future.sync] to avoid mixing sync and async errors. + // Note: doesn't seem to decrease performance at all. + // https://dart.dev/guides/libraries/futures-error-handling#potential-problem-accidentally-mixing-synchronous-and-asynchronous-errors + Future.sync(() async { + if (_hasRelations) { + throw UnsupportedError( + 'putAsync() is currently not supported on entity ' + '${T.toString()} because it has relations.'); + } + _async ??= _AsyncBoxHelper(this); + + // Note: we can use the shared flatbuffer object, because: + // https://dart.dev/codelabs/async-await#execution-flow-with-async-and-await + // > An async function runs synchronously until the first await keyword. + // > This means that within an async function body, all synchronous code + // > before the first await keyword executes immediately. + _builder.fbb.reset(); + var id = _entity.objectToFB(object, _builder.fbb); + final newId = _async!.put(id, _builder, mode); + _builder.resetIfLarge(); // reset before `await` + if (id == 0) { + // Note: if the newId future completes with an error, ID isn't set. + _entity.setId(object, await newId); + } + return newId; + }); + + /// Schedules the given object to be put later on, by an asynchronous queue. + /// + /// The actual database put operation may fail even if this function returned + /// normally (and even if it returned a new ID for a new object). For example + /// if the database put failed because of a unique constraint violation. + /// Therefore, you should make sure the data you put is correct and you have + /// a fall back in place even if it eventually failed. + /// + /// In extreme scenarios (e.g. having hundreds of thousands async operations + /// per second), this may fail as internal queues fill up if the disk can't + /// keep up. However, this should not be a concern for typical apps. + /// + /// See also [putAsync] which returns a [Future] that only completes after an + /// actual database put was successful. + /// Use [Store.awaitAsyncCompletion] and [Store.awaitAsyncSubmitted] to wait + /// until all operations have finished. + int putQueued(T object, {PutMode mode = PutMode.put}) { + if (_hasRelations) { + throw UnsupportedError('putQueued() is currently not supported on entity ' + '${T.toString()} because it has relations.'); + } + _async ??= _AsyncBoxHelper(this); + + _builder.fbb.reset(); + var id = _entity.objectToFB(object, _builder.fbb); + final newId = C.async_put_object4(_async!._cAsync, _builder.bufPtr, + _builder.fbb.size, _getOBXPutMode(mode)); + id = _handlePutObjectResult(object, id, newId); + _builder.resetIfLarge(); + return newId; + } + int _put(T object, PutMode mode, Transaction? tx) { if (_hasRelations) { if (tx == null) { @@ -315,6 +380,62 @@ class Box { } } +int _getOBXPutMode(PutMode mode) { +// TODO microbenchmark if this is fast or we should just return mode.index+1 + switch (mode) { + case PutMode.put: + return OBXPutMode.PUT; + case PutMode.insert: + return OBXPutMode.INSERT; + case PutMode.update: + return OBXPutMode.UPDATE; + default: + throw ArgumentError.value(mode, 'mode'); + } +} + +class _AsyncBoxHelper { + final Pointer _cAsync; + + _AsyncBoxHelper(Box box) : _cAsync = C.async_1(box._cBox) { + initializeDartAPI(); + } + + Future put(int id, BuilderWithCBuffer fbb, PutMode mode) async { + final port = ReceivePort(); + final newId = C.dartc_async_put_object(_cAsync, port.sendPort.nativePort, + fbb.bufPtr, fbb.fbb.size, _getOBXPutMode(mode)); + + final completer = Completer(); + + // Zero is returned to indicate an immediate error, object won't be stored. + if (newId == 0) { + port.close(); + try { + throwLatestNativeError(context: 'putAsync failed'); + } catch (e) { + completer.completeError(e); + } + } + + port.listen((dynamic message) { + // Null is sent if the put was successful (there is no error, thus NULL) + if (message == null) { + completer.complete(newId); + } else if (message is String) { + completer.completeError(message.startsWith('Unique constraint') + ? UniqueViolationException(message) + : ObjectBoxException(message)); + } else { + completer.completeError(ObjectBoxException( + 'Unknown message type (${message.runtimeType}: $message')); + } + port.close(); + }); + return completer.future; + } +} + /// Internal only. @internal class InternalBoxAccess { diff --git a/objectbox/lib/src/native/query/query.dart b/objectbox/lib/src/native/query/query.dart index 5ec92a170..f6046843e 100644 --- a/objectbox/lib/src/native/query/query.dart +++ b/objectbox/lib/src/native/query/query.dart @@ -3,10 +3,12 @@ library query; import 'dart:async'; import 'dart:collection'; import 'dart:ffi'; +import 'dart:isolate'; import 'dart:typed_data'; import 'package:ffi/ffi.dart'; +import '../../common.dart'; import '../../modelinfo/entity_definition.dart'; import '../../modelinfo/modelproperty.dart'; import '../../modelinfo/modelrelation.dart'; @@ -733,6 +735,107 @@ class Query { return result; } + /// Finds Objects matching the query, streaming them while the query executes. + /// + /// Note: make sure you evaluate performance in your use case - streams come + /// with an overhead so a plain [find()] is usually faster. + Stream stream() => _stream1(); + + /// Stream items by sending full flatbuffers binary as a message. + Stream _stream1() { + initializeDartAPI(); + final port = ReceivePort(); + final cStream = checkObxPtr( + C.dartc_query_find(_cQuery, port.sendPort.nativePort), 'query stream'); + + var closed = false; + final close = () { + if (closed) return; + closed = true; + C.dartc_stream_close(cStream); + port.close(); + }; + + try { + final controller = StreamController(onCancel: close); + port.listen((dynamic message) { + // We expect Uint8List for data and NULL when the query has finished. + if (message is Uint8List) { + try { + controller.add(_entity.objectFromFB(_store, message)); + return; + } catch (e) { + controller.addError(e); + } + } else if (message is String) { + controller.addError( + ObjectBoxException('Query stream native exception: $message')); + } else if (message != null) { + controller.addError(ObjectBoxException( + 'Query stream received an invalid message type ' + '(${message.runtimeType}): $message')); + } + controller.close(); // done + close(); + }); + return controller.stream; + } catch (e) { + close(); + rethrow; + } + } + + /// Stream items by sending pointers from native code. + /// Interestingly this is slower even though it transfers only pointers... + /// Probably because of the slowness of `asTypedList()`, see native_pointers.dart benchmark + // Stream _stream2() { + // initializeDartAPI(); + // final port = ReceivePort(); + // final cStream = checkObxPtr( + // C.dartc_query_find_ptr(_cQuery, port.sendPort.nativePort), + // 'query stream'); + // + // var closed = false; + // final close = () { + // if (closed) return; + // closed = true; + // C.dartc_stream_close(cStream); + // port.close(); + // }; + // + // try { + // final controller = StreamController(onCancel: close); + // port.listen((dynamic message) { + // // We expect Uint8List for data and NULL when the query has finished. + // if (message is Uint8List) { + // try { + // final int64s = Int64List.view(message.buffer); + // assert(int64s.length == 2); + // final data = + // Pointer.fromAddress(int64s[0]).asTypedList(int64s[1]); + // controller.add(_entity.objectFromFB(_store, data)); + // return; + // } catch (e) { + // controller.addError(e); + // } + // } else if (message is String) { + // controller.addError( + // ObjectBoxException('Query stream native exception: $message')); + // } else if (message != null) { + // controller.addError(ObjectBoxException( + // 'Query stream received an invalid message type ' + // '(${message.runtimeType}): $message')); + // } + // controller.close(); // done + // close(); + // }); + // return controller.stream; + // } catch (e) { + // close(); + // rethrow; + // } + // } + /// For internal testing purposes. String describe() => dartStringFromC(C.query_describe(_ptr)); diff --git a/objectbox/lib/src/native/store.dart b/objectbox/lib/src/native/store.dart index e50779f9e..83130db35 100644 --- a/objectbox/lib/src/native/store.dart +++ b/objectbox/lib/src/native/store.dart @@ -235,6 +235,24 @@ class Store { /// available. Use [Sync.client()] to create one first. SyncClient? syncClient() => syncClientsStorage[this]; + /// Await for all (including future) async submissions to be completed + /// (the async queue becomes idle for a moment). + /// + /// Returns true if all submissions were completed or async processing was + /// not started; false if shutting down (or an internal error occurred). + /// + /// Use to wait until all puts by [Box.putQueued] have finished. + bool awaitAsyncCompletion() => C.store_await_async_submitted(_ptr); + + /// Await for previously submitted async operations to be completed + /// (the async queue does not have to become idle). + /// + /// Returns true if all submissions were completed or async processing was + /// not started; false if shutting down (or an internal error occurred). + /// + /// Use to wait until all puts by [Box.putQueued] have finished. + bool awaitAsyncSubmitted() => C.store_await_async_submitted(_ptr); + /// The low-level pointer to this store. @pragma('vm:prefer-inline') Pointer get _ptr { diff --git a/objectbox/test/box_test.dart b/objectbox/test/box_test.dart index ccf8533cb..e2778b8fa 100644 --- a/objectbox/test/box_test.dart +++ b/objectbox/test/box_test.dart @@ -45,6 +45,110 @@ void main() { expect(object.id, equals(putId)); // ID on the object is updated }); + test('.putAsync', () async { + final box = store.box(); + final a = TestEntityNonRel.filled(id: 0); + final b = TestEntityNonRel.filled(id: 0); + Future aId = box.putAsync(a); + expect(a.id, 0); + Future bId = box.putAsync(b); + expect(b.id, 0); + expect(await aId, 1); + expect(await bId, 2); + expect(a.id, 1); + expect(b.id, 2); + }); + + test('.putAsync failures', () async { + final box = store.box(); + expect( + () => box + .putAsync(TestEntity2(), mode: PutMode.update) + .timeout(defaultTimeout), + throwsA(predicate( + (ArgumentError e) => e.toString().contains('ID is not set')))); + + expect( + await box + .putAsync(TestEntity2(id: 1), mode: PutMode.insert) + .timeout(defaultTimeout), + 1); + expect( + () async => await box + .putAsync(TestEntity2(id: 5), mode: PutMode.update) + .timeout(defaultTimeout), + throwsA(predicate((ObjectBoxException e) => + e.toString().contains('object with the given ID not found')))); + expect(box.count(), 1); + + expect( + () async => await box + .putAsync(TestEntity2(id: 1), mode: PutMode.insert) + .timeout(defaultTimeout), + throwsA(predicate((ObjectBoxException e) => + e.toString().contains('object with the given ID already exists')))); + + { + // check unique constraint violation behavior + await box.putAsync(TestEntity2()..value = 42); + final object = TestEntity2()..value = 42; + final future = box.putAsync(object); + expect( + future, + throwsA(predicate((UniqueViolationException e) => + e.toString().contains('Unique constraint')))); + + try { + // paranoia, should already have waited on the above [expect()] + await future; + } catch (_) {} + + expect(object.id, isNull); // ID must remain unassigned + } + }); + + test('.putAsync many', () async { + final items = List.generate(1000, (i) => TestEntityNonRel.filled(id: 0)); + final futures = items.map(store.box().putAsync).toList(); + print('${futures.length} futures collected'); + final ids = await Future.wait(futures); + print('${ids.length} futures finished'); + for (int i = 0; i < items.length; i++) { + expect(items[i].id, ids[i]); + } + }); + + test('.putQueued', () { + final box = store.box(); + final items = List.generate(1000, (i) => TestEntityNonRel.filled(id: 0)); + final ids = items.map(box.putQueued).toList(); + for (int i = 0; i < items.length; i++) { + expect(items[i].id, ids[i]); + } + store.awaitAsyncSubmitted(); + expect(box.count(), 1000); + }); + + test('.putQueued failures', () async { + expect( + () => store + .box() + .putQueued(TestEntity2(), mode: PutMode.update), + throwsA(predicate( + (ArgumentError e) => e.toString().contains('ID is not set')))); + + expect( + () => store + .box() + .putQueued(TestEntityNonRel.filled(id: 5), mode: PutMode.insert), + throwsA(predicate((ArgumentError e) => + e.toString().contains('Use ID 0 (zero) to insert new entities')))); + + store.awaitAsyncCompletion(); + expect(store.box().count(), 0); + expect(store.box().count(), 0); + }); + test('.get() returns the correct item', () { final int putId = box.put(TestEntity( tString: 'Hello', diff --git a/objectbox/test/entity2.dart b/objectbox/test/entity2.dart index 990b5d2fe..8b2528813 100644 --- a/objectbox/test/entity2.dart +++ b/objectbox/test/entity2.dart @@ -6,6 +6,9 @@ class TestEntity2 { @Id(assignable: true) int? id; + @Unique() + int? value; + TestEntity2({this.id}); } diff --git a/objectbox/test/objectbox-model.json b/objectbox/test/objectbox-model.json index be842fc75..425b23ba0 100644 --- a/objectbox/test/objectbox-model.json +++ b/objectbox/test/objectbox-model.json @@ -201,7 +201,7 @@ }, { "id": "3:3569200127393812728", - "lastPropertyId": "1:2429256362396080523", + "lastPropertyId": "2:1699553834502252174", "name": "TestEntity2", "properties": [ { @@ -209,6 +209,13 @@ "name": "id", "type": 6, "flags": 129 + }, + { + "id": "2:1699553834502252174", + "name": "value", + "type": 6, + "flags": 40, + "indexId": "18:8736709610628523744" } ], "relations": [] @@ -396,7 +403,7 @@ } ], "lastEntityId": "7:6593746178940714095", - "lastIndexId": "17:1880235110135739998", + "lastIndexId": "18:8736709610628523744", "lastRelationId": "1:2155747579134420981", "lastSequenceId": "0:0", "modelVersion": 5, diff --git a/objectbox/test/query_test.dart b/objectbox/test/query_test.dart index 5d4980544..d66d1e3bf 100644 --- a/objectbox/test/query_test.dart +++ b/objectbox/test/query_test.dart @@ -1,3 +1,6 @@ +import 'dart:io'; + +import 'package:collection/collection.dart'; import 'package:test/test.dart'; import 'entity.dart'; @@ -588,4 +591,40 @@ void main() { ' AND tUint8List <= byte[4]{0x090A0B0C})')); q.close(); }); + + test('stream items', () async { + const count = 1000; + final items = List.generate( + count, (i) => TestEntity.filled(id: 0, tByte: i % 30)); + box.putMany(items); + expect(box.count(), count); + + final query = box.query(TestEntity_.tByte.lessThan(10)).build(); + final countMatching = + items.fold(0, (int c, item) => c + (item.tByte! < 10 ? 1 : 0)); + expect(query.count(), countMatching); + + final foundIds = query.findIds(); + final streamed = await query.stream().toList(); + expect(streamed.length, countMatching); + final streamedIds = streamed.map((e) => e.id).toList(growable: false); + + // this is much much slower: expect(streamedIds, sameAsList(foundIds)); + expect(const ListEquality().equals(streamedIds, foundIds), isTrue); + + // Test subscription cancellation doesn't leave non-freed resources. + final streamListenedItems = {}; + + final start = DateTime.now(); + final subscription = query.stream().listen(streamListenedItems.add); + for (int i = 0; i < 10 && streamListenedItems.isEmpty; i++) { + await Future.delayed(Duration(milliseconds: i)); + } + print('Received ${streamListenedItems.length} items in ' + '${DateTime.now().difference(start).inMilliseconds} milliseconds'); + await subscription.cancel(); + expect(streamListenedItems.length, isNonZero); + + query.close(); + }); }