Skip to content

Add queueing for async actions #464

Closed
@mostafa

Description

@mostafa

Currently the asynchronous actions run as a goroutine. This should be changed to using a worker that queues the outputs for async actions and run them separately. This can be done as a separate command, like gatewayd worker, that run a set of workers that consume from a message queue that gatewayd run produces messages to.

flowchart LR
    Queue -- publishes outputs --> GatewayD
    Queue -- consumes outputs --> Workers
    Workers -- runs --> Action
    Workers -- publishes results --> Queue
    GatewayD -- consumes results --> Queue
Loading

The idea is that if an action is async, it can be defined in a worker and registered in a registry with a pointer to the worker. The worker contains a Run function, which executes the action. Data needed for the Run function is dispatched (published) to the worker via a queue, enabling the Run function's execution. Results or errors are returned through an alternate queue set back to the Act system. For this, we don't need to change the implementation of the action, rather we can have a generic queue message publisher that implements the ActionFunc, which can be reused in async actions for publishing the exported Action fields to the queue. The message will be picked up by the worker to execute the Run function.

I have implemented a minimal example in the act-poc repository for queueing using this channel, which is used in action like this and then the results are logged into the terminal. It uses the awesome golang-queue project, which I recommend to be used as high-level wrapper for our use case.

This is what I have on my mind:

sequenceDiagram
    participant GatewayD
    participant Registry
    participant Queue
    participant Worker
    GatewayD->>Registry: Registry async action
    Registry->>Registry: Register action with pointer to generic ActionFunc
    Registry->>GatewayD: Success or error
    GatewayD->>Registry: Run async action
    Registry->>Queue: Publish message
    Queue->>Worker: Consume message
    Worker->>Worker: Run action with the given data in the message
    Worker->>Queue: Return result or error
    Queue->>Registry: Return result or error
    Registry->>GatewayD: Here's the result or the error
Loading

Resources

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type

Projects

Status

🎉 Done

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions