Skip to content

fix(fcm): Optimize SendEachInBatch with worker pool #695

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: dev
Choose a base branch
from
89 changes: 60 additions & 29 deletions messaging/messaging_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"mime/multipart"
"net/http"
"net/textproto"
"sync"

"firebase.google.com/go/v4/internal"
)
Expand Down Expand Up @@ -165,53 +164,85 @@ func (c *fcmClient) sendEachInBatch(ctx context.Context, messages []*Message, dr
return nil, fmt.Errorf("messages must not contain more than %d elements", maxMessages)
}

var responses []*SendResponse = make([]*SendResponse, len(messages))
var wg sync.WaitGroup

for idx, m := range messages {
if err := validateMessage(m); err != nil {
return nil, fmt.Errorf("invalid message at index %d: %v", idx, err)
}
wg.Add(1)
go func(idx int, m *Message, dryRun bool, responses []*SendResponse) {
defer wg.Done()
var resp string
var err error
if dryRun {
resp, err = c.SendDryRun(ctx, m)
} else {
resp, err = c.Send(ctx, m)
}
if err == nil {
responses[idx] = &SendResponse{
Success: true,
MessageID: resp,
}
} else {
responses[idx] = &SendResponse{
Success: false,
Error: err,
}
}
}(idx, m, dryRun, responses)
}
// Wait for all SendDryRun/Send calls to finish
wg.Wait()

const numWorkers = 50
jobs := make(chan job, len(messages))
results := make(chan result, len(messages))

responses := make([]*SendResponse, len(messages))

for w := 0; w < numWorkers; w++ {
go worker(ctx, c, dryRun, jobs, results)
}

for idx, m := range messages {
jobs <- job{message: m, index: idx}
}
close(jobs)

for i := 0; i < len(messages); i++ {
res := <-results
responses[res.index] = res.response
}

successCount := 0
failureCount := 0
for _, r := range responses {
if r.Success {
successCount++
} else {
failureCount++
}
}

return &BatchResponse{
Responses: responses,
SuccessCount: successCount,
FailureCount: len(responses) - successCount,
FailureCount: failureCount,
}, nil
}

type job struct {
message *Message
index int
}

type result struct {
response *SendResponse
index int
}

func worker(ctx context.Context, c *fcmClient, dryRun bool, jobs <-chan job, results chan<- result) {
for j := range jobs {
var respMsg string
var err error
if dryRun {
respMsg, err = c.SendDryRun(ctx, j.message)
} else {
respMsg, err = c.Send(ctx, j.message)
}

var sr *SendResponse
if err == nil {
sr = &SendResponse{
Success: true,
MessageID: respMsg,
}
} else {
sr = &SendResponse{
Success: false,
Error: err,
}
}
results <- result{response: sr, index: j.index}
}
}

// SendAll sends the messages in the given array via Firebase Cloud Messaging.
//
// The messages array may contain up to 500 messages. SendAll employs batching to send the entire
Expand Down
Loading
Loading