diff --git a/lib/src/observable.dart b/lib/src/observable.dart index fcdf4bd05..8768e0c59 100644 --- a/lib/src/observable.dart +++ b/lib/src/observable.dart @@ -23,12 +23,12 @@ class _Observable { static void _anyCallback( Pointer user_data, Pointer mutated_ids, int mutated_count) { final storeAddress = user_data.address; - for (var i = 0; i < mutated_count; i++) { - // call schema's callback - if (_any.containsKey(storeAddress) && - _any[storeAddress] /*!*/ .containsKey(mutated_ids[i])) { - _any[storeAddress] /*!*/ - [mutated_ids[i]] /*!*/ (user_data, mutated_ids, mutated_count); + // call schema's callback + final storeCallbacks = _any[storeAddress]; + if (storeCallbacks != null) { + for (var i = 0; i < mutated_count; i++) { + storeCallbacks[mutated_ids[i]] + ?.call(user_data, mutated_ids, mutated_count); } } } @@ -71,19 +71,22 @@ extension Streamable on Query { _Observable._any[storeAddress] ??= {}; _Observable._any[storeAddress] /*!*/ [entityId] ??= (u, _, __) { // dummy value to trigger an event - _Observable.controller.add(u.address); + _Observable.controller.add(entityId); }; } Stream> findStream({int offset = 0, int limit = 0}) { _setup(); return _Observable.controller.stream + .where((e) => e == entityId) .map((_) => find(offset: offset, limit: limit)); } /// Use this for Query Property Stream> get stream { _setup(); - return _Observable.controller.stream.map((_) => this); + return _Observable.controller.stream + .where((e) => e == entityId) + .map((_) => this); } } diff --git a/test/stream_test.dart b/test/stream_test.dart index 1e3ed86e5..34decdcaf 100644 --- a/test/stream_test.dart +++ b/test/stream_test.dart @@ -4,6 +4,7 @@ import 'package:objectbox/observable.dart'; import 'package:test/test.dart'; import 'entity.dart'; +import 'entity2.dart'; import 'objectbox.g.dart'; import 'test_env.dart'; @@ -73,6 +74,65 @@ void main() { await subscription.cancel(); }); + test( + 'Only observers of a single entity are notified, no cross-entity observer notification', + () async { + // setup listeners + final box2 = Box(env.store); + + var counter1 = 0, counter2 = 0; + + final query2 = box2.query().build(); + final queryStream2 = query2.findStream(); + final subscription2 = queryStream2.listen((_) { + counter2++; + }); + + final query1 = box.query().build(); + final queryStream1 = query1.findStream(); + final subscription1 = queryStream1.listen((_) { + counter1++; + }); + + // counter2 test #.1 + final t2 = TestEntity2(); + box2.put(t2); + + await Future.delayed(Duration(seconds: 0)); + expect(counter1, 0); + expect(counter2, 1); + + // counter1 test #.1 + final t1 = TestEntity(); + box.put(t1); + + await Future.delayed(Duration(seconds: 0)); + expect(counter1, 1); + expect(counter2, 1); + + // counter1 many test #.2 + final ts1 = [1, 2, 3].map((i) => TestEntity(tInt: i)).toList(); + box.putMany(ts1); + + await Future.delayed(Duration(seconds: 0)); + expect(counter1, 2); + expect(counter2, 1); + + // counter2 many test #.2 + final ts2 = [1, 2, 3].map((i) => TestEntity2()).toList(); + box2.putMany(ts2); + + await Future.delayed(Duration(seconds: 0)); + expect(counter1, 2); + expect(counter2, 2); + + query1.close(); + query2.close(); + + await subscription1.cancel(); + await subscription2.cancel(); + }); + tearDown(() { env.close(); });