From ef48ab3d91c4d3eb18cbe7047140fcd20fb1f6d8 Mon Sep 17 00:00:00 2001 From: Luca Fabbri Date: Thu, 11 May 2023 13:44:17 +0200 Subject: [PATCH] Made use of BulkTransactionsClient easily overridable --- CHANGES.md | 1 + stac_fastapi/sqlalchemy/transactions.py | 86 +++++++++++++------------ 2 files changed, 45 insertions(+), 42 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 70a7129..6a2ebe0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -8,6 +8,7 @@ As a part of this release, this repository was extracted from the main ### Changed * Default branch to **main** ([#544](https://github.com/stac-utils/stac-fastapi/pull/544)) +* `BulkTransactionsClient` usage easier to be overridden ([#24](https://github.com/stac-utils/stac-fastapi-sqlalchemy/pull/24)) ## [2.4.4] - 2023-03-09 diff --git a/stac_fastapi/sqlalchemy/transactions.py b/stac_fastapi/sqlalchemy/transactions.py index 89d3374..43c0c91 100644 --- a/stac_fastapi/sqlalchemy/transactions.py +++ b/stac_fastapi/sqlalchemy/transactions.py @@ -21,6 +21,48 @@ logger = logging.getLogger(__name__) +@attr.s +class BulkTransactionsClient(BaseBulkTransactionsClient): + """Postgres bulk transactions.""" + + session: Session = attr.ib(default=attr.Factory(Session.create_from_env)) + debug: bool = attr.ib(default=False) + item_table: Type[database.Item] = attr.ib(default=database.Item) + item_serializer: Type[serializers.Serializer] = attr.ib( + default=serializers.ItemSerializer + ) + + def __attrs_post_init__(self): + """Create sqlalchemy engine.""" + self.engine = self.session.writer.cached_engine + + def _preprocess_item(self, item: stac_types.Item) -> stac_types.Item: + """Preprocess items to match data model. + + # TODO: dedup with GetterDict logic (ref #58) + """ + db_model = self.item_serializer.stac_to_db(item) + return self.item_serializer.row_to_dict(db_model) + + def bulk_item_insert( + self, items: Items, chunk_size: Optional[int] = None, **kwargs + ) -> str: + """Bulk item insertion using sqlalchemy core. + + https://docs.sqlalchemy.org/en/13/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow + """ + # Use items.items because schemas.Items is a model with an items key + processed_items = [self._preprocess_item(item) for item in items] + return_msg = f"Successfully added {len(processed_items)} items." + if chunk_size: + for chunk in self._chunks(processed_items, chunk_size): + self.engine.execute(self.item_table.__table__.insert(), chunk) + return return_msg + + self.engine.execute(self.item_table.__table__.insert(), processed_items) + return return_msg + + @attr.s class TransactionsClient(BaseTransactionsClient): """Transactions extension specific CRUD operations.""" @@ -34,6 +76,7 @@ class TransactionsClient(BaseTransactionsClient): collection_serializer: Type[serializers.Serializer] = attr.ib( default=serializers.CollectionSerializer ) + bulk_client_cls = attr.ib(default=BulkTransactionsClient) def create_item( self, @@ -46,7 +89,7 @@ def create_item( # If a feature collection is posted if item["type"] == "FeatureCollection": - bulk_client = BulkTransactionsClient(session=self.session) + bulk_client = self.bulk_client_cls(session=self.session) bulk_client.bulk_item_insert(items=item["features"]) return None @@ -158,44 +201,3 @@ def delete_collection( query.delete() return self.collection_serializer.db_to_stac(data, base_url=base_url) - -@attr.s -class BulkTransactionsClient(BaseBulkTransactionsClient): - """Postgres bulk transactions.""" - - session: Session = attr.ib(default=attr.Factory(Session.create_from_env)) - debug: bool = attr.ib(default=False) - item_table: Type[database.Item] = attr.ib(default=database.Item) - item_serializer: Type[serializers.Serializer] = attr.ib( - default=serializers.ItemSerializer - ) - - def __attrs_post_init__(self): - """Create sqlalchemy engine.""" - self.engine = self.session.writer.cached_engine - - def _preprocess_item(self, item: stac_types.Item) -> stac_types.Item: - """Preprocess items to match data model. - - # TODO: dedup with GetterDict logic (ref #58) - """ - db_model = self.item_serializer.stac_to_db(item) - return self.item_serializer.row_to_dict(db_model) - - def bulk_item_insert( - self, items: Items, chunk_size: Optional[int] = None, **kwargs - ) -> str: - """Bulk item insertion using sqlalchemy core. - - https://docs.sqlalchemy.org/en/13/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow - """ - # Use items.items because schemas.Items is a model with an items key - processed_items = [self._preprocess_item(item) for item in items] - return_msg = f"Successfully added {len(processed_items)} items." - if chunk_size: - for chunk in self._chunks(processed_items, chunk_size): - self.engine.execute(self.item_table.__table__.insert(), chunk) - return return_msg - - self.engine.execute(self.item_table.__table__.insert(), processed_items) - return return_msg