From 6154804613cf6dbf858dc6b8276fcf9da2d85281 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 16 Oct 2021 12:47:35 +0100 Subject: [PATCH 01/10] Make the Mirror Queue a queue Convert the Mirror syncing queue to a modules/queue instead of the old simple queue. Signed-off-by: Andrew Thornton --- custom/conf/app.example.ini | 4 +- .../doc/advanced/config-cheat-sheet.en-us.md | 24 ++++- modules/queue/unique_queue_channel.go | 34 +++++-- modules/setting/queue.go | 12 +++ services/mirror/mirror.go | 99 ++++++++++++------- 5 files changed, 126 insertions(+), 47 deletions(-) diff --git a/custom/conf/app.example.ini b/custom/conf/app.example.ini index bdc42480e443e..1753ed2330706 100644 --- a/custom/conf/app.example.ini +++ b/custom/conf/app.example.ini @@ -769,10 +769,10 @@ PATH = ;; Global limit of repositories per user, applied at creation time. -1 means no limit ;MAX_CREATION_LIMIT = -1 ;; -;; Mirror sync queue length, increase if mirror syncing starts hanging +;; Mirror sync queue length, increase if mirror syncing starts hanging (DEPRECATED: please use [queue.mirror] LENGTH instead) ;MIRROR_QUEUE_LENGTH = 1000 ;; -;; Patch test queue length, increase if pull request patch testing starts hanging +;; Patch test queue length, increase if pull request patch testing starts hanging (DEPRECATED: please use [queue.pr_patch_checker] LENGTH instead) ;PULL_REQUEST_QUEUE_LENGTH = 1000 ;; ;; Preferred Licenses to place at the top of the List diff --git a/docs/content/doc/advanced/config-cheat-sheet.en-us.md b/docs/content/doc/advanced/config-cheat-sheet.en-us.md index 251f6bd51a9df..6421457332e31 100644 --- a/docs/content/doc/advanced/config-cheat-sheet.en-us.md +++ b/docs/content/doc/advanced/config-cheat-sheet.en-us.md @@ -54,10 +54,10 @@ Values containing `#` or `;` must be quoted using `` ` `` or `"""`. - `DEFAULT_PUSH_CREATE_PRIVATE`: **true**: Default private when creating a new repository with push-to-create. - `MAX_CREATION_LIMIT`: **-1**: Global maximum creation limit of repositories per user, `-1` means no limit. -- `PULL_REQUEST_QUEUE_LENGTH`: **1000**: Length of pull request patch test queue, make it +- `PULL_REQUEST_QUEUE_LENGTH`: **1000**: Length of pull request patch test queue, make it. **DEPRECATED** use `LENGTH` in `[queue.pr_patch_checker]`. as large as possible. Use caution when editing this value. - `MIRROR_QUEUE_LENGTH`: **1000**: Patch test queue length, increase if pull request patch - testing starts hanging. + testing starts hanging. **DEPRECATED** use `LENGTH` in `[queue.mirror]`. - `PREFERRED_LICENSES`: **Apache License 2.0,MIT License**: Preferred Licenses to place at the top of the list. Name must match file name in options/license or custom/options/license. - `DISABLE_HTTP_GIT`: **false**: Disable the ability to interact with repositories over the @@ -382,6 +382,8 @@ relation to port exhaustion. ## Queue (`queue` and `queue.*`) +Configuration at `[queue]` will set defaults for all queues with overrides for individual queues at `[queue.*]`. + - `TYPE`: **persistable-channel**: General queue type, currently support: `persistable-channel` (uses a LevelDB internally), `channel`, `level`, `redis`, `dummy` - `DATADIR`: **queues/**: Base DataDir for storing persistent and level queues. `DATADIR` for individual queues can be set in `queue.name` sections but will default to `DATADIR/`**`common`**. (Previously each queue would default to `DATADIR/`**`name`**.) - `LENGTH`: **20**: Maximal queue size before channel queues block @@ -400,6 +402,22 @@ relation to port exhaustion. - `BOOST_TIMEOUT`: **5m**: Boost workers will timeout after this long. - `BOOST_WORKERS`: **1** (v1.14 and before: **5**): This many workers will be added to the worker pool if there is a boost. +Gitea creates the following non-unique queues: + +- `code_indexer` +- `issue_indexer` +- `notification-service` +- `task` +- `mail` +- `push_update` + +And the following unique queues: + +- `repo_stats_update` +- `repo-archive` +- `mirror` +- `pr_patch_checker` + ## Admin (`admin`) - `DEFAULT_EMAIL_NOTIFICATIONS`: **enabled**: Default configuration for email notifications for users (user configurable). Options: enabled, onmention, disabled @@ -588,7 +606,7 @@ Define allowed algorithms and their minimum key length (use -1 to disable a type command or full path). - `SENDMAIL_ARGS`: **_empty_**: Specify any extra sendmail arguments. - `SENDMAIL_TIMEOUT`: **5m**: default timeout for sending email through sendmail -- `SEND_BUFFER_LEN`: **100**: Buffer length of mailing queue. +- `SEND_BUFFER_LEN`: **100**: Buffer length of mailing queue. **DEPRECATED** use `LENGTH` in `[queue.mailer]` ## Cache (`cache`) diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go index 5bec67c4d355c..3c8896cba644e 100644 --- a/modules/queue/unique_queue_channel.go +++ b/modules/queue/unique_queue_channel.go @@ -9,6 +9,7 @@ import ( "fmt" "sync" + "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/log" ) @@ -29,7 +30,7 @@ type ChannelUniqueQueueConfiguration ChannelQueueConfiguration type ChannelUniqueQueue struct { *WorkerPool lock sync.Mutex - table map[Data]bool + table map[string]bool shutdownCtx context.Context shutdownCtxCancel context.CancelFunc terminateCtx context.Context @@ -54,7 +55,7 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) queue := &ChannelUniqueQueue{ - table: map[Data]bool{}, + table: map[string]bool{}, shutdownCtx: shutdownCtx, shutdownCtxCancel: shutdownCtxCancel, terminateCtx: terminateCtx, @@ -65,9 +66,14 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue } queue.WorkerPool = NewWorkerPool(func(data ...Data) { for _, datum := range data { - queue.lock.Lock() - delete(queue.table, datum) - queue.lock.Unlock() + bs, err := json.Marshal(datum) + if err != nil { + log.Error("unable to marshal data: %v", datum) + } else { + queue.lock.Lock() + delete(queue.table, string(bs)) + queue.lock.Unlock() + } handle(datum) } }, config.WorkerPoolConfiguration) @@ -94,6 +100,11 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error { if !assignableTo(data, q.exemplar) { return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name) } + + bs, err := json.Marshal(data) + if err != nil { + return err + } q.lock.Lock() locked := true defer func() { @@ -101,16 +112,16 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error { q.lock.Unlock() } }() - if _, ok := q.table[data]; ok { + if _, ok := q.table[string(bs)]; ok { return ErrAlreadyInQueue } // FIXME: We probably need to implement some sort of limit here // If the downstream queue blocks this table will grow without limit - q.table[data] = true + q.table[string(bs)] = true if fn != nil { err := fn() if err != nil { - delete(q.table, data) + delete(q.table, string(bs)) return err } } @@ -122,9 +133,14 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error { // Has checks if the data is in the queue func (q *ChannelUniqueQueue) Has(data Data) (bool, error) { + bs, err := json.Marshal(data) + if err != nil { + return false, err + } + q.lock.Lock() defer q.lock.Unlock() - _, has := q.table[data] + _, has := q.table[string(bs)] return has, nil } diff --git a/modules/setting/queue.go b/modules/setting/queue.go index 76b7dc1faf70c..2e54fece415a1 100644 --- a/modules/setting/queue.go +++ b/modules/setting/queue.go @@ -158,4 +158,16 @@ func NewQueueService() { if _, ok := sectionMap["LENGTH"]; !ok { _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Repository.PullRequestQueueLength)) } + + // Handle the old mirror queue configuration + // Please note this will be a unique queue + section = Cfg.Section("queue.mirror") + sectionMap = map[string]bool{} + for _, key := range section.Keys() { + sectionMap[key.Name()] = true + } + if _, ok := sectionMap["LENGTH"]; !ok { + _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Repository.MirrorQueueLength)) + } + } diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go index 7a3e37d993597..3373b7f0a148a 100644 --- a/services/mirror/mirror.go +++ b/services/mirror/mirror.go @@ -7,18 +7,43 @@ package mirror import ( "context" "fmt" - "strconv" - "strings" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" - "code.gitea.io/gitea/modules/sync" ) -// mirrorQueue holds an UniqueQueue object of the mirror -var mirrorQueue = sync.NewUniqueQueue(setting.Repository.MirrorQueueLength) +var mirrorQueue queue.UniqueQueue + +// RequestType type of mirror request +type RequestType int + +const ( + // PullRequestType for pull mirrors + PullRequestType RequestType = iota + // PushRequestType for push mirrors + PushRequestType +) + +// Request for the mirror queue +type Request struct { + Type RequestType + RepoID int64 +} + +// doMirror causes this request to mirror itself +func doMirror(ctx context.Context, req *Request) { + switch req.Type { + case PushRequestType: + _ = SyncPushMirror(ctx, req.RepoID) + case PullRequestType: + _ = SyncPullMirror(ctx, req.RepoID) + default: + log.Error("Unknown Request type in queue: %v for RepoID[%d]", req.Type, req.RepoID) + } +} // Update checks and updates mirror repositories. func Update(ctx context.Context) error { @@ -29,19 +54,25 @@ func Update(ctx context.Context) error { log.Trace("Doing: Update") handler := func(idx int, bean interface{}) error { - var item string + var item Request if m, ok := bean.(*models.Mirror); ok { if m.Repo == nil { log.Error("Disconnected mirror found: %d", m.ID) return nil } - item = fmt.Sprintf("pull %d", m.RepoID) + item = Request{ + Type: PullRequestType, + RepoID: m.RepoID, + } } else if m, ok := bean.(*models.PushMirror); ok { if m.Repo == nil { log.Error("Disconnected push-mirror found: %d", m.ID) return nil } - item = fmt.Sprintf("push %d", m.ID) + item = Request{ + Type: PushRequestType, + RepoID: m.RepoID, + } } else { log.Error("Unknown bean: %v", bean) return nil @@ -51,8 +82,7 @@ func Update(ctx context.Context) error { case <-ctx.Done(): return fmt.Errorf("Aborted") default: - mirrorQueue.Add(item) - return nil + return mirrorQueue.Push(&item) } } @@ -68,26 +98,10 @@ func Update(ctx context.Context) error { return nil } -// syncMirrors checks and syncs mirrors. -// FIXME: graceful: this should be a persistable queue -func syncMirrors(ctx context.Context) { - // Start listening on new sync requests. - for { - select { - case <-ctx.Done(): - mirrorQueue.Close() - return - case item := <-mirrorQueue.Queue(): - id, _ := strconv.ParseInt(item[5:], 10, 64) - if strings.HasPrefix(item, "pull") { - _ = SyncPullMirror(ctx, id) - } else if strings.HasPrefix(item, "push") { - _ = SyncPushMirror(ctx, id) - } else { - log.Error("Unknown item in queue: %v", item) - } - mirrorQueue.Remove(item) - } +func queueHandle(data ...queue.Data) { + for _, datum := range data { + req := datum.(*Request) + doMirror(graceful.GetManager().ShutdownContext(), req) } } @@ -96,7 +110,9 @@ func InitSyncMirrors() { if !setting.Mirror.Enabled { return } - go graceful.GetManager().RunWithShutdownContext(syncMirrors) + mirrorQueue = queue.CreateUniqueQueue("mirror", queueHandle, new(Request)) + + go graceful.GetManager().RunWithShutdownFns(mirrorQueue.Run) } // StartToMirror adds repoID to mirror queue @@ -104,7 +120,15 @@ func StartToMirror(repoID int64) { if !setting.Mirror.Enabled { return } - go mirrorQueue.Add(fmt.Sprintf("pull %d", repoID)) + go func() { + err := mirrorQueue.Push(&Request{ + Type: PushRequestType, + RepoID: repoID, + }) + if err != nil { + log.Error("Unable to push push mirror request to the queue for repo[%d]: Error: %v", repoID, err) + } + }() } // AddPushMirrorToQueue adds the push mirror to the queue @@ -112,5 +136,14 @@ func AddPushMirrorToQueue(mirrorID int64) { if !setting.Mirror.Enabled { return } - go mirrorQueue.Add(fmt.Sprintf("push %d", mirrorID)) + go func() { + + err := mirrorQueue.Push(&Request{ + Type: PullRequestType, + RepoID: mirrorID, + }) + if err != nil { + log.Error("Unable to push pull mirror request to the queue for repo[%d]: Error: %v", mirrorID, err) + } + }() } From 293c806ab26378c36f5008c1b0407cfe1805e86a Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 16 Oct 2021 13:50:45 +0100 Subject: [PATCH 02/10] Remove MailService.QueueLength Signed-off-by: Andrew Thornton --- modules/setting/mailer.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/modules/setting/mailer.go b/modules/setting/mailer.go index a2228e938ba0d..d2fac440ac08c 100644 --- a/modules/setting/mailer.go +++ b/modules/setting/mailer.go @@ -16,7 +16,6 @@ import ( // Mailer represents mail service. type Mailer struct { // Mailer - QueueLength int Name string From string FromName string @@ -54,7 +53,6 @@ func newMailService() { } MailService = &Mailer{ - QueueLength: sec.Key("SEND_BUFFER_LEN").MustInt(100), Name: sec.Key("NAME").MustString(AppName), SendAsPlainText: sec.Key("SEND_AS_PLAIN_TEXT").MustBool(false), MailerType: sec.Key("MAILER_TYPE").In("", []string{"smtp", "sendmail", "dummy"}), From 0518c4bc66b9e801aa6101852e2fc814556d6a91 Mon Sep 17 00:00:00 2001 From: zeripath Date: Sat, 16 Oct 2021 16:08:56 +0100 Subject: [PATCH 03/10] Update modules/queue/unique_queue_channel.go --- modules/queue/unique_queue_channel.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go index 3c8896cba644e..f617595c04380 100644 --- a/modules/queue/unique_queue_channel.go +++ b/modules/queue/unique_queue_channel.go @@ -66,14 +66,13 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue } queue.WorkerPool = NewWorkerPool(func(data ...Data) { for _, datum := range data { - bs, err := json.Marshal(datum) - if err != nil { - log.Error("unable to marshal data: %v", datum) - } else { - queue.lock.Lock() - delete(queue.table, string(bs)) - queue.lock.Unlock() - } + // No error is possible here because PushFunc ensures that this can be marshalled + bs, _ := json.Marshal(datum) + + queue.lock.Lock() + delete(queue.table, string(bs)) + queue.lock.Unlock() + handle(datum) } }, config.WorkerPoolConfiguration) From a30c432b6c7c01ba153b471d5c52896e0dd2be74 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 16 Oct 2021 18:44:12 +0100 Subject: [PATCH 04/10] as per delvh Signed-off-by: Andrew Thornton --- modules/setting/queue.go | 65 +++++++++++++++++++++------------------- 1 file changed, 34 insertions(+), 31 deletions(-) diff --git a/modules/setting/queue.go b/modules/setting/queue.go index 2e54fece415a1..5fa0451bb9435 100644 --- a/modules/setting/queue.go +++ b/modules/setting/queue.go @@ -7,9 +7,11 @@ package setting import ( "fmt" "path/filepath" + "strconv" "time" "code.gitea.io/gitea/modules/log" + ini "gopkg.in/ini.v1" ) // QueueSettings represent the settings for a queue from the ini @@ -106,11 +108,8 @@ func NewQueueService() { // Now handle the old issue_indexer configuration section := Cfg.Section("queue.issue_indexer") - sectionMap := map[string]bool{} - for _, key := range section.Keys() { - sectionMap[key.Name()] = true - } - if _, ok := sectionMap["TYPE"]; !ok && defaultType == "" { + directlySet := toDirectlySetKeysMap(section) + if !directlySet["TYPE"] && defaultType == "" { switch Indexer.IssueQueueType { case LevelQueueType: _, _ = section.NewKey("TYPE", "level") @@ -125,49 +124,53 @@ func NewQueueService() { Indexer.IssueQueueType) } } - if _, ok := sectionMap["LENGTH"]; !ok && Indexer.UpdateQueueLength != 0 { + if !directlySet["LENGTH"] && Indexer.UpdateQueueLength != 0 { _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Indexer.UpdateQueueLength)) } - if _, ok := sectionMap["BATCH_LENGTH"]; !ok && Indexer.IssueQueueBatchNumber != 0 { + if !directlySet["BATCH_LENGTH"] && Indexer.IssueQueueBatchNumber != 0 { _, _ = section.NewKey("BATCH_LENGTH", fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber)) } - if _, ok := sectionMap["DATADIR"]; !ok && Indexer.IssueQueueDir != "" { + if !directlySet["DATADIR"] && Indexer.IssueQueueDir != "" { _, _ = section.NewKey("DATADIR", Indexer.IssueQueueDir) } - if _, ok := sectionMap["CONN_STR"]; !ok && Indexer.IssueQueueConnStr != "" { + if !directlySet["CONN_STR"] && Indexer.IssueQueueConnStr != "" { _, _ = section.NewKey("CONN_STR", Indexer.IssueQueueConnStr) } // Handle the old mailer configuration - section = Cfg.Section("queue.mailer") - sectionMap = map[string]bool{} - for _, key := range section.Keys() { - sectionMap[key.Name()] = true - } - if _, ok := sectionMap["LENGTH"]; !ok { - _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100))) - } + handleOldLengthConfiguration("mailer", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100)) // Handle the old test pull requests configuration // Please note this will be a unique queue - section = Cfg.Section("queue.pr_patch_checker") - sectionMap = map[string]bool{} - for _, key := range section.Keys() { - sectionMap[key.Name()] = true - } - if _, ok := sectionMap["LENGTH"]; !ok { - _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Repository.PullRequestQueueLength)) - } + handleOldLengthConfiguration("pr_patch_checker", Repository.PullRequestQueueLength) // Handle the old mirror queue configuration // Please note this will be a unique queue - section = Cfg.Section("queue.mirror") - sectionMap = map[string]bool{} - for _, key := range section.Keys() { - sectionMap[key.Name()] = true + handleOldLengthConfiguration("mirror", Repository.MirrorQueueLength) +} + +// handleOldLengthConfiguration allows fallback to older configuration. `[queue.name]` `LENGTH` will override this configuration, but +// if that is left unset then we should fallback to the older configuration. (Except where the new length woul be <=0) +func handleOldLengthConfiguration(queueName string, value int) { + // Don't override with 0 + if value <= 0 { + return } - if _, ok := sectionMap["LENGTH"]; !ok { - _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Repository.MirrorQueueLength)) + + section := Cfg.Section("queue." + queueName) + directlySet := toDirectlySetKeysMap(section) + if !directlySet["LENGTH"] { + _, _ = section.NewKey("LENGTH", strconv.Itoa(value)) } +} +// toDirectlySetKeysMap returns a bool map of keys directly set by this section +// Note: we cannot use section.HasKey(...) as that will immediately set the Key if a parent section has the Key +// but this section does not. +func toDirectlySetKeysMap(section *ini.Section) map[string]bool { + sectionMap := map[string]bool{} + for _, key := range section.Keys() { + sectionMap[key.Name()] = true + } + return sectionMap } From 07a304a802cb67d742ca937bda2251f02ed28fc1 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 16 Oct 2021 18:49:35 +0100 Subject: [PATCH 05/10] as per delvh Signed-off-by: Andrew Thornton --- services/mirror/mirror.go | 54 +++++++++++++++++++-------------------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go index 3373b7f0a148a..9dd6c82ce34dc 100644 --- a/services/mirror/mirror.go +++ b/services/mirror/mirror.go @@ -17,28 +17,28 @@ import ( var mirrorQueue queue.UniqueQueue -// RequestType type of mirror request -type RequestType int +// SyncType type of sync request +type SyncType int const ( - // PullRequestType for pull mirrors - PullRequestType RequestType = iota - // PushRequestType for push mirrors - PushRequestType + // PullMirrorType for pull mirrors + PullMirrorType SyncType = iota + // PushMirrorType for push mirrors + PushMirrorType ) -// Request for the mirror queue -type Request struct { - Type RequestType +// SyncRequest for the mirror queue +type SyncRequest struct { + Type SyncType RepoID int64 } -// doMirror causes this request to mirror itself -func doMirror(ctx context.Context, req *Request) { +// doMirrorSync causes this request to mirror itself +func doMirrorSync(ctx context.Context, req *SyncRequest) { switch req.Type { - case PushRequestType: + case PushMirrorType: _ = SyncPushMirror(ctx, req.RepoID) - case PullRequestType: + case PullMirrorType: _ = SyncPullMirror(ctx, req.RepoID) default: log.Error("Unknown Request type in queue: %v for RepoID[%d]", req.Type, req.RepoID) @@ -54,14 +54,14 @@ func Update(ctx context.Context) error { log.Trace("Doing: Update") handler := func(idx int, bean interface{}) error { - var item Request + var item SyncRequest if m, ok := bean.(*models.Mirror); ok { if m.Repo == nil { log.Error("Disconnected mirror found: %d", m.ID) return nil } - item = Request{ - Type: PullRequestType, + item = SyncRequest{ + Type: PullMirrorType, RepoID: m.RepoID, } } else if m, ok := bean.(*models.PushMirror); ok { @@ -69,8 +69,8 @@ func Update(ctx context.Context) error { log.Error("Disconnected push-mirror found: %d", m.ID) return nil } - item = Request{ - Type: PushRequestType, + item = SyncRequest{ + Type: PushMirrorType, RepoID: m.RepoID, } } else { @@ -100,8 +100,8 @@ func Update(ctx context.Context) error { func queueHandle(data ...queue.Data) { for _, datum := range data { - req := datum.(*Request) - doMirror(graceful.GetManager().ShutdownContext(), req) + req := datum.(*SyncRequest) + doMirrorSync(graceful.GetManager().ShutdownContext(), req) } } @@ -110,7 +110,7 @@ func InitSyncMirrors() { if !setting.Mirror.Enabled { return } - mirrorQueue = queue.CreateUniqueQueue("mirror", queueHandle, new(Request)) + mirrorQueue = queue.CreateUniqueQueue("mirror", queueHandle, new(SyncRequest)) go graceful.GetManager().RunWithShutdownFns(mirrorQueue.Run) } @@ -121,12 +121,12 @@ func StartToMirror(repoID int64) { return } go func() { - err := mirrorQueue.Push(&Request{ - Type: PushRequestType, + err := mirrorQueue.Push(&SyncRequest{ + Type: PushMirrorType, RepoID: repoID, }) if err != nil { - log.Error("Unable to push push mirror request to the queue for repo[%d]: Error: %v", repoID, err) + log.Error("Unable to push sync request for to the queue for push mirror repo[%d]: Error: %v", repoID, err) } }() } @@ -138,12 +138,12 @@ func AddPushMirrorToQueue(mirrorID int64) { } go func() { - err := mirrorQueue.Push(&Request{ - Type: PullRequestType, + err := mirrorQueue.Push(&SyncRequest{ + Type: PullMirrorType, RepoID: mirrorID, }) if err != nil { - log.Error("Unable to push pull mirror request to the queue for repo[%d]: Error: %v", mirrorID, err) + log.Error("Unable to push sync request to the queue for pull mirror repo[%d]: Error: %v", mirrorID, err) } }() } From aee49c3b0c549c35da9c8c5c8eb880e18c664662 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 16 Oct 2021 19:22:47 +0100 Subject: [PATCH 06/10] remove unnecessary line Signed-off-by: Andrew Thornton --- services/mirror/mirror.go | 1 - 1 file changed, 1 deletion(-) diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go index 9dd6c82ce34dc..bbf04cf9849b2 100644 --- a/services/mirror/mirror.go +++ b/services/mirror/mirror.go @@ -137,7 +137,6 @@ func AddPushMirrorToQueue(mirrorID int64) { return } go func() { - err := mirrorQueue.Push(&SyncRequest{ Type: PullMirrorType, RepoID: mirrorID, From 6109ee02ed6d5eb3ee6da2da7d20b387c4ff0d3f Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 16 Oct 2021 19:53:10 +0100 Subject: [PATCH 07/10] Add clarification to queues configuration Signed-off-by: Andrew Thornton --- .../doc/advanced/config-cheat-sheet.en-us.md | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/docs/content/doc/advanced/config-cheat-sheet.en-us.md b/docs/content/doc/advanced/config-cheat-sheet.en-us.md index 6421457332e31..91c62dbec34ae 100644 --- a/docs/content/doc/advanced/config-cheat-sheet.en-us.md +++ b/docs/content/doc/advanced/config-cheat-sheet.en-us.md @@ -382,7 +382,7 @@ relation to port exhaustion. ## Queue (`queue` and `queue.*`) -Configuration at `[queue]` will set defaults for all queues with overrides for individual queues at `[queue.*]`. +Configuration at `[queue]` will set defaults for queues with overrides for individual queues at `[queue.*]`. (However see below.) - `TYPE`: **persistable-channel**: General queue type, currently support: `persistable-channel` (uses a LevelDB internally), `channel`, `level`, `redis`, `dummy` - `DATADIR`: **queues/**: Base DataDir for storing persistent and level queues. `DATADIR` for individual queues can be set in `queue.name` sections but will default to `DATADIR/`**`common`**. (Previously each queue would default to `DATADIR/`**`name`**.) @@ -418,6 +418,21 @@ And the following unique queues: - `mirror` - `pr_patch_checker` +Certain queues have defaults that override the defaults set in `[queue]` (this occurs mostly to support older configuration): + +- `[queue.issue_indexer]` + - `TYPE` this will default to `[queue]` `TYPE` if it is set but if not it will appropriately convert `[indexer]` `ISSUE_INDEXER_QUEUE_TYPE` if that is set. + - `LENGTH` will default to `[indexer]` `UPDATE_BUFFER_LEN` if that is set. + - `BATCH_LENGTH` will default to `[indexer]` `ISSUE_INDEXER_QUEUE_BATCH_NUMBER` if that is set. + - `DATADIR` will default to `[indexer]` `ISSUE_INDEXER_QUEUE_DIR` if that is set. + - `CONN_STR` will default to `[indexer]` `ISSUE_INDEXER_QUEUE_CONN_STR` if that is set. +- `[queue.mailer]` + - `LENGTH` will default to **100** or whatever `[mailer]` `SEND_BUFFER_LEN` is. +- `[queue.pr_patch_checker]` + - `LENGTH` will default to **1000** or whatever `[repository]` `PULL_REQUEST_QUEUE_LENGTH` is. +- `[queue.mirror]` + - `LENGTH` will default to **1000** or whatever `[repository]` `MIRROR_QUEUE_LENGTH` is. + ## Admin (`admin`) - `DEFAULT_EMAIL_NOTIFICATIONS`: **enabled**: Default configuration for email notifications for users (user configurable). Options: enabled, onmention, disabled From 54d8bff6269276f3b1c2d26f9cf8a836a742d4ab Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 16 Oct 2021 19:57:48 +0100 Subject: [PATCH 08/10] Remove PullRequestQueueLength and MirrorQueueLength Signed-off-by: Andrew Thornton --- modules/setting/queue.go | 4 ++-- modules/setting/repository.go | 4 ---- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/modules/setting/queue.go b/modules/setting/queue.go index 5fa0451bb9435..225fc5d9be36d 100644 --- a/modules/setting/queue.go +++ b/modules/setting/queue.go @@ -142,11 +142,11 @@ func NewQueueService() { // Handle the old test pull requests configuration // Please note this will be a unique queue - handleOldLengthConfiguration("pr_patch_checker", Repository.PullRequestQueueLength) + handleOldLengthConfiguration("pr_patch_checker", Cfg.Section("repository").Key("PULL_REQUEST_QUEUE_LENGTH").MustInt(1000)) // Handle the old mirror queue configuration // Please note this will be a unique queue - handleOldLengthConfiguration("mirror", Repository.MirrorQueueLength) + handleOldLengthConfiguration("mirror", Cfg.Section("repository").Key("MIRROR_QUEUE_LENGTH").MustInt(1000)) } // handleOldLengthConfiguration allows fallback to older configuration. `[queue.name]` `LENGTH` will override this configuration, but diff --git a/modules/setting/repository.go b/modules/setting/repository.go index de57eb91401a6..0791602efb6bb 100644 --- a/modules/setting/repository.go +++ b/modules/setting/repository.go @@ -29,8 +29,6 @@ var ( DefaultPrivate string DefaultPushCreatePrivate bool MaxCreationLimit int - MirrorQueueLength int - PullRequestQueueLength int PreferredLicenses []string DisableHTTPGit bool AccessControlAllowOrigin string @@ -142,8 +140,6 @@ var ( DefaultPrivate: RepoCreatingLastUserVisibility, DefaultPushCreatePrivate: true, MaxCreationLimit: -1, - MirrorQueueLength: 1000, - PullRequestQueueLength: 1000, PreferredLicenses: []string{"Apache License 2.0", "MIT License"}, DisableHTTPGit: false, AccessControlAllowOrigin: "", From 128036d1b070ae16d8f420f7b81f6bc9f4a467ba Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 16 Oct 2021 20:02:52 +0100 Subject: [PATCH 09/10] use strconv too Signed-off-by: Andrew Thornton --- modules/setting/queue.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/modules/setting/queue.go b/modules/setting/queue.go index 225fc5d9be36d..1668cc63a34db 100644 --- a/modules/setting/queue.go +++ b/modules/setting/queue.go @@ -5,7 +5,6 @@ package setting import ( - "fmt" "path/filepath" "strconv" "time" @@ -125,10 +124,10 @@ func NewQueueService() { } } if !directlySet["LENGTH"] && Indexer.UpdateQueueLength != 0 { - _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Indexer.UpdateQueueLength)) + _, _ = section.NewKey("LENGTH", strconv.Itoa(Indexer.UpdateQueueLength)) } if !directlySet["BATCH_LENGTH"] && Indexer.IssueQueueBatchNumber != 0 { - _, _ = section.NewKey("BATCH_LENGTH", fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber)) + _, _ = section.NewKey("BATCH_LENGTH", strconv.Itoa(Indexer.IssueQueueBatchNumber)) } if !directlySet["DATADIR"] && Indexer.IssueQueueDir != "" { _, _ = section.NewKey("DATADIR", Indexer.IssueQueueDir) From 6d20582b4d2601486ee5715cf7b127d91bf71337 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sun, 17 Oct 2021 10:25:29 +0100 Subject: [PATCH 10/10] oops Signed-off-by: Andrew Thornton --- services/mirror/mirror.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go index bbf04cf9849b2..eb37639beff8e 100644 --- a/services/mirror/mirror.go +++ b/services/mirror/mirror.go @@ -122,7 +122,7 @@ func StartToMirror(repoID int64) { } go func() { err := mirrorQueue.Push(&SyncRequest{ - Type: PushMirrorType, + Type: PullMirrorType, RepoID: repoID, }) if err != nil { @@ -138,7 +138,7 @@ func AddPushMirrorToQueue(mirrorID int64) { } go func() { err := mirrorQueue.Push(&SyncRequest{ - Type: PullMirrorType, + Type: PushMirrorType, RepoID: mirrorID, }) if err != nil {