diff --git a/CHANGELOG.md b/CHANGELOG.md index b909041c..3afc86eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added +- Added logging to bulk insertion methods to provide detailed feedback on errors encountered during operations. [#364](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/364) +- Introduced the `RAISE_ON_BULK_ERROR` environment variable to control whether bulk insertion methods raise exceptions on errors (`true`) or log warnings and continue processing (`false`). [#364](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/364) - Added code coverage reporting to the test suite using pytest-cov. [#87](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/issues/87) ### Changed diff --git a/README.md b/README.md index 896db23f..5f93608e 100644 --- a/README.md +++ b/README.md @@ -113,7 +113,8 @@ You can customize additional settings in your `.env` file: | `BACKEND` | Tests-related variable | `elasticsearch` or `opensearch` based on the backend | Optional | | `ELASTICSEARCH_VERSION` | Version of Elasticsearch to use. | `8.11.0` | Optional | | `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional | -| `OPENSEARCH_VERSION` | OpenSearch version | `2.11.1` | Optional | +| `OPENSEARCH_VERSION` | OpenSearch version | `2.11.1` | Optional +| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. | `false` | Optional | > [!NOTE] > The variables `ES_HOST`, `ES_PORT`, `ES_USE_SSL`, and `ES_VERIFY_CERTS` apply to both Elasticsearch and OpenSearch backends, so there is no need to rename the key names to `OS_` even if you're using OpenSearch. diff --git a/stac_fastapi/core/stac_fastapi/core/core.py b/stac_fastapi/core/stac_fastapi/core/core.py index 5ddbb7d9..a8821273 100644 --- a/stac_fastapi/core/stac_fastapi/core/core.py +++ b/stac_fastapi/core/stac_fastapi/core/core.py @@ -676,21 +676,22 @@ class TransactionsClient(AsyncBaseTransactionsClient): @overrides async def create_item( self, collection_id: str, item: Union[Item, ItemCollection], **kwargs - ) -> Optional[stac_types.Item]: - """Create an item in the collection. + ) -> Union[stac_types.Item, str]: + """ + Create an item or a feature collection of items in the specified collection. Args: - collection_id (str): The id of the collection to add the item to. - item (stac_types.Item): The item to be added to the collection. - kwargs: Additional keyword arguments. + collection_id (str): The ID of the collection to add the item(s) to. + item (Union[Item, ItemCollection]): A single item or a collection of items to be added. + **kwargs: Additional keyword arguments, such as `request` and `refresh`. Returns: - stac_types.Item: The created item. + Union[stac_types.Item, str]: The created item if a single item is added, or a summary string + indicating the number of items successfully added and errors if a collection of items is added. Raises: - NotFound: If the specified collection is not found in the database. - ConflictError: If the item in the specified collection already exists. - + NotFoundError: If the specified collection is not found in the database. + ConflictError: If an item with the same ID already exists in the collection. """ item = item.model_dump(mode="json") base_url = str(kwargs["request"].base_url) @@ -706,14 +707,22 @@ async def create_item( ) for item in item["features"] ] - - await self.database.bulk_async( - collection_id, processed_items, refresh=kwargs.get("refresh", False) + attempted = len(processed_items) + success, errors = await self.database.bulk_async( + collection_id, + processed_items, + refresh=kwargs.get("refresh", False), ) + if errors: + logger.error(f"Bulk async operation encountered errors: {errors}") + else: + logger.info(f"Bulk async operation succeeded with {success} actions.") - return None + return f"Successfully added {success} Items. {attempted - success} errors occurred." else: - item = await self.database.prep_create_item(item=item, base_url=base_url) + item = await self.database.async_prep_create_item( + item=item, base_url=base_url + ) await self.database.create_item(item, refresh=kwargs.get("refresh", False)) return ItemSerializer.db_to_stac(item, base_url) @@ -875,7 +884,7 @@ def preprocess_item( The preprocessed item. """ exist_ok = method == BulkTransactionMethod.UPSERT - return self.database.sync_prep_create_item( + return self.database.bulk_sync_prep_create_item( item=item, base_url=base_url, exist_ok=exist_ok ) @@ -906,12 +915,18 @@ def bulk_item_insert( # not a great way to get the collection_id-- should be part of the method signature collection_id = processed_items[0]["collection"] - - self.database.bulk_sync( - collection_id, processed_items, refresh=kwargs.get("refresh", False) + attempted = len(processed_items) + success, errors = self.database.bulk_sync( + collection_id, + processed_items, + refresh=kwargs.get("refresh", False), ) + if errors: + logger.error(f"Bulk sync operation encountered errors: {errors}") + else: + logger.info(f"Bulk sync operation succeeded with {success} actions.") - return f"Successfully added {len(processed_items)} Items." + return f"Successfully added/updated {success} Items. {attempted - success} errors occurred." _DEFAULT_QUERYABLES: Dict[str, Dict[str, Any]] = { diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py index 2044a4b2..37e1ba5b 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py @@ -86,6 +86,7 @@ class ElasticsearchSettings(ApiSettings, ApiBaseSettings): indexed_fields: Set[str] = {"datetime"} enable_response_models: bool = False enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False) + raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False) @property def create_client(self): @@ -106,6 +107,7 @@ class AsyncElasticsearchSettings(ApiSettings, ApiBaseSettings): indexed_fields: Set[str] = {"datetime"} enable_response_models: bool = False enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False) + raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False) @property def create_client(self): diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index d32db777..2834a4de 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -128,8 +128,20 @@ async def delete_item_index(collection_id: str): class DatabaseLogic(BaseDatabaseLogic): """Database logic.""" - client = AsyncElasticsearchSettings().create_client - sync_client = SyncElasticsearchSettings().create_client + async_settings: AsyncElasticsearchSettings = attr.ib( + factory=AsyncElasticsearchSettings + ) + sync_settings: SyncElasticsearchSettings = attr.ib( + factory=SyncElasticsearchSettings + ) + + client = attr.ib(init=False) + sync_client = attr.ib(init=False) + + def __attrs_post_init__(self): + """Initialize clients after the class is instantiated.""" + self.client = self.async_settings.create_client + self.sync_client = self.sync_settings.create_client item_serializer: Type[ItemSerializer] = attr.ib(default=ItemSerializer) collection_serializer: Type[CollectionSerializer] = attr.ib( @@ -699,7 +711,7 @@ async def check_collection_exists(self, collection_id: str): if not await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id): raise NotFoundError(f"Collection {collection_id} does not exist") - async def prep_create_item( + async def async_prep_create_item( self, item: Item, base_url: str, exist_ok: bool = False ) -> Item: """ @@ -729,42 +741,106 @@ async def prep_create_item( return self.item_serializer.stac_to_db(item, base_url) - def sync_prep_create_item( + async def bulk_async_prep_create_item( self, item: Item, base_url: str, exist_ok: bool = False ) -> Item: """ Prepare an item for insertion into the database. - This method performs pre-insertion preparation on the given `item`, - such as checking if the collection the item belongs to exists, - and optionally verifying that an item with the same ID does not already exist in the database. + This method performs pre-insertion preparation on the given `item`, such as: + - Verifying that the collection the item belongs to exists. + - Optionally checking if an item with the same ID already exists in the database. + - Serializing the item into a database-compatible format. Args: - item (Item): The item to be inserted into the database. - base_url (str): The base URL used for constructing URLs for the item. - exist_ok (bool): Indicates whether the item can exist already. + item (Item): The item to be prepared for insertion. + base_url (str): The base URL used to construct the item's self URL. + exist_ok (bool): Indicates whether the item can already exist in the database. + If False, a `ConflictError` is raised if the item exists. Returns: - Item: The item after preparation is done. + Item: The prepared item, serialized into a database-compatible format. Raises: NotFoundError: If the collection that the item belongs to does not exist in the database. - ConflictError: If an item with the same ID already exists in the collection. + ConflictError: If an item with the same ID already exists in the collection and `exist_ok` is False, + and `RAISE_ON_BULK_ERROR` is set to `true`. """ - item_id = item["id"] - collection_id = item["collection"] - if not self.sync_client.exists(index=COLLECTIONS_INDEX, id=collection_id): - raise NotFoundError(f"Collection {collection_id} does not exist") + logger.debug(f"Preparing item {item['id']} in collection {item['collection']}.") - if not exist_ok and self.sync_client.exists( - index=index_alias_by_collection_id(collection_id), - id=mk_item_id(item_id, collection_id), + # Check if the collection exists + await self.check_collection_exists(collection_id=item["collection"]) + + # Check if the item already exists in the database + if not exist_ok and await self.client.exists( + index=index_alias_by_collection_id(item["collection"]), + id=mk_item_id(item["id"], item["collection"]), ): - raise ConflictError( - f"Item {item_id} in collection {collection_id} already exists" + error_message = ( + f"Item {item['id']} in collection {item['collection']} already exists." ) + if self.async_settings.raise_on_bulk_error: + raise ConflictError(error_message) + else: + logger.warning( + f"{error_message} Continuing as `RAISE_ON_BULK_ERROR` is set to false." + ) + + # Serialize the item into a database-compatible format + prepped_item = self.item_serializer.stac_to_db(item, base_url) + logger.debug(f"Item {item['id']} prepared successfully.") + return prepped_item + + def bulk_sync_prep_create_item( + self, item: Item, base_url: str, exist_ok: bool = False + ) -> Item: + """ + Prepare an item for insertion into the database. - return self.item_serializer.stac_to_db(item, base_url) + This method performs pre-insertion preparation on the given `item`, such as: + - Verifying that the collection the item belongs to exists. + - Optionally checking if an item with the same ID already exists in the database. + - Serializing the item into a database-compatible format. + + Args: + item (Item): The item to be prepared for insertion. + base_url (str): The base URL used to construct the item's self URL. + exist_ok (bool): Indicates whether the item can already exist in the database. + If False, a `ConflictError` is raised if the item exists. + + Returns: + Item: The prepared item, serialized into a database-compatible format. + + Raises: + NotFoundError: If the collection that the item belongs to does not exist in the database. + ConflictError: If an item with the same ID already exists in the collection and `exist_ok` is False, + and `RAISE_ON_BULK_ERROR` is set to `true`. + """ + logger.debug(f"Preparing item {item['id']} in collection {item['collection']}.") + + # Check if the collection exists + if not self.sync_client.exists(index=COLLECTIONS_INDEX, id=item["collection"]): + raise NotFoundError(f"Collection {item['collection']} does not exist") + + # Check if the item already exists in the database + if not exist_ok and self.sync_client.exists( + index=index_alias_by_collection_id(item["collection"]), + id=mk_item_id(item["id"], item["collection"]), + ): + error_message = ( + f"Item {item['id']} in collection {item['collection']} already exists." + ) + if self.sync_settings.raise_on_bulk_error: + raise ConflictError(error_message) + else: + logger.warning( + f"{error_message} Continuing as `RAISE_ON_BULK_ERROR` is set to false." + ) + + # Serialize the item into a database-compatible format + prepped_item = self.item_serializer.stac_to_db(item, base_url) + logger.debug(f"Item {item['id']} prepared successfully.") + return prepped_item async def create_item(self, item: Item, refresh: bool = False): """Database logic for creating one item. @@ -959,52 +1035,72 @@ async def delete_collection(self, collection_id: str, refresh: bool = False): await delete_item_index(collection_id) async def bulk_async( - self, collection_id: str, processed_items: List[Item], refresh: bool = False - ) -> None: - """Perform a bulk insert of items into the database asynchronously. + self, + collection_id: str, + processed_items: List[Item], + refresh: bool = False, + ) -> Tuple[int, List[Dict[str, Any]]]: + """ + Perform a bulk insert of items into the database asynchronously. Args: - self: The instance of the object calling this function. collection_id (str): The ID of the collection to which the items belong. processed_items (List[Item]): A list of `Item` objects to be inserted into the database. refresh (bool): Whether to refresh the index after the bulk insert (default: False). + Returns: + Tuple[int, List[Dict[str, Any]]]: A tuple containing: + - The number of successfully processed actions (`success`). + - A list of errors encountered during the bulk operation (`errors`). + Notes: - This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The - insert is performed asynchronously, and the event loop is used to run the operation in a separate executor. The - `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, the - index is refreshed after the bulk insert. The function does not return any value. + This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. + The insert is performed asynchronously, and the event loop is used to run the operation in a separate executor. + The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, + the index is refreshed after the bulk insert. """ - await helpers.async_bulk( + raise_on_error = self.async_settings.raise_on_bulk_error + success, errors = await helpers.async_bulk( self.client, mk_actions(collection_id, processed_items), refresh=refresh, - raise_on_error=False, + raise_on_error=raise_on_error, ) + return success, errors def bulk_sync( - self, collection_id: str, processed_items: List[Item], refresh: bool = False - ) -> None: - """Perform a bulk insert of items into the database synchronously. + self, + collection_id: str, + processed_items: List[Item], + refresh: bool = False, + ) -> Tuple[int, List[Dict[str, Any]]]: + """ + Perform a bulk insert of items into the database synchronously. Args: - self: The instance of the object calling this function. collection_id (str): The ID of the collection to which the items belong. processed_items (List[Item]): A list of `Item` objects to be inserted into the database. refresh (bool): Whether to refresh the index after the bulk insert (default: False). + Returns: + Tuple[int, List[Dict[str, Any]]]: A tuple containing: + - The number of successfully processed actions (`success`). + - A list of errors encountered during the bulk operation (`errors`). + Notes: - This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The - insert is performed synchronously and blocking, meaning that the function does not return until the insert has + This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. + The insert is performed synchronously and blocking, meaning that the function does not return until the insert has completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to - True, the index is refreshed after the bulk insert. The function does not return any value. + True, the index is refreshed after the bulk insert. """ - helpers.bulk( + raise_on_error = self.sync_settings.raise_on_bulk_error + success, errors = helpers.bulk( self.sync_client, mk_actions(collection_id, processed_items), refresh=refresh, - raise_on_error=False, + raise_on_error=raise_on_error, ) + return success, errors # DANGER async def delete_items(self) -> None: diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py index 00498468..4c305fda 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py @@ -83,6 +83,7 @@ class OpensearchSettings(ApiSettings, ApiBaseSettings): indexed_fields: Set[str] = {"datetime"} enable_response_models: bool = False enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False) + raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False) @property def create_client(self): @@ -103,6 +104,7 @@ class AsyncOpensearchSettings(ApiSettings, ApiBaseSettings): indexed_fields: Set[str] = {"datetime"} enable_response_models: bool = False enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False) + raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False) @property def create_client(self): diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index 29bd6030..a555e3b0 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -143,8 +143,16 @@ async def delete_item_index(collection_id: str) -> None: class DatabaseLogic(BaseDatabaseLogic): """Database logic.""" - client = AsyncSearchSettings().create_client - sync_client = SyncSearchSettings().create_client + async_settings: AsyncSearchSettings = attr.ib(factory=AsyncSearchSettings) + sync_settings: SyncSearchSettings = attr.ib(factory=SyncSearchSettings) + + client = attr.ib(init=False) + sync_client = attr.ib(init=False) + + def __attrs_post_init__(self): + """Initialize clients after the class is instantiated.""" + self.client = self.async_settings.create_client + self.sync_client = self.sync_settings.create_client item_serializer: Type[ItemSerializer] = attr.ib(default=ItemSerializer) collection_serializer: Type[CollectionSerializer] = attr.ib( @@ -723,7 +731,7 @@ async def check_collection_exists(self, collection_id: str): if not await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id): raise NotFoundError(f"Collection {collection_id} does not exist") - async def prep_create_item( + async def async_prep_create_item( self, item: Item, base_url: str, exist_ok: bool = False ) -> Item: """ @@ -753,42 +761,105 @@ async def prep_create_item( return self.item_serializer.stac_to_db(item, base_url) - def sync_prep_create_item( + async def bulk_async_prep_create_item( self, item: Item, base_url: str, exist_ok: bool = False ) -> Item: """ Prepare an item for insertion into the database. - This method performs pre-insertion preparation on the given `item`, - such as checking if the collection the item belongs to exists, - and optionally verifying that an item with the same ID does not already exist in the database. + This method performs pre-insertion preparation on the given `item`, such as: + - Verifying that the collection the item belongs to exists. + - Optionally checking if an item with the same ID already exists in the database. + - Serializing the item into a database-compatible format. Args: - item (Item): The item to be inserted into the database. - base_url (str): The base URL used for constructing URLs for the item. - exist_ok (bool): Indicates whether the item can exist already. + item (Item): The item to be prepared for insertion. + base_url (str): The base URL used to construct the item's self URL. + exist_ok (bool): Indicates whether the item can already exist in the database. + If False, a `ConflictError` is raised if the item exists. Returns: - Item: The item after preparation is done. + Item: The prepared item, serialized into a database-compatible format. Raises: NotFoundError: If the collection that the item belongs to does not exist in the database. - ConflictError: If an item with the same ID already exists in the collection. + ConflictError: If an item with the same ID already exists in the collection and `exist_ok` is False, + and `RAISE_ON_BULK_ERROR` is set to `true`. """ - item_id = item["id"] - collection_id = item["collection"] - if not self.sync_client.exists(index=COLLECTIONS_INDEX, id=collection_id): - raise NotFoundError(f"Collection {collection_id} does not exist") + logger.debug(f"Preparing item {item['id']} in collection {item['collection']}.") - if not exist_ok and self.sync_client.exists( - index=index_alias_by_collection_id(collection_id), - id=mk_item_id(item_id, collection_id), + # Check if the collection exists + await self.check_collection_exists(collection_id=item["collection"]) + + # Check if the item already exists in the database + if not exist_ok and await self.client.exists( + index=index_alias_by_collection_id(item["collection"]), + id=mk_item_id(item["id"], item["collection"]), ): - raise ConflictError( - f"Item {item_id} in collection {collection_id} already exists" + error_message = ( + f"Item {item['id']} in collection {item['collection']} already exists." ) + if self.async_settings.raise_on_bulk_error: + raise ConflictError(error_message) + else: + logger.warning( + f"{error_message} Continuing as `RAISE_ON_BULK_ERROR` is set to false." + ) + # Serialize the item into a database-compatible format + prepped_item = self.item_serializer.stac_to_db(item, base_url) + logger.debug(f"Item {item['id']} prepared successfully.") + return prepped_item + + def bulk_sync_prep_create_item( + self, item: Item, base_url: str, exist_ok: bool = False + ) -> Item: + """ + Prepare an item for insertion into the database. - return self.item_serializer.stac_to_db(item, base_url) + This method performs pre-insertion preparation on the given `item`, such as: + - Verifying that the collection the item belongs to exists. + - Optionally checking if an item with the same ID already exists in the database. + - Serializing the item into a database-compatible format. + + Args: + item (Item): The item to be prepared for insertion. + base_url (str): The base URL used to construct the item's self URL. + exist_ok (bool): Indicates whether the item can already exist in the database. + If False, a `ConflictError` is raised if the item exists. + + Returns: + Item: The prepared item, serialized into a database-compatible format. + + Raises: + NotFoundError: If the collection that the item belongs to does not exist in the database. + ConflictError: If an item with the same ID already exists in the collection and `exist_ok` is False, + and `RAISE_ON_BULK_ERROR` is set to `true`. + """ + logger.debug(f"Preparing item {item['id']} in collection {item['collection']}.") + + # Check if the collection exists + if not self.sync_client.exists(index=COLLECTIONS_INDEX, id=item["collection"]): + raise NotFoundError(f"Collection {item['collection']} does not exist") + + # Check if the item already exists in the database + if not exist_ok and self.sync_client.exists( + index=index_alias_by_collection_id(item["collection"]), + id=mk_item_id(item["id"], item["collection"]), + ): + error_message = ( + f"Item {item['id']} in collection {item['collection']} already exists." + ) + if self.sync_settings.raise_on_bulk_error: + raise ConflictError(error_message) + else: + logger.warning( + f"{error_message} Continuing as `RAISE_ON_BULK_ERROR` is set to false." + ) + + # Serialize the item into a database-compatible format + prepped_item = self.item_serializer.stac_to_db(item, base_url) + logger.debug(f"Item {item['id']} prepared successfully.") + return prepped_item async def create_item(self, item: Item, refresh: bool = False): """Database logic for creating one item. @@ -983,52 +1054,72 @@ async def delete_collection(self, collection_id: str, refresh: bool = False): await delete_item_index(collection_id) async def bulk_async( - self, collection_id: str, processed_items: List[Item], refresh: bool = False - ) -> None: - """Perform a bulk insert of items into the database asynchronously. + self, + collection_id: str, + processed_items: List[Item], + refresh: bool = False, + ) -> Tuple[int, List[Dict[str, Any]]]: + """ + Perform a bulk insert of items into the database asynchronously. Args: - self: The instance of the object calling this function. collection_id (str): The ID of the collection to which the items belong. processed_items (List[Item]): A list of `Item` objects to be inserted into the database. refresh (bool): Whether to refresh the index after the bulk insert (default: False). + Returns: + Tuple[int, List[Dict[str, Any]]]: A tuple containing: + - The number of successfully processed actions (`success`). + - A list of errors encountered during the bulk operation (`errors`). + Notes: - This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The - insert is performed asynchronously, and the event loop is used to run the operation in a separate executor. The - `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, the - index is refreshed after the bulk insert. The function does not return any value. + This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. + The insert is performed asynchronously, and the event loop is used to run the operation in a separate executor. + The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, + the index is refreshed after the bulk insert. """ - await helpers.async_bulk( + raise_on_error = self.async_settings.raise_on_bulk_error + success, errors = await helpers.async_bulk( self.client, mk_actions(collection_id, processed_items), refresh=refresh, - raise_on_error=False, + raise_on_error=raise_on_error, ) + return success, errors def bulk_sync( - self, collection_id: str, processed_items: List[Item], refresh: bool = False - ) -> None: - """Perform a bulk insert of items into the database synchronously. + self, + collection_id: str, + processed_items: List[Item], + refresh: bool = False, + ) -> Tuple[int, List[Dict[str, Any]]]: + """ + Perform a bulk insert of items into the database synchronously. Args: - self: The instance of the object calling this function. collection_id (str): The ID of the collection to which the items belong. processed_items (List[Item]): A list of `Item` objects to be inserted into the database. refresh (bool): Whether to refresh the index after the bulk insert (default: False). + Returns: + Tuple[int, List[Dict[str, Any]]]: A tuple containing: + - The number of successfully processed actions (`success`). + - A list of errors encountered during the bulk operation (`errors`). + Notes: - This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The - insert is performed synchronously and blocking, meaning that the function does not return until the insert has + This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. + The insert is performed synchronously and blocking, meaning that the function does not return until the insert has completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to - True, the index is refreshed after the bulk insert. The function does not return any value. + True, the index is refreshed after the bulk insert. """ - helpers.bulk( + raise_on_error = self.sync_settings.raise_on_bulk_error + success, errors = helpers.bulk( self.sync_client, mk_actions(collection_id, processed_items), refresh=refresh, - raise_on_error=False, + raise_on_error=raise_on_error, ) + return success, errors # DANGER async def delete_items(self) -> None: diff --git a/stac_fastapi/tests/api/test_api.py b/stac_fastapi/tests/api/test_api.py index 91c0e811..807da5e4 100644 --- a/stac_fastapi/tests/api/test_api.py +++ b/stac_fastapi/tests/api/test_api.py @@ -5,6 +5,8 @@ import pytest +from stac_fastapi.types.errors import ConflictError + from ..conftest import create_collection, create_item ROUTES = { @@ -635,53 +637,47 @@ async def test_search_line_string_intersects(app_client, ctx): @pytest.mark.parametrize( "value, expected", [ - (32767, 1), # Short Limit, + (32767, 1), # Short Limit (2147483647, 1), # Int Limit - (2147483647 + 5000, 1), # Above int Limit - (21474836470, 1), # Above int Limit - # This value still fails to return 1 - # Commenting out - # (9223372036854775807, 1), + (2147483647 + 5000, 1), # Above Int Limit + (21474836470, 1), # Above Int Limit ], ) async def test_big_int_eo_search( app_client, txn_client, test_item, test_collection, value, expected ): - - random_str = "".join(random.choice("abcdef") for i in range(random.randint(1, 5))) + random_str = "".join(random.choice("abcdef") for _ in range(5)) collection_id = f"test-collection-eo-{random_str}" - test_big_int_item = test_item - del test_big_int_item["properties"]["eo:bands"] - test_big_int_item["collection"] = collection_id - test_big_int_collection = test_collection - test_big_int_collection["id"] = collection_id - - # type number - attr = "eo:full_width_half_max" - - stac_extensions = [ - "https://stac-extensions.github.io/eo/v2.0.0/schema.json", + test_collection["id"] = collection_id + test_collection["stac_extensions"] = [ + "https://stac-extensions.github.io/eo/v2.0.0/schema.json" ] - test_collection["stac_extensions"] = stac_extensions + test_item["collection"] = collection_id + test_item["stac_extensions"] = test_collection["stac_extensions"] - test_item["stac_extensions"] = stac_extensions + # Remove "eo:bands" to simplify the test + del test_item["properties"]["eo:bands"] - await create_collection(txn_client, test_collection) + # Attribute to test + attr = "eo:full_width_half_max" - for val in [ - value, - value + random.randint(10, 1010), - value - random.randint(10, 1010), - ]: + try: + await create_collection(txn_client, test_collection) + except ConflictError: + pass + + # Create items with deterministic offsets + for val in [value, value + 100, value - 100]: item = deepcopy(test_item) item["id"] = str(uuid.uuid4()) item["properties"][attr] = val await create_item(txn_client, item) + # Search for the exact value params = { - "collections": [item["collection"]], + "collections": [collection_id], "filter": { "args": [ { @@ -697,5 +693,8 @@ async def test_big_int_eo_search( } resp = await app_client.post("/search", json=params) resp_json = resp.json() - results = set([x["properties"][attr] for x in resp_json["features"]]) + + # Validate results + results = {x["properties"][attr] for x in resp_json["features"]} assert len(results) == expected + assert results == {value} diff --git a/stac_fastapi/tests/clients/test_elasticsearch.py b/stac_fastapi/tests/clients/test_es_os.py similarity index 83% rename from stac_fastapi/tests/clients/test_elasticsearch.py rename to stac_fastapi/tests/clients/test_es_os.py index a0867ad3..e913f11f 100644 --- a/stac_fastapi/tests/clients/test_elasticsearch.py +++ b/stac_fastapi/tests/clients/test_es_os.py @@ -1,3 +1,4 @@ +import os import uuid from copy import deepcopy from typing import Callable @@ -10,6 +11,13 @@ from ..conftest import MockRequest, create_item +if os.getenv("BACKEND", "elasticsearch").lower() == "opensearch": + from stac_fastapi.opensearch.config import OpensearchSettings as SearchSettings +else: + from stac_fastapi.elasticsearch.config import ( + ElasticsearchSettings as SearchSettings, + ) + @pytest.mark.asyncio async def test_create_collection(app_client, ctx, core_client, txn_client): @@ -297,6 +305,51 @@ async def test_bulk_item_insert(ctx, core_client, txn_client, bulk_txn_client): # ) +@pytest.mark.asyncio +async def test_bulk_item_insert_with_raise_on_error( + ctx, core_client, txn_client, bulk_txn_client +): + """ + Test bulk_item_insert behavior with RAISE_ON_BULK_ERROR set to true and false. + + This test verifies that when RAISE_ON_BULK_ERROR is set to true, a ConflictError + is raised for conflicting items. When set to false, the operation logs errors + and continues gracefully. + """ + + # Insert an initial item to set up a conflict + initial_item = deepcopy(ctx.item) + initial_item["id"] = str(uuid.uuid4()) + await create_item(txn_client, initial_item) + + # Verify the initial item is inserted + fc = await core_client.item_collection(ctx.collection["id"], request=MockRequest()) + assert len(fc["features"]) >= 1 + + # Create conflicting items (same ID as the initial item) + conflicting_items = {initial_item["id"]: deepcopy(initial_item)} + + # Test with RAISE_ON_BULK_ERROR set to true + os.environ["RAISE_ON_BULK_ERROR"] = "true" + bulk_txn_client.database.sync_settings = SearchSettings() + + with pytest.raises(ConflictError): + bulk_txn_client.bulk_item_insert(Items(items=conflicting_items), refresh=True) + + # Test with RAISE_ON_BULK_ERROR set to false + os.environ["RAISE_ON_BULK_ERROR"] = "false" + bulk_txn_client.database.sync_settings = SearchSettings() # Reinitialize settings + result = bulk_txn_client.bulk_item_insert( + Items(items=conflicting_items), refresh=True + ) + + # Validate the results + assert "Successfully added/updated 1 Items" in result + + # Clean up the inserted item + await txn_client.delete_item(initial_item["id"], ctx.item["collection"]) + + @pytest.mark.asyncio async def test_feature_collection_insert( core_client,