Skip to content

Commit 4faf387

Browse files
authored
Merge pull request #152 from RTrackerDev/main
Query stream issue #151
2 parents c5f820f + 3ff4d5e commit 4faf387

File tree

2 files changed

+71
-8
lines changed

2 files changed

+71
-8
lines changed

lib/src/observable.dart

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ class _Observable {
2323
static void _anyCallback(
2424
Pointer<Void> user_data, Pointer<Uint32> mutated_ids, int mutated_count) {
2525
final storeAddress = user_data.address;
26-
for (var i = 0; i < mutated_count; i++) {
27-
// call schema's callback
28-
if (_any.containsKey(storeAddress) &&
29-
_any[storeAddress] /*!*/ .containsKey(mutated_ids[i])) {
30-
_any[storeAddress] /*!*/
31-
[mutated_ids[i]] /*!*/ (user_data, mutated_ids, mutated_count);
26+
// call schema's callback
27+
final storeCallbacks = _any[storeAddress];
28+
if (storeCallbacks != null) {
29+
for (var i = 0; i < mutated_count; i++) {
30+
storeCallbacks[mutated_ids[i]]
31+
?.call(user_data, mutated_ids, mutated_count);
3232
}
3333
}
3434
}
@@ -71,19 +71,22 @@ extension Streamable<T> on Query<T> {
7171
_Observable._any[storeAddress] ??= <int, Any>{};
7272
_Observable._any[storeAddress] /*!*/ [entityId] ??= (u, _, __) {
7373
// dummy value to trigger an event
74-
_Observable.controller.add(u.address);
74+
_Observable.controller.add(entityId);
7575
};
7676
}
7777

7878
Stream<List<T>> findStream({int offset = 0, int limit = 0}) {
7979
_setup();
8080
return _Observable.controller.stream
81+
.where((e) => e == entityId)
8182
.map((_) => find(offset: offset, limit: limit));
8283
}
8384

8485
/// Use this for Query Property
8586
Stream<Query<T>> get stream {
8687
_setup();
87-
return _Observable.controller.stream.map((_) => this);
88+
return _Observable.controller.stream
89+
.where((e) => e == entityId)
90+
.map((_) => this);
8891
}
8992
}

test/stream_test.dart

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import 'package:objectbox/observable.dart';
44
import 'package:test/test.dart';
55

66
import 'entity.dart';
7+
import 'entity2.dart';
78
import 'objectbox.g.dart';
89
import 'test_env.dart';
910

@@ -73,6 +74,65 @@ void main() {
7374
await subscription.cancel();
7475
});
7576

77+
test(
78+
'Only observers of a single entity are notified, no cross-entity observer notification',
79+
() async {
80+
// setup listeners
81+
final box2 = Box<TestEntity2>(env.store);
82+
83+
var counter1 = 0, counter2 = 0;
84+
85+
final query2 = box2.query().build();
86+
final queryStream2 = query2.findStream();
87+
final subscription2 = queryStream2.listen((_) {
88+
counter2++;
89+
});
90+
91+
final query1 = box.query().build();
92+
final queryStream1 = query1.findStream();
93+
final subscription1 = queryStream1.listen((_) {
94+
counter1++;
95+
});
96+
97+
// counter2 test #.1
98+
final t2 = TestEntity2();
99+
box2.put(t2);
100+
101+
await Future.delayed(Duration(seconds: 0));
102+
expect(counter1, 0);
103+
expect(counter2, 1);
104+
105+
// counter1 test #.1
106+
final t1 = TestEntity();
107+
box.put(t1);
108+
109+
await Future.delayed(Duration(seconds: 0));
110+
expect(counter1, 1);
111+
expect(counter2, 1);
112+
113+
// counter1 many test #.2
114+
final ts1 = [1, 2, 3].map((i) => TestEntity(tInt: i)).toList();
115+
box.putMany(ts1);
116+
117+
await Future.delayed(Duration(seconds: 0));
118+
expect(counter1, 2);
119+
expect(counter2, 1);
120+
121+
// counter2 many test #.2
122+
final ts2 = [1, 2, 3].map((i) => TestEntity2()).toList();
123+
box2.putMany(ts2);
124+
125+
await Future.delayed(Duration(seconds: 0));
126+
expect(counter1, 2);
127+
expect(counter2, 2);
128+
129+
query1.close();
130+
query2.close();
131+
132+
await subscription1.cancel();
133+
await subscription2.cancel();
134+
});
135+
76136
tearDown(() {
77137
env.close();
78138
});

0 commit comments

Comments
 (0)