diff --git a/pynamodb/async_util.py b/pynamodb/async_util.py new file mode 100644 index 000000000..bf3b73b2a --- /dev/null +++ b/pynamodb/async_util.py @@ -0,0 +1,19 @@ +import functools + + +def run_secretly_sync_async_fn(async_fn, *args, **kwargs): + # From https://github.com/python-trio/hip/issues/1#issuecomment-322028457 + coro = async_fn(*args, **kwargs) + try: + coro.send(None) + except StopIteration as exc: + return exc.value + else: + raise RuntimeError("you lied, this async function is not secretly synchronous") + + +def wrap_secretly_sync_async_fn(async_fn): + @functools.wraps(async_fn) + def wrap(*args, **kwargs): + return run_secretly_sync_async_fn(async_fn, *args, **kwargs) + return wrap diff --git a/pynamodb/connection/base.py b/pynamodb/connection/base.py index f58394860..ceef9f829 100644 --- a/pynamodb/connection/base.py +++ b/pynamodb/connection/base.py @@ -1,6 +1,7 @@ """ Lowest level connection """ +import asyncio import json import logging import random @@ -19,6 +20,14 @@ from botocore.exceptions import BotoCoreError from botocore.session import get_session +if True: + # TODO: handle optional dependency + import aiobotocore.session + from aiobotocore.client import AioBaseClient + from aiobotocore.session import get_session as get_async_session + + +from pynamodb.async_util import wrap_secretly_sync_async_fn from pynamodb.constants import ( RETURN_CONSUMED_CAPACITY_VALUES, RETURN_ITEM_COLL_METRICS_VALUES, RETURN_ITEM_COLL_METRICS, RETURN_CONSUMED_CAPACITY, RETURN_VALUES_VALUES, @@ -234,7 +243,18 @@ def get_exclusive_start_key_map(self, exclusive_start_key): } -class Connection(object): +class ConnectionMeta(type): + def __init__(self, name, bases, attrs): + super().__init__(name, bases, attrs) + + for attr_name, attr_value in attrs.items(): + if attr_name.endswith('_async') and asyncio.iscoroutinefunction(attr_value): + wrapped_fn = wrap_secretly_sync_async_fn(attr_value) + wrapped_fn.__name__ = wrapped_fn.__name__.rstrip('_async') + setattr(self, wrapped_fn.__name__, wrapped_fn) + + +class Connection(metaclass=ConnectionMeta): """ A higher level abstraction over botocore """ @@ -296,26 +316,29 @@ def _log_debug(self, operation: str, kwargs: str): """ log.debug("Calling %s with arguments %s", operation, kwargs) - def _sign_request(self, request): - auth = self.client._request_signer.get_auth_instance( - self.client._request_signer.signing_name, - self.client._request_signer.region_name, - self.client._request_signer.signature_version) + async def _sign_request(self, client, request): + auth = client._request_signer.get_auth_instance( + client._request_signer.signing_name, + client._request_signer.region_name, + client._request_signer.signature_version) + if asyncio.iscoroutine(auth): + auth = await auth auth.add_auth(request) - def _create_prepared_request( + async def _create_prepared_request( self, + client, params: Dict, operation_model: Optional[Any], ) -> AWSPreparedRequest: request = create_request_object(params) - self._sign_request(request) - prepared_request = self.client._endpoint.prepare_request(request) + await self._sign_request(client, request) + prepared_request = client._endpoint.prepare_request(request) if self._extra_headers is not None: prepared_request.headers.update(self._extra_headers) return prepared_request - def dispatch(self, operation_name, operation_kwargs): + async def dispatch(self, operation_name, operation_kwargs): """ Dispatches `operation_name` with arguments `operation_kwargs` @@ -330,7 +353,11 @@ def dispatch(self, operation_name, operation_kwargs): req_uuid = uuid.uuid4() self.send_pre_boto_callback(operation_name, req_uuid, table_name) - data = self._make_api_call(operation_name, operation_kwargs) + + data = await self._make_api_call(operation_name, operation_kwargs) + if asyncio.iscoroutine(data): + data = await data + self.send_post_boto_callback(operation_name, req_uuid, table_name) if data and CONSUMED_CAPACITY in data: @@ -352,7 +379,7 @@ def send_pre_boto_callback(self, operation_name, req_uuid, table_name): except Exception as e: log.exception("pre_boto callback threw an exception.") - def _make_api_call(self, operation_name, operation_kwargs): + async def _make_api_call(self, operation_name, operation_kwargs): """ This private method is here for two reasons: 1. It's faster to avoid using botocore's response parsing @@ -379,7 +406,7 @@ def _make_api_call(self, operation_name, operation_kwargs): prepared_request.reset_stream() # Create a new request for each retry (including a new signature). - prepared_request = self._create_prepared_request(request_dict, operation_model) + prepared_request = await self._create_prepared_request(self.client, request_dict, operation_model) # Implement the before-send event from botocore event_name = 'before-send.dynamodb.{}'.format(operation_model.name) @@ -387,16 +414,13 @@ def _make_api_call(self, operation_name, operation_kwargs): event_response = first_non_none_response(event_responses) if event_response is None: + # TODO(async): This will need to be awaited http_response = self.client._endpoint.http_session.send(prepared_request) else: http_response = event_response is_last_attempt_for_exceptions = True # don't retry if we have an event response - # json.loads accepts bytes in >= 3.6.0 - if sys.version_info < (3, 6, 0): - data = json.loads(http_response.text) - else: - data = json.loads(http_response.content) + data = json.loads(http_response.content) except (ValueError, botocore.exceptions.HTTPClientError, botocore.exceptions.ConnectionError) as e: if is_last_attempt_for_exceptions: log.debug('Reached the maximum number of retry attempts: %s', attempt_number) @@ -441,6 +465,7 @@ def _make_api_call(self, operation_name, operation_kwargs): verbose_properties['table_name'] = operation_kwargs.get(TABLE_NAME) try: + print(http_response.headers) raise VerboseClientError(botocore_expected_format, operation_name, verbose_properties) except VerboseClientError as e: if is_last_attempt_for_exceptions: @@ -538,7 +563,7 @@ def client(self): self._client = self.session.create_client(SERVICE_NAME, self.region, endpoint_url=self.host, config=config) return self._client - def get_meta_table(self, table_name: str, refresh: bool = False): + async def get_meta_table_async(self, table_name: str, refresh: bool = False): """ Returns a MetaTable """ @@ -547,7 +572,7 @@ def get_meta_table(self, table_name: str, refresh: bool = False): TABLE_NAME: table_name } try: - data = self.dispatch(DESCRIBE_TABLE, operation_kwargs) + data = await self.dispatch(DESCRIBE_TABLE, operation_kwargs) self._tables[table_name] = MetaTable(data.get(TABLE_KEY)) except BotoCoreError as e: raise TableError("Unable to describe table: {}".format(e), e) @@ -558,7 +583,7 @@ def get_meta_table(self, table_name: str, refresh: bool = False): raise return self._tables[table_name] - def create_table( + async def create_table_async( self, table_name: str, attribute_definitions: Optional[Any] = None, @@ -648,12 +673,12 @@ def create_table( ] try: - data = self.dispatch(CREATE_TABLE, operation_kwargs) + data = await self.dispatch(CREATE_TABLE, operation_kwargs) except BOTOCORE_EXCEPTIONS as e: raise TableError("Failed to create table: {}".format(e), e) return data - def update_time_to_live(self, table_name: str, ttl_attribute_name: str) -> Dict: + async def update_time_to_live_async(self, table_name: str, ttl_attribute_name: str) -> Dict: """ Performs the UpdateTimeToLive operation """ @@ -665,11 +690,11 @@ def update_time_to_live(self, table_name: str, ttl_attribute_name: str) -> Dict: } } try: - return self.dispatch(UPDATE_TIME_TO_LIVE, operation_kwargs) + return await self.dispatch(UPDATE_TIME_TO_LIVE, operation_kwargs) except BOTOCORE_EXCEPTIONS as e: raise TableError("Failed to update TTL on table: {}".format(e), e) - def delete_table(self, table_name: str) -> Dict: + async def delete_table_async(self, table_name: str) -> Dict: """ Performs the DeleteTable operation """ @@ -677,12 +702,12 @@ def delete_table(self, table_name: str) -> Dict: TABLE_NAME: table_name } try: - data = self.dispatch(DELETE_TABLE, operation_kwargs) + data = await self.dispatch(DELETE_TABLE, operation_kwargs) except BOTOCORE_EXCEPTIONS as e: raise TableError("Failed to delete table: {}".format(e), e) return data - def update_table( + async def update_table_async( self, table_name: str, read_capacity_units: Optional[int] = None, @@ -716,11 +741,11 @@ def update_table( }) operation_kwargs[GLOBAL_SECONDARY_INDEX_UPDATES] = global_secondary_indexes_list try: - return self.dispatch(UPDATE_TABLE, operation_kwargs) + return await self.dispatch(UPDATE_TABLE, operation_kwargs) except BOTOCORE_EXCEPTIONS as e: raise TableError("Failed to update table: {}".format(e), e) - def list_tables( + async def list_tables_async( self, exclusive_start_table_name: Optional[str] = None, limit: Optional[int] = None, @@ -738,16 +763,16 @@ def list_tables( LIMIT: limit }) try: - return self.dispatch(LIST_TABLES, operation_kwargs) + return await self.dispatch(LIST_TABLES, operation_kwargs) except BOTOCORE_EXCEPTIONS as e: raise TableError("Unable to list tables: {}".format(e), e) - def describe_table(self, table_name: str) -> Dict: + async def describe_table_async(self, table_name: str) -> Dict: """ Performs the DescribeTable operation """ try: - tbl = self.get_meta_table(table_name, refresh=True) + tbl = await self.get_meta_table_async(table_name, refresh=True) if tbl: return tbl.data except ValueError: @@ -934,7 +959,7 @@ def get_operation_kwargs( operation_kwargs[EXPRESSION_ATTRIBUTE_VALUES] = expression_attribute_values return operation_kwargs - def delete_item( + async def delete_item_async( self, table_name: str, hash_key: str, @@ -957,11 +982,11 @@ def delete_item( return_item_collection_metrics=return_item_collection_metrics ) try: - return self.dispatch(DELETE_ITEM, operation_kwargs) + return await self.dispatch(DELETE_ITEM, operation_kwargs) except BOTOCORE_EXCEPTIONS as e: raise DeleteError("Failed to delete item: {}".format(e), e) - def update_item( + async def update_item_async( self, table_name: str, hash_key: str, @@ -989,11 +1014,11 @@ def update_item( return_item_collection_metrics=return_item_collection_metrics ) try: - return self.dispatch(UPDATE_ITEM, operation_kwargs) + return await self.dispatch(UPDATE_ITEM, operation_kwargs) except BOTOCORE_EXCEPTIONS as e: raise UpdateError("Failed to update item: {}".format(e), e) - def put_item( + async def put_item_async( self, table_name: str, hash_key: str, @@ -1019,7 +1044,7 @@ def put_item( return_item_collection_metrics=return_item_collection_metrics ) try: - return self.dispatch(PUT_ITEM, operation_kwargs) + return await self.dispatch(PUT_ITEM, operation_kwargs) except BOTOCORE_EXCEPTIONS as e: raise PutError("Failed to put item: {}".format(e), e) @@ -1039,7 +1064,7 @@ def _get_transact_operation_kwargs( return operation_kwargs - def transact_write_items( + async def transact_write_items_async( self, condition_check_items: Sequence[Dict], delete_items: Sequence[Dict], @@ -1074,11 +1099,11 @@ def transact_write_items( operation_kwargs[TRANSACT_ITEMS] = transact_items try: - return self.dispatch(TRANSACT_WRITE_ITEMS, operation_kwargs) + return await self.dispatch(TRANSACT_WRITE_ITEMS, operation_kwargs) except BOTOCORE_EXCEPTIONS as e: raise TransactWriteError("Failed to write transaction items", e) - def transact_get_items( + async def transact_get_items_await( self, get_items: Sequence[Dict], return_consumed_capacity: Optional[str] = None @@ -1092,11 +1117,11 @@ def transact_get_items( ] try: - return self.dispatch(TRANSACT_GET_ITEMS, operation_kwargs) + return await self.dispatch(TRANSACT_GET_ITEMS, operation_kwargs) except BOTOCORE_EXCEPTIONS as e: raise TransactGetError("Failed to get transaction items", e) - def batch_write_item( + async def batch_write_item_async( self, table_name: str, put_items: Optional[Any] = None, @@ -1132,11 +1157,11 @@ def batch_write_item( }) operation_kwargs[REQUEST_ITEMS][table_name] = delete_items_list + put_items_list try: - return self.dispatch(BATCH_WRITE_ITEM, operation_kwargs) + return await self.dispatch(BATCH_WRITE_ITEM, operation_kwargs) except BOTOCORE_EXCEPTIONS as e: raise PutError("Failed to batch write items: {}".format(e), e) - def batch_get_item( + async def batch_get_item_async( self, table_name: str, keys: Sequence[str], @@ -1173,11 +1198,11 @@ def batch_get_item( ) operation_kwargs[REQUEST_ITEMS][table_name].update(keys_map) try: - return self.dispatch(BATCH_GET_ITEM, operation_kwargs) + return await self.dispatch(BATCH_GET_ITEM, operation_kwargs) except BOTOCORE_EXCEPTIONS as e: raise GetError("Failed to batch get items: {}".format(e), e) - def get_item( + async def get_item_async( self, table_name: str, hash_key: str, @@ -1196,11 +1221,11 @@ def get_item( attributes_to_get=attributes_to_get ) try: - return self.dispatch(GET_ITEM, operation_kwargs) + return await self.dispatch(GET_ITEM, operation_kwargs) except BOTOCORE_EXCEPTIONS as e: raise GetError("Failed to get item: {}".format(e), e) - def scan( + async def scan_async( self, table_name: str, filter_condition: Optional[Any] = None, @@ -1248,11 +1273,11 @@ def scan( operation_kwargs[EXPRESSION_ATTRIBUTE_VALUES] = expression_attribute_values try: - return self.dispatch(SCAN, operation_kwargs) + return await self.dispatch(SCAN, operation_kwargs) except BOTOCORE_EXCEPTIONS as e: raise ScanError("Failed to scan table: {}".format(e), e) - def query( + async def query_async( self, table_name: str, hash_key: str, @@ -1322,7 +1347,7 @@ def query( operation_kwargs[EXPRESSION_ATTRIBUTE_VALUES] = expression_attribute_values try: - return self.dispatch(QUERY, operation_kwargs) + return await self.dispatch(QUERY, operation_kwargs) except BOTOCORE_EXCEPTIONS as e: raise QueryError("Failed to query items: {}".format(e), e) @@ -1336,6 +1361,129 @@ def _reverse_dict(d): return {v: k for k, v in d.items()} +# Uses aiobotocore instead of urllib3/botocore +class AsyncConnection(Connection): + @property + def session(self) -> aiobotocore.session.AioSession: + """ + Returns a valid async aiobotocore session + """ + # botocore client creation is not thread safe as of v1.2.5+ (see issue #153) + if getattr(self._local, 'async_session', None) is None: + self._local.async_session = get_async_session() + return self._local.async_session + + @property + def client(self) -> AioBaseClient: + """ + Returns an aiobotocore dynamodb client + """ + return super().client + + async def _make_api_call(self, operation_name, operation_kwargs): + # TODO: dedup with super + async with self.client as client: + operation_model = client._service_model.operation_model(operation_name) + request_dict = await client._convert_to_request_dict( + operation_kwargs, + operation_model, + ) + + for i in range(0, self._max_retry_attempts_exception + 1): + attempt_number = i + 1 + is_last_attempt_for_exceptions = i == self._max_retry_attempts_exception + + http_response = None + prepared_request = None + try: + if prepared_request is not None: + # If there is a stream associated with the request, we need + # to reset it before attempting to send the request again. + # This will ensure that we resend the entire contents of the + # body. + prepared_request.reset_stream() + + # Create a new request for each retry (including a new signature). + prepared_request = await self._create_prepared_request(client, request_dict, operation_model) + + # Implement the before-send event from botocore + event_name = 'before-send.dynamodb.{}'.format(operation_model.name) + event_responses = await client._endpoint._event_emitter.emit(event_name, request=prepared_request) + event_response = first_non_none_response(event_responses) + + if event_response is None: + http_response = await client._endpoint._send(prepared_request) + else: + http_response = event_response + is_last_attempt_for_exceptions = True # don't retry if we have an event response + + data = json.loads(http_response.content) + except (ValueError, botocore.exceptions.HTTPClientError, botocore.exceptions.ConnectionError) as e: + if is_last_attempt_for_exceptions: + log.debug('Reached the maximum number of retry attempts: %s', attempt_number) + if http_response: + e.args += (http_response.text,) + raise + else: + # No backoff for fast-fail exceptions that likely failed at the frontend + log.debug( + 'Retry needed for (%s) after attempt %s, retryable %s caught: %s', + operation_name, + attempt_number, + e.__class__.__name__, + e + ) + continue + + status_code = http_response.status_code + headers = http_response.headers + if status_code >= 300: + # Extract error code from __type + code = data.get('__type', '') + if '#' in code: + code = code.rsplit('#', 1)[1] + botocore_expected_format = {'Error': {'Message': data.get('message', ''), 'Code': code}} + verbose_properties = { + 'request_id': headers.get('x-amzn-RequestId') + } + + if 'RequestItems' in operation_kwargs: + # Batch operations can hit multiple tables, report them comma separated + verbose_properties['table_name'] = ','.join(operation_kwargs['RequestItems']) + else: + verbose_properties['table_name'] = operation_kwargs.get('TableName') + + try: + raise VerboseClientError(botocore_expected_format, operation_name, verbose_properties) + except VerboseClientError as e: + if is_last_attempt_for_exceptions: + log.debug('Reached the maximum number of retry attempts: %s', attempt_number) + raise + elif status_code < 500 and code not in RATE_LIMITING_ERROR_CODES: + # We don't retry on a ConditionalCheckFailedException or other 4xx (except for + # throughput related errors) because we assume they will fail in perpetuity. + # Retrying when there is already contention could cause other problems + # in part due to unnecessary consumption of throughput. + raise + else: + # We use fully-jittered exponentially-backed-off retries: + # https://www.awsarchitectureblog.com/2015/03/backoff.html + sleep_time_ms = random.randint(0, self._base_backoff_ms * (2 ** i)) + log.debug( + 'Retry with backoff needed for (%s) after attempt %s,' + 'sleeping for %s milliseconds, retryable %s caught: %s', + operation_name, + attempt_number, + sleep_time_ms, + e.__class__.__name__, + e + ) + await asyncio.sleep(sleep_time_ms / 1000.0) + continue + + return self._handle_binary_attributes(data) + + def _convert_binary(attr): if BINARY in attr: attr[BINARY] = b64decode(attr[BINARY].encode(DEFAULT_ENCODING)) diff --git a/pynamodb/connection/table.py b/pynamodb/connection/table.py index 4eb218093..fe9ba35e3 100644 --- a/pynamodb/connection/table.py +++ b/pynamodb/connection/table.py @@ -3,15 +3,26 @@ ~~~~~~~~~~~~~~~~~~~~~~~~~~~ """ +import asyncio from typing import Any, Dict, Mapping, Optional, Sequence +from pynamodb.async_util import wrap_secretly_sync_async_fn from pynamodb.connection.base import Connection, MetaTable from pynamodb.constants import DEFAULT_BILLING_MODE, KEY from pynamodb.expressions.condition import Condition from pynamodb.expressions.update import Action -class TableConnection: +class TableMeta(type): + def __init__(self, name, bases, attrs): + super().__init__(name, bases, attrs) + + for attr_name, attr_value in attrs.items(): + if attr_name.endswith('_async') and asyncio.iscoroutinefunction(attr_value): + setattr(self, attr_name.rstrip("_async"), wrap_secretly_sync_async_fn(attr_value)) + + +class TableConnection(metaclass=TableMeta): """ A higher level abstraction over botocore """ @@ -32,6 +43,8 @@ def __init__( aws_session_token: Optional[str] = None, ) -> None: self.table_name = table_name + + # TODO: optional async self.connection = Connection(region=region, host=host, connect_timeout_seconds=connect_timeout_seconds, @@ -46,7 +59,7 @@ def __init__( aws_secret_access_key, aws_session_token) - def get_meta_table(self, refresh: bool = False) -> MetaTable: + def get_meta_table_async(self, refresh: bool = False) -> MetaTable: """ Returns a MetaTable """ @@ -259,11 +272,11 @@ def query( scan_index_forward=scan_index_forward, select=select) - def describe_table(self) -> Dict: + async def describe_table_async(self) -> Dict: """ Performs the DescribeTable operation and returns the result """ - return self.connection.describe_table(self.table_name) + return await self.connection.describe_table_async(self.table_name) def delete_table(self) -> Dict: """