Skip to content

Commit 112d995

Browse files
committed
implement async callbacks from C and switch observable.dart to use them
1 parent 9a792dd commit 112d995

10 files changed

+305
-244
lines changed

lib/src/bindings/bindings.dart

+12
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,15 @@ ObjectBoxC loadObjectBoxLib() {
3333
ObjectBoxC /*?*/ _cachedBindings;
3434

3535
ObjectBoxC get bindings => _cachedBindings ??= loadObjectBoxLib();
36+
37+
/// Init DartAPI in C for async callbacks - only needs to be called once.
38+
/// See the following issue:
39+
/// https://github.com/objectbox/objectbox-dart/issues/143
40+
void initializeDartAPI() {
41+
if (!_dartAPIinitialized) {
42+
_dartAPIinitialized = true;
43+
bindings.obx_dart_init_api(NativeApi.initializeApiDLData);
44+
}
45+
}
46+
47+
bool _dartAPIinitialized = false;

lib/src/bindings/objectbox-c.dart

+82-1
Original file line numberDiff line numberDiff line change
@@ -4187,7 +4187,7 @@ class ObjectBoxC {
41874187
/// /// Sets credentials to authenticate the client with the server.
41884188
/// /// See OBXSyncCredentialsType for available options.
41894189
/// /// The accepted OBXSyncCredentials type depends on your sync-server configuration.
4190-
/// /// @param data may be NULL, i.e. in combination with OBXSyncCredentialsType_UNCHECKED
4190+
/// /// @param data may be NULL, i.e. in combination with OBXSyncCredentialsType_NONE
41914191
int obx_sync_credentials(
41924192
ffi.Pointer<OBX_sync> sync_1,
41934193
int type,
@@ -4512,6 +4512,53 @@ class ObjectBoxC {
45124512
}
45134513

45144514
_dart_obx_sync_listener_change _obx_sync_listener_change;
4515+
4516+
int obx_dart_init_api(
4517+
ffi.Pointer<ffi.Void> data,
4518+
) {
4519+
_obx_dart_init_api ??=
4520+
_dylib.lookupFunction<_c_obx_dart_init_api, _dart_obx_dart_init_api>(
4521+
'obx_dart_init_api');
4522+
return _obx_dart_init_api(
4523+
data,
4524+
);
4525+
}
4526+
4527+
_dart_obx_dart_init_api _obx_dart_init_api;
4528+
4529+
/// /// @see obx_observe()
4530+
/// /// Note: use obx_observer_close() to free unassign the observer and free resources after you're done with it
4531+
ffi.Pointer<OBX_observer> obx_dart_observe(
4532+
ffi.Pointer<OBX_store> store,
4533+
int dart_native_port,
4534+
) {
4535+
_obx_dart_observe ??=
4536+
_dylib.lookupFunction<_c_obx_dart_observe, _dart_obx_dart_observe>(
4537+
'obx_dart_observe');
4538+
return _obx_dart_observe(
4539+
store,
4540+
dart_native_port,
4541+
);
4542+
}
4543+
4544+
_dart_obx_dart_observe _obx_dart_observe;
4545+
4546+
ffi.Pointer<OBX_observer> obx_dart_observe_single_type(
4547+
ffi.Pointer<OBX_store> store,
4548+
int type_id,
4549+
int dart_native_port,
4550+
) {
4551+
_obx_dart_observe_single_type ??= _dylib.lookupFunction<
4552+
_c_obx_dart_observe_single_type,
4553+
_dart_obx_dart_observe_single_type>('obx_dart_observe_single_type');
4554+
return _obx_dart_observe_single_type(
4555+
store,
4556+
type_id,
4557+
dart_native_port,
4558+
);
4559+
}
4560+
4561+
_dart_obx_dart_observe_single_type _obx_dart_observe_single_type;
45154562
}
45164563

45174564
abstract class OBXPropertyType {
@@ -4944,6 +4991,10 @@ const int OBX_ERROR_FILE_PAGES_CORRUPT = 10503;
49444991

49454992
const int OBX_ERROR_SCHEMA_OBJECT_NOT_FOUND = 10504;
49464993

4994+
const int OBX_ERROR_TIME_SERIES_NOT_AVAILABLE = 10601;
4995+
4996+
const int OBX_ERROR_SYNC_NOT_AVAILABLE = 10602;
4997+
49474998
typedef _c_obx_version = ffi.Void Function(
49484999
ffi.Pointer<ffi.Int32> major,
49495000
ffi.Pointer<ffi.Int32> minor,
@@ -7715,3 +7766,33 @@ typedef _dart_obx_sync_listener_change = void Function(
77157766
ffi.Pointer<ffi.NativeFunction<OBX_sync_listener_change>> listener,
77167767
ffi.Pointer<ffi.Void> listener_arg,
77177768
);
7769+
7770+
typedef _c_obx_dart_init_api = ffi.Int32 Function(
7771+
ffi.Pointer<ffi.Void> data,
7772+
);
7773+
7774+
typedef _dart_obx_dart_init_api = int Function(
7775+
ffi.Pointer<ffi.Void> data,
7776+
);
7777+
7778+
typedef _c_obx_dart_observe = ffi.Pointer<OBX_observer> Function(
7779+
ffi.Pointer<OBX_store> store,
7780+
ffi.Int64 dart_native_port,
7781+
);
7782+
7783+
typedef _dart_obx_dart_observe = ffi.Pointer<OBX_observer> Function(
7784+
ffi.Pointer<OBX_store> store,
7785+
int dart_native_port,
7786+
);
7787+
7788+
typedef _c_obx_dart_observe_single_type = ffi.Pointer<OBX_observer> Function(
7789+
ffi.Pointer<OBX_store> store,
7790+
ffi.Uint32 type_id,
7791+
ffi.Int64 dart_native_port,
7792+
);
7793+
7794+
typedef _dart_obx_dart_observe_single_type = ffi.Pointer<OBX_observer> Function(
7795+
ffi.Pointer<OBX_store> store,
7796+
int type_id,
7797+
int dart_native_port,
7798+
);

lib/src/bindings/objectbox.h

+13-1
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,10 @@ bool obx_supports_time_series(void);
165165
/// A requested schema object (e.g., an entity or a property) was not found in the schema
166166
#define OBX_ERROR_SCHEMA_OBJECT_NOT_FOUND 10504
167167

168+
/// Feature specific errors
169+
#define OBX_ERROR_TIME_SERIES_NOT_AVAILABLE 10601
170+
#define OBX_ERROR_SYNC_NOT_AVAILABLE 10602
171+
168172
//----------------------------------------------
169173
// Error info; obx_last_error_*
170174
//----------------------------------------------
@@ -1734,7 +1738,7 @@ obx_err obx_sync_close(OBX_sync* sync);
17341738
/// Sets credentials to authenticate the client with the server.
17351739
/// See OBXSyncCredentialsType for available options.
17361740
/// The accepted OBXSyncCredentials type depends on your sync-server configuration.
1737-
/// @param data may be NULL, i.e. in combination with OBXSyncCredentialsType_UNCHECKED
1741+
/// @param data may be NULL, i.e. in combination with OBXSyncCredentialsType_NONE
17381742
obx_err obx_sync_credentials(OBX_sync* sync, OBXSyncCredentialsType type, const void* data, size_t size);
17391743

17401744
/// Configures the maximum number of outgoing TX messages that can be sent without an ACK from the server.
@@ -1830,6 +1834,14 @@ void obx_sync_listener_complete(OBX_sync* sync, OBX_sync_listener_complete* list
18301834
/// @param listener_arg is a pass-through argument passed to the listener
18311835
void obx_sync_listener_change(OBX_sync* sync, OBX_sync_listener_change* listener, void* listener_arg);
18321836

1837+
obx_err obx_dart_init_api(void* data);
1838+
1839+
/// @see obx_observe()
1840+
/// Note: use obx_observer_close() to free unassign the observer and free resources after you're done with it
1841+
OBX_observer* obx_dart_observe(OBX_store* store, int64_t dart_native_port);
1842+
1843+
OBX_observer* obx_dart_observe_single_type(OBX_store* store, obx_schema_id type_id, int64_t dart_native_port);
1844+
18331845
#ifdef __cplusplus
18341846
}
18351847
#endif

lib/src/observable.dart

+107-68
Original file line numberDiff line numberDiff line change
@@ -1,92 +1,131 @@
11
import 'dart:async';
22
import 'dart:ffi';
3+
import 'dart:isolate';
34

45
import 'bindings/bindings.dart';
6+
import 'bindings/helpers.dart';
7+
import 'modelinfo/entity_definition.dart';
58
import 'query/query.dart';
69
import 'store.dart';
7-
import 'util.dart';
8-
9-
// ignore_for_file: non_constant_identifier_names
10-
11-
// dart callback signature
12-
typedef Any = void Function(Pointer<Void>, Pointer<Uint32>, int);
13-
14-
class _Observable {
15-
static final _anyObserver = <int, Pointer<OBX_observer>>{};
16-
static final _any = <int, Map<int, Any>>{};
17-
18-
// sync:true -> ObjectBoxException: 10001 TX is not active anymore: #101
19-
static final controller = StreamController<int>.broadcast();
20-
21-
// The user_data is used to pass the store ptr address
22-
// in case there is no consensus on the entity id between stores
23-
static void _anyCallback(
24-
Pointer<Void> user_data, Pointer<Uint32> mutated_ids, int mutated_count) {
25-
final storeAddress = user_data.address;
26-
// call schema's callback
27-
final storeCallbacks = _any[storeAddress];
28-
if (storeCallbacks != null) {
29-
for (var i = 0; i < mutated_count; i++) {
30-
storeCallbacks[mutated_ids[i]]
31-
?.call(user_data, mutated_ids, mutated_count);
32-
}
33-
}
10+
11+
/// Simple wrapper used below in ObservableStore to reduce code duplication.
12+
/// Contains shared code for single-entity observer and the generic/global one.
13+
class _Observer<StreamValueType> {
14+
StreamController<StreamValueType> /*?*/ controller;
15+
Pointer<OBX_observer> /*?*/ _cObserver;
16+
ReceivePort /*?*/ receivePort;
17+
18+
int get nativePort => receivePort /*!*/ .sendPort.nativePort;
19+
20+
set cObserver(Pointer<OBX_observer> value) {
21+
_cObserver = checkObxPtr(value, 'observer initialization failed');
22+
_debugLog('started');
3423
}
3524

36-
static void subscribe(Store store) {
37-
syncOrObserversExclusive.mark(store);
25+
Stream<StreamValueType> get stream => controller /*!*/ .stream;
3826

39-
final callback = Pointer.fromFunction<obx_observer>(_anyCallback);
40-
final storePtr = store.ptr;
41-
_anyObserver[storePtr.address] =
42-
bindings.obx_observe(storePtr, callback, storePtr.cast<Void>());
43-
StoreCloseObserver.addListener(store, _anyObserver[storePtr.address], () {
44-
unsubscribe(store);
45-
});
27+
_Observer() {
28+
initializeDartAPI();
29+
}
30+
31+
// start() is called whenever user starts listen()-ing to the stream
32+
void init(void Function() start) {
33+
controller = StreamController<StreamValueType>(
34+
onListen: start, onPause: stop, onResume: start, onCancel: stop);
4635
}
4736

48-
// #53 ffi:Pointer finalizer
49-
static void unsubscribe(Store store) {
50-
final storeAddress = store.ptr.address;
51-
if (!_anyObserver.containsKey(storeAddress)) {
52-
return;
53-
}
54-
StoreCloseObserver.removeListener(store, _anyObserver[storeAddress]);
55-
bindings.obx_observer_close(_anyObserver[storeAddress]);
56-
_anyObserver.remove(storeAddress);
57-
syncOrObserversExclusive.unmark(store);
37+
// stop() is called when the stream subscription is paused or canceled
38+
void stop() {
39+
_debugLog('stopped');
40+
if (_cObserver != null) checkObx(bindings.obx_observer_close(_cObserver));
5841
}
5942

60-
static bool isSubscribed(Store store) =>
61-
_Observable._anyObserver.containsKey(store.ptr.address);
43+
void _debugLog(String message) {
44+
// print('Observer=${_cObserver?.address} ' + message);
45+
}
6246
}
6347

64-
extension Streamable<T> on Query<T> {
65-
void _setup() {
66-
if (!_Observable.isSubscribed(store)) {
67-
_Observable.subscribe(store);
68-
}
69-
final storeAddress = store.ptr.address;
70-
71-
_Observable._any[storeAddress] ??= <int, Any>{};
72-
_Observable._any[storeAddress] /*!*/ [entityId] ??= (u, _, __) {
73-
// dummy value to trigger an event
74-
_Observable.controller.add(entityId);
75-
};
48+
/// StreamController implementation inspired by the sample controller sample at:
49+
/// https://dart.dev/articles/libraries/creating-streams#honoring-the-pause-state
50+
/// https://dart.dev/articles/libraries/code/stream_controller.dart
51+
extension ObservableStore on Store {
52+
/// Create a stream to data changes on EntityT (stored Entity class).
53+
///
54+
/// The stream receives an event whenever an object of EntityT is created or
55+
/// changed or deleted. Make sure to close() the subscription after you're
56+
/// done with it to avoid hanging change listeners.
57+
Stream<void> subscribe<EntityT>() {
58+
final observer = _Observer<void>();
59+
final entityId = entityDef<EntityT>().model.id.id;
60+
61+
observer.init(() {
62+
// We're listening to events on single entity so there's no argument.
63+
// Ideally, controller.add() would work but it doesn't, even though we're
64+
// using StreamController<Void> so the argument type is `void`.
65+
observer.receivePort = ReceivePort()
66+
..listen((_) => observer.controller.add(null));
67+
observer.cObserver = bindings.obx_dart_observe_single_type(
68+
ptr, entityId, observer.nativePort);
69+
});
70+
71+
return observer.stream;
7672
}
7773

74+
/// Create a stream to data changes on all Entity types.
75+
///
76+
/// The stream receives an even whenever any data changes in the database.
77+
/// Make sure to close() the subscription after you're done with it to avoid
78+
/// hanging change listeners.
79+
Stream<Type> subscribeAll() {
80+
initializeDartAPI();
81+
final observer = _Observer<Type>();
82+
83+
// create a map from Entity ID to Entity type (dart class)
84+
final entityTypesById = <int, Type>{};
85+
defs.bindings.forEach((Type entity, EntityDefinition entityDef) =>
86+
entityTypesById[entityDef.model.id.id] = entity);
87+
88+
observer.init(() {
89+
// We're listening to a events for all entity types. C-API sends entity ID
90+
// and we must map it to a dart type (class) corresponding to that entity.
91+
observer.receivePort = ReceivePort()
92+
..listen((entityIds) {
93+
if (entityIds is! List) {
94+
observer.controller.addError(Exception(
95+
'Received invalid data format from the core notification: (${entityIds.runtimeType}) $entityIds'));
96+
return;
97+
}
98+
99+
entityIds.forEach((entityId) {
100+
if (entityId is! int) {
101+
observer.controller.addError(Exception(
102+
'Received invalid item data format from the core notification: (${entityId.runtimeType}) $entityId'));
103+
return;
104+
}
105+
106+
final entityType = entityTypesById[entityId];
107+
if (entityType == null) {
108+
observer.controller.addError(Exception(
109+
'Received data change notification for an unknown entity ID $entityId'));
110+
} else {
111+
observer.controller.add(entityType);
112+
}
113+
});
114+
});
115+
observer.cObserver = bindings.obx_dart_observe(ptr, observer.nativePort);
116+
});
117+
118+
return observer.stream;
119+
}
120+
}
121+
122+
extension Streamable<T> on Query<T> {
78123
Stream<List<T>> findStream({int offset = 0, int limit = 0}) {
79-
_setup();
80-
return _Observable.controller.stream
81-
.where((e) => e == entityId)
82-
.map((_) => find(offset: offset, limit: limit));
124+
return store.subscribe<T>().map((_) => find(offset: offset, limit: limit));
83125
}
84126

85127
/// Use this for Query Property
86128
Stream<Query<T>> get stream {
87-
_setup();
88-
return _Observable.controller.stream
89-
.where((e) => e == entityId)
90-
.map((_) => this);
129+
return store.subscribe<T>().map((_) => this);
91130
}
92131
}

lib/src/sync.dart

-2
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@ class SyncClient {
100100
_cSync = nullptr;
101101
syncClientsStorage.remove(_store);
102102
StoreCloseObserver.removeListener(_store, this);
103-
syncOrObserversExclusive.unmark(_store);
104103
checkObx(err);
105104
}
106105

@@ -243,7 +242,6 @@ class Sync {
243242
if (syncClientsStorage.containsKey(store)) {
244243
throw Exception('Only one sync client can be active for a store');
245244
}
246-
syncOrObserversExclusive.mark(store);
247245
final client = SyncClient(store, serverUri, creds);
248246
syncClientsStorage[store] = client;
249247
StoreCloseObserver.addListener(store, client, client.close);

lib/src/util.dart

-20
Original file line numberDiff line numberDiff line change
@@ -42,23 +42,3 @@ class StoreCloseObserver {
4242

4343
/// Global internal storage of sync clients - one client per store.
4444
final Map<Store, SyncClient> syncClientsStorage = {};
45-
46-
// Currently, either SyncClient or Observers can be used at the same time.
47-
// TODO: lift this condition after #142 is fixed.
48-
class SyncOrObserversExclusive {
49-
final _map = <Store, bool>{};
50-
51-
void mark(Store store) {
52-
if (_map.containsKey(store)) {
53-
throw Exception(
54-
'Using observers/query streams in combination with SyncClient is currently not supported');
55-
}
56-
_map[store] = true;
57-
}
58-
59-
void unmark(Store store) {
60-
_map.remove(store);
61-
}
62-
}
63-
64-
final syncOrObserversExclusive = SyncOrObserversExclusive();

0 commit comments

Comments
 (0)