Skip to content

Commit e5dc293

Browse files
committed
sync - change listener
1 parent 70bad33 commit e5dc293

File tree

2 files changed

+141
-49
lines changed

2 files changed

+141
-49
lines changed

objectbox/lib/src/sync.dart

+87-45
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
import 'dart:async';
22
import 'dart:ffi';
33
import 'dart:isolate';
4-
import 'dart:typed_data' show Uint8List;
4+
import 'dart:typed_data';
55
import 'dart:convert' show utf8;
66

77
import 'package:ffi/ffi.dart';
88

9+
import '../objectbox.dart';
910
import 'store.dart';
1011
import 'util.dart';
1112
import 'bindings/bindings.dart';
@@ -68,6 +69,15 @@ enum SyncConnectionEvent { connected, disconnected }
6869

6970
enum SyncLoginEvent { loggedIn, credentialsRejected, unknownError }
7071

72+
class SyncChange {
73+
final int entityId;
74+
final Type entity;
75+
final List<int> puts;
76+
final List<int> removals;
77+
78+
SyncChange(this.entityId, this.entity, this.puts, this.removals);
79+
}
80+
7181
/// Sync client is used to provide ObjectBox Sync client capabilities to your application.
7282
class SyncClient {
7383
final Store _store;
@@ -107,6 +117,7 @@ class SyncClient {
107117
_connectionEvents?._stop();
108118
_loginEvents?._stop();
109119
_completionEvents?._stop();
120+
_changeEvents?._stop();
110121
final err = C.sync_close(_cSync);
111122
_cSync = nullptr;
112123
syncClientsStorage.remove(_store);
@@ -229,7 +240,7 @@ class SyncClient {
229240
/// Subscribe (listen) to the stream to actually start listening to events.
230241
Stream<SyncConnectionEvent> get connectionEvents {
231242
if (_connectionEvents == null) {
232-
// This stream combines events from two C listeners: connect & disconnect.
243+
// Combine events from two C listeners: connect & disconnect.
233244
_connectionEvents =
234245
_SyncListenerGroup<SyncConnectionEvent>('sync-connection');
235246

@@ -253,7 +264,7 @@ class SyncClient {
253264
/// Subscribe (listen) to the stream to actually start listening to events.
254265
Stream<SyncLoginEvent> get loginEvents {
255266
if (_loginEvents == null) {
256-
// This stream combines events from two C listeners: connect & disconnect.
267+
// Combine events from two C listeners: login & login-failure.
257268
_loginEvents = _SyncListenerGroup<SyncLoginEvent>('sync-login');
258269

259270
_loginEvents.add(_SyncListenerConfig(
@@ -296,6 +307,78 @@ class SyncClient {
296307
}
297308
return _completionEvents.stream;
298309
}
310+
311+
_SyncListenerGroup<List<SyncChange>> /*?*/ _changeEvents;
312+
313+
/// Get a broadcast stream of incoming synced data changes.
314+
///
315+
/// Subscribe (listen) to the stream to actually start listening to events.
316+
Stream<List<SyncChange>> get changeEvents {
317+
if (_changeEvents == null) {
318+
// This stream combines events from two C listeners: connect & disconnect.
319+
_changeEvents = _SyncListenerGroup<List<SyncChange>>('sync-change');
320+
321+
// create a map from Entity ID to Entity type (dart class)
322+
final entityTypesById = <int, Type>{};
323+
_store.defs.bindings.forEach((Type entity, EntityDefinition entityDef) =>
324+
entityTypesById[entityDef.model.id.id] = entity);
325+
326+
_changeEvents.add(_SyncListenerConfig(
327+
(int nativePort) => C.dart_sync_listener_change(ptr, nativePort),
328+
(syncChanges, controller) {
329+
if (syncChanges is! List) {
330+
controller.addError(Exception(
331+
'Received invalid data type from the core notification: (${syncChanges.runtimeType}) $syncChanges'));
332+
return;
333+
}
334+
335+
// List<SyncChange> is flattened to List<dynamic>, with SyncChange object
336+
// properties always coming in groups of three (entityId, puts, removals)
337+
const numProperties = 3;
338+
if (syncChanges.length % numProperties != 0) {
339+
controller.addError(Exception(
340+
'Received invalid list length from the core notification: (${syncChanges.runtimeType}) $syncChanges'));
341+
return;
342+
}
343+
344+
final changes = <SyncChange>[];
345+
for (var i = 0; i < syncChanges.length / numProperties; i++) {
346+
final entityId = syncChanges[i * numProperties + 0];
347+
final putsBytes = syncChanges[i * numProperties + 1];
348+
final removalsBytes = syncChanges[i * numProperties + 2];
349+
350+
final entityType = entityTypesById[entityId];
351+
if (entityType == null) {
352+
controller.addError(Exception(
353+
'Received sync change notification for an unknown entity ID $entityId'));
354+
return;
355+
}
356+
357+
if (entityId is! int ||
358+
putsBytes is! Uint8List ||
359+
removalsBytes is! Uint8List) {
360+
controller.addError(Exception(
361+
'Received invalid list items format from the core notification at i=${i}: '
362+
'entityId = (${entityId.runtimeType}) $entityId; '
363+
'putsBytes = (${putsBytes.runtimeType}) $putsBytes; '
364+
'removalsBytes = (${removalsBytes.runtimeType}) $removalsBytes'));
365+
return;
366+
}
367+
368+
changes.add(SyncChange(
369+
entityId,
370+
entityType,
371+
Uint64List.view(putsBytes.buffer).toList(),
372+
Uint64List.view(removalsBytes.buffer).toList()));
373+
}
374+
375+
controller.add(changes);
376+
}));
377+
378+
_changeEvents.finish();
379+
}
380+
return _changeEvents.stream;
381+
}
299382
}
300383

301384
/// Configuration for _SyncListenerGroup, setting up a single native listener.
@@ -368,7 +451,7 @@ class _SyncListenerGroup<StreamValueType> {
368451

369452
// Start the native listener.
370453
final cListener = config.cListenerInit(receivePort.sendPort.nativePort);
371-
if (cListener == null) {
454+
if (cListener == null || cListener == nullptr) {
372455
hasError = true;
373456
} else {
374457
_cListeners.add(cListener);
@@ -447,44 +530,3 @@ class Sync {
447530
return client;
448531
}
449532
}
450-
451-
/* BACKUP: Sync change listener async callback message handling
452-
ReceivePort()..listen((syncChanges) {
453-
if (syncChanges is! List) {
454-
observer.controller.addError(Exception(
455-
'Received invalid data type from the core notification: (${syncChanges.runtimeType}) $syncChanges'));
456-
return;
457-
}
458-
459-
// List<SyncChange> is flattened to List<dynamic>, with SyncChange object
460-
// properties always coming in groups of three (entityId, puts, removals)
461-
const numProperties = 3;
462-
if (syncChanges.length % numProperties != 0) {
463-
observer.controller.addError(Exception(
464-
'Received invalid list length from the core notification: (${syncChanges.runtimeType}) $syncChanges'));
465-
return;
466-
}
467-
468-
for (var i = 0; i < syncChanges.length / numProperties; i++) {
469-
final entityId = syncChanges[i * numProperties + 0];
470-
final putsBytes = syncChanges[i * numProperties + 1];
471-
final removalsBytes = syncChanges[i * numProperties + 2];
472-
473-
if (entityId is! int ||
474-
putsBytes is! Uint8List ||
475-
removalsBytes is! Uint8List) {
476-
observer.controller.addError(Exception(
477-
'Received invalid list items format from the core notification at i=${i}: '
478-
'entityId = (${entityId.runtimeType}) $entityId; '
479-
'putsBytes = (${putsBytes.runtimeType}) $putsBytes; '
480-
'removalsBytes = (${removalsBytes.runtimeType}) $removalsBytes'));
481-
return;
482-
}
483-
484-
final puts = Uint64List.view(putsBytes.buffer).toList();
485-
final removals = Uint64List.view(removalsBytes.buffer).toList();
486-
487-
// forward the event with entityId, puts & removals
488-
}
489-
});
490-
*/

objectbox/test/sync_test.dart

+54-4
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@ import 'dart:math';
33
import 'dart:typed_data';
44

55
import 'package:objectbox/src/bindings/bindings.dart';
6-
import 'package:objectbox/objectbox.dart';
76
import 'package:test/test.dart';
87

98
import 'entity.dart';
9+
import 'entity2.dart';
1010
import 'objectbox.g.dart';
1111
import 'test_env.dart';
1212

@@ -277,6 +277,53 @@ void main() {
277277

278278
expect(env2.box.get(1) /*!*/ .tLong, 100);
279279
});
280+
281+
test('SyncClient listeners: changes', () async {
282+
final client = loggedInClient(store);
283+
final client2 = loggedInClient(env2.store);
284+
285+
final events = <List<SyncChange>>[];
286+
client2.changeEvents.listen(events.add);
287+
288+
expect(env2.box.get(1), isNull);
289+
290+
env.box.put(TestEntity(tString: 'foo'));
291+
env.store.runInTransaction(TxMode.Write, () {
292+
Box<TestEntity2>(env.store).put(TestEntity2()); // not synced
293+
env.box.put(TestEntity(tString: 'bar'));
294+
env.box.put(TestEntity(tString: 'oof'));
295+
env.box.remove(1);
296+
});
297+
298+
// wait for the data to be transferred
299+
expect(waitUntil(() => env2.box.count() == 2), isTrue);
300+
301+
// check the events
302+
await yieldExecution();
303+
expect(events.length, 2);
304+
305+
// env.box.put(TestEntity(tString: 'foo'));
306+
expect(events[0].length, 1);
307+
expect(events[0][0].entity, TestEntity);
308+
expect(events[0][0].entityId, 1);
309+
expect(events[0][0].puts, [1]);
310+
expect(events[0][0].removals, []);
311+
312+
// env.store.runInTransaction(TxMode.Write, () {
313+
// Box<TestEntity2>(env.store).put(TestEntity2()); // not synced
314+
// env.box.put(TestEntity(tString: 'bar'));
315+
// env.box.put(TestEntity(tString: 'oof'));
316+
// env.box.remove(1);
317+
// });
318+
expect(events[1].length, 1);
319+
expect(events[1][0].entity, TestEntity);
320+
expect(events[1][0].entityId, 1);
321+
expect(events[1][0].puts, [2, 3]);
322+
expect(events[1][0].removals, [1]);
323+
324+
client.close();
325+
client2.close();
326+
});
280327
},
281328
skip: SyncServer.isAvailable()
282329
? null
@@ -329,9 +376,12 @@ class SyncServer {
329376
final proc = process /*!*/;
330377
process = null;
331378
proc.kill(ProcessSignal.sigint);
332-
await stdout.addStream(proc.stdout);
333-
await stderr.addStream(proc.stderr);
334-
expect(await proc.exitCode, isZero);
379+
final exitCode = await proc.exitCode;
380+
if (exitCode != 0) {
381+
await stdout.addStream(proc.stdout);
382+
await stderr.addStream(proc.stderr);
383+
expect(await proc.exitCode, isZero);
384+
}
335385
if (!keepDb) _deleteDb();
336386
}
337387

0 commit comments

Comments
 (0)