Skip to content

Commit 4639d9a

Browse files
committed
Reload schema on changin of tarantool schema_id value.
* Reloading of schema on Tarantool schema error (closes gh-63) * Rewrite request using collections.Sequence * Delete RETRIES on completion status == 1 (no this completion status, anymore, also closes gh-45) * flush schema now, also, loads schema from server. * Modify/Simplify schema internals * Added absent constants * Updated errorcodes (closes gh-69) * Schema is now flushed, upon successfull authentication (closes gh-68) Proofed to work on py27, py35 and pypy
1 parent 49658d9 commit 4639d9a

File tree

9 files changed

+404
-227
lines changed

9 files changed

+404
-227
lines changed

tarantool/__init__.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
DatabaseError,
1717
NetworkError,
1818
NetworkWarning,
19-
RetryWarning
2019
)
2120

2221
from tarantool.schema import (
@@ -48,4 +47,4 @@ def connect(host="localhost", port=33013, user=None, password=None, encoding=ENC
4847

4948

5049
__all__ = ['connect', 'Connection', 'Schema', 'Error', 'DatabaseError',
51-
'NetworkError', 'NetworkWarning', 'RetryWarning', 'SchemaError']
50+
'NetworkError', 'NetworkWarning', 'SchemaError']

tarantool/connection.py

+28-19
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
SOCKET_TIMEOUT,
4242
RECONNECT_MAX_ATTEMPTS,
4343
RECONNECT_DELAY,
44-
RETRY_MAX_ATTEMPTS,
4544
REQUEST_TYPE_OK,
4645
REQUEST_TYPE_ERROR,
4746
IPROTO_GREETING_SIZE,
@@ -52,9 +51,8 @@
5251
from tarantool.error import (
5352
NetworkError,
5453
DatabaseError,
55-
warn,
56-
RetryWarning,
57-
NetworkWarning)
54+
NetworkWarning,
55+
SchemaReloadException)
5856

5957
from .schema import Schema
6058
from .utils import check_key, greeting_decode, version_id
@@ -108,6 +106,7 @@ def __init__(self, host, port,
108106
self.reconnect_delay = reconnect_delay
109107
self.reconnect_max_attempts = reconnect_max_attempts
110108
self.schema = Schema(self)
109+
self.schema_version = 1
111110
self._socket = None
112111
self.connected = False
113112
self.error = True
@@ -166,6 +165,7 @@ def connect(self):
166165
# the connection fails because the server is simply
167166
# not bound to port
168167
self._socket.settimeout(self.socket_timeout)
168+
self.load_schema()
169169
except socket.error as e:
170170
self.connected = False
171171
raise NetworkError(e)
@@ -210,18 +210,17 @@ def _send_request_wo_reconnect(self, request):
210210
'''
211211
assert isinstance(request, Request)
212212

213-
# Repeat request in a loop if the server returns completion_status == 1
214-
# (try again)
215-
for attempt in range(RETRY_MAX_ATTEMPTS): # pylint: disable=W0612
216-
self._socket.sendall(bytes(request))
217-
response = Response(self, self._read_response())
218-
219-
if response.completion_status != 1:
220-
return response
221-
warn(response.return_message, RetryWarning)
213+
response = None
214+
while True:
215+
try:
216+
self._socket.sendall(bytes(request))
217+
response = Response(self, self._read_response())
218+
break
219+
except SchemaReloadException as e:
220+
self.update_schema(e.schema_version)
221+
continue
222222

223-
# Raise an error if the maximum number of attempts have been made
224-
raise DatabaseError(response.return_code, response.return_message)
223+
return response
225224

226225
def _opt_reconnect(self):
227226
'''
@@ -294,13 +293,20 @@ def _send_request(self, request):
294293
assert isinstance(request, Request)
295294

296295
self._opt_reconnect()
297-
response = self._send_request_wo_reconnect(
298-
request)
299296

300-
return response
297+
return self._send_request_wo_reconnect(request)
298+
299+
def load_schema(self):
300+
self.schema.fetch_space_all()
301+
self.schema.fetch_index_all()
302+
303+
def update_schema(self, schema_version):
304+
self.schema_version = schema_version
305+
self.flush_schema()
301306

302307
def flush_schema(self):
303308
self.schema.flush()
309+
self.load_schema()
304310

305311
def call(self, func_name, *args):
306312
'''
@@ -378,7 +384,10 @@ def authenticate(self, user, password):
378384

379385
request = RequestAuthenticate(self, self._salt, self.user,
380386
self.password)
381-
return self._send_request_wo_reconnect(request)
387+
auth_response = self._send_request_wo_reconnect(request)
388+
if auth_response.return_code == 0:
389+
self.flush_schema()
390+
return auth_response
382391

383392
def _join_v16(self, server_uuid):
384393
request = RequestJoin(self, server_uuid)

tarantool/const.py

+12-3
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,38 @@
33

44
import six
55

6+
#
67
IPROTO_CODE = 0x00
78
IPROTO_SYNC = 0x01
9+
# replication keys (header)
10+
IPROTO_SERVER_ID = 0x02
11+
IPROTO_LSN = 0x03
12+
IPROTO_TIMESTAMP = 0x04
13+
IPROTO_SCHEMA_ID = 0X05
14+
#
815
IPROTO_SPACE_ID = 0x10
916
IPROTO_INDEX_ID = 0x11
1017
IPROTO_LIMIT = 0x12
1118
IPROTO_OFFSET = 0x13
1219
IPROTO_ITERATOR = 0x14
20+
IPROTO_INDEX_BASE = 0x15
21+
#
1322
IPROTO_KEY = 0x20
1423
IPROTO_TUPLE = 0x21
1524
IPROTO_FUNCTION_NAME = 0x22
1625
IPROTO_USER_NAME = 0x23
26+
#
1727
IPROTO_SERVER_UUID = 0x24
1828
IPROTO_CLUSTER_UUID = 0x25
1929
IPROTO_VCLOCK = 0x26
2030
IPROTO_EXPR = 0x27
2131
IPROTO_OPS = 0x28
32+
#
2233
IPROTO_DATA = 0x30
2334
IPROTO_ERROR = 0x31
2435

2536
IPROTO_GREETING_SIZE = 128
37+
IPROTO_BODY_MAX_LEN = 2147483648
2638

2739
REQUEST_TYPE_OK = 0
2840
REQUEST_TYPE_SELECT = 1
@@ -74,9 +86,6 @@
7486
RECONNECT_MAX_ATTEMPTS = 10
7587
# Default delay between attempts to reconnect (seconds)
7688
RECONNECT_DELAY = 0.1
77-
# Number of reattempts in case of server
78-
# return completion_status == 1 (try again)
79-
RETRY_MAX_ATTEMPTS = 10
8089

8190
if six.PY2:
8291
ENCODING_DEFAULT = None

tarantool/error.py

+124-73
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ def os_strerror_patched(code):
108108
os.strerror = os_strerror_patched
109109
del os_strerror_patched
110110

111-
112111
class SchemaError(DatabaseError):
113112
def __init__(self, value):
114113
super(SchemaError, self).__init__(0, value)
@@ -117,6 +116,16 @@ def __init__(self, value):
117116
def __str__(self):
118117
return str(self.value)
119118

119+
class SchemaReloadException(DatabaseError):
120+
def __init__(self, message, schema_version):
121+
super(SchemaReloadException, self).__init__(109, message)
122+
self.code = 109
123+
self.message = message
124+
self.schema_version = schema_version
125+
126+
def __str__(self):
127+
return str(self.message)
128+
120129

121130
class NetworkError(DatabaseError):
122131

@@ -144,18 +153,8 @@ class NetworkWarning(UserWarning):
144153
pass
145154

146155

147-
class RetryWarning(UserWarning):
148-
149-
'''
150-
Warning is emited in case of server return completion_status == 1
151-
(try again)
152-
'''
153-
pass
154-
155-
156156
# always print this warnings
157157
warnings.filterwarnings("always", category=NetworkWarning)
158-
warnings.filterwarnings("always", category=RetryWarning)
159158

160159

161160
def warn(message, warning_class):
@@ -169,68 +168,120 @@ def warn(message, warning_class):
169168
warnings.warn_explicit(message, warning_class, module_name, line_no)
170169

171170
_strerror = {
172-
0: ("ER_OK", "OK"),
173-
1: ("ER_ILLEGAL_PARAMS", "Illegal parameters, %s"),
174-
2: ("ER_MEMORY_ISSUE", "Failed to allocate %u bytes in %s for %s"),
175-
3: ("ER_TUPLE_FOUND", "Duplicate key exists in unique index %u"),
176-
4: ("ER_TUPLE_NOT_FOUND", "Tuple doesn't exist in index %u"),
177-
5: ("ER_UNSUPPORTED", "%s does not support %s"),
178-
6: ("ER_NONMASTER",
179-
"Can't modify data on a replication slave. My master is: %s"),
180-
7: ("ER_SECONDARY",
181-
"Can't modify data upon a request on the secondary port."),
182-
8: ("ER_INJECTION", "Error injection '%s'"),
183-
9: ("ER_CREATE_SPACE", "Failed to create space %u: %s"),
184-
10: ("ER_SPACE_EXISTS", "Space %u already exists"),
185-
11: ("ER_DROP_SPACE", "Can't drop space %u: %s"),
186-
12: ("ER_ALTER_SPACE", "Can't modify space %u: %s"),
187-
13: ("ER_INDEX_TYPE",
188-
"Unsupported index type supplied for index %u in space %u"),
189-
14: ("ER_MODIFY_INDEX",
190-
"Can't create or modify index %u in space %u: %s"),
191-
15: ("ER_LAST_DROP",
192-
"Can't drop the primary key in a system space, space id %u"),
193-
16: ("ER_TUPLE_FORMAT_LIMIT", "Tuple format limit reached: %u"),
194-
17: ("ER_DROP_PRIMARY_KEY",
195-
"Can't drop primary key in space %u while secondary keys exist"),
196-
18: ("ER_KEY_FIELD_TYPE",
197-
("Supplied key type of part %u does not match index part type:"
198-
" expected %s")),
199-
19: ("ER_EXACT_MATCH",
200-
"Invalid key part count in an exact match (expected %u, got %u)"),
201-
20: ("ER_INVALID_MSGPACK", "Invalid MsgPack - %s"),
202-
21: ("ER_PROC_RET", "msgpack.encode: can not encode Lua type '%s'"),
203-
22: ("ER_TUPLE_NOT_ARRAY", "Tuple/Key must be MsgPack array"),
204-
23: ("ER_FIELD_TYPE",
205-
("Tuple field %u type does not match one required by operation:"
206-
" expected %s")),
207-
24: ("ER_FIELD_TYPE_MISMATCH",
208-
("Ambiguous field type in index %u, key part %u. Requested type"
209-
" is %s but the field has previously been defined as %s")),
210-
25: ("ER_SPLICE", "Field SPLICE error: %s"),
211-
26: ("ER_ARG_TYPE",
212-
("Argument type in operation on field %u does not match field type:"
213-
" expected a %s")),
214-
27: ("ER_TUPLE_IS_TOO_LONG", "Tuple is too long %u"),
215-
28: ("ER_UNKNOWN_UPDATE_OP", "Unknown UPDATE operation"),
216-
29: ("ER_UPDATE_FIELD", "Field %u UPDATE error: %s"),
217-
30: ("ER_FIBER_STACK",
218-
"Can not create a new fiber: recursion limit reached"),
219-
31: ("ER_KEY_PART_COUNT",
220-
"Invalid key part count (expected [0..%u], got %u)"),
221-
32: ("ER_PROC_LUA", "%s"),
222-
33: ("ER_NO_SUCH_PROC", "Procedure '%.*s' is not defined"),
223-
34: ("ER_NO_SUCH_TRIGGER", "Trigger is not found"),
224-
35: ("ER_NO_SUCH_INDEX", "No index #%u is defined in space %u"),
225-
36: ("ER_NO_SUCH_SPACE", "Space %u does not exist"),
226-
37: ("ER_NO_SUCH_FIELD", "Field %u was not found in the tuple"),
227-
38: ("ER_SPACE_ARITY",
228-
"Tuple field count %u does not match space %u arity %u"),
229-
39: ("ER_INDEX_ARITY",
230-
("Tuple field count %u is less than required by a defined index"
231-
" (expected %u)")),
232-
40: ("ER_WAL_IO", "Failed to write to disk"),
233-
41: ("ER_MORE_THAN_ONE_TUPLE", "More than one tuple found"),
171+
0: ("ER_UNKNOWN", "Unknown error"),
172+
1: ("ER_ILLEGAL_PARAMS", "Illegal parameters, %s"),
173+
2: ("ER_MEMORY_ISSUE", "Failed to allocate %u bytes in %s for %s"),
174+
3: ("ER_TUPLE_FOUND", "Duplicate key exists in unique index '%s' in space '%s'"),
175+
4: ("ER_TUPLE_NOT_FOUND", "Tuple doesn't exist in index '%s' in space '%s'"),
176+
5: ("ER_UNSUPPORTED", "%s does not support %s"),
177+
6: ("ER_NONMASTER", "Can't modify data on a replication slave. My master is: %s"),
178+
7: ("ER_READONLY", "Can't modify data because this server is in read-only mode."),
179+
8: ("ER_INJECTION", "Error injection '%s'"),
180+
9: ("ER_CREATE_SPACE", "Failed to create space '%s': %s"),
181+
10: ("ER_SPACE_EXISTS", "Space '%s' already exists"),
182+
11: ("ER_DROP_SPACE", "Can't drop space '%s': %s"),
183+
12: ("ER_ALTER_SPACE", "Can't modify space '%s': %s"),
184+
13: ("ER_INDEX_TYPE", "Unsupported index type supplied for index '%s' in space '%s'"),
185+
14: ("ER_MODIFY_INDEX", "Can't create or modify index '%s' in space '%s': %s"),
186+
15: ("ER_LAST_DROP", "Can't drop the primary key in a system space, space '%s'"),
187+
16: ("ER_TUPLE_FORMAT_LIMIT", "Tuple format limit reached: %u"),
188+
17: ("ER_DROP_PRIMARY_KEY", "Can't drop primary key in space '%s' while secondary keys exist"),
189+
18: ("ER_KEY_PART_TYPE", "Supplied key type of part %u does not match index part type: expected %s"),
190+
19: ("ER_EXACT_MATCH", "Invalid key part count in an exact match (expected %u, got %u)"),
191+
20: ("ER_INVALID_MSGPACK", "Invalid MsgPack - %s"),
192+
21: ("ER_PROC_RET", "msgpack.encode: can not encode Lua type '%s'"),
193+
22: ("ER_TUPLE_NOT_ARRAY", "Tuple/Key must be MsgPack array"),
194+
23: ("ER_FIELD_TYPE", "Tuple field %u type does not match one required by operation: expected %s"),
195+
24: ("ER_FIELD_TYPE_MISMATCH", "Ambiguous field type in index '%s', key part %u. Requested type is %s but the field has previously been defined as %s"),
196+
25: ("ER_SPLICE", "SPLICE error on field %u: %s"),
197+
26: ("ER_ARG_TYPE", "Argument type in operation '%c' on field %u does not match field type: expected a %s"),
198+
27: ("ER_TUPLE_IS_TOO_LONG", "Tuple is too long %u"),
199+
28: ("ER_UNKNOWN_UPDATE_OP", "Unknown UPDATE operation"),
200+
29: ("ER_UPDATE_FIELD", "Field %u UPDATE error: %s"),
201+
30: ("ER_FIBER_STACK", "Can not create a new fiber: recursion limit reached"),
202+
31: ("ER_KEY_PART_COUNT", "Invalid key part count (expected [0..%u], got %u)"),
203+
32: ("ER_PROC_LUA", "%s"),
204+
33: ("ER_NO_SUCH_PROC", "Procedure '%.*s' is not defined"),
205+
34: ("ER_NO_SUCH_TRIGGER", "Trigger is not found"),
206+
35: ("ER_NO_SUCH_INDEX", "No index #%u is defined in space '%s'"),
207+
36: ("ER_NO_SUCH_SPACE", "Space '%s' does not exist"),
208+
37: ("ER_NO_SUCH_FIELD", "Field %d was not found in the tuple"),
209+
38: ("ER_SPACE_FIELD_COUNT", "Tuple field count %u does not match space '%s' field count %u"),
210+
39: ("ER_INDEX_FIELD_COUNT", "Tuple field count %u is less than required by a defined index (expected %u)"),
211+
40: ("ER_WAL_IO", "Failed to write to disk"),
212+
41: ("ER_MORE_THAN_ONE_TUPLE", "More than one tuple found by get()"),
213+
42: ("ER_ACCESS_DENIED", "%s access on %s is denied for user '%s'"),
214+
43: ("ER_CREATE_USER", "Failed to create user '%s': %s"),
215+
44: ("ER_DROP_USER", "Failed to drop user or role '%s': %s"),
216+
45: ("ER_NO_SUCH_USER", "User '%s' is not found"),
217+
46: ("ER_USER_EXISTS", "User '%s' already exists"),
218+
47: ("ER_PASSWORD_MISMATCH", "Incorrect password supplied for user '%s'"),
219+
48: ("ER_UNKNOWN_REQUEST_TYPE", "Unknown request type %u"),
220+
49: ("ER_UNKNOWN_SCHEMA_OBJECT", "Unknown object type '%s'"),
221+
50: ("ER_CREATE_FUNCTION", "Failed to create function '%s': %s"),
222+
51: ("ER_NO_SUCH_FUNCTION", "Function '%s' does not exist"),
223+
52: ("ER_FUNCTION_EXISTS", "Function '%s' already exists"),
224+
53: ("ER_FUNCTION_ACCESS_DENIED", "%s access is denied for user '%s' to function '%s'"),
225+
54: ("ER_FUNCTION_MAX", "A limit on the total number of functions has been reached: %u"),
226+
55: ("ER_SPACE_ACCESS_DENIED", "%s access is denied for user '%s' to space '%s'"),
227+
56: ("ER_USER_MAX", "A limit on the total number of users has been reached: %u"),
228+
57: ("ER_NO_SUCH_ENGINE", "Space engine '%s' does not exist"),
229+
58: ("ER_RELOAD_CFG", "Can't set option '%s' dynamically"),
230+
59: ("ER_CFG", "Incorrect value for option '%s': %s"),
231+
60: ("ER_SOPHIA", "%s"),
232+
61: ("ER_LOCAL_SERVER_IS_NOT_ACTIVE", "Local server is not active"),
233+
62: ("ER_UNKNOWN_SERVER", "Server %s is not registered with the cluster"),
234+
63: ("ER_CLUSTER_ID_MISMATCH", "Cluster id of the replica %s doesn't match cluster id of the master %s"),
235+
64: ("ER_INVALID_UUID", "Invalid UUID: %s"),
236+
65: ("ER_CLUSTER_ID_IS_RO", "Can't reset cluster id: it is already assigned"),
237+
66: ("ER_RESERVED66", "Reserved66"),
238+
67: ("ER_SERVER_ID_IS_RESERVED", "Can't initialize server id with a reserved value %u"),
239+
68: ("ER_INVALID_ORDER", "Invalid LSN order for server %u: previous LSN = %llu, new lsn = %llu"),
240+
69: ("ER_MISSING_REQUEST_FIELD", "Missing mandatory field '%s' in request"),
241+
70: ("ER_IDENTIFIER", "Invalid identifier '%s' (expected letters, digits or an underscore)"),
242+
71: ("ER_DROP_FUNCTION", "Can't drop function %u: %s"),
243+
72: ("ER_ITERATOR_TYPE", "Unknown iterator type '%s'"),
244+
73: ("ER_REPLICA_MAX", "Replica count limit reached: %u"),
245+
74: ("ER_INVALID_XLOG", "Failed to read xlog: %lld"),
246+
75: ("ER_INVALID_XLOG_NAME", "Invalid xlog name: expected %lld got %lld"),
247+
76: ("ER_INVALID_XLOG_ORDER", "Invalid xlog order: %lld and %lld"),
248+
77: ("ER_NO_CONNECTION", "Connection is not established"),
249+
78: ("ER_TIMEOUT", "Timeout exceeded"),
250+
79: ("ER_ACTIVE_TRANSACTION", "Operation is not permitted when there is an active transaction "),
251+
80: ("ER_NO_ACTIVE_TRANSACTION", "Operation is not permitted when there is no active transaction "),
252+
81: ("ER_CROSS_ENGINE_TRANSACTION", "A multi-statement transaction can not use multiple storage engines"),
253+
82: ("ER_NO_SUCH_ROLE", "Role '%s' is not found"),
254+
83: ("ER_ROLE_EXISTS", "Role '%s' already exists"),
255+
84: ("ER_CREATE_ROLE", "Failed to create role '%s': %s"),
256+
85: ("ER_INDEX_EXISTS", "Index '%s' already exists"),
257+
86: ("ER_TUPLE_REF_OVERFLOW", "Tuple reference counter overflow"),
258+
87: ("ER_ROLE_LOOP", "Granting role '%s' to role '%s' would create a loop"),
259+
88: ("ER_GRANT", "Incorrect grant arguments: %s"),
260+
89: ("ER_PRIV_GRANTED", "User '%s' already has %s access on %s '%s'"),
261+
90: ("ER_ROLE_GRANTED", "User '%s' already has role '%s'"),
262+
91: ("ER_PRIV_NOT_GRANTED", "User '%s' does not have %s access on %s '%s'"),
263+
92: ("ER_ROLE_NOT_GRANTED", "User '%s' does not have role '%s'"),
264+
93: ("ER_MISSING_SNAPSHOT", "Can't find snapshot"),
265+
94: ("ER_CANT_UPDATE_PRIMARY_KEY", "Attempt to modify a tuple field which is part of index '%s' in space '%s'"),
266+
95: ("ER_UPDATE_INTEGER_OVERFLOW", "Integer overflow when performing '%c' operation on field %u"),
267+
96: ("ER_GUEST_USER_PASSWORD", "Setting password for guest user has no effect"),
268+
97: ("ER_TRANSACTION_CONFLICT", "Transaction has been aborted by conflict"),
269+
98: ("ER_UNSUPPORTED_ROLE_PRIV", "Unsupported role privilege '%s'"),
270+
99: ("ER_LOAD_FUNCTION", "Failed to dynamically load function '%s': %s"),
271+
100: ("ER_FUNCTION_LANGUAGE", "Unsupported language '%s' specified for function '%s'"),
272+
101: ("ER_RTREE_RECT", "RTree: %s must be an array with %u (point) or %u (rectangle/box) numeric coordinates"),
273+
102: ("ER_PROC_C", "%s"),
274+
103: ("ER_UNKNOWN_RTREE_INDEX_DISTANCE_TYPE", "Unknown RTREE index distance type %s"),
275+
104: ("ER_PROTOCOL", "%s"),
276+
105: ("ER_UPSERT_UNIQUE_SECONDARY_KEY", "Space %s has a unique secondary index and does not support UPSERT"),
277+
106: ("ER_WRONG_INDEX_RECORD", "Wrong record in _index space: got {%s}, expected {%s}"),
278+
107: ("ER_WRONG_INDEX_PARTS", "Wrong index parts (field %u): %s; expected field1 id (number), field1 type (string), ..."),
279+
108: ("ER_WRONG_INDEX_OPTIONS", "Wrong index options (field %u): %s"),
280+
109: ("ER_WRONG_SCHEMA_VERSION", "Wrong schema version, current: %d, in request: %u"),
281+
110: ("ER_SLAB_ALLOC_MAX", "Failed to allocate %u bytes for tuple in the slab allocator: tuple is too large. Check 'slab_alloc_maximal' configuration option."),
282+
111: ("ER_WRONG_SPACE_OPTIONS", "Wrong space options (field %u): %s"),
283+
112: ("ER_UNSUPPORTED_INDEX_FEATURE", "Index '%s' (%s) of space '%s' (%s) does not support %s"),
284+
113: ("ER_VIEW_IS_RO", "View '%s' is read-only"),
234285
}
235286

236287

0 commit comments

Comments
 (0)