Skip to content

LiveList: sync subObjects after reconnect #352

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -497,8 +497,6 @@ To activate listening for updates on all included objects, add `listenOnAllSubIt
If you want ParseLiveList to listen for updates on only some sub-objects, use `listeningIncludes: const <String>[/*all the included sub-objects*/]` instead.
Just as QueryBuilder, ParseLiveList supports nested sub-objects too.

**NOTE:** Currently ParseLiveList wont update your sub-objects after your client reconnects to the web.

**NOTE:** To use this features you have to enable [Live Queries](#live-queries) first.

## Users
Expand Down
198 changes: 133 additions & 65 deletions lib/src/utils/parse_live_list.dart
Original file line number Diff line number Diff line change
Expand Up @@ -186,48 +186,59 @@ class ParseLiveList<T extends ParseObject> {
.getClientEventStream
.listen((LiveQueryClientEvent event) async {
if (event == LiveQueryClientEvent.CONNECTED) {
final ParseResponse parseResponse = await _runQuery();
if (parseResponse.success) {
final List<T> newList = parseResponse.results ?? List<T>();

//update List
for (int i = 0; i < _list.length; i++) {
final ParseObject currentObject = _list[i].object;
final String currentObjectId =
currentObject.get<String>(keyVarObjectId);

bool stillInList = false;

for (int j = 0; j < newList.length; j++) {
if (newList[j].get<String>(keyVarObjectId) == currentObjectId) {
stillInList = true;
if (newList[j]
.get<DateTime>(keyVarUpdatedAt)
.isAfter(currentObject.get<DateTime>(keyVarUpdatedAt))) {
final QueryBuilder<T> queryBuilder =
QueryBuilder<T>.copy(_query)
..whereEqualTo(keyVarObjectId, currentObjectId);
queryBuilder.query<T>().then((ParseResponse result) {
if (result.success && result.results != null) {
_objectUpdated(result.results.first);
}
});
_updateQueue.whenComplete(() async {
List<Future<void>> tasks = <Future<void>>[];
final ParseResponse parseResponse = await _runQuery();
if (parseResponse.success) {
final List<T> newList = parseResponse.results ?? List<T>();

//update List
for (int i = 0; i < _list.length; i++) {
final ParseObject currentObject = _list[i].object;
final String currentObjectId =
currentObject.get<String>(keyVarObjectId);

bool stillInList = false;

for (int j = 0; j < newList.length; j++) {
if (newList[j].get<String>(keyVarObjectId) == currentObjectId) {
stillInList = true;
if (newList[j]
.get<DateTime>(keyVarUpdatedAt)
.isAfter(currentObject.get<DateTime>(keyVarUpdatedAt))) {
final QueryBuilder<T> queryBuilder =
QueryBuilder<T>.copy(_query)
..whereEqualTo(keyVarObjectId, currentObjectId);
tasks.add(queryBuilder
.query<T>()
.then((ParseResponse result) async {
if (result.success && result.results != null) {
await _objectUpdated(result.results.first);
}
}));
}
newList.removeAt(j);
j--;
break;
}
newList.removeAt(j);
j--;
break;
}
if (!stillInList) {
_objectDeleted(currentObject);
i--;
}
}
if (!stillInList) {
_objectDeleted(currentObject);
i--;

for (int i = 0; i < newList.length; i++) {
tasks.add(_objectAdded(newList[i], loaded: false));
}
}

for (int i = 0; i < newList.length; i++) {
_objectAdded(newList[i], loaded: false);
await Future.wait(tasks);
tasks = <Future<void>>[];
for (ParseLiveListElement<T> element in _list) {
tasks.add(element.reconnected());
}
}
await Future.wait(tasks);
});
}
});
}
Expand Down Expand Up @@ -351,7 +362,7 @@ class ParseLiveList<T extends ParseObject> {
_list.removeAt(i).dispose();
_eventStreamController.sink.add(ParseLiveListDeleteEvent<T>(
i, object?.clone(object?.toJson(full: true))));
_objectAdded(object?.clone(object?.toJson(full: true)),
await _objectAdded(object?.clone(object?.toJson(full: true)),
fetchedIncludes: true);
}
break;
Expand Down Expand Up @@ -450,7 +461,7 @@ class ParseLiveListElement<T extends ParseObject> {
final StreamController<T> _streamController = StreamController<T>.broadcast();
T _object;
bool _loaded = false;
Map<MapEntry<String, Subscription<ParseObject>>, dynamic> _updatedSubItems;
Map<PathKey, dynamic> _updatedSubItems;
LiveQuery _liveQuery;
final Future<void> _subscriptionQueue = Future<void>.value();

Expand All @@ -459,21 +470,17 @@ class ParseLiveListElement<T extends ParseObject> {
// ignore: invalid_use_of_protected_member
T get object => _object?.clone(_object?.toJson(full: true));

Map<MapEntry<String, Subscription<ParseObject>>, dynamic> _toSubscriptionMap(
Map<String, dynamic> map) {
Map<MapEntry<String, Subscription<ParseObject>>, dynamic> result =
Map<MapEntry<String, Subscription<ParseObject>>, dynamic>();
Map<PathKey, dynamic> _toSubscriptionMap(Map<String, dynamic> map) {
final Map<PathKey, dynamic> result = Map<PathKey, dynamic>();
for (String key in map.keys) {
result.putIfAbsent(MapEntry<String, Subscription<ParseObject>>(key, null),
() => _toSubscriptionMap(map[key]));
result.putIfAbsent(PathKey(key), () => _toSubscriptionMap(map[key]));
}
return result;
}

Map<String, dynamic> _toKeyMap(
Map<MapEntry<String, Subscription<ParseObject>>, dynamic> map) {
Map<String, dynamic> _toKeyMap(Map<PathKey, dynamic> map) {
final Map<String, dynamic> result = Map<String, dynamic>();
for (MapEntry<String, Subscription<ParseObject>> key in map.keys) {
for (PathKey key in map.keys) {
result.putIfAbsent(key.key, () => _toKeyMap(map[key]));
}
return result;
Expand All @@ -483,8 +490,7 @@ class ParseLiveListElement<T extends ParseObject> {
_subscriptionQueue.whenComplete(() async {
if (_updatedSubItems.isNotEmpty && _object != null) {
final List<Future<void>> tasks = <Future<void>>[];
for (MapEntry<String, Subscription<ParseObject>> key
in _updatedSubItems.keys) {
for (PathKey key in _updatedSubItems.keys) {
tasks.add(_subscribeSubItem(_object, key,
_object.get<ParseObject>(key.key), _updatedSubItems[key]));
}
Expand All @@ -493,24 +499,21 @@ class ParseLiveListElement<T extends ParseObject> {
});
}

void _unsubscribe(
Map<MapEntry<String, Subscription<ParseObject>>, dynamic> subscriptions) {
for (MapEntry<String, Subscription<ParseObject>> key
in subscriptions.keys) {
if (_liveQuery != null && key.value != null)
_liveQuery.client.unSubscribe(key.value);
void _unsubscribe(Map<PathKey, dynamic> subscriptions) {
for (PathKey key in subscriptions.keys) {
if (_liveQuery != null && key.subscription != null) {
_liveQuery.client.unSubscribe(key.subscription);
key.subscription = null;
}
_unsubscribe(subscriptions[key]);
}
}

Future<void> _subscribeSubItem(
ParseObject parentObject,
MapEntry<String, Subscription<ParseObject>> currentKey,
ParseObject subObject,
Map<MapEntry<String, Subscription<ParseObject>>, dynamic> path) async {
Future<void> _subscribeSubItem(ParseObject parentObject, PathKey currentKey,
ParseObject subObject, Map<PathKey, dynamic> path) async {
if (_liveQuery != null && subObject != null) {
final List<Future<void>> tasks = <Future<void>>[];
for (MapEntry<String, Subscription<ParseObject>> key in path.keys) {
for (PathKey key in path.keys) {
tasks.add(_subscribeSubItem(
subObject, key, subObject.get<ParseObject>(key.key), path[key]));
}
Expand All @@ -521,6 +524,7 @@ class ParseLiveListElement<T extends ParseObject> {
tasks.add(_liveQuery.client
.subscribe(queryBuilder)
.then((Subscription<ParseObject> subscription) {
currentKey.subscription = subscription;
subscription.on(LiveQueryEvent.update, (ParseObject newObject) async {
_subscriptionQueue.whenComplete(() async {
await ParseLiveList._loadIncludes(newObject,
Expand All @@ -530,11 +534,11 @@ class ParseLiveListElement<T extends ParseObject> {
_streamController
?.add(_object?.clone(_object?.toJson(full: true)));
//Resubscribe subitems
// TODO(any): only resubscribe on changed pointers
_unsubscribe(path);
for (MapEntry<String, Subscription<ParseObject>> key
in path.keys) {
tasks.add(_subscribeSubItem(subObject, key,
subObject.get<ParseObject>(key.key), path[key]));
for (PathKey key in path.keys) {
tasks.add(_subscribeSubItem(newObject, key,
newObject.get<ParseObject>(key.key), path[key]));
}
}
await Future.wait(tasks);
Expand All @@ -560,6 +564,70 @@ class ParseLiveListElement<T extends ParseObject> {
_unsubscribe(_updatedSubItems);
_streamController.close();
}

Future<void> reconnected() async {
if (loaded) {
_subscriptionQueue.whenComplete(() async {
await _updateSubItems(_object, _updatedSubItems);
// _streamController?.add(_object?.clone(_object?.toJson(full: true)));
});
}
}

List<String> _getIncludeList(Map<PathKey, dynamic> path) {
final List<String> includes = <String>[];
for (PathKey key in path.keys) {
includes.add(key.key);
includes.addAll(
_getIncludeList(path[key]).map((String e) => '${key.key}.$e'));
}
return includes;
}

Future<void> _updateSubItems(
ParseObject root, Map<PathKey, dynamic> path) async {
final List<Future<void>> tasks = <Future<void>>[];
for (PathKey key in path.keys) {
ParseObject subObject = root.get<ParseObject>(key.key);
if (subObject?.containsKey(keyVarUpdatedAt) == true) {
final QueryBuilder<ParseObject> queryBuilder =
QueryBuilder<ParseObject>(subObject)
..keysToReturn([keyVarUpdatedAt])
..whereEqualTo(keyVarObjectId, subObject.objectId);
ParseResponse parseResponse = await queryBuilder.query();
if (parseResponse.success &&
(parseResponse.results.first as ParseObject).updatedAt !=
subObject.updatedAt) {
queryBuilder.limiters.remove("keys");
queryBuilder.includeObject(_getIncludeList(path[key]));
ParseResponse parseResponse = await queryBuilder.query();
if (parseResponse.success) {
subObject = parseResponse.result.first;
// root.getObjectData()[key.key] = subObject;
if (key.subscription?.eventCallbacks?.containsKey("update") ==
true) {
key.subscription.eventCallbacks["update"](subObject);
}
// key.subscription.eventCallbacks["update"](subObject);
break;
}
}
}
tasks.add(_updateSubItems(subObject, path[key]));
}
await Future.wait(tasks);
}
}

class PathKey {
PathKey(this.key, {this.subscription});

final String key;
Subscription<ParseObject> subscription;
@override
String toString() {
return 'PathKey(key: $key, subscription: ${subscription?.requestId})';
}
}

abstract class ParseLiveListEvent<T extends ParseObject> {
Expand Down