From 5b57470eef12a4f344aa58481a24b949de9aff05 Mon Sep 17 00:00:00 2001 From: Luca Fabbri Date: Wed, 5 Apr 2023 16:16:18 +0200 Subject: [PATCH] make use of BulkTransactionsClient easily overridable TransactionsClient now exposes a bulk_client_cls, easily editable in subclasses --- .../stac_fastapi/sqlalchemy/transactions.py | 88 ++++++++++--------- 1 file changed, 45 insertions(+), 43 deletions(-) diff --git a/stac_fastapi/sqlalchemy/stac_fastapi/sqlalchemy/transactions.py b/stac_fastapi/sqlalchemy/stac_fastapi/sqlalchemy/transactions.py index 644b82f2d..e7baa3017 100644 --- a/stac_fastapi/sqlalchemy/stac_fastapi/sqlalchemy/transactions.py +++ b/stac_fastapi/sqlalchemy/stac_fastapi/sqlalchemy/transactions.py @@ -21,6 +21,48 @@ logger = logging.getLogger(__name__) +@attr.s +class BulkTransactionsClient(BaseBulkTransactionsClient): + """Postgres bulk transactions.""" + + session: Session = attr.ib(default=attr.Factory(Session.create_from_env)) + debug: bool = attr.ib(default=False) + item_table: Type[database.Item] = attr.ib(default=database.Item) + item_serializer: Type[serializers.Serializer] = attr.ib( + default=serializers.ItemSerializer + ) + + def __attrs_post_init__(self): + """Create sqlalchemy engine.""" + self.engine = self.session.writer.cached_engine + + def _preprocess_item(self, item: stac_types.Item) -> stac_types.Item: + """Preprocess items to match data model. + + # TODO: dedup with GetterDict logic (ref #58) + """ + db_model = self.item_serializer.stac_to_db(item) + return self.item_serializer.row_to_dict(db_model) + + def bulk_item_insert( + self, items: Items, chunk_size: Optional[int] = None, **kwargs + ) -> str: + """Bulk item insertion using sqlalchemy core. + + https://docs.sqlalchemy.org/en/13/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow + """ + # Use items.items because schemas.Items is a model with an items key + processed_items = [self._preprocess_item(item) for item in items] + return_msg = f"Successfully added {len(processed_items)} items." + if chunk_size: + for chunk in self._chunks(processed_items, chunk_size): + self.engine.execute(self.item_table.__table__.insert(), chunk) + return return_msg + + self.engine.execute(self.item_table.__table__.insert(), processed_items) + return return_msg + + @attr.s class TransactionsClient(BaseTransactionsClient): """Transactions extension specific CRUD operations.""" @@ -34,6 +76,7 @@ class TransactionsClient(BaseTransactionsClient): collection_serializer: Type[serializers.Serializer] = attr.ib( default=serializers.CollectionSerializer ) + bulk_client_cls = attr.ib(default=BulkTransactionsClient) def create_item( self, @@ -46,7 +89,8 @@ def create_item( # If a feature collection is posted if item["type"] == "FeatureCollection": - bulk_client = BulkTransactionsClient(session=self.session) + breakpoint() + bulk_client = self.bulk_client_cls(session=self.session) bulk_client.bulk_item_insert(items=item["features"]) return None @@ -157,45 +201,3 @@ def delete_collection( raise NotFoundError(f"Collection {collection_id} not found") query.delete() return self.collection_serializer.db_to_stac(data, base_url=base_url) - - -@attr.s -class BulkTransactionsClient(BaseBulkTransactionsClient): - """Postgres bulk transactions.""" - - session: Session = attr.ib(default=attr.Factory(Session.create_from_env)) - debug: bool = attr.ib(default=False) - item_table: Type[database.Item] = attr.ib(default=database.Item) - item_serializer: Type[serializers.Serializer] = attr.ib( - default=serializers.ItemSerializer - ) - - def __attrs_post_init__(self): - """Create sqlalchemy engine.""" - self.engine = self.session.writer.cached_engine - - def _preprocess_item(self, item: stac_types.Item) -> stac_types.Item: - """Preprocess items to match data model. - - # TODO: dedup with GetterDict logic (ref #58) - """ - db_model = self.item_serializer.stac_to_db(item) - return self.item_serializer.row_to_dict(db_model) - - def bulk_item_insert( - self, items: Items, chunk_size: Optional[int] = None, **kwargs - ) -> str: - """Bulk item insertion using sqlalchemy core. - - https://docs.sqlalchemy.org/en/13/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow - """ - # Use items.items because schemas.Items is a model with an items key - processed_items = [self._preprocess_item(item) for item in items] - return_msg = f"Successfully added {len(processed_items)} items." - if chunk_size: - for chunk in self._chunks(processed_items, chunk_size): - self.engine.execute(self.item_table.__table__.insert(), chunk) - return return_msg - - self.engine.execute(self.item_table.__table__.insert(), processed_items) - return return_msg