Skip to content

x/sync/errgroup: add TryGo and SetLimit to control concurrency #27837

Closed
@kurin

Description

@kurin

The errgroup package will currently spawn a new goroutine for each invocation of Group.Go. This is usually fine, but extremely high cardinality fanout can exhaust memory or other resources. It would be neat if the errgroup interface allowed users to specify the maximum number of concurrent goroutines they want the errgroup to spawn.

Proposal

type Group struct {
  N int
  // contains etc
}

N would be copied to an unexported on the first invocation of Go, so that subsequent modification has no effect. This preserves the validity and the behavior of the empty Group.

When calling Go, if the number of functions running is > N then Go would block until the number was <= N.

The behavior of Go is not otherwise modified; if a subtask returns an error, then subsequent tasks will still be executed, and callers would rely on subtasks handling context cancellation to fall through to the Wait() call and then return, if WithContext was called.

Alternatives considered

An alternative interface would be that Go never block, but enqueue instead. This is an unbounded queue and I'm not a fan.

Another alternative is that the group is context-aware, and that Go return immediately if the group's context is cancelled. This requires that Group retain a reference to the context, which it does not currently do.

Activity

added this to the Unreleased milestone on Sep 25, 2018
mdlayher

mdlayher commented on Sep 25, 2018

@mdlayher
Member

/cc @bcmills who recently was thinking about some changes to this package IIRC

kevinburke

kevinburke commented on Sep 25, 2018

@kevinburke
Contributor

In the meantime I'd suggest using a buffered channel before calling group.Go() and releasing it when the function returns, or using a package like github.com/kevinburke/semaphore to acquire resources before starting a goroutine.

bcmills

bcmills commented on Jan 10, 2019

@bcmills
Contributor

There is a draft API In slide 119 (in the backup slides) of my GopherCon 2018 talk, Rethinking Classical Concurrency Patterns.

I agree that the Go method should block until it can begin executing the function, not enqueue: enqueuing tasks to a bounded executor is much too prone to deadlocks.

I propose a new TryGo method as a non-blocking alternative. (A non-blocking variant is mostly useful for “concurrency-saturating” operations like tree or graph traversals, where you want to keep the number of concurrent workers as high as possible but can fall back to sequential operation when saturated.)

I would rather have a SetLimit method than an exported field: that way we can more easily enforce invariants like “the limit must not be modified while goroutines are running”.

added
NeedsInvestigationSomeone must examine and confirm this is a valid issue and not a duplicate of an existing one.
on Jan 10, 2019
fatih

fatih commented on Jul 20, 2019

@fatih
Member

I also needed something similar and combined it with golang.org/x/sync/semaphore. Here is an example on how I'm using it. It limits the number of simultaneous execution based on the variable maxWorkers:

func main() {
	const maxWorkers = 5
	sem := semaphore.NewWeighted(maxWorkers)

	g, ctx := errgroup.WithContext(context.Background())
	for i := 0; i < 50; i++ {
		i := i
		fmt.Printf("executing %d\n", i)

		g.Go(func() error {
			err := sem.Acquire(ctx, 1)
			if err != nil {
				return err
			}
			defer sem.Release(1)

			// do work
			time.Sleep(1 * time.Second)
			fmt.Printf("finished %+v\n", i)
			return nil
		})
	}

	if err := g.Wait(); err != nil {
		fmt.Printf("g.Wait() err = %+v\n", err)
	}

	fmt.Println("done!")
}

If anything in this approach wrong please let me know. Seems like it works fine based on the debug statements.

alexaandru

alexaandru commented on Jul 25, 2019

@alexaandru

@fatih I would personally put the Acquire() outside/in front of the goroutine. The way you have it, it does NOT prevent the launching of 50 simultaneous goroutines, it only prevents them to actually do their work for more than maxWorkers at a time.

Look at it another way, if instead of 50, you had 1m, what your code does is launch 1m goroutines. Of them, maxWorkers goroutines will actuall do the work (well, in this case sleep), while 1m - maxWorkers of them will ALL attempt to acquire the lock (that sits behind the semaphore abstraction).

All the best!

fatih

fatih commented on Jul 25, 2019

@fatih
Member

@alexaandru thanks for the tip! You're right about that. I've fixed that actually on my end (https://twitter.com/fatih/status/1152991683870633985 and https://play.golang.org/p/h2yfBVC8IjB) but I forgot to update it here.

alexaandru

alexaandru commented on Jul 25, 2019

@alexaandru

You're most welcome @fatih ! Cheers! :)

tschaub

tschaub commented on Apr 9, 2020

@tschaub

Another subtle issue that ideally would be solved by having an errgroup with a limit is that it is very easy to write code using errgroup and semaphore that swallows significant errors and instead returns only context.Cancelled.

For example, it might be non-obvious that the work function below returns context.Cancelled instead of errors.New("important message here"):

const (
	maxWorkers = 10
	numTasks   = 1e6
)

func work() error {
	group, ctx := errgroup.WithContext(context.Background())
	sem := semaphore.NewWeighted(maxWorkers)

	for i := 0; i < numTasks; i++ {
		if err := sem.Acquire(ctx, 1); err != nil {
			return err
		}

		group.Go(func() error {
			defer sem.Release(1)

			time.Sleep(1 * time.Second)
			if rand.Float64() > 0.5 {
				return errors.New("important message here")
			}

			return nil
		})
	}

	return group.Wait()
}

The code can be fixed with something like this, but it is easy to forget

diff --git a/main.go b/main.go
index 7690b92..9f64dbc 100644
--- a/main.go
+++ b/main.go
@@ -21,6 +21,10 @@ func work() error {
        sem := semaphore.NewWeighted(maxWorkers)
 
        for i := 0; i < numTasks; i++ {
+               if ctx.Err() != nil {
+                       break
+               }
+
                if err := sem.Acquire(ctx, 1); err != nil {
                        return err
                }
bcmills

bcmills commented on Apr 10, 2020

@bcmills
Contributor

@tschaub, note that in general anything that may produce an error as a result of errgroup cancellation should be run within the errgroup itself.

So that example would probably be clearer as:

const (
	maxWorkers = 10
	numTasks   = 1e6
)

func work() error {
	group, ctx := errgroup.WithContext(context.Background())
+
+	group.Go(func() error {
		sem := semaphore.NewWeighted(maxWorkers)

		for i := 0; i < numTasks; i++ {
			if err := sem.Acquire(ctx, 1); err != nil {
				return err
			}

			group.Go(func() error {
				defer sem.Release(1)

				time.Sleep(1 * time.Second)
				if rand.Float64() > 0.5 {
					return errors.New("important message here")
				}

				return nil
			})
		}
+	})

	return group.Wait()
}
smasher164

smasher164 commented on May 25, 2020

@smasher164
Member

We came across this use-case today, and used a semaphore channel instead of x/sync/semaphore. But since context is heavily threaded through, we'll probably switch to using x/sync/semaphore.

Regarding the proposed API, SetLimit makes sense with existing errgroup API, but TryGo always succeeds when there is no limit. Would there be a clearer separation with a LimitGroup type, which is instantiated with WithContextLimit?

60 remaining items

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

      Development

      No branches or pull requests

        Participants

        @tschaub@alexaandru@kurin@rsc@seh

        Issue actions

          x/sync/errgroup: add TryGo and SetLimit to control concurrency · Issue #27837 · golang/go