Skip to content

feat: add stactools-item-generator and stac-item-loader constructs #150

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 30, 2025

Conversation

hrodmn
Copy link
Contributor

@hrodmn hrodmn commented May 28, 2025

⚠️ Checklist if your PR is changing anything else than documentation

  • Posted the link to a successful manually triggered deployment workflow (successful including the resources destruction)

Merge request description

There has been appetite for an event-based STAC item generation/ingestion infrastructure in several recent projects (e.g. NASA MAAP). I put together a POC of a SNS/SQS based system over in https://github.com/developmentseed/stactools-ingest but in the interest of making it available via eoapi-cdk constructs I thought I would move it over here for now.

This PR adds two new constructs:

  • StactoolsItemGenerator: a SNS/SQS/Lambda combo that can use any stactools package to generate a STAC item
  • StacItemLoader: a SNS/SQS/Lambda combo that loads batches of STAC items into a pgstac database

The StacItemLoader is designed to be useful for any pipeline that could publish STAC metadata to an SNS topic or upload STAC items to an S3 bucket (not just StacItemGenerator).

Ref: developmentseed/eoAPI#211

@hrodmn hrodmn force-pushed the feat/item-gen-load branch 2 times, most recently from 7663b85 to fe75c9f Compare May 28, 2025 19:47
@hrodmn hrodmn force-pushed the feat/item-gen-load branch from fe75c9f to ced3f42 Compare May 28, 2025 19:52
Copy link

@gadomski gadomski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgive if I'm missing something, but it looks like the loader is set up to take from SNS or S3 notifications, but the generator just goes to SNS? Was there a pre-baked S3 path for the generator that I missed?

Comment on lines 215 to 225
* ## Supported Stactools Packages
*
* Any package available on PyPI that follows the stactools plugin pattern
* can be used. Examples include:
* - `stactools-glad-global-forest-change`
* - `stactools-glad-glclu2020`
* - `stactools-landsat`
* - `stactools-sentinel2`
*
* @see {@link https://github.com/stactools-packages} for available stactools packages
* @see {@link https://stactools.readthedocs.io/} for stactools documentation

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😬 I know we've got a lot of stuff out there in the stactools space, but I'm not sold on its future (radiantearth/stac-spec#1331). I wonder if there's a "protocol" style we can adopt to generalize away from the stactools stack? I'm not sure exactly what that would look like, but could be as simple as a "stac-in, stac-out" pattern: def create_item(item_like: dict[str, Any]) -> pystac.Item where item_like doesn't have to be a perfect STAC item, but still has assets to point to hrefs and properties with any other stuff.

Much larger discussion than this PR, of course...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used the stactools workflow here selfishly because it has been my go-to method for generating STAC metadata from scratch thus far and having the mostly consistent API for creating items across many possible stactools packages made this type of generic interface pretty easy to build. This is all heavily based on stactools-pipelines but a one-size-fits-all deployment rather than dataset-specific deployments.

I guess not very many users would use the StacItemGenerator as it exists right now (using uvx and stactools), but if we make it possible to provide a custom STAC-building Lambda function to the construct it could be a convenient base for a serverless, event-driven STAC generation pipeline that follows the SNS -> SQS -> Lambda -> ItemLoad SNS pattern.

I wonder if there's a "protocol" style we can adopt to generalize away from the stactools stack? I'm not sure exactly what that would look like, but could be as simple as a "stac-in, stac-out" pattern

I am very interested in discussing this idea more!

Comment on lines 235 to 252
public readonly queue: sqs.Queue;

/**
* Dead letter queue for failed item generation attempts.
*
* Messages that fail processing after 5 attempts are sent here for
* inspection and potential replay. This helps with debugging stactools
* package issues, network failures, or malformed requests.
*/
public readonly deadLetterQueue: sqs.Queue;

/**
* The SNS topic that receives item generation requests.
*
* External systems publish ItemRequest messages to this topic to trigger
* STAC item generation. The topic fans out to the SQS queue for processing.
*/
public readonly topic: sns.Topic;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels nice to me and quite reusable 👏🏼

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW this is pretty close to https://github.com/stac-utils/stac-task, but I like this better as it's more procedural. stac-task did the whole inheritance thing which I find pretty unmaintainable.

Comment on lines 40 to 77
def create_stac_item(request: ItemRequest) -> Item:
"""
Create a STAC item using a stactools package
"""
logger.info(f"Received request: {json.dumps(request.model_dump())}")

if not request.package_name:
raise ValueError("Missing required parameter: package_name")

command = [
"uvx",
"--with",
f"requests,{request.package_name}",
"--from",
"stactools",
"stac",
request.group_name,
"create-item",
*request.create_item_args,
]

for option, value in request.create_item_options.items():
command.extend([f"--{option}", value])

logger.info(f"Executing command: {' '.join(command)}")

with NamedTemporaryFile(suffix=".json") as output:
command.append(output.name)
result = subprocess.run(command, capture_output=True, text=True, check=True)

logger.info(f"Command output: {result.stdout}")
with open(output.name) as f:
item_dict = json.load(f)

if request.collection_id:
item_dict["collection"] = request.collection_id

return Item(**item_dict)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, as I noted above, I understand why we're doing the callout to the stactools command line, but I wonder if we could/should lean into a Python API "protocol" instead.

@hrodmn
Copy link
Contributor Author

hrodmn commented May 29, 2025

Forgive if I'm missing something, but it looks like the loader is set up to take from SNS or S3 notifications, but the generator just goes to SNS? Was there a pre-baked S3 path for the generator that I missed?

The dual functionality (SNS or S3 event notifications) was half-baked this morning. I just pushed a change to the loader that expects SNS/SQS messages that contain either STAC item JSON in the message body OR S3 event notifications that are flagging a STAC item JSON getting uploaded to S3 since I know that is a pattern some users would prefer (@ceholden).

Right now the stac-item-generator construct will publish STAC items directly to SNS rather than uploading the json to S3.

My vision here is that the StacItemLoader construct could be used to process multiple different types of StacItemGenerator infrastructure for any given organization.

@alukach
Copy link
Member

alukach commented May 29, 2025

(half-baked notes from sync)

"Known unknowns" for documentation:

  1. What happens with items in the DLQ is not solved
  2. Drift detection between S3 and STAC catalog to detect possible S3 -> SNS -> SQS hand-off failures
  3. "Fast failure" for synchronous validation of items or auth scenarios could take place in an API that would validate and write successful items to S3

@hrodmn
Copy link
Contributor Author

hrodmn commented May 29, 2025

A few more notes:

  • maybe add some logic to drop bad items from a failed pypgstac load and retrying good ones to avoid good items being re-driven with bad items and winding up in the DLQ catching invalid items and checking for collection id is enough
  • don't try to handle the DLQ yet - let users do that independently for now
  • the StacItemLoader is the most valuable feature, the StacItemGenerator could work for some use-cases but we should pursue something that is easier to make generic tooling for (protocol-based).

@hrodmn hrodmn marked this pull request as ready for review May 30, 2025 14:31
@hrodmn hrodmn force-pushed the feat/item-gen-load branch from 985c51b to 24cd738 Compare May 30, 2025 14:53
@hrodmn hrodmn changed the title feat: add stac-item-generator and stac-item-loader constructs feat: add stactools-item-generator and stac-item-loader constructs May 30, 2025
@hrodmn
Copy link
Contributor Author

hrodmn commented May 30, 2025

I have made a few changes in the PR today after testing some batch STAC item processing. My test was to upload ~1000 STAC items to an S3 bucket - with batchSize set to 500, all items were loaded into the database after ~7 minutes. This was spread out over several batches of varying size thanks to some SQS black box behavior.

  • Throttled the StacItemLoader Lambda and SQS batcher to a maximum concurrency of 2 in order to avoid many concurrent pypgstac load commands being run simultaneously. I never hit a pgstac lock in my testing but I assume it could happen if we don't limit concurrency. It is a safe choice to limit the parallelism a bit here, I think. This behavior can be controlled with the maxConcurrency prop in StacItemLoaderProps cc @sharkinsspatial
  • StacItemGenerator -> StactoolsItemGenerator (to leave room for some more generic solution down the road) cc @gadomski
  • Additional details about the DeadLetterQueue being a dead-end that users will want to manage somehow! cc @alukach

@ceholden
Copy link

@hrodmn for the limit on loader concurrency,

Throttled the StacItemLoader Lambda and SQS batcher to a maximum concurrency of 2 in order to avoid many concurrent pypgstac load commands being run simultaneously. I never hit this in my testing but I assume it could happen so it would be a safe choice to limit the parallelism a bit here.

+1, this seems like a good idea to me. In CSDA we had a similar setup that was running 10x concurrent Lambda functions for ingesting Items from one (1) STAC Collection. This caused a lot of table locks which not only slowed down the ingesters, but also resulted in failures for some of our user-facing PgSTAC consumers that relied on /search or Item Collections endpoints. I documented the signals (e.g., RDS PerfInsights showing lock contention), impact, and resolution here, https://github.com/NASA-IMPACT/csda-project/issues/1190

The potential caveat is that the concern about table lock contention wouldn't really apply as long as each Lambda is ingesting Items from different Collections. For very high throughput systems a global n=2 might be restrictive.

For this case we could expect different Collections to write to different buckets or different bucket prefixes, and then have separate SQS queues + Lambdas that subscribe to the bucket notifications. It's probably easiest to have a separate bucket for very large use cases, but if we wanted just one bucket we can write Items to separate prefixes and subscribing using non-overlapping SNS filters. For example, the forthcoming NISAR mission might have its own ingestion pathway that filters the SNS->SQS subscription to s3://stac-items/NISAR/. Every other Collection might share just one queue+ingester and subscribe using an "anything-but" filter on the bucket prefix to ignore the "NISAR" prefix. I have some examples of the filter rules in this part of an ADR.

@hrodmn
Copy link
Contributor Author

hrodmn commented May 30, 2025

The potential caveat is that the concern about table lock contention wouldn't really apply as long as each Lambda is ingesting Items from different Collections. For very high throughput systems a global n=2 might be restrictive.

I agree that n=2 is not ideal but I wouldn't mind as much if the item processing code were a bit faster. Right now it takes ~2.5 minutes to process a batch of 500 STAC items when the messages are delivered as S3 event notifications 😞. Maybe we could speed that up with some async logic in the loop that processes the messages.

For this case we could expect different Collections to write to different buckets or different bucket prefixes, and then have separate SQS queues + Lambdas that subscribe to the bucket notifications. It's probably easiest to have a separate bucket for very large use cases, but if we wanted just one bucket we can write Items to separate prefixes and subscribing using non-overlapping SNS filters. For example, the forthcoming NISAR mission might have its own ingestion pathway that filters the SNS->SQS subscription to s3://stac-items/NISAR/. Every other Collection might share just one queue+ingester and subscribe using an "anything-but" filter on the bucket prefix to ignore the "NISAR" prefix. I have some examples of the filter rules in this part of an ADR.

@ceholden these are all really helpful suggestions. For high throughput systems it would make sense to deploy multiple StacItemLoaders that each handle items from a specific set of collections (or just one collection). I think that's the nice thing about having this construct in eoapi-cdk! It will be possible to deploy a fleet of these things each with their own special subscriptions.

@hrodmn hrodmn merged commit 371f6c3 into main May 30, 2025
8 checks passed
@hrodmn hrodmn deleted the feat/item-gen-load branch May 30, 2025 19:51
github-actions bot pushed a commit that referenced this pull request May 30, 2025
# [8.2.0](v8.1.1...v8.2.0) (2025-05-30)

### Features

* add stactools-item-generator and stac-item-loader constructs ([#150](#150)) ([371f6c3](371f6c3))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants