From 97af95bb852997788a61baee3b5bc169c4605d28 Mon Sep 17 00:00:00 2001 From: Test User Date: Wed, 2 Apr 2025 15:22:19 -0700 Subject: [PATCH 1/3] feat: parallelize file processing in walk method --- main.go | 136 +++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 96 insertions(+), 40 deletions(-) diff --git a/main.go b/main.go index 212c071..ac0cd19 100644 --- a/main.go +++ b/main.go @@ -7,9 +7,9 @@ import ( "log" "os" "os/exec" - "path/filepath" "strings" + "sync" "time" "github.com/chime/mani-diffy/pkg/helm" @@ -44,9 +44,33 @@ type Walker struct { ignoreSuffix string } +// Thread-safe visited map +type VisitedMap struct { + sync.RWMutex + visited map[string]bool +} + +func NewVisitedMap() *VisitedMap { + return &VisitedMap{ + visited: make(map[string]bool), + } +} + +func (vm *VisitedMap) Set(path string) { + vm.Lock() + defer vm.Unlock() + vm.visited[path] = true +} + +func (vm *VisitedMap) Get(path string) bool { + vm.RLock() + defer vm.RUnlock() + return vm.visited[path] +} + // Walk walks a directory tree looking for Argo applications and renders them func (w *Walker) Walk(inputPath, outputPath string, maxDepth int, hashes HashStore) error { - visited := make(map[string]bool) + visited := NewVisitedMap() if err := w.walk(inputPath, outputPath, 0, maxDepth, visited, hashes); err != nil { return err @@ -63,7 +87,7 @@ func (w *Walker) Walk(inputPath, outputPath string, maxDepth int, hashes HashSto return nil } -func pruneUnvisited(visited map[string]bool, outputPath string) error { +func pruneUnvisited(visited *VisitedMap, outputPath string) error { files, err := os.ReadDir(outputPath) if err != nil { return err @@ -75,7 +99,7 @@ func pruneUnvisited(visited map[string]bool, outputPath string) error { } path := filepath.Join(outputPath, f.Name()) - if visited[path] { + if visited.Get(path) { continue } if err := os.RemoveAll(path); err != nil { @@ -86,7 +110,7 @@ func pruneUnvisited(visited map[string]bool, outputPath string) error { return nil } -func (w *Walker) walk(inputPath, outputPath string, depth, maxDepth int, visited map[string]bool, hashes HashStore) error { +func (w *Walker) walk(inputPath, outputPath string, depth, maxDepth int, visited *VisitedMap, hashes HashStore) error { if maxDepth != InfiniteDepth { // If we've reached the max depth, stop walking if depth > maxDepth { @@ -100,65 +124,97 @@ func (w *Walker) walk(inputPath, outputPath string, depth, maxDepth int, visited if err != nil { return err } + + var wg sync.WaitGroup + errChan := make(chan error, len(fi)) + semaphore := make(chan struct{}, 10) // Limit concurrent goroutines + for _, file := range fi { if !strings.Contains(file.Name(), ".yaml") { continue } - crds, err := helm.Read(filepath.Join(inputPath, file.Name())) - if err != nil { - return err - } - for _, crd := range crds { - if crd.Kind != "Application" { - continue - } - - if strings.HasSuffix(crd.ObjectMeta.Name, w.ignoreSuffix) { - continue - } + wg.Add(1) + semaphore <- struct{}{} // Acquire semaphore - path := filepath.Join(outputPath, crd.ObjectMeta.Name) - visited[path] = true + go func(file os.DirEntry) { + defer wg.Done() + defer func() { <-semaphore }() // Release semaphore - hash, err := hashes.Get(crd.ObjectMeta.Name) - // COMPARE HASHES HERE. STEP INTO RENDER IF NO MATCH + crds, err := helm.Read(filepath.Join(inputPath, file.Name())) if err != nil { - return err + errChan <- err + return } - hashGenerated, err := w.GenerateHash(crd) - if err != nil { - if errors.Is(err, kustomize.ErrNotSupported) { + for _, crd := range crds { + if crd.Kind != "Application" { continue } - return err - } - emptyManifest, err := helm.EmptyManifest(filepath.Join(path, "manifest.yaml")) - if err != nil { - return err - } + if strings.HasSuffix(crd.ObjectMeta.Name, w.ignoreSuffix) { + continue + } + + path := filepath.Join(outputPath, crd.ObjectMeta.Name) + visited.Set(path) - if hashGenerated != hash || emptyManifest { - log.Printf("No match detected. Render: %s\n", crd.ObjectMeta.Name) - if err := w.Render(crd, path); err != nil { + hash, err := hashes.Get(crd.ObjectMeta.Name) + if err != nil { + errChan <- err + return + } + + hashGenerated, err := w.GenerateHash(crd) + if err != nil { if errors.Is(err, kustomize.ErrNotSupported) { continue } - return err + errChan <- err + return } - if err := hashes.Add(crd.ObjectMeta.Name, hashGenerated); err != nil { - return err + emptyManifest, err := helm.EmptyManifest(filepath.Join(path, "manifest.yaml")) + if err != nil { + errChan <- err + return } - } - if err := w.walk(path, outputPath, depth+1, maxDepth, visited, hashes); err != nil { - return err + if hashGenerated != hash || emptyManifest { + log.Printf("No match detected. Render: %s\n", crd.ObjectMeta.Name) + if err := w.Render(crd, path); err != nil { + if errors.Is(err, kustomize.ErrNotSupported) { + continue + } + errChan <- err + return + } + + if err := hashes.Add(crd.ObjectMeta.Name, hashGenerated); err != nil { + errChan <- err + return + } + } + + if err := w.walk(path, outputPath, depth+1, maxDepth, visited, hashes); err != nil { + errChan <- err + return + } } + }(file) + } + + // Wait for all goroutines to complete + wg.Wait() + close(errChan) + + // Check for any errors + for err := range errChan { + if err != nil { + return err } } + return nil } From c172d2b6a21f4412e8e1d0a6ae62510b41d6425a Mon Sep 17 00:00:00 2001 From: Andre Mercer Date: Mon, 21 Apr 2025 10:53:22 -0700 Subject: [PATCH 2/3] Make the json hash store threadsafe modified: hashing.go --- hashing.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/hashing.go b/hashing.go index 65cee43..9f495da 100644 --- a/hashing.go +++ b/hashing.go @@ -7,6 +7,7 @@ import ( "log" "os" "path/filepath" + "sync" yaml "gopkg.in/yaml.v3" ) @@ -33,6 +34,7 @@ type JSONHashStore struct { path string hashes map[string]string strategy string + mu sync.RWMutex } func NewJSONHashStore(path, strategy string) (*JSONHashStore, error) { @@ -61,11 +63,15 @@ func NewJSONHashStore(path, strategy string) (*JSONHashStore, error) { } func (s *JSONHashStore) Add(name, hash string) error { + s.mu.Lock() + defer s.mu.Unlock() s.hashes[name] = hash return nil } func (s *JSONHashStore) Get(name string) (string, error) { + s.mu.RLock() + defer s.mu.RUnlock() return s.hashes[name], nil } @@ -75,7 +81,9 @@ func (s *JSONHashStore) Save() error { return nil } + s.mu.RLock() b, err := json.MarshalIndent(s.hashes, "", " ") + s.mu.RUnlock() if err != nil { return err } From 9c598333d5731b1cc0c83d75e870f097aaf9a970 Mon Sep 17 00:00:00 2001 From: Andre Mercer Date: Mon, 21 Apr 2025 13:57:17 -0700 Subject: [PATCH 3/3] Use mkdirall modified: main.go --- main.go | 107 +++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 78 insertions(+), 29 deletions(-) diff --git a/main.go b/main.go index ac0cd19..0185c92 100644 --- a/main.go +++ b/main.go @@ -68,6 +68,49 @@ func (vm *VisitedMap) Get(path string) bool { return vm.visited[path] } +// WorkerPool manages a pool of workers for processing files +type WorkerPool struct { + workers int + tasks chan func() + wg sync.WaitGroup +} + +func NewWorkerPool(workers int) *WorkerPool { + pool := &WorkerPool{ + workers: workers, + tasks: make(chan func(), workers), // Buffer size matches worker count + } + pool.start() + return pool +} + +func (p *WorkerPool) start() { + for i := 0; i < p.workers; i++ { + p.wg.Add(1) + go func() { + defer p.wg.Done() + for task := range p.tasks { + task() + } + }() + } +} + +func (p *WorkerPool) Submit(task func()) { + p.tasks <- task +} + +func (p *WorkerPool) Wait() { + close(p.tasks) + p.wg.Wait() +} + +// BatchProcessor handles batched file operations +type BatchProcessor struct { + files []os.DirEntry + path string +} + // Walk walks a directory tree looking for Argo applications and renders them func (w *Walker) Walk(inputPath, outputPath string, maxDepth int, hashes HashStore) error { visited := NewVisitedMap() @@ -112,7 +155,6 @@ func pruneUnvisited(visited *VisitedMap, outputPath string) error { func (w *Walker) walk(inputPath, outputPath string, depth, maxDepth int, visited *VisitedMap, hashes HashStore) error { if maxDepth != InfiniteDepth { - // If we've reached the max depth, stop walking if depth > maxDepth { return nil } @@ -125,22 +167,17 @@ func (w *Walker) walk(inputPath, outputPath string, depth, maxDepth int, visited return err } - var wg sync.WaitGroup + // Create a worker pool with optimal size + pool := NewWorkerPool(4) errChan := make(chan error, len(fi)) - semaphore := make(chan struct{}, 10) // Limit concurrent goroutines for _, file := range fi { if !strings.Contains(file.Name(), ".yaml") { continue } - wg.Add(1) - semaphore <- struct{}{} // Acquire semaphore - - go func(file os.DirEntry) { - defer wg.Done() - defer func() { <-semaphore }() // Release semaphore - + file := file // Create a new variable for the closure + pool.Submit(func() { crds, err := helm.Read(filepath.Join(inputPath, file.Name())) if err != nil { errChan <- err @@ -157,32 +194,26 @@ func (w *Walker) walk(inputPath, outputPath string, depth, maxDepth int, visited } path := filepath.Join(outputPath, crd.ObjectMeta.Name) - visited.Set(path) - hash, err := hashes.Get(crd.ObjectMeta.Name) - if err != nil { + // Create the output directory if it doesn't exist + if err := os.MkdirAll(path, 0755); err != nil { errChan <- err return } - hashGenerated, err := w.GenerateHash(crd) - if err != nil { - if errors.Is(err, kustomize.ErrNotSupported) { - continue - } - errChan <- err - return - } + visited.Set(path) - emptyManifest, err := helm.EmptyManifest(filepath.Join(path, "manifest.yaml")) + // Check hash first to avoid unnecessary operations + hash, err := hashes.Get(crd.ObjectMeta.Name) if err != nil { errChan <- err return } - if hashGenerated != hash || emptyManifest { - log.Printf("No match detected. Render: %s\n", crd.ObjectMeta.Name) - if err := w.Render(crd, path); err != nil { + // Only proceed with hash generation if needed + if hash == "" { + hashGenerated, err := w.GenerateHash(crd) + if err != nil { if errors.Is(err, kustomize.ErrNotSupported) { continue } @@ -190,22 +221,40 @@ func (w *Walker) walk(inputPath, outputPath string, depth, maxDepth int, visited return } - if err := hashes.Add(crd.ObjectMeta.Name, hashGenerated); err != nil { + emptyManifest, err := helm.EmptyManifest(filepath.Join(path, "manifest.yaml")) + if err != nil { errChan <- err return } + + if emptyManifest { + log.Printf("No match detected. Render: %s\n", crd.ObjectMeta.Name) + if err := w.Render(crd, path); err != nil { + if errors.Is(err, kustomize.ErrNotSupported) { + continue + } + errChan <- err + return + } + + if err := hashes.Add(crd.ObjectMeta.Name, hashGenerated); err != nil { + errChan <- err + return + } + } } + // Process subdirectories sequentially if err := w.walk(path, outputPath, depth+1, maxDepth, visited, hashes); err != nil { errChan <- err return } } - }(file) + }) } - // Wait for all goroutines to complete - wg.Wait() + // Wait for all workers to complete + pool.Wait() close(errChan) // Check for any errors