Skip to content

Livequery: combine both implementations #433

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions lib/parse_server_sdk.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,23 @@ import 'dart:math';
import 'dart:typed_data';
import 'dart:ui' as ui;

import 'package:connectivity/connectivity.dart';
import 'package:flutter/widgets.dart';
import 'package:http/http.dart';
import 'package:http/io_client.dart';
import 'package:meta/meta.dart';
import 'package:package_info/package_info.dart';
import 'package:parse_server_sdk/src/network/parse_websocket.dart'
as parse_web_socket;
import 'package:path/path.dart' as path;
import 'package:path_provider/path_provider.dart';
import 'package:sembast/sembast.dart';
import 'package:sembast/sembast_io.dart';
import 'package:shared_preferences/shared_preferences.dart';
import 'package:uuid/uuid.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:xxtea/xxtea.dart';

export 'src/network/parse_live_query.dart'
if (dart.library.js) 'src/network/parse_live_query_web.dart';
export 'src/utils/parse_live_list.dart';

part 'package:parse_server_sdk/src/data/core_store.dart';
Expand All @@ -37,6 +40,7 @@ part 'src/base/parse_constants.dart';
part 'src/data/parse_core_data.dart';
part 'src/enums/parse_enum_api_rq.dart';
part 'src/network/parse_http_client.dart';
part 'src/network/parse_live_query.dart';
part 'src/network/parse_query.dart';
part 'src/objects/parse_acl.dart';
part 'src/objects/parse_base.dart';
Expand Down
76 changes: 32 additions & 44 deletions lib/src/network/parse_live_query.dart
Original file line number Diff line number Diff line change
@@ -1,13 +1,4 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';

import 'package:connectivity/connectivity.dart';
import 'package:flutter/widgets.dart';
import 'package:web_socket_channel/io.dart';
import 'package:web_socket_channel/web_socket_channel.dart';

import '../../parse_server_sdk.dart';
part of flutter_parse_sdk;

enum LiveQueryEvent { create, enter, update, leave, delete, error }

Expand All @@ -31,7 +22,6 @@ class Subscription<T extends ParseObject> {
'error'
];
Map<String, Function> eventCallbacks = <String, Function>{};

void on(LiveQueryEvent op, Function callback) {
eventCallbacks[_liveQueryEvent[op.index]] = callback;
}
Expand All @@ -46,9 +36,13 @@ enum LiveQueryClientEvent { CONNECTED, DISCONNECTED, USER_DISCONNECTED }
class LiveQueryReconnectingController with WidgetsBindingObserver {
LiveQueryReconnectingController(
this._reconnect, this._eventStream, this.debug) {
Connectivity().checkConnectivity().then(_connectivityChanged);
Connectivity().onConnectivityChanged.listen(_connectivityChanged);

//Connectivity works differently on web
if (!parseIsWeb) {
Connectivity().checkConnectivity().then(_connectivityChanged);
Connectivity().onConnectivityChanged.listen(_connectivityChanged);
} else {
_connectivityChanged(ConnectivityResult.wifi);
}
_eventStream.listen((LiveQueryClientEvent event) {
switch (event) {
case LiveQueryClientEvent.CONNECTED:
Expand Down Expand Up @@ -132,10 +126,9 @@ class LiveQueryReconnectingController with WidgetsBindingObserver {
}
}

class Client {
factory Client() => _getInstance();

Client._internal(
class LiveQueryClient {
factory LiveQueryClient() => _getInstance();
LiveQueryClient._internal(
{bool debug, ParseHTTPClient client, bool autoSendSessionId}) {
_clientEventStreamController = StreamController<LiveQueryClientEvent>();
_clientEventStream =
Expand All @@ -160,13 +153,11 @@ class Client {
reconnectingController = LiveQueryReconnectingController(
() => reconnect(userInitialized: false), getClientEventStream, _debug);
}

static Client get instance => _getInstance();
static Client _instance;

static Client _getInstance(
static LiveQueryClient get instance => _getInstance();
static LiveQueryClient _instance;
static LiveQueryClient _getInstance(
{bool debug, ParseHTTPClient client, bool autoSendSessionId}) {
_instance ??= Client._internal(
_instance ??= LiveQueryClient._internal(
debug: debug, client: client, autoSendSessionId: autoSendSessionId);
return _instance;
}
Expand All @@ -175,7 +166,7 @@ class Client {
return _clientEventStream;
}

WebSocket _webSocket;
parse_web_socket.WebSocket _webSocket;
ParseHTTPClient _client;
bool _debug;
bool _sendSessionId;
Expand All @@ -186,7 +177,6 @@ class Client {
Stream<LiveQueryClientEvent> _clientEventStream;
LiveQueryReconnectingController reconnectingController;

// ignore: always_specify_types
final Map<int, Subscription> _requestSubScription = <int, Subscription>{};

Future<void> reconnect({bool userInitialized = false}) async {
Expand All @@ -198,11 +188,12 @@ class Client {
if (_webSocket != null) {
return _webSocket.readyState;
}
return WebSocket.connecting;
return parse_web_socket.WebSocket.CONNECTING;
}

Future<dynamic> disconnect({bool userInitialized = false}) async {
if (_webSocket != null && _webSocket.readyState == WebSocket.open) {
if (_webSocket != null &&
_webSocket.readyState == parse_web_socket.WebSocket.OPEN) {
if (_debug) {
print('$_printConstLiveQuery: Socket closed');
}
Expand All @@ -216,7 +207,6 @@ class Client {
await _channel.sink.close();
_channel = null;
}
// ignore: always_specify_types
_requestSubScription.values.toList().forEach((Subscription subscription) {
subscription._enabled = false;
});
Expand Down Expand Up @@ -274,9 +264,10 @@ class Client {
_connecting = true;

try {
_webSocket = await WebSocket.connect(_liveQueryURL);
_webSocket = await parse_web_socket.WebSocket.connect(_liveQueryURL);
_connecting = false;
if (_webSocket != null && _webSocket.readyState == WebSocket.open) {
if (_webSocket != null &&
_webSocket.readyState == parse_web_socket.WebSocket.OPEN) {
if (_debug) {
print('$_printConstLiveQuery: Socket opened');
}
Expand All @@ -286,7 +277,7 @@ class Client {
}
return Future<void>.value(null);
}
_channel = IOWebSocketChannel(_webSocket);
_channel = _webSocket.createWebSocketChannel();
_channel.stream.listen((dynamic message) {
_handleMessage(message);
}, onDone: () {
Expand All @@ -302,8 +293,11 @@ class Client {
print(
'$_printConstLiveQuery: Error: ${error.runtimeType.toString()}');
}
return Future<ParseResponse>.value(handleException(Exception(error),
ParseApiRQ.liveQuery, _debug, 'IOWebSocketChannel'));
return Future<ParseResponse>.value(handleException(
Exception(error),
ParseApiRQ.liveQuery,
_debug,
!parseIsWeb ? 'IOWebSocketChannel' : 'HtmlWebSocketChannel'));
});
} on Exception catch (e) {
_connecting = false;
Expand Down Expand Up @@ -341,13 +335,11 @@ class Client {
_channel.sink.add(jsonEncode(connectMessage));
}

// ignore: always_specify_types
void _subscribeLiveQuery(Subscription subscription) {
if (subscription._enabled) {
return;
}
subscription._enabled = true;
// ignore: always_specify_types
final QueryBuilder query = subscription.query;
final List<String> keysToReturn = query.limiters['keys']?.split(',');
query.limiters.clear(); //Remove limits in LiveQuery
Expand Down Expand Up @@ -386,11 +378,11 @@ class Client {
}

final Map<String, dynamic> actionData = jsonDecode(message);
// ignore: always_specify_types

Subscription subscription;
if (actionData.containsKey('op') && actionData['op'] == 'connected') {
print('ReSubScription:$_requestSubScription');
// ignore: always_specify_types

_requestSubScription.values.toList().forEach((Subscription subcription) {
_subscribeLiveQuery(subcription);
});
Expand Down Expand Up @@ -436,21 +428,17 @@ class LiveQuery {
_debug = isDebugEnabled(objectLevelDebug: debug);
_sendSessionId =
autoSendSessionId ?? ParseCoreData().autoSendSessionId ?? true;
this.client = Client._getInstance(
this.client = LiveQueryClient._getInstance(
client: _client, debug: _debug, autoSendSessionId: _sendSessionId);
}

ParseHTTPClient _client;
bool _debug;
bool _sendSessionId;

// ignore: always_specify_types
Subscription _latestSubscription;
Client client;
LiveQueryClient client;

// ignore: always_specify_types
@deprecated
// ignore: always_specify_types
Future<dynamic> subscribe(QueryBuilder query) async {
_latestSubscription = await client.subscribe(query);
return _latestSubscription;
Expand Down
Loading