Skip to content

Commit 4bc2ca7

Browse files
authored
Replication APIjk (#69)
* Adding replication API * Test fixes
1 parent f1de45b commit 4bc2ca7

File tree

5 files changed

+353
-1
lines changed

5 files changed

+353
-1
lines changed

arangoasync/database.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@
101101
)
102102
from arangoasync.foxx import Foxx
103103
from arangoasync.graph import Graph
104+
from arangoasync.replication import Replication
104105
from arangoasync.request import Method, Request
105106
from arangoasync.response import Response
106107
from arangoasync.result import Result
@@ -234,6 +235,15 @@ def foxx(self) -> Foxx:
234235
"""
235236
return Foxx(self._executor)
236237

238+
@property
239+
def replication(self) -> Replication:
240+
"""Return Replication API wrapper.
241+
242+
Returns:
243+
Replication API wrapper.
244+
"""
245+
return Replication(self._executor)
246+
237247
async def properties(self) -> Result[DatabaseProperties]:
238248
"""Return database properties.
239249

arangoasync/exceptions.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,34 @@ class PermissionUpdateError(ArangoServerError):
551551
"""Failed to update user permission."""
552552

553553

554+
class ReplicationApplierConfigError(ArangoServerError):
555+
"""Failed to retrieve replication applier configuration."""
556+
557+
558+
class ReplicationApplierStateError(ArangoServerError):
559+
"""Failed to retrieve replication applier state."""
560+
561+
562+
class ReplicationClusterInventoryError(ArangoServerError):
563+
"""Failed to retrieve overview of collection and indexes in a cluster."""
564+
565+
566+
class ReplicationDumpError(ArangoServerError):
567+
"""Failed to retrieve collection content."""
568+
569+
570+
class ReplicationInventoryError(ArangoServerError):
571+
"""Failed to retrieve inventory of collection and indexes."""
572+
573+
574+
class ReplicationLoggerStateError(ArangoServerError):
575+
"""Failed to retrieve logger state."""
576+
577+
578+
class ReplicationServerIDError(ArangoServerError):
579+
"""Failed to retrieve server ID."""
580+
581+
554582
class SerializationError(ArangoClientError):
555583
"""Failed to serialize the request."""
556584

arangoasync/replication.py

Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
__all__ = ["Replication"]
2+
3+
4+
from typing import Optional
5+
6+
from arangoasync.exceptions import (
7+
ReplicationApplierConfigError,
8+
ReplicationApplierStateError,
9+
ReplicationClusterInventoryError,
10+
ReplicationDumpError,
11+
ReplicationInventoryError,
12+
ReplicationLoggerStateError,
13+
ReplicationServerIDError,
14+
)
15+
from arangoasync.executor import ApiExecutor
16+
from arangoasync.request import Method, Request
17+
from arangoasync.response import Response
18+
from arangoasync.result import Result
19+
from arangoasync.serialization import Deserializer, Serializer
20+
from arangoasync.typings import Json, Jsons, Params
21+
22+
23+
class Replication:
24+
"""Replication API wrapper."""
25+
26+
def __init__(self, executor: ApiExecutor) -> None:
27+
self._executor = executor
28+
29+
@property
30+
def serializer(self) -> Serializer[Json]:
31+
"""Return the serializer."""
32+
return self._executor.serializer
33+
34+
@property
35+
def deserializer(self) -> Deserializer[Json, Jsons]:
36+
"""Return the deserializer."""
37+
return self._executor.deserializer
38+
39+
async def inventory(
40+
self,
41+
batch_id: str,
42+
include_system: Optional[bool] = None,
43+
all_databases: Optional[bool] = None,
44+
collection: Optional[bool] = None,
45+
db_server: Optional[str] = None,
46+
) -> Result[Json]:
47+
"""
48+
Return an overview of collections and indexes.
49+
50+
Args:
51+
batch_id (str): Batch ID.
52+
include_system (bool | None): Include system collections.
53+
all_databases (bool | None): Include all databases (only on "_system").
54+
collection (bool | None): If this parameter is set, the
55+
response will be restricted to a single collection (the one specified),
56+
and no views will be returned.
57+
db_server (str | None): On a Coordinator, this request must have a
58+
DBserver query parameter
59+
60+
Returns:
61+
dict: Overview of collections and indexes.
62+
63+
Raises:
64+
ReplicationInventoryError: If retrieval fails.
65+
66+
References:
67+
- `get-a-replication-inventory <https://docs.arangodb.com/stable/develop/http-api/replication/replication-dump/#get-a-replication-inventory>`__
68+
""" # noqa: E501
69+
params: Params = dict()
70+
params["batchId"] = batch_id
71+
if include_system is not None:
72+
params["includeSystem"] = include_system
73+
if all_databases is not None:
74+
params["global"] = all_databases
75+
if collection is not None:
76+
params["collection"] = collection
77+
if db_server is not None:
78+
params["DBServer"] = db_server
79+
80+
request = Request(
81+
method=Method.GET,
82+
endpoint="/_api/replication/inventory",
83+
params=params,
84+
)
85+
86+
def response_handler(resp: Response) -> Json:
87+
if not resp.is_success:
88+
raise ReplicationInventoryError(resp, request)
89+
result: Json = self.deserializer.loads(resp.raw_body)
90+
return result
91+
92+
return await self._executor.execute(request, response_handler)
93+
94+
async def dump(
95+
self,
96+
collection: str,
97+
batch_id: Optional[str] = None,
98+
chunk_size: Optional[int] = None,
99+
) -> Result[bytes]:
100+
"""Return the events data of one collection.
101+
102+
Args:
103+
collection (str): ID of the collection to dump.
104+
batch_id (str | None): Batch ID.
105+
chunk_size (int | None): Size of the result in bytes. This value is honored
106+
approximately only.
107+
108+
Returns:
109+
bytes: Collection events data.
110+
111+
Raises:
112+
ReplicationDumpError: If retrieval fails.
113+
114+
References:
115+
- `get-a-replication-dump <https://docs.arangodb.com/stable/develop/http-api/replication/replication-dump/#get-a-replication-dump>`__
116+
""" # noqa: E501
117+
params: Params = dict()
118+
params["collection"] = collection
119+
if batch_id is not None:
120+
params["batchId"] = batch_id
121+
if chunk_size is not None:
122+
params["chunkSize"] = chunk_size
123+
124+
request = Request(
125+
method=Method.GET,
126+
endpoint="/_api/replication/dump",
127+
params=params,
128+
)
129+
130+
def response_handler(resp: Response) -> bytes:
131+
if not resp.is_success:
132+
raise ReplicationDumpError(resp, request)
133+
return resp.raw_body
134+
135+
return await self._executor.execute(request, response_handler)
136+
137+
async def cluster_inventory(
138+
self, include_system: Optional[bool] = None
139+
) -> Result[Json]:
140+
"""Return an overview of collections and indexes in a cluster.
141+
142+
Args:
143+
include_system (bool | None): Include system collections.
144+
145+
Returns:
146+
dict: Overview of collections and indexes in the cluster.
147+
148+
Raises:
149+
ReplicationClusterInventoryError: If retrieval fails.
150+
151+
References:
152+
- `get-the-cluster-collections-and-indexes <https://docs.arangodb.com/stable/develop/http-api/replication/replication-dump/#get-the-cluster-collections-and-indexes>`__
153+
""" # noqa: E501
154+
params: Params = {}
155+
if include_system is not None:
156+
params["includeSystem"] = include_system
157+
158+
request = Request(
159+
method=Method.GET,
160+
endpoint="/_api/replication/clusterInventory",
161+
params=params,
162+
)
163+
164+
def response_handler(resp: Response) -> Json:
165+
if not resp.is_success:
166+
raise ReplicationClusterInventoryError(resp, request)
167+
result: Json = self.deserializer.loads(resp.raw_body)
168+
return result
169+
170+
return await self._executor.execute(request, response_handler)
171+
172+
async def logger_state(self) -> Result[Json]:
173+
"""Return the state of the replication logger.
174+
175+
Returns:
176+
dict: Logger state.
177+
178+
Raises:
179+
ReplicationLoggerStateError: If retrieval fails.
180+
181+
References:
182+
- `get-the-replication-logger-state <https://docs.arangodb.com/stable/develop/http-api/replication/replication-logger/#get-the-replication-logger-state>`__
183+
""" # noqa: E501
184+
request = Request(
185+
method=Method.GET,
186+
endpoint="/_api/replication/logger-state",
187+
)
188+
189+
def response_handler(resp: Response) -> Json:
190+
if not resp.is_success:
191+
raise ReplicationLoggerStateError(resp, request)
192+
result: Json = self.deserializer.loads(resp.raw_body)
193+
return result
194+
195+
return await self._executor.execute(request, response_handler)
196+
197+
async def applier_config(self) -> Result[Json]:
198+
"""Return the configuration of the replication applier.
199+
200+
Returns:
201+
dict: Configuration of the replication applier.
202+
203+
Raises:
204+
ReplicationApplierConfigError: If retrieval fails.
205+
206+
References:
207+
- `get-the-replication-applier-configuration <https://docs.arangodb.com/stable/develop/http-api/replication/replication-applier/#get-the-replication-applier-configuration>`__
208+
""" # noqa: E501
209+
request = Request(
210+
method=Method.GET,
211+
endpoint="/_api/replication/applier-config",
212+
)
213+
214+
def response_handler(resp: Response) -> Json:
215+
if not resp.is_success:
216+
raise ReplicationApplierConfigError(resp, request)
217+
result: Json = self.deserializer.loads(resp.raw_body)
218+
return result
219+
220+
return await self._executor.execute(request, response_handler)
221+
222+
async def applier_state(self) -> Result[Json]:
223+
"""Return the state of the replication applier.
224+
225+
Returns:
226+
dict: State of the replication applier.
227+
228+
Raises:
229+
ReplicationApplierStateError: If retrieval fails.
230+
231+
References:
232+
- `get-the-replication-applier-state <https://docs.arangodb.com/stable/develop/http-api/replication/replication-applier/#get-the-replication-applier-state>`__
233+
""" # noqa: E501
234+
request = Request(
235+
method=Method.GET,
236+
endpoint="/_api/replication/applier-state",
237+
)
238+
239+
def response_handler(resp: Response) -> Json:
240+
if not resp.is_success:
241+
raise ReplicationApplierStateError(resp, request)
242+
result: Json = self.deserializer.loads(resp.raw_body)
243+
return result
244+
245+
return await self._executor.execute(request, response_handler)
246+
247+
async def server_id(self) -> Result[str]:
248+
"""Return the current server's ID.
249+
250+
Returns:
251+
str: Server ID.
252+
253+
Raises:
254+
ReplicationServerIDError: If retrieval fails.
255+
256+
References:
257+
- `get-the-replication-server-id <https://docs.arangodb.com/stable/develop/http-api/replication/other-replication-commands/#get-the-replication-server-id>`__
258+
""" # noqa: E501
259+
request = Request(
260+
method=Method.GET,
261+
endpoint="/_api/replication/server-id",
262+
)
263+
264+
def response_handler(resp: Response) -> str:
265+
if not resp.is_success:
266+
raise ReplicationServerIDError(resp, request)
267+
result: Json = self.deserializer.loads(resp.raw_body)
268+
return str(result["serverId"])
269+
270+
return await self._executor.execute(request, response_handler)

docs/migration.rst

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ this is not always consistent.
5151
5252
The asynchronous driver, however, tries to stick to a simple rule:
5353

54-
* If the API returns a camel case key, it will be returned as is.
54+
* If the API returns a camel case key, it will be returned as is. The response is returned from the server as is.
5555
* Parameters passed from client to server use the snake case equivalent of the camel case keys required by the API
5656
(e.g. `userName` becomes `user_name`). This is done to ensure PEP8 compatibility.
5757

@@ -74,6 +74,13 @@ Serialization
7474
Check out the :ref:`Serialization` section to learn more about how to implement your own serializer/deserializer. The
7575
current driver makes use of generic types and allows for a higher degree of customization.
7676

77+
Replication
78+
===========
79+
80+
Although a minimal replication API is available for observability purposes, its use is not recommended.
81+
Most of these are internal APIs that are not meant to be used by the end user. If you need to make any changes
82+
to replication, please do so from the cluster web interface.
83+
7784
Mixing sync and async
7885
=====================
7986

0 commit comments

Comments
 (0)