diff --git a/lib/parse_server_sdk.dart b/lib/parse_server_sdk.dart index 45b4b42dd..681143501 100644 --- a/lib/parse_server_sdk.dart +++ b/lib/parse_server_sdk.dart @@ -7,20 +7,23 @@ import 'dart:math'; import 'dart:typed_data'; import 'dart:ui' as ui; +import 'package:connectivity/connectivity.dart'; +import 'package:flutter/widgets.dart'; import 'package:http/http.dart'; import 'package:http/io_client.dart'; import 'package:meta/meta.dart'; import 'package:package_info/package_info.dart'; +import 'package:parse_server_sdk/src/network/parse_websocket.dart' + as parse_web_socket; import 'package:path/path.dart' as path; import 'package:path_provider/path_provider.dart'; import 'package:sembast/sembast.dart'; import 'package:sembast/sembast_io.dart'; import 'package:shared_preferences/shared_preferences.dart'; import 'package:uuid/uuid.dart'; +import 'package:web_socket_channel/web_socket_channel.dart'; import 'package:xxtea/xxtea.dart'; -export 'src/network/parse_live_query.dart' - if (dart.library.js) 'src/network/parse_live_query_web.dart'; export 'src/utils/parse_live_list.dart'; part 'package:parse_server_sdk/src/data/core_store.dart'; @@ -37,6 +40,7 @@ part 'src/base/parse_constants.dart'; part 'src/data/parse_core_data.dart'; part 'src/enums/parse_enum_api_rq.dart'; part 'src/network/parse_http_client.dart'; +part 'src/network/parse_live_query.dart'; part 'src/network/parse_query.dart'; part 'src/objects/parse_acl.dart'; part 'src/objects/parse_base.dart'; diff --git a/lib/src/network/parse_live_query.dart b/lib/src/network/parse_live_query.dart index ca90110c0..05e6be9a4 100644 --- a/lib/src/network/parse_live_query.dart +++ b/lib/src/network/parse_live_query.dart @@ -1,13 +1,4 @@ -import 'dart:async'; -import 'dart:convert'; -import 'dart:io'; - -import 'package:connectivity/connectivity.dart'; -import 'package:flutter/widgets.dart'; -import 'package:web_socket_channel/io.dart'; -import 'package:web_socket_channel/web_socket_channel.dart'; - -import '../../parse_server_sdk.dart'; +part of flutter_parse_sdk; enum LiveQueryEvent { create, enter, update, leave, delete, error } @@ -31,7 +22,6 @@ class Subscription { 'error' ]; Map eventCallbacks = {}; - void on(LiveQueryEvent op, Function callback) { eventCallbacks[_liveQueryEvent[op.index]] = callback; } @@ -46,9 +36,13 @@ enum LiveQueryClientEvent { CONNECTED, DISCONNECTED, USER_DISCONNECTED } class LiveQueryReconnectingController with WidgetsBindingObserver { LiveQueryReconnectingController( this._reconnect, this._eventStream, this.debug) { - Connectivity().checkConnectivity().then(_connectivityChanged); - Connectivity().onConnectivityChanged.listen(_connectivityChanged); - + //Connectivity works differently on web + if (!parseIsWeb) { + Connectivity().checkConnectivity().then(_connectivityChanged); + Connectivity().onConnectivityChanged.listen(_connectivityChanged); + } else { + _connectivityChanged(ConnectivityResult.wifi); + } _eventStream.listen((LiveQueryClientEvent event) { switch (event) { case LiveQueryClientEvent.CONNECTED: @@ -132,10 +126,9 @@ class LiveQueryReconnectingController with WidgetsBindingObserver { } } -class Client { - factory Client() => _getInstance(); - - Client._internal( +class LiveQueryClient { + factory LiveQueryClient() => _getInstance(); + LiveQueryClient._internal( {bool debug, ParseHTTPClient client, bool autoSendSessionId}) { _clientEventStreamController = StreamController(); _clientEventStream = @@ -160,13 +153,11 @@ class Client { reconnectingController = LiveQueryReconnectingController( () => reconnect(userInitialized: false), getClientEventStream, _debug); } - - static Client get instance => _getInstance(); - static Client _instance; - - static Client _getInstance( + static LiveQueryClient get instance => _getInstance(); + static LiveQueryClient _instance; + static LiveQueryClient _getInstance( {bool debug, ParseHTTPClient client, bool autoSendSessionId}) { - _instance ??= Client._internal( + _instance ??= LiveQueryClient._internal( debug: debug, client: client, autoSendSessionId: autoSendSessionId); return _instance; } @@ -175,7 +166,7 @@ class Client { return _clientEventStream; } - WebSocket _webSocket; + parse_web_socket.WebSocket _webSocket; ParseHTTPClient _client; bool _debug; bool _sendSessionId; @@ -186,7 +177,6 @@ class Client { Stream _clientEventStream; LiveQueryReconnectingController reconnectingController; - // ignore: always_specify_types final Map _requestSubScription = {}; Future reconnect({bool userInitialized = false}) async { @@ -198,11 +188,12 @@ class Client { if (_webSocket != null) { return _webSocket.readyState; } - return WebSocket.connecting; + return parse_web_socket.WebSocket.CONNECTING; } Future disconnect({bool userInitialized = false}) async { - if (_webSocket != null && _webSocket.readyState == WebSocket.open) { + if (_webSocket != null && + _webSocket.readyState == parse_web_socket.WebSocket.OPEN) { if (_debug) { print('$_printConstLiveQuery: Socket closed'); } @@ -216,7 +207,6 @@ class Client { await _channel.sink.close(); _channel = null; } - // ignore: always_specify_types _requestSubScription.values.toList().forEach((Subscription subscription) { subscription._enabled = false; }); @@ -274,9 +264,10 @@ class Client { _connecting = true; try { - _webSocket = await WebSocket.connect(_liveQueryURL); + _webSocket = await parse_web_socket.WebSocket.connect(_liveQueryURL); _connecting = false; - if (_webSocket != null && _webSocket.readyState == WebSocket.open) { + if (_webSocket != null && + _webSocket.readyState == parse_web_socket.WebSocket.OPEN) { if (_debug) { print('$_printConstLiveQuery: Socket opened'); } @@ -286,7 +277,7 @@ class Client { } return Future.value(null); } - _channel = IOWebSocketChannel(_webSocket); + _channel = _webSocket.createWebSocketChannel(); _channel.stream.listen((dynamic message) { _handleMessage(message); }, onDone: () { @@ -302,8 +293,11 @@ class Client { print( '$_printConstLiveQuery: Error: ${error.runtimeType.toString()}'); } - return Future.value(handleException(Exception(error), - ParseApiRQ.liveQuery, _debug, 'IOWebSocketChannel')); + return Future.value(handleException( + Exception(error), + ParseApiRQ.liveQuery, + _debug, + !parseIsWeb ? 'IOWebSocketChannel' : 'HtmlWebSocketChannel')); }); } on Exception catch (e) { _connecting = false; @@ -341,13 +335,11 @@ class Client { _channel.sink.add(jsonEncode(connectMessage)); } - // ignore: always_specify_types void _subscribeLiveQuery(Subscription subscription) { if (subscription._enabled) { return; } subscription._enabled = true; - // ignore: always_specify_types final QueryBuilder query = subscription.query; final List keysToReturn = query.limiters['keys']?.split(','); query.limiters.clear(); //Remove limits in LiveQuery @@ -386,11 +378,11 @@ class Client { } final Map actionData = jsonDecode(message); - // ignore: always_specify_types + Subscription subscription; if (actionData.containsKey('op') && actionData['op'] == 'connected') { print('ReSubScription:$_requestSubScription'); - // ignore: always_specify_types + _requestSubScription.values.toList().forEach((Subscription subcription) { _subscribeLiveQuery(subcription); }); @@ -436,21 +428,17 @@ class LiveQuery { _debug = isDebugEnabled(objectLevelDebug: debug); _sendSessionId = autoSendSessionId ?? ParseCoreData().autoSendSessionId ?? true; - this.client = Client._getInstance( + this.client = LiveQueryClient._getInstance( client: _client, debug: _debug, autoSendSessionId: _sendSessionId); } ParseHTTPClient _client; bool _debug; bool _sendSessionId; - - // ignore: always_specify_types Subscription _latestSubscription; - Client client; + LiveQueryClient client; - // ignore: always_specify_types @deprecated - // ignore: always_specify_types Future subscribe(QueryBuilder query) async { _latestSubscription = await client.subscribe(query); return _latestSubscription; diff --git a/lib/src/network/parse_live_query_web.dart b/lib/src/network/parse_live_query_web.dart deleted file mode 100644 index 6d9012785..000000000 --- a/lib/src/network/parse_live_query_web.dart +++ /dev/null @@ -1,462 +0,0 @@ -import 'dart:async'; -import 'dart:convert'; -import 'dart:html' as html; - -import 'package:connectivity/connectivity.dart'; -import 'package:flutter/widgets.dart'; - -import '../../parse_server_sdk.dart'; - -enum LiveQueryEvent { create, enter, update, leave, delete, error } - -const String _printConstLiveQuery = 'LiveQuery: '; - -// // ignore_for_file: always_specify_types -class Subscription { - Subscription(this.query, this.requestId, {T copyObject}) { - _copyObject = copyObject; - } - QueryBuilder query; - T _copyObject; - int requestId; - bool _enabled = false; - final List _liveQueryEvent = [ - 'create', - 'enter', - 'update', - 'leave', - 'delete', - 'error' - ]; - Map eventCallbacks = {}; - void on(LiveQueryEvent op, Function callback) { - eventCallbacks[_liveQueryEvent[op.index]] = callback; - } - - T get copyObject { - return _copyObject; - } -} - -enum LiveQueryClientEvent { CONNECTED, DISCONNECTED, USER_DISCONNECTED } - -class LiveQueryReconnectingController with WidgetsBindingObserver { - LiveQueryReconnectingController( - this._reconnect, this._eventStream, this.debug) { - _connectivityChanged(ConnectivityResult.wifi); - _eventStream.listen((LiveQueryClientEvent event) { - switch (event) { - case LiveQueryClientEvent.CONNECTED: - _isConnected = true; - _retryState = 0; - _userDisconnected = false; - break; - case LiveQueryClientEvent.DISCONNECTED: - _isConnected = false; - _setReconnect(); - break; - case LiveQueryClientEvent.USER_DISCONNECTED: - _userDisconnected = true; - if (_currentTimer != null) { - _currentTimer.cancel(); - _currentTimer = null; - } - break; - } - - if (debug) { - print('$DEBUG_TAG: $event'); - } - }); - WidgetsBinding.instance.addObserver(this); - } - - static List get retryInterval => ParseCoreData().liveListRetryIntervals; - static const String DEBUG_TAG = 'LiveQueryReconnectingController'; - - final Function _reconnect; - final Stream _eventStream; - final bool debug; - - int _retryState = 0; - bool _isOnline = false; - bool _isConnected = false; - bool _userDisconnected = false; - - Timer _currentTimer; - - void _connectivityChanged(ConnectivityResult state) { - if (!_isOnline && state != ConnectivityResult.none) { - _retryState = 0; - } - _isOnline = state != ConnectivityResult.none; - if (debug) { - print('$DEBUG_TAG: $state'); - } - _setReconnect(); - } - - @override - void didChangeAppLifecycleState(AppLifecycleState state) { - switch (state) { - case AppLifecycleState.resumed: - _setReconnect(); - break; - default: - break; - } - } - - void _setReconnect() { - if (_isOnline && - !_isConnected && - _currentTimer == null && - !_userDisconnected && - retryInterval[_retryState] >= 0) { - _currentTimer = - Timer(Duration(milliseconds: retryInterval[_retryState]), () { - _currentTimer = null; - _reconnect(); - }); - if (debug) - print('$DEBUG_TAG: Retrytimer set to ${retryInterval[_retryState]}ms'); - if (_retryState < retryInterval.length - 1) { - _retryState++; - } - } - } -} - -class Client { - factory Client() => _getInstance(); - Client._internal( - {bool debug, ParseHTTPClient client, bool autoSendSessionId}) { - _clientEventStreamController = StreamController(); - _clientEventStream = - _clientEventStreamController.stream.asBroadcastStream(); - - _client = client ?? - ParseHTTPClient( - sendSessionId: - autoSendSessionId ?? ParseCoreData().autoSendSessionId, - securityContext: ParseCoreData().securityContext); - - _debug = isDebugEnabled(objectLevelDebug: debug); - _sendSessionId = - autoSendSessionId ?? ParseCoreData().autoSendSessionId ?? true; - _liveQueryURL = _client.data.liveQueryURL; - if (_liveQueryURL.contains('https')) { - _liveQueryURL = _liveQueryURL.replaceAll('https', 'wss'); - } else if (_liveQueryURL.contains('http')) { - _liveQueryURL = _liveQueryURL.replaceAll('http', 'ws'); - } - - reconnectingController = LiveQueryReconnectingController( - () => reconnect(userInitialized: false), getClientEventStream, _debug); - } - static Client get instance => _getInstance(); - static Client _instance; - static Client _getInstance( - {bool debug, ParseHTTPClient client, bool autoSendSessionId}) { - _instance ??= Client._internal( - debug: debug, client: client, autoSendSessionId: autoSendSessionId); - return _instance; - } - - Stream get getClientEventStream { - return _clientEventStream; - } - - html.WebSocket _webSocket; - ParseHTTPClient _client; - bool _debug; - bool _sendSessionId; - Stream _stream; - String _liveQueryURL; - bool _connecting = false; - StreamController _clientEventStreamController; - Stream _clientEventStream; - LiveQueryReconnectingController reconnectingController; - - final Map _requestSubScription = {}; - - Future reconnect({bool userInitialized = false}) async { - await _connect(userInitialized: userInitialized); - _connectLiveQuery(); - } - - int readyState() { - if (_webSocket != null) { - return _webSocket.readyState; - } - return html.WebSocket.CONNECTING; - } - - Future disconnect({bool userInitialized = false}) async { - if (_webSocket != null && _webSocket.readyState == html.WebSocket.OPEN) { - if (_debug) { - print('$_printConstLiveQuery: Socket closed'); - } - _webSocket.close(); - _webSocket = null; - } - if (_webSocket != null) { - if (_debug) { - print('$_printConstLiveQuery: close'); - } - _webSocket.close(); - _webSocket = null; - _stream = null; - } - - _requestSubScription.values.toList().forEach((Subscription subscription) { - subscription._enabled = false; - }); - _connecting = false; - if (userInitialized) - _clientEventStreamController.sink - .add(LiveQueryClientEvent.USER_DISCONNECTED); - } - - Future> subscribe( - QueryBuilder query, - {T copyObject}) async { - if (_webSocket == null) { - await _clientEventStream.any((LiveQueryClientEvent event) => - event == LiveQueryClientEvent.CONNECTED); - } - final int requestId = _requestIdGenerator(); - final Subscription subscription = - Subscription(query, requestId, copyObject: copyObject); - _requestSubScription[requestId] = subscription; - //After a client connects to the LiveQuery server, - //it can send a subscribe message to subscribe a ParseQuery. - _subscribeLiveQuery(subscription); - return subscription; - } - - void unSubscribe(Subscription subscription) { - //Mount message for Unsubscribe - final Map unsubscribeMessage = { - 'op': 'unsubscribe', - 'requestId': subscription.requestId, - }; - if (_webSocket != null) { - if (_debug) { - print('$_printConstLiveQuery: UnsubscribeMessage: $unsubscribeMessage'); - } - _webSocket.send(jsonEncode(unsubscribeMessage)); -// _channel.sink.add(jsonEncode(unsubscribeMessage)); - subscription._enabled = false; - _requestSubScription.remove(subscription.requestId); - } - } - - static int _requestIdCount = 1; - - int _requestIdGenerator() { - return _requestIdCount++; - } - - Future _connect({bool userInitialized = false}) async { - if (_connecting) { - print('already connecting'); - return Future.value(null); - } - await disconnect(userInitialized: userInitialized); - _connecting = true; - - try { - _webSocket = html.WebSocket(_liveQueryURL); - await _webSocket.onOpen.first; - - _connecting = false; - if (_webSocket != null && _webSocket.readyState == html.WebSocket.OPEN) { - if (_debug) { - print('$_printConstLiveQuery: Socket opened'); - } - } else { - if (_debug) { - print('$_printConstLiveQuery: Error when connection client'); - } - return Future.value(null); - } - _stream = _webSocket.onMessage; - - _stream.listen((html.MessageEvent event) { - final dynamic message = event.data; - _handleMessage(message); - }, onDone: () { - _clientEventStreamController.sink - .add(LiveQueryClientEvent.DISCONNECTED); - if (_debug) { - print('$_printConstLiveQuery: Done'); - } - }, onError: (Object error) { - _clientEventStreamController.sink - .add(LiveQueryClientEvent.DISCONNECTED); - if (_debug) { - print( - '$_printConstLiveQuery: Error: ${error.runtimeType.toString()}'); - } - return Future.value(handleException( - Exception(error), ParseApiRQ.liveQuery, _debug, 'HtmlWebSocket')); - }); - } on Exception catch (e) { - _connecting = false; - _clientEventStreamController.sink.add(LiveQueryClientEvent.DISCONNECTED); - if (_debug) { - print('$_printConstLiveQuery: Error: ${e.toString()}'); - } - return handleException(e, ParseApiRQ.liveQuery, _debug, 'LiveQuery'); - } - } - - void _connectLiveQuery() { - if (_webSocket == null) { - return; - } - //The connect message is sent from a client to the LiveQuery server. - //It should be the first message sent from a client after the WebSocket connection is established. - final Map connectMessage = { - 'op': 'connect', - 'applicationId': _client.data.applicationId - }; - - if (_sendSessionId && _client.data.sessionId != null) { - connectMessage['sessionToken'] = _client.data.sessionId; - } - - if (_client.data.clientKey != null) - connectMessage['clientKey'] = _client.data.clientKey; - if (_client.data.masterKey != null) - connectMessage['masterKey'] = _client.data.masterKey; - - if (_debug) { - print('$_printConstLiveQuery: ConnectMessage: $connectMessage'); - } - _webSocket.send(jsonEncode(connectMessage)); -// _channel.sink.add(jsonEncode(connectMessage)); - } - - void _subscribeLiveQuery(Subscription subscription) { - if (subscription._enabled) { - return; - } - subscription._enabled = true; - - final QueryBuilder query = subscription.query; - final List keysToReturn = query.limiters['keys']?.split(','); - query.limiters.clear(); //Remove limits in LiveQuery - final String _where = query.buildQuery().replaceAll('where=', ''); - - //Convert where condition to Map - Map _whereMap = Map(); - if (_where != '') { - _whereMap = json.decode(_where); - } - - final Map subscribeMessage = { - 'op': 'subscribe', - 'requestId': subscription.requestId, - 'query': { - 'className': query.object.parseClassName, - 'where': _whereMap, - if (keysToReturn != null && keysToReturn.isNotEmpty) - 'fields': keysToReturn - } - }; - if (_sendSessionId && _client.data.sessionId != null) { - subscribeMessage['sessionToken'] = _client.data.sessionId; - } - - if (_debug) { - print('$_printConstLiveQuery: SubscribeMessage: $subscribeMessage'); - } - - _webSocket.send(jsonEncode(subscribeMessage)); -// _channel.sink.add(jsonEncode(subscribeMessage)); - } - - void _handleMessage(String message) { - if (_debug) { - print('$_printConstLiveQuery: Listen: $message'); - } - - final Map actionData = jsonDecode(message); - - Subscription subscription; - if (actionData.containsKey('op') && actionData['op'] == 'connected') { - print('ReSubScription:$_requestSubScription'); - - _requestSubScription.values.toList().forEach((Subscription subcription) { - _subscribeLiveQuery(subcription); - }); - _clientEventStreamController.sink.add(LiveQueryClientEvent.CONNECTED); - return; - } - if (actionData.containsKey('requestId')) { - subscription = _requestSubScription[actionData['requestId']]; - } - if (subscription == null) { - return; - } - if (subscription.eventCallbacks.containsKey(actionData['op'])) { - if (actionData.containsKey('object')) { - final Map map = actionData['object']; - final String className = map['className']; - if (className == keyClassUser) { - subscription.eventCallbacks[actionData['op']]( - (subscription.copyObject ?? - ParseCoreData.instance.createParseUser(null, null, null)) - .fromJson(map)); - } else { - subscription.eventCallbacks[actionData['op']]( - (subscription.copyObject ?? - ParseCoreData.instance.createObject(className)) - .fromJson(map)); - } - } else { - subscription.eventCallbacks[actionData['op']](actionData); - } - } - } -} - -class LiveQuery { - LiveQuery({bool debug, ParseHTTPClient client, bool autoSendSessionId}) { - _client = client ?? - ParseHTTPClient( - sendSessionId: - autoSendSessionId ?? ParseCoreData().autoSendSessionId, - securityContext: ParseCoreData().securityContext); - - _debug = isDebugEnabled(objectLevelDebug: debug); - _sendSessionId = - autoSendSessionId ?? ParseCoreData().autoSendSessionId ?? true; - this.client = Client._getInstance( - client: _client, debug: _debug, autoSendSessionId: _sendSessionId); - } - - ParseHTTPClient _client; - bool _debug; - bool _sendSessionId; - Subscription _latestSubscription; - Client client; - - @deprecated - Future subscribe(QueryBuilder query) async { - _latestSubscription = await client.subscribe(query); - return _latestSubscription; - } - - @deprecated - Future unSubscribe() async { - client.unSubscribe(_latestSubscription); - } - - @deprecated - void on(LiveQueryEvent op, Function callback) { - _latestSubscription.on(op, callback); - } -} diff --git a/lib/src/network/parse_websocket.dart b/lib/src/network/parse_websocket.dart new file mode 100644 index 000000000..4ec07c900 --- /dev/null +++ b/lib/src/network/parse_websocket.dart @@ -0,0 +1,2 @@ +export 'parse_websocket_io.dart' + if (dart.library.js) 'parse_websocket_html.dart'; diff --git a/lib/src/network/parse_websocket_html.dart b/lib/src/network/parse_websocket_html.dart new file mode 100644 index 000000000..aeab1d27c --- /dev/null +++ b/lib/src/network/parse_websocket_html.dart @@ -0,0 +1,33 @@ +/// If you change this file, you should apply the same changes to the 'parse_websocket_io.dart' file + +import 'dart:html' as html; + +import 'package:web_socket_channel/html.dart'; +import 'package:web_socket_channel/web_socket_channel.dart'; + +class WebSocket { + WebSocket._(this._webSocket); + + static const int CONNECTING = 0; + static const int OPEN = 1; + static const int CLOSING = 2; + static const int CLOSED = 3; + + final html.WebSocket _webSocket; + + static Future connect(String liveQueryURL) async { + final html.WebSocket webSocket = html.WebSocket(liveQueryURL); + await webSocket.onOpen.first; + return WebSocket._(webSocket); + } + + int get readyState => _webSocket.readyState; + + Future close() async { + return _webSocket.close(); + } + + WebSocketChannel createWebSocketChannel() { + return HtmlWebSocketChannel(_webSocket); + } +} diff --git a/lib/src/network/parse_websocket_io.dart b/lib/src/network/parse_websocket_io.dart new file mode 100644 index 000000000..37206dd49 --- /dev/null +++ b/lib/src/network/parse_websocket_io.dart @@ -0,0 +1,31 @@ +/// If you change this file, you should apply the same changes to the 'parse_websocket_html.dart' file + +import 'dart:io' as io; + +import 'package:web_socket_channel/io.dart'; +import 'package:web_socket_channel/web_socket_channel.dart'; + +class WebSocket { + WebSocket._(this._webSocket); + + static const int CONNECTING = 0; + static const int OPEN = 1; + static const int CLOSING = 2; + static const int CLOSED = 3; + + final io.WebSocket _webSocket; + + static Future connect(String liveQueryURL) async { + return WebSocket._(await io.WebSocket.connect(liveQueryURL)); + } + + int get readyState => _webSocket.readyState; + + Future close() { + return _webSocket.close(); + } + + WebSocketChannel createWebSocketChannel() { + return IOWebSocketChannel(_webSocket); + } +}