From 498d88fabc64798e8ff021c0c4514baafe11f5ca Mon Sep 17 00:00:00 2001 From: Joe Lanford Date: Fri, 6 Dec 2024 09:20:39 -0500 Subject: [PATCH 01/15] add a new feature-gated api/v1/query endpoint Signed-off-by: Joe Lanford --- catalogd/cmd/catalogd/main.go | 4 +- catalogd/internal/features/features.go | 8 +- catalogd/internal/storage/index.go | 136 +++++++++++ catalogd/internal/storage/localdir.go | 256 +++++++++++++++++---- catalogd/internal/storage/localdir_test.go | 70 +++--- 5 files changed, 387 insertions(+), 87 deletions(-) create mode 100644 catalogd/internal/storage/index.go diff --git a/catalogd/cmd/catalogd/main.go b/catalogd/cmd/catalogd/main.go index 8ab76aa32..cae87717b 100644 --- a/catalogd/cmd/catalogd/main.go +++ b/catalogd/cmd/catalogd/main.go @@ -302,9 +302,9 @@ func main() { os.Exit(1) } - localStorage = storage.LocalDirV1{RootDir: storeDir, RootURL: baseStorageURL} + localStorage = &storage.LocalDirV1{RootDir: storeDir, RootURL: baseStorageURL} - // Config for the the catalogd web server + // Config for the catalogd web server catalogServerConfig := serverutil.CatalogServerConfig{ ExternalAddr: externalAddr, CatalogAddr: catalogServerAddr, diff --git a/catalogd/internal/features/features.go b/catalogd/internal/features/features.go index 8f67b1689..1ab490854 100644 --- a/catalogd/internal/features/features.go +++ b/catalogd/internal/features/features.go @@ -5,7 +5,13 @@ import ( "k8s.io/component-base/featuregate" ) -var catalogdFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{} +const ( + APIV1QueryHandler = featuregate.Feature("APIV1QueryHandler") +) + +var catalogdFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ + APIV1QueryHandler: {Default: false, PreRelease: featuregate.Alpha}, +} var CatalogdFeatureGate featuregate.MutableFeatureGate = featuregate.NewFeatureGate() diff --git a/catalogd/internal/storage/index.go b/catalogd/internal/storage/index.go new file mode 100644 index 000000000..b80b2646a --- /dev/null +++ b/catalogd/internal/storage/index.go @@ -0,0 +1,136 @@ +package storage + +import ( + "cmp" + "encoding/json" + "errors" + "fmt" + "io" + "slices" + + "k8s.io/apimachinery/pkg/util/sets" + + "github.com/operator-framework/operator-registry/alpha/declcfg" +) + +type index struct { + BySchema map[string][]section `json:"by_schema"` + ByPackage map[string][]section `json:"by_package"` + ByName map[string][]section `json:"by_name"` +} + +type section struct { + offset int64 + length int64 +} + +func (s *section) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf(`[%d,%d]`, s.offset, s.length)), nil +} + +func (s *section) UnmarshalJSON(b []byte) error { + vals := [2]int64{} + if err := json.Unmarshal(b, &vals); err != nil { + return err + } + s.offset = vals[0] + s.length = vals[1] + return nil +} + +func (i index) Size() int64 { + size := 0 + for k, v := range i.BySchema { + size += len(k) + len(v)*16 + } + for k, v := range i.ByPackage { + size += len(k) + len(v)*16 + } + for k, v := range i.ByName { + size += len(k) + len(v)*16 + } + return int64(size) +} + +func (i index) Get(r io.ReaderAt, schema, packageName, name string) (io.Reader, bool) { + sectionSet := i.getSectionSet(schema, packageName, name) + + sections := sectionSet.UnsortedList() + slices.SortFunc(sections, func(a, b section) int { + return cmp.Compare(a.offset, b.offset) + }) + + srs := make([]io.Reader, 0, len(sections)) + for _, s := range sections { + sr := io.NewSectionReader(r, s.offset, s.length) + srs = append(srs, sr) + } + return io.MultiReader(srs...), true +} + +func (i *index) getSectionSet(schema, packageName, name string) sets.Set[section] { + if schema == "" { + if packageName == "" { + if name == "" { + sectionSet := sets.New[section]() + for _, s := range i.BySchema { + sectionSet.Insert(s...) + } + return sectionSet + } else { + return sets.New[section](i.ByName[name]...) + } + } else { + sectionSet := sets.New[section](i.ByPackage[packageName]...) + if name == "" { + return sectionSet + } else { + return sectionSet.Intersection(sets.New[section](i.ByName[name]...)) + } + } + } else { + sectionSet := sets.New[section](i.BySchema[schema]...) + if packageName == "" { + if name == "" { + return sectionSet + } else { + return sectionSet.Intersection(sets.New[section](i.ByName[name]...)) + } + } else { + sectionSet = sectionSet.Intersection(sets.New[section](i.ByPackage[packageName]...)) + if name == "" { + return sectionSet + } else { + return sectionSet.Intersection(sets.New[section](i.ByName[name]...)) + } + } + } +} + +func newIndex(r io.Reader) (*index, error) { + idx := &index{ + BySchema: make(map[string][]section), + ByPackage: make(map[string][]section), + ByName: make(map[string][]section), + } + var meta declcfg.Meta + dec := json.NewDecoder(r) + for { + i1 := dec.InputOffset() + if err := dec.Decode(&meta); err != nil { + if errors.Is(err, io.EOF) { + break + } + return nil, err + } + i2 := dec.InputOffset() + start := i1 + length := i2 - i1 + + s := section{offset: start, length: length} + idx.BySchema[meta.Schema] = append(idx.BySchema[meta.Schema], s) + idx.ByPackage[meta.Package] = append(idx.ByPackage[meta.Package], s) + idx.ByName[meta.Name] = append(idx.ByName[meta.Name], s) + } + return idx, nil +} diff --git a/catalogd/internal/storage/localdir.go b/catalogd/internal/storage/localdir.go index dd06729ea..7a0b67e99 100644 --- a/catalogd/internal/storage/localdir.go +++ b/catalogd/internal/storage/localdir.go @@ -2,16 +2,24 @@ package storage import ( "context" + "encoding/json" + "errors" "fmt" + "io" "io/fs" "net/http" "net/url" "os" "path/filepath" + "strings" + "sync" "github.com/klauspost/compress/gzhttp" + "golang.org/x/sync/errgroup" "github.com/operator-framework/operator-registry/alpha/declcfg" + + "github.com/operator-framework/catalogd/internal/features" ) // LocalDirV1 is a storage Instance. When Storing a new FBC contained in @@ -22,93 +30,241 @@ import ( type LocalDirV1 struct { RootDir string RootURL *url.URL + + m sync.RWMutex } -const ( - v1ApiPath = "api/v1" - v1ApiData = "all" -) +func (s *LocalDirV1) Store(ctx context.Context, catalog string, fsys fs.FS) error { + s.m.Lock() + defer s.m.Unlock() + + if features.CatalogdFeatureGate.Enabled(features.APIV1QueryHandler) { + return s.storeCatalogFileAndIndex(ctx, catalog, fsys) + } + return s.storeCatalogFile(ctx, catalog, fsys) +} -func (s LocalDirV1) Store(ctx context.Context, catalog string, fsys fs.FS) error { - fbcDir := filepath.Join(s.RootDir, catalog, v1ApiPath) - if err := os.MkdirAll(fbcDir, 0700); err != nil { +func (s *LocalDirV1) storeCatalogFile(ctx context.Context, catalog string, fsys fs.FS) error { + if err := os.MkdirAll(s.RootDir, 0700); err != nil { return err } - tempFile, err := os.CreateTemp(s.RootDir, fmt.Sprint(catalog)) + tmpCatalogFile, err := os.CreateTemp(s.RootDir, fmt.Sprintf(".%s-*.jsonl", catalog)) if err != nil { return err } - defer os.Remove(tempFile.Name()) + defer os.Remove(tmpCatalogFile.Name()) + if err := declcfg.WalkMetasFS(ctx, fsys, func(path string, meta *declcfg.Meta, err error) error { if err != nil { return err } - _, err = tempFile.Write(meta.Blob) + _, err = tmpCatalogFile.Write(meta.Blob) return err }); err != nil { return fmt.Errorf("error walking FBC root: %w", err) } - fbcFile := filepath.Join(fbcDir, v1ApiData) - return os.Rename(tempFile.Name(), fbcFile) + + fbcFile := filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog)) + return os.Rename(tmpCatalogFile.Name(), fbcFile) } -func (s LocalDirV1) Delete(catalog string) error { - return os.RemoveAll(filepath.Join(s.RootDir, catalog)) +func (s *LocalDirV1) storeCatalogFileAndIndex(ctx context.Context, catalog string, fsys fs.FS) error { + if err := os.MkdirAll(s.RootDir, 0700); err != nil { + return err + } + tmpCatalogFile, err := os.CreateTemp(s.RootDir, fmt.Sprintf(".%s-*.jsonl", catalog)) + if err != nil { + return err + } + defer os.Remove(tmpCatalogFile.Name()) + + tmpIndexFile, err := os.CreateTemp(s.RootDir, filepath.Base(fmt.Sprintf("%s.index.json", strings.TrimSuffix(tmpCatalogFile.Name(), ".jsonl")))) + if err != nil { + return err + } + defer os.Remove(tmpIndexFile.Name()) + + pr, pw := io.Pipe() + mw := io.MultiWriter(tmpCatalogFile, pw) + eg, egCtx := errgroup.WithContext(ctx) + eg.Go(func() error { + if err := declcfg.WalkMetasFS(egCtx, fsys, func(path string, meta *declcfg.Meta, err error) error { + if err != nil { + return err + } + _, err = mw.Write(meta.Blob) + if err != nil { + return pw.CloseWithError(err) + } + return nil + }, declcfg.WithConcurrency(1)); err != nil { + return fmt.Errorf("error walking FBC root: %w", err) + } + return pw.CloseWithError(tmpCatalogFile.Close()) + }) + eg.Go(func() error { + idx, err := newIndex(pr) + if err != nil { + return pr.CloseWithError(err) + } + if err := pr.Close(); err != nil { + return err + } + enc := json.NewEncoder(tmpIndexFile) + if err := enc.Encode(idx); err != nil { + return err + } + if err := tmpIndexFile.Close(); err != nil { + return err + } + return nil + }) + if err := eg.Wait(); err != nil { + return err + } + + fbcFile := filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog)) + fbcIndexFile := filepath.Join(s.RootDir, fmt.Sprintf("%s.index.json", catalog)) + return errors.Join( + os.Rename(tmpCatalogFile.Name(), fbcFile), + os.Rename(tmpIndexFile.Name(), fbcIndexFile), + ) } -func (s LocalDirV1) BaseURL(catalog string) string { +func (s *LocalDirV1) Delete(catalog string) error { + s.m.Lock() + defer s.m.Unlock() + + var errs []error + errs = append(errs, os.RemoveAll(filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog)))) + + if features.CatalogdFeatureGate.Enabled(features.APIV1QueryHandler) { + errs = append(errs, os.RemoveAll(filepath.Join(s.RootDir, fmt.Sprintf("%s.index.json", catalog)))) + } + return errors.Join(errs...) +} + +func (s *LocalDirV1) BaseURL(catalog string) string { return s.RootURL.JoinPath(catalog).String() } -func (s LocalDirV1) StorageServerHandler() http.Handler { +func (s *LocalDirV1) StorageServerHandler() http.Handler { mux := http.NewServeMux() - fsHandler := http.FileServer(http.FS(&filesOnlyFilesystem{os.DirFS(s.RootDir)})) - spHandler := http.StripPrefix(s.RootURL.Path, fsHandler) - gzHandler := gzhttp.GzipHandler(spHandler) + + v1AllPath := s.RootURL.JoinPath("{catalog}", "api", "v1", "all").Path + mux.Handle(v1AllPath, s.v1AllHandler()) + + if features.CatalogdFeatureGate.Enabled(features.APIV1QueryHandler) { + v1QueryPath := s.RootURL.JoinPath("{catalog}", "api", "v1", "query").Path + mux.Handle(v1QueryPath, s.v1QueryHandler()) + } + return mux +} + +func (s *LocalDirV1) v1AllHandler() http.Handler { + catalogHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + s.m.RLock() + defer s.m.RUnlock() + + catalog := r.PathValue("catalog") + http.ServeFile(w, r, filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog))) + }) + gzHandler := gzhttp.GzipHandler(catalogHandler) typeHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/jsonl") gzHandler.ServeHTTP(w, r) }) - mux.Handle(s.RootURL.Path, typeHandler) - return mux + return typeHandler } -func (s LocalDirV1) ContentExists(catalog string) bool { - file, err := os.Stat(filepath.Join(s.RootDir, catalog, v1ApiPath, v1ApiData)) +func (s *LocalDirV1) v1QueryHandler() http.Handler { + catalogHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + s.m.RLock() + defer s.m.RUnlock() + + catalog := r.PathValue("catalog") + schema := r.URL.Query().Get("schema") + pkg := r.URL.Query().Get("package") + name := r.URL.Query().Get("name") + + catalogFilePath := filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog)) + catalogFileStat, err := os.Stat(catalogFilePath) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + http.Error(w, "Catalog not found", http.StatusNotFound) + return + } + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + modTime := catalogFileStat.ModTime().Format(http.TimeFormat) + if r.Header.Get("If-Modified-Since") == modTime { + w.WriteHeader(http.StatusNotModified) + return + } + + catalogFile, err := os.Open(catalogFilePath) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + http.Error(w, "Catalog not found", http.StatusNotFound) + return + } + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer catalogFile.Close() + + indexFile, err := os.Open(filepath.Join(s.RootDir, fmt.Sprintf("%s.index.json", catalog))) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + http.Error(w, "No catalog contents found matching query", http.StatusNotFound) + } + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer indexFile.Close() + + var idx index + if err := json.NewDecoder(indexFile).Decode(&idx); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + queryReader, ok := idx.Get(catalogFile, schema, pkg, name) + if !ok { + http.Error(w, fmt.Sprintf("No index found for schema=%q, package=%q, name=%q", schema, pkg, name), http.StatusInternalServerError) + return + } + w.Header().Add("Content-Type", "application/jsonl") + w.Header().Set("Last-Modified", modTime) + _, _ = io.Copy(w, queryReader) + }) + gzHandler := gzhttp.GzipHandler(catalogHandler) + return gzHandler +} + +func (s *LocalDirV1) ContentExists(catalog string) bool { + s.m.RLock() + defer s.m.RUnlock() + + catalogFileStat, err := os.Stat(filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog))) if err != nil { return false } - if !file.Mode().IsRegular() { + if !catalogFileStat.Mode().IsRegular() { // path is not valid content return false } - return true -} -// filesOnlyFilesystem is a file system that can open only regular -// files from the underlying filesystem. All other file types result -// in os.ErrNotExists -type filesOnlyFilesystem struct { - FS fs.FS -} - -// Open opens a named file from the underlying filesystem. If the file -// is not a regular file, it return os.ErrNotExists. Callers are resposible -// for closing the file returned. -func (f *filesOnlyFilesystem) Open(name string) (fs.File, error) { - file, err := f.FS.Open(name) - if err != nil { - return nil, err - } - stat, err := file.Stat() - if err != nil { - _ = file.Close() - return nil, err - } - if !stat.Mode().IsRegular() { - _ = file.Close() - return nil, os.ErrNotExist + if features.CatalogdFeatureGate.Enabled(features.APIV1QueryHandler) { + indexFileStat, err := os.Stat(filepath.Join(s.RootDir, fmt.Sprintf("%s.index.json", catalog))) + if err != nil { + return false + } + if !indexFileStat.Mode().IsRegular() { + return false + } } - return file, nil + return true } diff --git a/catalogd/internal/storage/localdir_test.go b/catalogd/internal/storage/localdir_test.go index c975c8fc9..3d7f7f8a0 100644 --- a/catalogd/internal/storage/localdir_test.go +++ b/catalogd/internal/storage/localdir_test.go @@ -69,14 +69,13 @@ var _ = Describe("LocalDir Storage Test", func() { Expect(err).To(Not(HaveOccurred())) }) It("should store the content in the RootDir correctly", func() { - fbcDir := filepath.Join(rootDir, catalog, v1ApiPath) - fbcFile := filepath.Join(fbcDir, v1ApiData) + fbcFile := filepath.Join(rootDir, fmt.Sprintf("%s.jsonl", catalog)) _, err := os.Stat(fbcFile) Expect(err).To(Not(HaveOccurred())) gotConfig, err := declcfg.LoadFS(ctx, unpackResultFS) Expect(err).To(Not(HaveOccurred())) - storedConfig, err := declcfg.LoadFile(os.DirFS(fbcDir), v1ApiData) + storedConfig, err := declcfg.LoadFile(os.DirFS(filepath.Dir(fbcFile)), filepath.Base(fbcFile)) Expect(err).To(Not(HaveOccurred())) diff := cmp.Diff(gotConfig, storedConfig) Expect(diff).To(Equal("")) @@ -93,10 +92,15 @@ var _ = Describe("LocalDir Storage Test", func() { Expect(err).To(Not(HaveOccurred())) }) It("should delete the FBC from the cache directory", func() { - fbcFile := filepath.Join(rootDir, catalog) + fbcFile := filepath.Join(rootDir, fmt.Sprintf("%s.jsonl", catalog)) _, err := os.Stat(fbcFile) Expect(err).To(HaveOccurred()) Expect(os.IsNotExist(err)).To(BeTrue()) + + indexFile := filepath.Join(rootDir, fmt.Sprintf("%s.index.json", catalog)) + _, err = os.Stat(indexFile) + Expect(err).To(HaveOccurred()) + Expect(os.IsNotExist(err)).To(BeTrue()) }) It("should report content does not exist", func() { Expect(store.ContentExists(catalog)).To(BeFalse()) @@ -111,9 +115,7 @@ var _ = Describe("LocalDir Server Handler tests", func() { store LocalDirV1 ) BeforeEach(func() { - d, err := os.MkdirTemp(GinkgoT().TempDir(), "cache") - Expect(err).ToNot(HaveOccurred()) - Expect(os.MkdirAll(filepath.Join(d, "test-catalog", v1ApiPath), 0700)).To(Succeed()) + d := GinkgoT().TempDir() store = LocalDirV1{RootDir: d, RootURL: &url.URL{Path: urlPrefix}} testServer = httptest.NewServer(store.StorageServerHandler()) @@ -127,33 +129,32 @@ var _ = Describe("LocalDir Server Handler tests", func() { It("gets 404 for the path /catalogs/test-catalog/", func() { expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/")) }) - It("gets 404 for the path /test-catalog/foo.txt", func() { - // This ensures that even if the file exists, the URL must contain the /catalogs/ prefix - Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog", "foo.txt"), []byte("bar"), 0600)).To(Succeed()) - expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/test-catalog/foo.txt")) + It("gets 404 for the path /catalogs/test-catalog/api", func() { + expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api")) }) - It("gets 404 for the path /catalogs/test-catalog/non-existent.txt", func() { - expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/non-existent.txt")) + It("gets 404 for the path /catalogs/test-catalog/api/v1", func() { + expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api/v1")) }) - It("gets 200 for the path /catalogs/foo.txt", func() { - expectedContent := []byte("bar") - Expect(os.WriteFile(filepath.Join(store.RootDir, "foo.txt"), expectedContent, 0600)).To(Succeed()) - expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/foo.txt"), expectedContent) + It("gets 404 for the path /catalogs/test-catalog.jsonl", func() { + // This is actually how the file is stored, but we don't serve + // the filesystem, we serve an API. Hence, expect 404 not found + Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog.jsonl"), []byte("foobar"), 0600)).To(Succeed()) + expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog.jsonl")) }) - It("gets 200 for the path /catalogs/test-catalog/foo.txt", func() { - expectedContent := []byte("bar") - Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog", "foo.txt"), expectedContent, 0600)).To(Succeed()) - expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/foo.txt"), expectedContent) + It("gets 200 for the path /catalogs/test-catalog/api/v1/all", func() { + expectedContent := []byte(`{"foo":"bar"}`) + Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog.jsonl"), expectedContent, 0600)).To(Succeed()) + expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api/v1/all"), expectedContent, false) }) It("ignores accept-encoding for the path /catalogs/test-catalog/api/v1/all with size < 1400 bytes", func() { - expectedContent := []byte("bar") - Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog", v1ApiPath, v1ApiData), expectedContent, 0600)).To(Succeed()) - expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api/v1/all"), expectedContent) + expectedContent := []byte(`{"foo":"bar"}`) + Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog.jsonl"), expectedContent, 0600)).To(Succeed()) + expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api/v1/all"), expectedContent, false) }) It("provides gzipped content for the path /catalogs/test-catalog/api/v1/all with size > 1400 bytes", func() { expectedContent := []byte(testCompressableJSON) - Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog", v1ApiPath, v1ApiData), expectedContent, 0600)).To(Succeed()) - expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api/v1/all"), expectedContent) + Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog.jsonl"), expectedContent, 0600)).To(Succeed()) + expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api/v1/all"), expectedContent, true) }) It("provides json-lines format for the served JSON catalog", func() { catalog := "test-catalog" @@ -165,9 +166,9 @@ var _ = Describe("LocalDir Server Handler tests", func() { expectedContent, err := generateJSONLines([]byte(testCompressableJSON)) Expect(err).To(Not(HaveOccurred())) - path, err := url.JoinPath(testServer.URL, urlPrefix, catalog, v1ApiPath, v1ApiData) + path, err := url.JoinPath(testServer.URL, urlPrefix, catalog, "api", "v1", "all") Expect(err).To(Not(HaveOccurred())) - expectFound(path, []byte(expectedContent)) + expectFound(path, []byte(expectedContent), true) }) It("provides json-lines format for the served YAML catalog", func() { catalog := "test-catalog" @@ -181,9 +182,9 @@ var _ = Describe("LocalDir Server Handler tests", func() { expectedContent, err := generateJSONLines(yamlData) Expect(err).To(Not(HaveOccurred())) - path, err := url.JoinPath(testServer.URL, urlPrefix, catalog, v1ApiPath, v1ApiData) + path, err := url.JoinPath(testServer.URL, urlPrefix, catalog, "api", "v1", "all") Expect(err).To(Not(HaveOccurred())) - expectFound(path, []byte(expectedContent)) + expectFound(path, []byte(expectedContent), true) }) AfterEach(func() { testServer.Close() @@ -197,7 +198,7 @@ func expectNotFound(url string) { Expect(resp.Body.Close()).To(Succeed()) } -func expectFound(url string, expectedContent []byte) { +func expectFound(url string, expectedContent []byte, expectCompression bool) { req, err := http.NewRequest(http.MethodGet, url, nil) Expect(err).To(Not(HaveOccurred())) req.Header.Set("Accept-Encoding", "gzip") @@ -206,15 +207,16 @@ func expectFound(url string, expectedContent []byte) { Expect(resp.StatusCode).To(Equal(http.StatusOK)) var actualContent []byte - switch resp.Header.Get("Content-Encoding") { - case "gzip": + if expectCompression { + Expect(resp.Header.Get("Content-Encoding")).To(Equal("gzip")) Expect(len(expectedContent)).To(BeNumerically(">", 1400), fmt.Sprintf("gzipped content should only be provided for content larger than 1400 bytes, but our expected content is only %d bytes", len(expectedContent))) gz, err := gzip.NewReader(resp.Body) Expect(err).To(Not(HaveOccurred())) actualContent, err = io.ReadAll(gz) Expect(err).To(Not(HaveOccurred())) - default: + } else { + Expect(resp.Header.Get("Content-Encoding")).To(BeEmpty()) actualContent, err = io.ReadAll(resp.Body) Expect(len(expectedContent)).To(BeNumerically("<", 1400), fmt.Sprintf("plaintext content should only be provided for content smaller than 1400 bytes, but we received plaintext for %d bytes\n expectedContent:\n%s\n", len(expectedContent), expectedContent)) From 6aad08af598cf40f02a21266ff99c52e0876e747 Mon Sep 17 00:00:00 2001 From: Joe Lanford Date: Fri, 6 Dec 2024 16:36:04 -0500 Subject: [PATCH 02/15] add singleflight for shared index access for concurrent requests Signed-off-by: Joe Lanford --- catalogd/internal/serverutil/serverutil.go | 10 +++- catalogd/internal/storage/localdir.go | 70 +++++++++++++++++----- 2 files changed, 62 insertions(+), 18 deletions(-) diff --git a/catalogd/internal/serverutil/serverutil.go b/catalogd/internal/serverutil/serverutil.go index 1dcaa9282..afda4cf8e 100644 --- a/catalogd/internal/serverutil/serverutil.go +++ b/catalogd/internal/serverutil/serverutil.go @@ -1,6 +1,7 @@ package serverutil import ( + "context" "crypto/tls" "fmt" "io" @@ -12,7 +13,7 @@ import ( "github.com/gorilla/handlers" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/certwatcher" - "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/log" catalogdmetrics "github.com/operator-framework/operator-controller/catalogd/internal/metrics" "github.com/operator-framework/operator-controller/catalogd/internal/storage" @@ -51,8 +52,11 @@ func AddCatalogServerToManager(mgr ctrl.Manager, cfg CatalogServerConfig, tlsFil Name: "catalogs", OnlyServeWhenLeader: true, Server: &http.Server{ - Addr: cfg.CatalogAddr, - Handler: handler, + Addr: cfg.CatalogAddr, + Handler: catalogdmetrics.AddMetricsToHandler(cfg.LocalStorage.StorageServerHandler()), + BaseContext: func(_ net.Listener) context.Context { + return log.IntoContext(context.Background(), mgr.GetLogger().WithName("http.catalogs")) + }, ReadTimeout: 5 * time.Second, // TODO: Revert this to 10 seconds if/when the API // evolves to have significantly smaller responses diff --git a/catalogd/internal/storage/localdir.go b/catalogd/internal/storage/localdir.go index 7a0b67e99..20bbb8614 100644 --- a/catalogd/internal/storage/localdir.go +++ b/catalogd/internal/storage/localdir.go @@ -13,9 +13,12 @@ import ( "path/filepath" "strings" "sync" + "time" + "github.com/go-logr/logr" "github.com/klauspost/compress/gzhttp" "golang.org/x/sync/errgroup" + "golang.org/x/sync/singleflight" "github.com/operator-framework/operator-registry/alpha/declcfg" @@ -31,7 +34,8 @@ type LocalDirV1 struct { RootDir string RootURL *url.URL - m sync.RWMutex + m sync.RWMutex + sf singleflight.Group } func (s *LocalDirV1) Store(ctx context.Context, catalog string, fsys fs.FS) error { @@ -175,7 +179,8 @@ func (s *LocalDirV1) v1AllHandler() http.Handler { w.Header().Add("Content-Type", "application/jsonl") gzHandler.ServeHTTP(w, r) }) - return typeHandler + + return newLoggingMiddleware(typeHandler) } func (s *LocalDirV1) v1QueryHandler() http.Handler { @@ -215,22 +220,11 @@ func (s *LocalDirV1) v1QueryHandler() http.Handler { } defer catalogFile.Close() - indexFile, err := os.Open(filepath.Join(s.RootDir, fmt.Sprintf("%s.index.json", catalog))) + idx, err := s.getIndex(catalog) if err != nil { - if errors.Is(err, fs.ErrNotExist) { - http.Error(w, "No catalog contents found matching query", http.StatusNotFound) - } - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - defer indexFile.Close() - - var idx index - if err := json.NewDecoder(indexFile).Decode(&idx); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - queryReader, ok := idx.Get(catalogFile, schema, pkg, name) if !ok { http.Error(w, fmt.Sprintf("No index found for schema=%q, package=%q, name=%q", schema, pkg, name), http.StatusInternalServerError) @@ -241,7 +235,7 @@ func (s *LocalDirV1) v1QueryHandler() http.Handler { _, _ = io.Copy(w, queryReader) }) gzHandler := gzhttp.GzipHandler(catalogHandler) - return gzHandler + return newLoggingMiddleware(gzHandler) } func (s *LocalDirV1) ContentExists(catalog string) bool { @@ -268,3 +262,49 @@ func (s *LocalDirV1) ContentExists(catalog string) bool { } return true } + +func (s *LocalDirV1) getIndex(catalog string) (*index, error) { + idx, err, _ := s.sf.Do(catalog, func() (interface{}, error) { + indexFilePath := filepath.Join(s.RootDir, fmt.Sprintf("%s.index.json", catalog)) + fmt.Printf("opening index file %s\n", indexFilePath) + indexFile, err := os.Open(indexFilePath) + if err != nil { + return nil, err + } + defer indexFile.Close() + var idx index + if err := json.NewDecoder(indexFile).Decode(&idx); err != nil { + return nil, err + } + return &idx, nil + }) + return idx.(*index), err +} + +func newLoggingMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + logger := logr.FromContextOrDiscard(r.Context()) + + start := time.Now() + lrw := &loggingResponseWriter{ResponseWriter: w, statusCode: http.StatusOK} + next.ServeHTTP(lrw, r) + + logger.WithValues( + "method", r.Method, + "url", r.URL.String(), + "status", lrw.statusCode, + "duration", time.Since(start), + "remoteAddr", r.RemoteAddr, + ).Info("HTTP request processed") + }) +} + +type loggingResponseWriter struct { + http.ResponseWriter + statusCode int +} + +func (w *loggingResponseWriter) WriteHeader(code int) { + w.statusCode = code + w.ResponseWriter.WriteHeader(code) +} From 10c3634566b838b61a585a1740c5887642ef21f1 Mon Sep 17 00:00:00 2001 From: Joe Lanford Date: Fri, 17 Jan 2025 16:28:42 -0500 Subject: [PATCH 03/15] a few improvements and optimizations Signed-off-by: Joe Lanford --- catalogd/cmd/catalogd/main.go | 6 ++- catalogd/internal/storage/index.go | 33 +++++++-------- catalogd/internal/storage/localdir.go | 48 ++++++++++++---------- catalogd/internal/storage/localdir_test.go | 2 +- catalogd/internal/storage/storage.go | 3 +- 5 files changed, 50 insertions(+), 42 deletions(-) diff --git a/catalogd/cmd/catalogd/main.go b/catalogd/cmd/catalogd/main.go index cae87717b..bcabfa644 100644 --- a/catalogd/cmd/catalogd/main.go +++ b/catalogd/cmd/catalogd/main.go @@ -302,7 +302,11 @@ func main() { os.Exit(1) } - localStorage = &storage.LocalDirV1{RootDir: storeDir, RootURL: baseStorageURL} + localStorage = &storage.LocalDirV1{ + RootDir: storeDir, + RootURL: baseStorageURL, + EnableQueryHandler: features.CatalogdFeatureGate.Enabled(features.APIV1QueryHandler), + } // Config for the catalogd web server catalogServerConfig := serverutil.CatalogServerConfig{ diff --git a/catalogd/internal/storage/index.go b/catalogd/internal/storage/index.go index b80b2646a..02f14d11b 100644 --- a/catalogd/internal/storage/index.go +++ b/catalogd/internal/storage/index.go @@ -3,7 +3,6 @@ package storage import ( "cmp" "encoding/json" - "errors" "fmt" "io" "slices" @@ -107,30 +106,28 @@ func (i *index) getSectionSet(schema, packageName, name string) sets.Set[section } } -func newIndex(r io.Reader) (*index, error) { +func newIndex(metasChan <-chan *declcfg.Meta) (*index, error) { idx := &index{ BySchema: make(map[string][]section), ByPackage: make(map[string][]section), ByName: make(map[string][]section), } - var meta declcfg.Meta - dec := json.NewDecoder(r) - for { - i1 := dec.InputOffset() - if err := dec.Decode(&meta); err != nil { - if errors.Is(err, io.EOF) { - break - } - return nil, err - } - i2 := dec.InputOffset() - start := i1 - length := i2 - i1 + offset := int64(0) + for meta := range metasChan { + start := offset + length := int64(len(meta.Blob)) + offset += length s := section{offset: start, length: length} - idx.BySchema[meta.Schema] = append(idx.BySchema[meta.Schema], s) - idx.ByPackage[meta.Package] = append(idx.ByPackage[meta.Package], s) - idx.ByName[meta.Name] = append(idx.ByName[meta.Name], s) + if meta.Schema != "" { + idx.BySchema[meta.Schema] = append(idx.BySchema[meta.Schema], s) + } + if meta.Package != "" { + idx.ByPackage[meta.Package] = append(idx.ByPackage[meta.Package], s) + } + if meta.Name != "" { + idx.ByName[meta.Name] = append(idx.ByName[meta.Name], s) + } } return idx, nil } diff --git a/catalogd/internal/storage/localdir.go b/catalogd/internal/storage/localdir.go index 20bbb8614..60e09fadb 100644 --- a/catalogd/internal/storage/localdir.go +++ b/catalogd/internal/storage/localdir.go @@ -31,8 +31,9 @@ import ( // done so that clients accessing the content stored in RootDir/catalogName have // atomic view of the content for a catalog. type LocalDirV1 struct { - RootDir string - RootURL *url.URL + RootDir string + RootURL *url.URL + EnableQueryHandler bool m sync.RWMutex sf singleflight.Group @@ -42,7 +43,7 @@ func (s *LocalDirV1) Store(ctx context.Context, catalog string, fsys fs.FS) erro s.m.Lock() defer s.m.Unlock() - if features.CatalogdFeatureGate.Enabled(features.APIV1QueryHandler) { + if s.EnableQueryHandler { return s.storeCatalogFileAndIndex(ctx, catalog, fsys) } return s.storeCatalogFile(ctx, catalog, fsys) @@ -88,30 +89,33 @@ func (s *LocalDirV1) storeCatalogFileAndIndex(ctx context.Context, catalog strin } defer os.Remove(tmpIndexFile.Name()) - pr, pw := io.Pipe() - mw := io.MultiWriter(tmpCatalogFile, pw) + metasChan := make(chan *declcfg.Meta) eg, egCtx := errgroup.WithContext(ctx) eg.Go(func() error { + defer close(metasChan) if err := declcfg.WalkMetasFS(egCtx, fsys, func(path string, meta *declcfg.Meta, err error) error { if err != nil { return err } - _, err = mw.Write(meta.Blob) + _, err = tmpCatalogFile.Write(meta.Blob) if err != nil { - return pw.CloseWithError(err) + return err } + select { + case <-egCtx.Done(): + return egCtx.Err() + case metasChan <- meta: + } + return nil }, declcfg.WithConcurrency(1)); err != nil { return fmt.Errorf("error walking FBC root: %w", err) } - return pw.CloseWithError(tmpCatalogFile.Close()) + return tmpCatalogFile.Close() }) eg.Go(func() error { - idx, err := newIndex(pr) + idx, err := newIndex(metasChan) if err != nil { - return pr.CloseWithError(err) - } - if err := pr.Close(); err != nil { return err } enc := json.NewEncoder(tmpIndexFile) @@ -142,7 +146,7 @@ func (s *LocalDirV1) Delete(catalog string) error { var errs []error errs = append(errs, os.RemoveAll(filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog)))) - if features.CatalogdFeatureGate.Enabled(features.APIV1QueryHandler) { + if s.EnableQueryHandler { errs = append(errs, os.RemoveAll(filepath.Join(s.RootDir, fmt.Sprintf("%s.index.json", catalog)))) } return errors.Join(errs...) @@ -158,7 +162,7 @@ func (s *LocalDirV1) StorageServerHandler() http.Handler { v1AllPath := s.RootURL.JoinPath("{catalog}", "api", "v1", "all").Path mux.Handle(v1AllPath, s.v1AllHandler()) - if features.CatalogdFeatureGate.Enabled(features.APIV1QueryHandler) { + if s.EnableQueryHandler { v1QueryPath := s.RootURL.JoinPath("{catalog}", "api", "v1", "query").Path mux.Handle(v1QueryPath, s.v1QueryHandler()) } @@ -171,16 +175,11 @@ func (s *LocalDirV1) v1AllHandler() http.Handler { defer s.m.RUnlock() catalog := r.PathValue("catalog") + w.Header().Add("Content-Type", "application/jsonl") http.ServeFile(w, r, filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog))) }) gzHandler := gzhttp.GzipHandler(catalogHandler) - - typeHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/jsonl") - gzHandler.ServeHTTP(w, r) - }) - - return newLoggingMiddleware(typeHandler) + return newLoggingMiddleware(gzHandler) } func (s *LocalDirV1) v1QueryHandler() http.Handler { @@ -193,6 +192,13 @@ func (s *LocalDirV1) v1QueryHandler() http.Handler { pkg := r.URL.Query().Get("package") name := r.URL.Query().Get("name") + // If no parameters are provided, return the entire catalog (this is the same as /api/v1/all) + if schema == "" && pkg == "" && name == "" { + w.Header().Add("Content-Type", "application/jsonl") + http.ServeFile(w, r, filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog))) + return + } + catalogFilePath := filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog)) catalogFileStat, err := os.Stat(catalogFilePath) if err != nil { diff --git a/catalogd/internal/storage/localdir_test.go b/catalogd/internal/storage/localdir_test.go index 3d7f7f8a0..0e84d3adc 100644 --- a/catalogd/internal/storage/localdir_test.go +++ b/catalogd/internal/storage/localdir_test.go @@ -56,7 +56,7 @@ var _ = Describe("LocalDir Storage Test", func() { rootDir = d baseURL = &url.URL{Scheme: "http", Host: "test-addr", Path: urlPrefix} - store = LocalDirV1{RootDir: rootDir, RootURL: baseURL} + store = &LocalDirV1{RootDir: rootDir, RootURL: baseURL} unpackResultFS = &fstest.MapFS{ "bundle.yaml": &fstest.MapFile{Data: []byte(testBundle), Mode: os.ModePerm}, "package.yaml": &fstest.MapFile{Data: []byte(testPackage), Mode: os.ModePerm}, diff --git a/catalogd/internal/storage/storage.go b/catalogd/internal/storage/storage.go index 458ff040b..af78a669f 100644 --- a/catalogd/internal/storage/storage.go +++ b/catalogd/internal/storage/storage.go @@ -13,7 +13,8 @@ import ( type Instance interface { Store(ctx context.Context, catalog string, fsys fs.FS) error Delete(catalog string) error + ContentExists(catalog string) bool + BaseURL(catalog string) string StorageServerHandler() http.Handler - ContentExists(catalog string) bool } From 82dac601869dcc9681bc145a4657f2efc1f68428 Mon Sep 17 00:00:00 2001 From: Joe Lanford Date: Sun, 19 Jan 2025 10:00:36 -0500 Subject: [PATCH 04/15] another round of refactoring improvement Signed-off-by: Joe Lanford --- catalogd/internal/serverutil/serverutil.go | 3 +- catalogd/internal/storage/index.go | 6 +- catalogd/internal/storage/localdir.go | 370 +++++++++---------- catalogd/internal/storage/multireadseeker.go | 118 ++++++ 4 files changed, 290 insertions(+), 207 deletions(-) create mode 100644 catalogd/internal/storage/multireadseeker.go diff --git a/catalogd/internal/serverutil/serverutil.go b/catalogd/internal/serverutil/serverutil.go index afda4cf8e..daec33057 100644 --- a/catalogd/internal/serverutil/serverutil.go +++ b/catalogd/internal/serverutil/serverutil.go @@ -14,6 +14,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/certwatcher" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" catalogdmetrics "github.com/operator-framework/operator-controller/catalogd/internal/metrics" "github.com/operator-framework/operator-controller/catalogd/internal/storage" @@ -53,7 +54,7 @@ func AddCatalogServerToManager(mgr ctrl.Manager, cfg CatalogServerConfig, tlsFil OnlyServeWhenLeader: true, Server: &http.Server{ Addr: cfg.CatalogAddr, - Handler: catalogdmetrics.AddMetricsToHandler(cfg.LocalStorage.StorageServerHandler()), + Handler: handler, BaseContext: func(_ net.Listener) context.Context { return log.IntoContext(context.Background(), mgr.GetLogger().WithName("http.catalogs")) }, diff --git a/catalogd/internal/storage/index.go b/catalogd/internal/storage/index.go index 02f14d11b..20fe5f2ed 100644 --- a/catalogd/internal/storage/index.go +++ b/catalogd/internal/storage/index.go @@ -51,7 +51,7 @@ func (i index) Size() int64 { return int64(size) } -func (i index) Get(r io.ReaderAt, schema, packageName, name string) (io.Reader, bool) { +func (i index) Get(r io.ReaderAt, schema, packageName, name string) (io.ReadSeeker, bool) { sectionSet := i.getSectionSet(schema, packageName, name) sections := sectionSet.UnsortedList() @@ -59,12 +59,12 @@ func (i index) Get(r io.ReaderAt, schema, packageName, name string) (io.Reader, return cmp.Compare(a.offset, b.offset) }) - srs := make([]io.Reader, 0, len(sections)) + srs := make([]io.ReadSeeker, 0, len(sections)) for _, s := range sections { sr := io.NewSectionReader(r, s.offset, s.length) srs = append(srs, sr) } - return io.MultiReader(srs...), true + return newMultiReadSeeker(srs...), true } func (i *index) getSectionSet(schema, packageName, name string) sets.Set[section] { diff --git a/catalogd/internal/storage/localdir.go b/catalogd/internal/storage/localdir.go index 60e09fadb..db642e8f5 100644 --- a/catalogd/internal/storage/localdir.go +++ b/catalogd/internal/storage/localdir.go @@ -11,18 +11,13 @@ import ( "net/url" "os" "path/filepath" - "strings" "sync" "time" - "github.com/go-logr/logr" - "github.com/klauspost/compress/gzhttp" "golang.org/x/sync/errgroup" "golang.org/x/sync/singleflight" "github.com/operator-framework/operator-registry/alpha/declcfg" - - "github.com/operator-framework/catalogd/internal/features" ) // LocalDirV1 is a storage Instance. When Storing a new FBC contained in @@ -43,99 +38,56 @@ func (s *LocalDirV1) Store(ctx context.Context, catalog string, fsys fs.FS) erro s.m.Lock() defer s.m.Unlock() - if s.EnableQueryHandler { - return s.storeCatalogFileAndIndex(ctx, catalog, fsys) - } - return s.storeCatalogFile(ctx, catalog, fsys) -} - -func (s *LocalDirV1) storeCatalogFile(ctx context.Context, catalog string, fsys fs.FS) error { if err := os.MkdirAll(s.RootDir, 0700); err != nil { return err } - tmpCatalogFile, err := os.CreateTemp(s.RootDir, fmt.Sprintf(".%s-*.jsonl", catalog)) + tmpCatalogDir, err := os.MkdirTemp(s.RootDir, fmt.Sprintf(".%s-*", catalog)) if err != nil { return err } - defer os.Remove(tmpCatalogFile.Name()) + defer os.RemoveAll(tmpCatalogDir) - if err := declcfg.WalkMetasFS(ctx, fsys, func(path string, meta *declcfg.Meta, err error) error { - if err != nil { - return err - } - _, err = tmpCatalogFile.Write(meta.Blob) - return err - }); err != nil { - return fmt.Errorf("error walking FBC root: %w", err) - } - - fbcFile := filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog)) - return os.Rename(tmpCatalogFile.Name(), fbcFile) -} - -func (s *LocalDirV1) storeCatalogFileAndIndex(ctx context.Context, catalog string, fsys fs.FS) error { - if err := os.MkdirAll(s.RootDir, 0700); err != nil { - return err - } - tmpCatalogFile, err := os.CreateTemp(s.RootDir, fmt.Sprintf(".%s-*.jsonl", catalog)) - if err != nil { - return err + storeMetaFuncs := []storeMetasFunc{storeCatalogData} + if s.EnableQueryHandler { + storeMetaFuncs = append(storeMetaFuncs, storeIndexData) } - defer os.Remove(tmpCatalogFile.Name()) - tmpIndexFile, err := os.CreateTemp(s.RootDir, filepath.Base(fmt.Sprintf("%s.index.json", strings.TrimSuffix(tmpCatalogFile.Name(), ".jsonl")))) - if err != nil { - return err + var ( + eg, egCtx = errgroup.WithContext(ctx) + metaChans []chan *declcfg.Meta + ) + for i, f := range storeMetaFuncs { + metaChans = append(metaChans, make(chan *declcfg.Meta, 1)) + eg.Go(func() error { return f(tmpCatalogDir, metaChans[i]) }) } - defer os.Remove(tmpIndexFile.Name()) - - metasChan := make(chan *declcfg.Meta) - eg, egCtx := errgroup.WithContext(ctx) - eg.Go(func() error { - defer close(metasChan) - if err := declcfg.WalkMetasFS(egCtx, fsys, func(path string, meta *declcfg.Meta, err error) error { - if err != nil { - return err - } - _, err = tmpCatalogFile.Write(meta.Blob) - if err != nil { - return err - } + err = declcfg.WalkMetasFS(egCtx, fsys, func(path string, meta *declcfg.Meta, err error) error { + if err != nil { + return err + } + for _, ch := range metaChans { select { + case ch <- meta: case <-egCtx.Done(): return egCtx.Err() - case metasChan <- meta: } - - return nil - }, declcfg.WithConcurrency(1)); err != nil { - return fmt.Errorf("error walking FBC root: %w", err) - } - return tmpCatalogFile.Close() - }) - eg.Go(func() error { - idx, err := newIndex(metasChan) - if err != nil { - return err - } - enc := json.NewEncoder(tmpIndexFile) - if err := enc.Encode(idx); err != nil { - return err - } - if err := tmpIndexFile.Close(); err != nil { - return err } return nil - }) + }, declcfg.WithConcurrency(1)) + for _, ch := range metaChans { + close(ch) + } + if err != nil { + return fmt.Errorf("error walking FBC root: %w", err) + } + if err := eg.Wait(); err != nil { return err } - fbcFile := filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog)) - fbcIndexFile := filepath.Join(s.RootDir, fmt.Sprintf("%s.index.json", catalog)) + catalogDir := s.catalogDir(catalog) return errors.Join( - os.Rename(tmpCatalogFile.Name(), fbcFile), - os.Rename(tmpIndexFile.Name(), fbcIndexFile), + os.RemoveAll(catalogDir), + os.Rename(tmpCatalogDir, catalogDir), ) } @@ -143,13 +95,78 @@ func (s *LocalDirV1) Delete(catalog string) error { s.m.Lock() defer s.m.Unlock() - var errs []error - errs = append(errs, os.RemoveAll(filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog)))) + return os.RemoveAll(s.catalogDir(catalog)) +} + +func (s *LocalDirV1) ContentExists(catalog string) bool { + s.m.RLock() + defer s.m.RUnlock() + + catalogFileStat, err := os.Stat(catalogFilePath(s.catalogDir(catalog))) + if err != nil { + return false + } + if !catalogFileStat.Mode().IsRegular() { + // path is not valid content + return false + } if s.EnableQueryHandler { - errs = append(errs, os.RemoveAll(filepath.Join(s.RootDir, fmt.Sprintf("%s.index.json", catalog)))) + indexFileStat, err := os.Stat(catalogIndexFilePath(s.catalogDir(catalog))) + if err != nil { + return false + } + if !indexFileStat.Mode().IsRegular() { + return false + } } - return errors.Join(errs...) + return true +} + +func (s *LocalDirV1) catalogDir(catalog string) string { + return filepath.Join(s.RootDir, catalog) +} + +func catalogFilePath(catalogDir string) string { + return filepath.Join(catalogDir, "catalog.jsonl") +} + +func catalogIndexFilePath(catalogDir string) string { + return filepath.Join(catalogDir, "index.json") +} + +type storeMetasFunc func(catalogDir string, metaChan <-chan *declcfg.Meta) error + +func storeCatalogData(catalogDir string, metas <-chan *declcfg.Meta) error { + f, err := os.Create(catalogFilePath(catalogDir)) + if err != nil { + return err + } + defer f.Close() + + for m := range metas { + if _, err := f.Write(m.Blob); err != nil { + return err + } + } + return nil +} + +func storeIndexData(catalogDir string, metas <-chan *declcfg.Meta) error { + idx, err := newIndex(metas) + if err != nil { + return err + } + + f, err := os.Create(catalogIndexFilePath(catalogDir)) + if err != nil { + return err + } + defer f.Close() + + enc := json.NewEncoder(f) + enc.SetEscapeHTML(false) + return enc.Encode(idx) } func (s *LocalDirV1) BaseURL(catalog string) string { @@ -159,121 +176,93 @@ func (s *LocalDirV1) BaseURL(catalog string) string { func (s *LocalDirV1) StorageServerHandler() http.Handler { mux := http.NewServeMux() - v1AllPath := s.RootURL.JoinPath("{catalog}", "api", "v1", "all").Path - mux.Handle(v1AllPath, s.v1AllHandler()) - + mux.HandleFunc(s.RootURL.JoinPath("{catalog}", "api", "v1", "all").Path, s.handleV1All) if s.EnableQueryHandler { - v1QueryPath := s.RootURL.JoinPath("{catalog}", "api", "v1", "query").Path - mux.Handle(v1QueryPath, s.v1QueryHandler()) + mux.HandleFunc(s.RootURL.JoinPath("{catalog}", "api", "v1", "query").Path, s.handleV1Query) } return mux } -func (s *LocalDirV1) v1AllHandler() http.Handler { - catalogHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - s.m.RLock() - defer s.m.RUnlock() +func (s *LocalDirV1) handleV1All(w http.ResponseWriter, r *http.Request) { + s.m.RLock() + defer s.m.RUnlock() - catalog := r.PathValue("catalog") - w.Header().Add("Content-Type", "application/jsonl") - http.ServeFile(w, r, filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog))) - }) - gzHandler := gzhttp.GzipHandler(catalogHandler) - return newLoggingMiddleware(gzHandler) + catalog := r.PathValue("catalog") + catalogFile, catalogStat, err := s.catalogData(catalog) + if err != nil { + httpError(w, err) + return + } + serveJsonLines(w, r, catalogStat.ModTime(), catalogFile) } -func (s *LocalDirV1) v1QueryHandler() http.Handler { - catalogHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - s.m.RLock() - defer s.m.RUnlock() - - catalog := r.PathValue("catalog") - schema := r.URL.Query().Get("schema") - pkg := r.URL.Query().Get("package") - name := r.URL.Query().Get("name") - - // If no parameters are provided, return the entire catalog (this is the same as /api/v1/all) - if schema == "" && pkg == "" && name == "" { - w.Header().Add("Content-Type", "application/jsonl") - http.ServeFile(w, r, filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog))) - return - } +func (s *LocalDirV1) handleV1Query(w http.ResponseWriter, r *http.Request) { + s.m.RLock() + defer s.m.RUnlock() - catalogFilePath := filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog)) - catalogFileStat, err := os.Stat(catalogFilePath) - if err != nil { - if errors.Is(err, fs.ErrNotExist) { - http.Error(w, "Catalog not found", http.StatusNotFound) - return - } - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - modTime := catalogFileStat.ModTime().Format(http.TimeFormat) - if r.Header.Get("If-Modified-Since") == modTime { - w.WriteHeader(http.StatusNotModified) - return - } + catalog := r.PathValue("catalog") + catalogFile, catalogStat, err := s.catalogData(catalog) + if err != nil { + httpError(w, err) + return + } + defer catalogFile.Close() - catalogFile, err := os.Open(catalogFilePath) - if err != nil { - if errors.Is(err, fs.ErrNotExist) { - http.Error(w, "Catalog not found", http.StatusNotFound) - return - } - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - defer catalogFile.Close() + schema := r.URL.Query().Get("schema") + pkg := r.URL.Query().Get("package") + name := r.URL.Query().Get("name") - idx, err := s.getIndex(catalog) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - queryReader, ok := idx.Get(catalogFile, schema, pkg, name) - if !ok { - http.Error(w, fmt.Sprintf("No index found for schema=%q, package=%q, name=%q", schema, pkg, name), http.StatusInternalServerError) - return - } - w.Header().Add("Content-Type", "application/jsonl") - w.Header().Set("Last-Modified", modTime) - _, _ = io.Copy(w, queryReader) - }) - gzHandler := gzhttp.GzipHandler(catalogHandler) - return newLoggingMiddleware(gzHandler) + if schema == "" && pkg == "" && name == "" { + // If no parameters are provided, return the entire catalog (this is the same as /api/v1/all) + serveJsonLines(w, r, catalogStat.ModTime(), catalogFile) + return + } + idx, err := s.getIndex(catalog) + if err != nil { + httpError(w, err) + return + } + indexReader, ok := idx.Get(catalogFile, schema, pkg, name) + if !ok { + httpError(w, fs.ErrNotExist) + return + } + serveJsonLines(w, r, catalogStat.ModTime(), indexReader) } -func (s *LocalDirV1) ContentExists(catalog string) bool { - s.m.RLock() - defer s.m.RUnlock() - - catalogFileStat, err := os.Stat(filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog))) +func (s *LocalDirV1) catalogData(catalog string) (*os.File, os.FileInfo, error) { + catalogFile, err := os.Open(catalogFilePath(s.catalogDir(catalog))) if err != nil { - return false + return nil, nil, err } - if !catalogFileStat.Mode().IsRegular() { - // path is not valid content - return false + catalogFileStat, err := catalogFile.Stat() + if err != nil { + return nil, nil, err } + return catalogFile, catalogFileStat, nil +} - if features.CatalogdFeatureGate.Enabled(features.APIV1QueryHandler) { - indexFileStat, err := os.Stat(filepath.Join(s.RootDir, fmt.Sprintf("%s.index.json", catalog))) - if err != nil { - return false - } - if !indexFileStat.Mode().IsRegular() { - return false - } +func httpError(w http.ResponseWriter, err error) { + var code int + switch { + case errors.Is(err, fs.ErrNotExist): + code = http.StatusNotFound + case errors.Is(err, fs.ErrPermission): + code = http.StatusForbidden + default: + code = http.StatusInternalServerError } - return true + http.Error(w, fmt.Sprintf("%d %s", code, http.StatusText(code)), code) +} + +func serveJsonLines(w http.ResponseWriter, r *http.Request, modTime time.Time, rs io.ReadSeeker) { + w.Header().Add("Content-Type", "application/jsonl") + http.ServeContent(w, r, "", modTime, rs) } func (s *LocalDirV1) getIndex(catalog string) (*index, error) { idx, err, _ := s.sf.Do(catalog, func() (interface{}, error) { - indexFilePath := filepath.Join(s.RootDir, fmt.Sprintf("%s.index.json", catalog)) - fmt.Printf("opening index file %s\n", indexFilePath) - indexFile, err := os.Open(indexFilePath) + indexFile, err := os.Open(catalogIndexFilePath(s.catalogDir(catalog))) if err != nil { return nil, err } @@ -284,33 +273,8 @@ func (s *LocalDirV1) getIndex(catalog string) (*index, error) { } return &idx, nil }) - return idx.(*index), err -} - -func newLoggingMiddleware(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - logger := logr.FromContextOrDiscard(r.Context()) - - start := time.Now() - lrw := &loggingResponseWriter{ResponseWriter: w, statusCode: http.StatusOK} - next.ServeHTTP(lrw, r) - - logger.WithValues( - "method", r.Method, - "url", r.URL.String(), - "status", lrw.statusCode, - "duration", time.Since(start), - "remoteAddr", r.RemoteAddr, - ).Info("HTTP request processed") - }) -} - -type loggingResponseWriter struct { - http.ResponseWriter - statusCode int -} - -func (w *loggingResponseWriter) WriteHeader(code int) { - w.statusCode = code - w.ResponseWriter.WriteHeader(code) + if err != nil { + return nil, err + } + return idx.(*index), nil } diff --git a/catalogd/internal/storage/multireadseeker.go b/catalogd/internal/storage/multireadseeker.go new file mode 100644 index 000000000..f1e3e1cd3 --- /dev/null +++ b/catalogd/internal/storage/multireadseeker.go @@ -0,0 +1,118 @@ +package storage + +import ( + "io" + "os" +) + +type doubleReadSeeker struct { + rs1, rs2 io.ReadSeeker + rs1len, rs2len int64 + second bool + pos int64 +} + +func (r *doubleReadSeeker) Seek(offset int64, whence int) (int64, error) { + var err error + switch whence { + case os.SEEK_SET: + if offset < r.rs1len { + r.second = false + r.pos, err = r.rs1.Seek(offset, os.SEEK_SET) + return r.pos, err + } else { + r.second = true + r.pos, err = r.rs2.Seek(offset-r.rs1len, os.SEEK_SET) + r.pos += r.rs1len + return r.pos, err + } + case os.SEEK_END: // negative offset + return r.Seek(r.rs1len+r.rs2len+offset-1, os.SEEK_SET) + default: // os.SEEK_CUR + return r.Seek(r.pos+offset, os.SEEK_SET) + } +} + +func (r *doubleReadSeeker) Read(p []byte) (n int, err error) { + switch { + case r.pos >= r.rs1len: // read only from the second reader + n, err := r.rs2.Read(p) + r.pos += int64(n) + return n, err + case r.pos+int64(len(p)) <= r.rs1len: // read only from the first reader + n, err := r.rs1.Read(p) + r.pos += int64(n) + return n, err + default: // read on the border - end of first reader and start of second reader + n1, err := r.rs1.Read(p) + r.pos += int64(n1) + if r.pos != r.rs1len || (err != nil && err != io.EOF) { + // Read() might not read all, return + // If error (but not EOF), return + return n1, err + } + _, err = r.rs2.Seek(0, os.SEEK_SET) + if err != nil { + return n1, err + } + r.second = true + n2, err := r.rs2.Read(p[n1:]) + r.pos += int64(n2) + return n1 + n2, err + } +} + +func multiReadSeeker(rs []io.ReadSeeker, leftmost bool) (io.ReadSeeker, int64, error) { + if len(rs) == 1 { + r := rs[0] + l, err := r.Seek(0, io.SeekEnd) + if err != nil { + return nil, 0, err + } + if leftmost { + _, err = r.Seek(0, io.SeekStart) + } + return r, l, err + } else { + rs1, l1, err := multiReadSeeker(rs[:len(rs)/2], leftmost) + if err != nil { + return nil, 0, err + } + rs2, l2, err := multiReadSeeker(rs[len(rs)/2:], false) + if err != nil { + return nil, 0, err + } + return &doubleReadSeeker{rs1, rs2, l1, l2, false, 0}, l1 + l2, nil + } +} + +type emptyReadSeeker struct{} + +func (r *emptyReadSeeker) Read(p []byte) (n int, err error) { + return 0, io.EOF +} + +func (r *emptyReadSeeker) Seek(offset int64, whence int) (int64, error) { + return 0, io.EOF +} + +// newMultiReadSeeker returns a ReadSeeker that's the logical concatenation of the provided +// input readseekers. After calling this method the initial position is set to the +// beginning of the first ReadSeeker. At the end of a ReadSeeker, Read always advances +// to the beginning of the next ReadSeeker and returns EOF at the end of the last ReadSeeker. +// Seek can be used over the sum of lengths of all readseekers. +// +// When a newMultiReadSeeker is used, no Read and Seek operations should be made on +// its ReadSeeker components and the length of the readseekers should not change. +// Also, users should make no assumption on the state of individual readseekers +// while the newMultiReadSeeker is used. +func newMultiReadSeeker(rs ...io.ReadSeeker) io.ReadSeeker { + if len(rs) == 0 { + return &emptyReadSeeker{} + } + r, _, err := multiReadSeeker(rs, true) + if err != nil { + return &emptyReadSeeker{} + } + return r +} From ab587bc549fc6896bbbe48fdec923e28353496c9 Mon Sep 17 00:00:00 2001 From: Anik Bhattacharjee Date: Thu, 16 Jan 2025 15:37:41 -0500 Subject: [PATCH 05/15] Tests --- catalogd/internal/serverutil/serverutil.go | 27 +- .../internal/serverutil/serverutil_test.go | 254 +++++ catalogd/internal/storage/localdir.go | 15 +- catalogd/internal/storage/localdir_test.go | 872 ++++++++++-------- go.mod | 2 +- 5 files changed, 773 insertions(+), 397 deletions(-) create mode 100644 catalogd/internal/serverutil/serverutil_test.go diff --git a/catalogd/internal/serverutil/serverutil.go b/catalogd/internal/serverutil/serverutil.go index daec33057..b87bd1c0a 100644 --- a/catalogd/internal/serverutil/serverutil.go +++ b/catalogd/internal/serverutil/serverutil.go @@ -1,7 +1,6 @@ package serverutil import ( - "context" "crypto/tls" "fmt" "io" @@ -11,9 +10,9 @@ import ( "github.com/go-logr/logr" "github.com/gorilla/handlers" + "github.com/klauspost/compress/gzhttp" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/certwatcher" - "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" catalogdmetrics "github.com/operator-framework/operator-controller/catalogd/internal/metrics" @@ -44,20 +43,12 @@ func AddCatalogServerToManager(mgr ctrl.Manager, cfg CatalogServerConfig, tlsFil } shutdownTimeout := 30 * time.Second - - l := mgr.GetLogger().WithName("catalogd-http-server") - handler := catalogdmetrics.AddMetricsToHandler(cfg.LocalStorage.StorageServerHandler()) - handler = logrLoggingHandler(l, handler) - catalogServer := manager.Server{ Name: "catalogs", OnlyServeWhenLeader: true, Server: &http.Server{ - Addr: cfg.CatalogAddr, - Handler: handler, - BaseContext: func(_ net.Listener) context.Context { - return log.IntoContext(context.Background(), mgr.GetLogger().WithName("http.catalogs")) - }, + Addr: cfg.CatalogAddr, + Handler: storageServerHandlerWrapped(mgr, cfg), ReadTimeout: 5 * time.Second, // TODO: Revert this to 10 seconds if/when the API // evolves to have significantly smaller responses @@ -102,3 +93,15 @@ func logrLoggingHandler(l logr.Logger, handler http.Handler) http.Handler { l.Info("handled request", "host", host, "username", username, "method", params.Request.Method, "uri", uri, "protocol", params.Request.Proto, "status", params.StatusCode, "size", params.Size) }) } + +func storageServerHandlerWrapped(mgr ctrl.Manager, cfg CatalogServerConfig) http.Handler { + + handler := cfg.LocalStorage.StorageServerHandler() + handler = gzhttp.GzipHandler(handler) + handler = catalogdmetrics.AddMetricsToHandler(handler) + + l := mgr.GetLogger().WithName("catalogd-http-server") + handler = logrLoggingHandler(l, handler) + return handler + +} diff --git a/catalogd/internal/serverutil/serverutil_test.go b/catalogd/internal/serverutil/serverutil_test.go new file mode 100644 index 000000000..89938d82a --- /dev/null +++ b/catalogd/internal/serverutil/serverutil_test.go @@ -0,0 +1,254 @@ +package serverutil + +import ( + "compress/gzip" + "context" + "io" + "io/fs" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" + ctrl "sigs.k8s.io/controller-runtime" +) + +func TestStorageServerHandlerWrapped_Gzip(t *testing.T) { + tests := []struct { + name string + acceptEncoding string + responseContent string + expectCompressed bool + expectedStatus int + }{ + { + name: "compresses large response when client accepts gzip", + acceptEncoding: "gzip", + responseContent: testCompressableJSON, + expectCompressed: true, + expectedStatus: http.StatusOK, + }, + { + name: "does not compress small response even when client accepts gzip", + acceptEncoding: "gzip", + responseContent: `{"foo":"bar"}`, + expectCompressed: false, + expectedStatus: http.StatusOK, + }, + { + name: "does not compress when client doesn't accept gzip", + acceptEncoding: "", + responseContent: testCompressableJSON, + expectCompressed: false, + expectedStatus: http.StatusOK, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a mock storage instance that returns our test content + mockStorage := &mockStorageInstance{ + content: tt.responseContent, + } + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{}) + require.NoError(t, err) + + cfg := CatalogServerConfig{ + LocalStorage: mockStorage, + } + handler := storageServerHandlerWrapped(mgr, cfg) + + // Create test request + req := httptest.NewRequest("GET", "/test", nil) + if tt.acceptEncoding != "" { + req.Header.Set("Accept-Encoding", tt.acceptEncoding) + } + + // Create response recorder + rec := httptest.NewRecorder() + + // Handle the request + handler.ServeHTTP(rec, req) + + // Check status code + require.Equal(t, tt.expectedStatus, rec.Code) + + // Check if response was compressed + wasCompressed := rec.Header().Get("Content-Encoding") == "gzip" + require.Equal(t, tt.expectCompressed, wasCompressed) + + // Get the response body + var responseBody []byte + if wasCompressed { + // Decompress the response + gzipReader, err := gzip.NewReader(rec.Body) + require.NoError(t, err) + responseBody, err = io.ReadAll(gzipReader) + require.NoError(t, err) + require.NoError(t, gzipReader.Close()) + } else { + responseBody = rec.Body.Bytes() + } + + // Verify the response content + require.Equal(t, tt.responseContent, string(responseBody)) + }) + } +} + +const testCompressableJSON = `{ + "defaultChannel": "stable-v6.x", + "name": "cockroachdb", + "schema": "olm.package" + } + { + "entries": [ + { + "name": "cockroachdb.v5.0.3" + }, + { + "name": "cockroachdb.v5.0.4", + "replaces": "cockroachdb.v5.0.3" + } + ], + "name": "stable-5.x", + "package": "cockroachdb", + "schema": "olm.channel" + } + { + "entries": [ + { + "name": "cockroachdb.v6.0.0", + "skipRange": "<6.0.0" + } + ], + "name": "stable-v6.x", + "package": "cockroachdb", + "schema": "olm.channel" + } + { + "image": "quay.io/openshift-community-operators/cockroachdb@sha256:a5d4f4467250074216eb1ba1c36e06a3ab797d81c431427fc2aca97ecaf4e9d8", + "name": "cockroachdb.v5.0.3", + "package": "cockroachdb", + "properties": [ + { + "type": "olm.gvk", + "value": { + "group": "charts.operatorhub.io", + "kind": "Cockroachdb", + "version": "v1alpha1" + } + }, + { + "type": "olm.package", + "value": { + "packageName": "cockroachdb", + "version": "5.0.3" + } + } + ], + "relatedImages": [ + { + "name": "", + "image": "quay.io/helmoperators/cockroachdb:v5.0.3" + }, + { + "name": "", + "image": "quay.io/openshift-community-operators/cockroachdb@sha256:a5d4f4467250074216eb1ba1c36e06a3ab797d81c431427fc2aca97ecaf4e9d8" + } + ], + "schema": "olm.bundle" + } + { + "image": "quay.io/openshift-community-operators/cockroachdb@sha256:f42337e7b85a46d83c94694638e2312e10ca16a03542399a65ba783c94a32b63", + "name": "cockroachdb.v5.0.4", + "package": "cockroachdb", + "properties": [ + { + "type": "olm.gvk", + "value": { + "group": "charts.operatorhub.io", + "kind": "Cockroachdb", + "version": "v1alpha1" + } + }, + { + "type": "olm.package", + "value": { + "packageName": "cockroachdb", + "version": "5.0.4" + } + } + ], + "relatedImages": [ + { + "name": "", + "image": "quay.io/helmoperators/cockroachdb:v5.0.4" + }, + { + "name": "", + "image": "quay.io/openshift-community-operators/cockroachdb@sha256:f42337e7b85a46d83c94694638e2312e10ca16a03542399a65ba783c94a32b63" + } + ], + "schema": "olm.bundle" + } + { + "image": "quay.io/openshift-community-operators/cockroachdb@sha256:d3016b1507515fc7712f9c47fd9082baf9ccb070aaab58ed0ef6e5abdedde8ba", + "name": "cockroachdb.v6.0.0", + "package": "cockroachdb", + "properties": [ + { + "type": "olm.gvk", + "value": { + "group": "charts.operatorhub.io", + "kind": "Cockroachdb", + "version": "v1alpha1" + } + }, + { + "type": "olm.package", + "value": { + "packageName": "cockroachdb", + "version": "6.0.0" + } + } + ], + "relatedImages": [ + { + "name": "", + "image": "quay.io/cockroachdb/cockroach-helm-operator:6.0.0" + }, + { + "name": "", + "image": "quay.io/openshift-community-operators/cockroachdb@sha256:d3016b1507515fc7712f9c47fd9082baf9ccb070aaab58ed0ef6e5abdedde8ba" + } + ], + "schema": "olm.bundle" + } + ` + +// mockStorageInstance implements storage.Instance interface for testing +type mockStorageInstance struct { + content string +} + +func (m *mockStorageInstance) StorageServerHandler() http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(m.content)) + }) +} + +func (m *mockStorageInstance) Store(ctx context.Context, catalogName string, fs fs.FS) error { + return nil +} + +func (m *mockStorageInstance) Delete(catalogName string) error { + return nil +} + +func (m *mockStorageInstance) ContentExists(catalog string) bool { + return true +} +func (m *mockStorageInstance) BaseURL(catalog string) string { + return "" +} diff --git a/catalogd/internal/storage/localdir.go b/catalogd/internal/storage/localdir.go index db642e8f5..07bc13faf 100644 --- a/catalogd/internal/storage/localdir.go +++ b/catalogd/internal/storage/localdir.go @@ -34,6 +34,11 @@ type LocalDirV1 struct { sf singleflight.Group } +var ( + _ Instance = &LocalDirV1{} + ErrInvalidParams = errors.New("invalid parameters") +) + func (s *LocalDirV1) Store(ctx context.Context, catalog string, fsys fs.FS) error { s.m.Lock() defer s.m.Unlock() @@ -56,9 +61,13 @@ func (s *LocalDirV1) Store(ctx context.Context, catalog string, fsys fs.FS) erro eg, egCtx = errgroup.WithContext(ctx) metaChans []chan *declcfg.Meta ) - for i, f := range storeMetaFuncs { + for range storeMetaFuncs { metaChans = append(metaChans, make(chan *declcfg.Meta, 1)) - eg.Go(func() error { return f(tmpCatalogDir, metaChans[i]) }) + } + for i, f := range storeMetaFuncs { + eg.Go(func() error { + return f(tmpCatalogDir, metaChans[i]) + }) } err = declcfg.WalkMetasFS(egCtx, fsys, func(path string, meta *declcfg.Meta, err error) error { if err != nil { @@ -249,6 +258,8 @@ func httpError(w http.ResponseWriter, err error) { code = http.StatusNotFound case errors.Is(err, fs.ErrPermission): code = http.StatusForbidden + case errors.Is(err, ErrInvalidParams): + code = http.StatusBadRequest default: code = http.StatusInternalServerError } diff --git a/catalogd/internal/storage/localdir_test.go b/catalogd/internal/storage/localdir_test.go index 0e84d3adc..df7e9dcb0 100644 --- a/catalogd/internal/storage/localdir_test.go +++ b/catalogd/internal/storage/localdir_test.go @@ -1,10 +1,7 @@ package storage import ( - "bytes" - "compress/gzip" "context" - "encoding/json" "errors" "fmt" "io" @@ -13,221 +10,495 @@ import ( "net/http/httptest" "net/url" "os" - "path/filepath" "strings" + "sync" + "testing" "testing/fstest" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - - "github.com/google/go-cmp/cmp" - "sigs.k8s.io/yaml" - - "github.com/operator-framework/operator-registry/alpha/declcfg" + "github.com/stretchr/testify/require" ) const urlPrefix = "/catalogs/" -var ctx = context.Background() - -var _ = Describe("LocalDir Storage Test", func() { - var ( - catalog = "test-catalog" - store Instance - rootDir string - baseURL *url.URL - testBundleName = "bundle.v0.0.1" - testBundleImage = "quaydock.io/namespace/bundle:0.0.3" - testBundleRelatedImageName = "test" - testBundleRelatedImageImage = "testimage:latest" - testBundleObjectData = "dW5pbXBvcnRhbnQK" - testPackageDefaultChannel = "preview_test" - testPackageName = "webhook_operator_test" - testChannelName = "preview_test" - testPackage = fmt.Sprintf(testPackageTemplate, testPackageDefaultChannel, testPackageName) - testBundle = fmt.Sprintf(testBundleTemplate, testBundleImage, testBundleName, testPackageName, testBundleRelatedImageName, testBundleRelatedImageImage, testBundleObjectData) - testChannel = fmt.Sprintf(testChannelTemplate, testPackageName, testChannelName, testBundleName) - - unpackResultFS fs.FS - ) - BeforeEach(func() { - d, err := os.MkdirTemp(GinkgoT().TempDir(), "cache") - Expect(err).ToNot(HaveOccurred()) - rootDir = d - - baseURL = &url.URL{Scheme: "http", Host: "test-addr", Path: urlPrefix} - store = &LocalDirV1{RootDir: rootDir, RootURL: baseURL} - unpackResultFS = &fstest.MapFS{ - "bundle.yaml": &fstest.MapFile{Data: []byte(testBundle), Mode: os.ModePerm}, - "package.yaml": &fstest.MapFile{Data: []byte(testPackage), Mode: os.ModePerm}, - "channel.yaml": &fstest.MapFile{Data: []byte(testChannel), Mode: os.ModePerm}, - } - }) - When("An unpacked FBC is stored using LocalDir", func() { - BeforeEach(func() { - err := store.Store(context.Background(), catalog, unpackResultFS) - Expect(err).To(Not(HaveOccurred())) - }) - It("should store the content in the RootDir correctly", func() { - fbcFile := filepath.Join(rootDir, fmt.Sprintf("%s.jsonl", catalog)) - _, err := os.Stat(fbcFile) - Expect(err).To(Not(HaveOccurred())) - - gotConfig, err := declcfg.LoadFS(ctx, unpackResultFS) - Expect(err).To(Not(HaveOccurred())) - storedConfig, err := declcfg.LoadFile(os.DirFS(filepath.Dir(fbcFile)), filepath.Base(fbcFile)) - Expect(err).To(Not(HaveOccurred())) - diff := cmp.Diff(gotConfig, storedConfig) - Expect(diff).To(Equal("")) - }) - It("should form the content URL correctly", func() { - Expect(store.BaseURL(catalog)).To(Equal(baseURL.JoinPath(catalog).String())) +func TestLocalDirStoraget(t *testing.T) { + tests := []struct { + name string + setup func(*testing.T) (*LocalDirV1, fs.FS) + test func(*testing.T, *LocalDirV1, fs.FS) + cleanup func(*testing.T, *LocalDirV1) + }{ + { + name: "store and retrieve catalog content", + setup: func(t *testing.T) (*LocalDirV1, fs.FS) { + s := &LocalDirV1{ + RootDir: t.TempDir(), + RootURL: &url.URL{Scheme: "http", Host: "test-addr", Path: urlPrefix}, + } + return s, createTestFS(t) + }, + test: func(t *testing.T, s *LocalDirV1, fsys fs.FS) { + const catalog = "test-catalog" + + // Initially content should not exist + if s.ContentExists(catalog) { + t.Fatal("content should not exist before store") + } + + // Store the content + if err := s.Store(context.Background(), catalog, fsys); err != nil { + t.Fatal(err) + } + + // Verify content exists after store + if !s.ContentExists(catalog) { + t.Fatal("content should exist after store") + } + + // Delete the content + if err := s.Delete(catalog); err != nil { + t.Fatal(err) + } + + // Verify content no longer exists + if s.ContentExists(catalog) { + t.Fatal("content should not exist after delete") + } + }, + }, + { + name: "storing with query handler enabled should create indexes", + setup: func(t *testing.T) (*LocalDirV1, fs.FS) { + s := &LocalDirV1{ + RootDir: t.TempDir(), + EnableQueryHandler: true, + } + return s, createTestFS(t) + }, + test: func(t *testing.T, s *LocalDirV1, fsys fs.FS) { + err := s.Store(context.Background(), "test-catalog", fsys) + if err != nil { + t.Fatal(err) + } + + if !s.ContentExists("test-catalog") { + t.Error("content should exist after store") + } + + // Verify index file was created + indexPath := catalogIndexFilePath(s.catalogDir("test-catalog")) + if _, err := os.Stat(indexPath); err != nil { + t.Errorf("index file should exist: %v", err) + } + }, + }, + { + name: "concurrent reads during write should not cause data race", + setup: func(t *testing.T) (*LocalDirV1, fs.FS) { + dir := t.TempDir() + s := &LocalDirV1{RootDir: dir} + return s, createTestFS(t) + }, + test: func(t *testing.T, s *LocalDirV1, fsys fs.FS) { + const catalog = "test-catalog" + var wg sync.WaitGroup + + // Start multiple concurrent readers + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Add(-1) + for j := 0; j < 100; j++ { + s.ContentExists(catalog) + } + }() + } + + // Write while readers are active + err := s.Store(context.Background(), catalog, fsys) + if err != nil { + t.Fatal(err) + } + + wg.Wait() + }, + }, + { + name: "delete nonexistent catalog", + setup: func(t *testing.T) (*LocalDirV1, fs.FS) { + return &LocalDirV1{RootDir: t.TempDir()}, nil + }, + test: func(t *testing.T, s *LocalDirV1, _ fs.FS) { + err := s.Delete("nonexistent") + if err != nil { + t.Errorf("expected no error deleting nonexistent catalog, got: %v", err) + } + }, + }, + { + name: "store with invalid permissions", + setup: func(t *testing.T) (*LocalDirV1, fs.FS) { + dir := t.TempDir() + // Set directory permissions to deny access + if err := os.Chmod(dir, 0000); err != nil { + t.Fatal(err) + } + return &LocalDirV1{RootDir: dir}, createTestFS(t) + }, + test: func(t *testing.T, s *LocalDirV1, fsys fs.FS) { + err := s.Store(context.Background(), "test-catalog", fsys) + if !errors.Is(err, fs.ErrPermission) { + t.Errorf("expected permission error, got: %v", err) + } + }, + cleanup: func(t *testing.T, s *LocalDirV1) { + // Restore permissions so cleanup can succeed + require.NoError(t, os.Chmod(s.RootDir, 0700)) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, fsys := tt.setup(t) + tt.test(t, s, fsys) + if tt.cleanup != nil { + tt.cleanup(t, s) + } }) - It("should report content exists", func() { - Expect(store.ContentExists(catalog)).To(BeTrue()) + } +} + +func TestLocalDirServerHandler(t *testing.T) { + store := &LocalDirV1{RootDir: t.TempDir(), RootURL: &url.URL{Path: urlPrefix}} + testFS := fstest.MapFS{ + "meta.json": &fstest.MapFile{ + Data: []byte(`{"foo":"bar"}`), + }, + } + if store.Store(context.Background(), "test-catalog", testFS) != nil { + t.Fatal("failed to store test catalog and start server") + } + testServer := httptest.NewServer(store.StorageServerHandler()) + defer testServer.Close() + + for _, tc := range []struct { + name string + URLPath string + expectedStatusCode int + expectedContent string + }{ + { + name: "Server returns 404 when root URL is queried", + expectedStatusCode: http.StatusNotFound, + expectedContent: "404 page not found", + URLPath: "", + }, + { + name: "Server returns 404 when path '/' is queried", + expectedStatusCode: http.StatusNotFound, + expectedContent: "404 page not found", + URLPath: "/", + }, + { + name: "Server returns 404 when path '/catalogs/' is queried", + expectedStatusCode: http.StatusNotFound, + expectedContent: "404 page not found", + URLPath: "/catalogs/", + }, + { + name: "Server returns 404 when path '/catalogs//' is queried", + expectedStatusCode: http.StatusNotFound, + expectedContent: "404 page not found", + URLPath: "/catalogs/test-catalog/", + }, + { + name: "Server returns 404 when path '/catalogs//api/' is queried", + expectedStatusCode: http.StatusNotFound, + expectedContent: "404 page not found", + URLPath: "/catalogs/test-catalog/api/", + }, + { + name: "Serer return 404 when path '/catalogs//api/v1' is queried", + expectedStatusCode: http.StatusNotFound, + expectedContent: "404 page not found", + URLPath: "/catalogs/test-catalog/api/v1c", + }, + { + name: "Server return 404 when path '/catalogs//non-existent.txt' is queried", + expectedStatusCode: http.StatusNotFound, + expectedContent: "404 page not found", + URLPath: "/catalogs/test-catalog/non-existent.txt", + }, + { + name: "Server returns 404 when path '/catalogs/.jsonl' is queried even if the file exists, since we don't serve the filesystem, and serve an API instead", + expectedStatusCode: http.StatusNotFound, + expectedContent: "404 page not found", + URLPath: "/catalogs/test-catalog.jsonl", + }, + { + name: "Server returns 404 when non-existent catalog is queried", + expectedStatusCode: http.StatusNotFound, + expectedContent: "404 Not Found", + URLPath: "/catalogs/non-existent-catalog/api/v1/all", + }, + { + name: "Server returns 200 when path '/catalogs//api/v1/all' is queried, when catalog exists", + expectedStatusCode: http.StatusOK, + expectedContent: `{"foo":"bar"}`, + URLPath: "/catalogs/test-catalog/api/v1/all", + }, + } { + t.Run(tc.name, func(t *testing.T) { + req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", testServer.URL, tc.URLPath), nil) + require.NoError(t, err) + req.Header.Set("Accept-Encoding", "gzip") + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + + require.Equal(t, tc.expectedStatusCode, resp.StatusCode) + + var actualContent []byte + actualContent, err = io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, tc.expectedContent, strings.TrimSpace(string(actualContent))) + require.NoError(t, resp.Body.Close()) }) - When("The stored content is deleted", func() { - BeforeEach(func() { - err := store.Delete(catalog) - Expect(err).To(Not(HaveOccurred())) - }) - It("should delete the FBC from the cache directory", func() { - fbcFile := filepath.Join(rootDir, fmt.Sprintf("%s.jsonl", catalog)) - _, err := os.Stat(fbcFile) - Expect(err).To(HaveOccurred()) - Expect(os.IsNotExist(err)).To(BeTrue()) - - indexFile := filepath.Join(rootDir, fmt.Sprintf("%s.index.json", catalog)) - _, err = os.Stat(indexFile) - Expect(err).To(HaveOccurred()) - Expect(os.IsNotExist(err)).To(BeTrue()) - }) - It("should report content does not exist", func() { - Expect(store.ContentExists(catalog)).To(BeFalse()) - }) + } +} + +// Tests to verify the behavior of the query endpoint, as described in +// https://docs.google.com/document/d/1s6_9IFEKGQLNh3ueH7SF4Yrx4PW9NSiNFqFIJx0pU-8/edit?usp=sharing +func TestQueryEndpoint(t *testing.T) { + store := &LocalDirV1{ + RootDir: t.TempDir(), + RootURL: &url.URL{Path: urlPrefix}, + EnableQueryHandler: true, + } + if store.Store(context.Background(), "test-catalog", createTestFS(t)) != nil { + t.Fatal("failed to store test catalog") + } + testServer := httptest.NewServer(store.StorageServerHandler()) + + testCases := []struct { + name string + setupStore func() (*httptest.Server, error) + queryParams string + expectedStatusCode int + expectedContent string + }{ + { + name: "valid query with package schema", + queryParams: "?schema=olm.package", + expectedStatusCode: http.StatusOK, + expectedContent: `{"defaultChannel":"preview_test","name":"webhook_operator_test","schema":"olm.package"}`, + }, + { + name: "valid query with schema and name combination", + queryParams: "?schema=olm.package&name=webhook_operator_test", + expectedStatusCode: http.StatusOK, + expectedContent: `{"defaultChannel":"preview_test","name":"webhook_operator_test","schema":"olm.package"}`, + }, + { + name: "valid query with channel schema and package name combination", + queryParams: "?schema=olm.channel&package=webhook_operator_test", + expectedStatusCode: http.StatusOK, + expectedContent: `{"entries":[{"name":"bundle.v0.0.1"}],"name":"preview_test","package":"webhook_operator_test","schema":"olm.channel"}`, + }, + { + name: "query with all meta fields", + queryParams: "?schema=olm.bundle&package=webhook_operator_test&name=bundle.v0.0.1", + expectedStatusCode: http.StatusOK, + expectedContent: `{"image":"quaydock.io/namespace/bundle:0.0.3","name":"bundle.v0.0.1","package":"webhook_operator_test","properties":[{"type":"olm.bundle.object","value":{"data":"dW5pbXBvcnRhbnQK"}},{"type":"some.other","value":{"data":"arbitrary-info"}}],"relatedImages":[{"image":"testimage:latest","name":"test"}],"schema":"olm.bundle"}`, + }, + // { + // name: "valid query for package schema for a package that does not exist", + // queryParams: "?schema=olm.package&name=not-present", + // expectedStatusCode: http.StatusOK, + // expectedContent: "", + // }, + { + name: "valid query with package and name", + queryParams: "?package=webhook_operator_test&name=bundle.v0.0.1", + expectedStatusCode: http.StatusOK, + expectedContent: `{"image":"quaydock.io/namespace/bundle:0.0.3","name":"bundle.v0.0.1","package":"webhook_operator_test","properties":[{"type":"olm.bundle.object","value":{"data":"dW5pbXBvcnRhbnQK"}},{"type":"some.other","value":{"data":"arbitrary-info"}}],"relatedImages":[{"image":"testimage:latest","name":"test"}],"schema":"olm.bundle"}`, + }, + // { + // name: "invalid query with non-existent schema", + // queryParams: "?schema=non_existent_schema", + // expectedStatusCode: http.StatusNotFound, + // expectedContent: "400 Bad Request", + // }, + { + name: "cached response with If-Modified-Since", + queryParams: "?schema=olm.package", + expectedStatusCode: http.StatusNotModified, + expectedContent: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/catalogs/test-catalog/api/v1/query%s", testServer.URL, tc.queryParams), nil) + require.NoError(t, err) + + if strings.Contains(tc.name, "If-Modified-Since") { + // Do an initial request to get a Last-Modified timestamp + // for the actual request + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + req.Header.Set("If-Modified-Since", resp.Header.Get("Last-Modified")) + } + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + require.Equal(t, tc.expectedStatusCode, resp.StatusCode) + + actualContent, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, tc.expectedContent, strings.TrimSpace(string(actualContent))) }) - }) -}) - -var _ = Describe("LocalDir Server Handler tests", func() { - var ( - testServer *httptest.Server - store LocalDirV1 - ) - BeforeEach(func() { - d := GinkgoT().TempDir() - store = LocalDirV1{RootDir: d, RootURL: &url.URL{Path: urlPrefix}} - testServer = httptest.NewServer(store.StorageServerHandler()) - - }) - It("gets 404 for the path /", func() { - expectNotFound(testServer.URL) - }) - It("gets 404 for the path /catalogs/", func() { - expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/")) - }) - It("gets 404 for the path /catalogs/test-catalog/", func() { - expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/")) - }) - It("gets 404 for the path /catalogs/test-catalog/api", func() { - expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api")) - }) - It("gets 404 for the path /catalogs/test-catalog/api/v1", func() { - expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api/v1")) - }) - It("gets 404 for the path /catalogs/test-catalog.jsonl", func() { - // This is actually how the file is stored, but we don't serve - // the filesystem, we serve an API. Hence, expect 404 not found - Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog.jsonl"), []byte("foobar"), 0600)).To(Succeed()) - expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog.jsonl")) - }) - It("gets 200 for the path /catalogs/test-catalog/api/v1/all", func() { - expectedContent := []byte(`{"foo":"bar"}`) - Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog.jsonl"), expectedContent, 0600)).To(Succeed()) - expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api/v1/all"), expectedContent, false) - }) - It("ignores accept-encoding for the path /catalogs/test-catalog/api/v1/all with size < 1400 bytes", func() { - expectedContent := []byte(`{"foo":"bar"}`) - Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog.jsonl"), expectedContent, 0600)).To(Succeed()) - expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api/v1/all"), expectedContent, false) - }) - It("provides gzipped content for the path /catalogs/test-catalog/api/v1/all with size > 1400 bytes", func() { - expectedContent := []byte(testCompressableJSON) - Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog.jsonl"), expectedContent, 0600)).To(Succeed()) - expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api/v1/all"), expectedContent, true) - }) - It("provides json-lines format for the served JSON catalog", func() { - catalog := "test-catalog" - unpackResultFS := &fstest.MapFS{ - "catalog.json": &fstest.MapFile{Data: []byte(testCompressableJSON), Mode: os.ModePerm}, - } - err := store.Store(context.Background(), catalog, unpackResultFS) - Expect(err).To(Not(HaveOccurred())) - - expectedContent, err := generateJSONLines([]byte(testCompressableJSON)) - Expect(err).To(Not(HaveOccurred())) - path, err := url.JoinPath(testServer.URL, urlPrefix, catalog, "api", "v1", "all") - Expect(err).To(Not(HaveOccurred())) - expectFound(path, []byte(expectedContent), true) - }) - It("provides json-lines format for the served YAML catalog", func() { - catalog := "test-catalog" - yamlData, err := makeYAMLFromConcatenatedJSON([]byte(testCompressableJSON)) - Expect(err).To(Not(HaveOccurred())) - unpackResultFS := &fstest.MapFS{ - "catalog.yaml": &fstest.MapFile{Data: yamlData, Mode: os.ModePerm}, - } - err = store.Store(context.Background(), catalog, unpackResultFS) - Expect(err).To(Not(HaveOccurred())) - - expectedContent, err := generateJSONLines(yamlData) - Expect(err).To(Not(HaveOccurred())) - path, err := url.JoinPath(testServer.URL, urlPrefix, catalog, "api", "v1", "all") - Expect(err).To(Not(HaveOccurred())) - expectFound(path, []byte(expectedContent), true) - }) - AfterEach(func() { - testServer.Close() - }) -}) - -func expectNotFound(url string) { - resp, err := http.Get(url) //nolint:gosec - Expect(err).To(Not(HaveOccurred())) - Expect(resp.StatusCode).To(Equal(http.StatusNotFound)) - Expect(resp.Body.Close()).To(Succeed()) + } } -func expectFound(url string, expectedContent []byte, expectCompression bool) { - req, err := http.NewRequest(http.MethodGet, url, nil) - Expect(err).To(Not(HaveOccurred())) - req.Header.Set("Accept-Encoding", "gzip") - resp, err := http.DefaultClient.Do(req) - Expect(err).To(Not(HaveOccurred())) - Expect(resp.StatusCode).To(Equal(http.StatusOK)) - - var actualContent []byte - if expectCompression { - Expect(resp.Header.Get("Content-Encoding")).To(Equal("gzip")) - Expect(len(expectedContent)).To(BeNumerically(">", 1400), - fmt.Sprintf("gzipped content should only be provided for content larger than 1400 bytes, but our expected content is only %d bytes", len(expectedContent))) - gz, err := gzip.NewReader(resp.Body) - Expect(err).To(Not(HaveOccurred())) - actualContent, err = io.ReadAll(gz) - Expect(err).To(Not(HaveOccurred())) - } else { - Expect(resp.Header.Get("Content-Encoding")).To(BeEmpty()) - actualContent, err = io.ReadAll(resp.Body) - Expect(len(expectedContent)).To(BeNumerically("<", 1400), - fmt.Sprintf("plaintext content should only be provided for content smaller than 1400 bytes, but we received plaintext for %d bytes\n expectedContent:\n%s\n", len(expectedContent), expectedContent)) - Expect(err).To(Not(HaveOccurred())) +func TestServerLoadHandling(t *testing.T) { + store := &LocalDirV1{ + RootDir: t.TempDir(), + RootURL: &url.URL{Path: urlPrefix}, + EnableQueryHandler: true, + } + + // Create large test data + largeFS := fstest.MapFS{} + for i := 0; i < 1000; i++ { + largeFS[fmt.Sprintf("meta_%d.json", i)] = &fstest.MapFile{ + Data: []byte(fmt.Sprintf(`{"schema":"olm.bundle","package":"test-op-%d","name":"test-op.v%d.0"}`, i, i)), + } + } + + if err := store.Store(context.Background(), "test-catalog", largeFS); err != nil { + t.Fatal("failed to store test catalog") } - Expect(actualContent).To(Equal(expectedContent)) - Expect(resp.Body.Close()).To(Succeed()) + testServer := httptest.NewServer(store.StorageServerHandler()) + defer testServer.Close() + + tests := []struct { + name string + concurrent int + requests func(baseURL string) []*http.Request + validateFunc func(t *testing.T, responses []*http.Response, errs []error) + }{ + { + name: "concurrent identical queries", + concurrent: 100, + requests: func(baseURL string) []*http.Request { + var reqs []*http.Request + for i := 0; i < 100; i++ { + req, _ := http.NewRequest(http.MethodGet, + fmt.Sprintf("%s/catalogs/test-catalog/api/v1/query?schema=olm.bundle", baseURL), + nil) + req.Header.Set("Accept", "application/jsonl") + reqs = append(reqs, req) + } + return reqs + }, + validateFunc: func(t *testing.T, responses []*http.Response, errs []error) { + for _, err := range errs { + require.NoError(t, err) + } + for _, resp := range responses { + require.Equal(t, http.StatusOK, resp.StatusCode) + require.Equal(t, resp.Header.Get("Content-Type"), "application/jsonl") + resp.Body.Close() + } + }, + }, + { + name: "concurrent different queries", + concurrent: 50, + requests: func(baseURL string) []*http.Request { + var reqs []*http.Request + for i := 0; i < 50; i++ { + req, _ := http.NewRequest(http.MethodGet, + fmt.Sprintf("%s/catalogs/test-catalog/api/v1/query?package=test-op-%d", baseURL, i), + nil) + req.Header.Set("Accept", "application/jsonl") + reqs = append(reqs, req) + } + return reqs + }, + validateFunc: func(t *testing.T, responses []*http.Response, errs []error) { + for _, err := range errs { + require.NoError(t, err) + } + for _, resp := range responses { + require.Equal(t, http.StatusOK, resp.StatusCode) + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Contains(t, string(body), "test-op-") + resp.Body.Close() + } + }, + }, + { + name: "mixed all and query endpoints", + concurrent: 40, + requests: func(baseURL string) []*http.Request { + var reqs []*http.Request + for i := 0; i < 20; i++ { + allReq, _ := http.NewRequest(http.MethodGet, + fmt.Sprintf("%s/catalogs/test-catalog/api/v1/all", baseURL), + nil) + queryReq, _ := http.NewRequest(http.MethodGet, + fmt.Sprintf("%s/catalogs/test-catalog/api/v1/query?schema=olm.bundle", baseURL), + nil) + allReq.Header.Set("Accept", "application/jsonl") + queryReq.Header.Set("Accept", "application/jsonl") + reqs = append(reqs, allReq, queryReq) + } + return reqs + }, + validateFunc: func(t *testing.T, responses []*http.Response, errs []error) { + for _, err := range errs { + require.NoError(t, err) + } + for _, resp := range responses { + require.Equal(t, http.StatusOK, resp.StatusCode) + resp.Body.Close() + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var ( + wg sync.WaitGroup + responses = make([]*http.Response, tt.concurrent) + errs = make([]error, tt.concurrent) + ) + + requests := tt.requests(testServer.URL) + for i := 0; i < tt.concurrent; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + resp, err := http.DefaultClient.Do(requests[idx]) + responses[idx] = resp + errs[idx] = err + }(i) + } + + wg.Wait() + tt.validateFunc(t, responses, errs) + }) + } } -const testBundleTemplate = `--- +func createTestFS(t *testing.T) fs.FS { + t.Helper() + testBundleTemplate := `--- image: %s name: %s schema: olm.bundle @@ -244,197 +515,34 @@ properties: data: arbitrary-info ` -const testPackageTemplate = `--- + testPackageTemplate := `--- defaultChannel: %s name: %s schema: olm.package ` -const testChannelTemplate = `--- + testChannelTemplate := `--- schema: olm.channel package: %s name: %s entries: - name: %s ` + testBundleName := "bundle.v0.0.1" + testBundleImage := "quaydock.io/namespace/bundle:0.0.3" + testBundleRelatedImageName := "test" + testBundleRelatedImageImage := "testimage:latest" + testBundleObjectData := "dW5pbXBvcnRhbnQK" + testPackageDefaultChannel := "preview_test" + testPackageName := "webhook_operator_test" + testChannelName := "preview_test" -// by default the compressor will only trigger for files larger than 1400 bytes -const testCompressableJSON = `{ - "defaultChannel": "stable-v6.x", - "name": "cockroachdb", - "schema": "olm.package" -} -{ - "entries": [ - { - "name": "cockroachdb.v5.0.3" - }, - { - "name": "cockroachdb.v5.0.4", - "replaces": "cockroachdb.v5.0.3" - } - ], - "name": "stable-5.x", - "package": "cockroachdb", - "schema": "olm.channel" -} -{ - "entries": [ - { - "name": "cockroachdb.v6.0.0", - "skipRange": "<6.0.0" - } - ], - "name": "stable-v6.x", - "package": "cockroachdb", - "schema": "olm.channel" -} -{ - "image": "quay.io/openshift-community-operators/cockroachdb@sha256:a5d4f4467250074216eb1ba1c36e06a3ab797d81c431427fc2aca97ecaf4e9d8", - "name": "cockroachdb.v5.0.3", - "package": "cockroachdb", - "properties": [ - { - "type": "olm.gvk", - "value": { - "group": "charts.operatorhub.io", - "kind": "Cockroachdb", - "version": "v1alpha1" - } - }, - { - "type": "olm.package", - "value": { - "packageName": "cockroachdb", - "version": "5.0.3" - } - } - ], - "relatedImages": [ - { - "name": "", - "image": "quay.io/helmoperators/cockroachdb:v5.0.3" - }, - { - "name": "", - "image": "quay.io/openshift-community-operators/cockroachdb@sha256:a5d4f4467250074216eb1ba1c36e06a3ab797d81c431427fc2aca97ecaf4e9d8" - } - ], - "schema": "olm.bundle" -} -{ - "image": "quay.io/openshift-community-operators/cockroachdb@sha256:f42337e7b85a46d83c94694638e2312e10ca16a03542399a65ba783c94a32b63", - "name": "cockroachdb.v5.0.4", - "package": "cockroachdb", - "properties": [ - { - "type": "olm.gvk", - "value": { - "group": "charts.operatorhub.io", - "kind": "Cockroachdb", - "version": "v1alpha1" - } - }, - { - "type": "olm.package", - "value": { - "packageName": "cockroachdb", - "version": "5.0.4" - } - } - ], - "relatedImages": [ - { - "name": "", - "image": "quay.io/helmoperators/cockroachdb:v5.0.4" - }, - { - "name": "", - "image": "quay.io/openshift-community-operators/cockroachdb@sha256:f42337e7b85a46d83c94694638e2312e10ca16a03542399a65ba783c94a32b63" - } - ], - "schema": "olm.bundle" -} -{ - "image": "quay.io/openshift-community-operators/cockroachdb@sha256:d3016b1507515fc7712f9c47fd9082baf9ccb070aaab58ed0ef6e5abdedde8ba", - "name": "cockroachdb.v6.0.0", - "package": "cockroachdb", - "properties": [ - { - "type": "olm.gvk", - "value": { - "group": "charts.operatorhub.io", - "kind": "Cockroachdb", - "version": "v1alpha1" - } - }, - { - "type": "olm.package", - "value": { - "packageName": "cockroachdb", - "version": "6.0.0" - } - } - ], - "relatedImages": [ - { - "name": "", - "image": "quay.io/cockroachdb/cockroach-helm-operator:6.0.0" - }, - { - "name": "", - "image": "quay.io/openshift-community-operators/cockroachdb@sha256:d3016b1507515fc7712f9c47fd9082baf9ccb070aaab58ed0ef6e5abdedde8ba" - } - ], - "schema": "olm.bundle" -} -` - -// makeYAMLFromConcatenatedJSON takes a byte slice of concatenated JSON objects and returns a byte slice of concatenated YAML objects. -func makeYAMLFromConcatenatedJSON(data []byte) ([]byte, error) { - var msg json.RawMessage - var delimiter = []byte("---\n") - var yamlData []byte - - yamlData = append(yamlData, delimiter...) - - dec := json.NewDecoder(bytes.NewReader(data)) - for { - err := dec.Decode(&msg) - if errors.Is(err, io.EOF) { - break - } - y, err := yaml.JSONToYAML(msg) - if err != nil { - return []byte{}, err - } - yamlData = append(yamlData, delimiter...) - yamlData = append(yamlData, y...) + testPackage := fmt.Sprintf(testPackageTemplate, testPackageDefaultChannel, testPackageName) + testBundle := fmt.Sprintf(testBundleTemplate, testBundleImage, testBundleName, testPackageName, testBundleRelatedImageName, testBundleRelatedImageImage, testBundleObjectData) + testChannel := fmt.Sprintf(testChannelTemplate, testPackageName, testChannelName, testBundleName) + return &fstest.MapFS{ + "bundle.yaml": {Data: []byte(testBundle), Mode: os.ModePerm}, + "package.yaml": {Data: []byte(testPackage), Mode: os.ModePerm}, + "channel.yaml": {Data: []byte(testChannel), Mode: os.ModePerm}, } - return yamlData, nil -} - -// generateJSONLines takes a byte slice of concatenated JSON objects and returns a JSONlines-formatted string. -func generateJSONLines(in []byte) (string, error) { - var out strings.Builder - reader := bytes.NewReader(in) - - err := declcfg.WalkMetasReader(reader, func(meta *declcfg.Meta, err error) error { - if err != nil { - return err - } - - if meta != nil && meta.Blob != nil { - if meta.Blob[len(meta.Blob)-1] != '\n' { - return fmt.Errorf("blob does not end with newline") - } - } - - _, err = out.Write(meta.Blob) - if err != nil { - return err - } - return nil - }) - return out.String(), err } diff --git a/go.mod b/go.mod index 3a0581966..67a43e9fc 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/spf13/pflag v1.0.6 github.com/stretchr/testify v1.10.0 golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c + golang.org/x/sync v0.10.0 gopkg.in/yaml.v2 v2.4.0 helm.sh/helm/v3 v3.17.0 k8s.io/api v0.32.0 @@ -227,7 +228,6 @@ require ( golang.org/x/crypto v0.32.0 // indirect golang.org/x/net v0.34.0 // indirect golang.org/x/oauth2 v0.25.0 // indirect - golang.org/x/sync v0.10.0 // indirect golang.org/x/sys v0.29.0 // indirect golang.org/x/term v0.28.0 // indirect golang.org/x/text v0.21.0 // indirect From 044d6cdf868d69153b4c38cd4b73fd50089052a4 Mon Sep 17 00:00:00 2001 From: Anik Bhattacharjee Date: Wed, 29 Jan 2025 13:57:14 -0500 Subject: [PATCH 06/15] use io.MultiReader instead of manual implementation --- catalogd/internal/storage/index.go | 6 +- catalogd/internal/storage/localdir.go | 11 +- catalogd/internal/storage/multireadseeker.go | 118 ------------------- 3 files changed, 13 insertions(+), 122 deletions(-) delete mode 100644 catalogd/internal/storage/multireadseeker.go diff --git a/catalogd/internal/storage/index.go b/catalogd/internal/storage/index.go index 20fe5f2ed..02f14d11b 100644 --- a/catalogd/internal/storage/index.go +++ b/catalogd/internal/storage/index.go @@ -51,7 +51,7 @@ func (i index) Size() int64 { return int64(size) } -func (i index) Get(r io.ReaderAt, schema, packageName, name string) (io.ReadSeeker, bool) { +func (i index) Get(r io.ReaderAt, schema, packageName, name string) (io.Reader, bool) { sectionSet := i.getSectionSet(schema, packageName, name) sections := sectionSet.UnsortedList() @@ -59,12 +59,12 @@ func (i index) Get(r io.ReaderAt, schema, packageName, name string) (io.ReadSeek return cmp.Compare(a.offset, b.offset) }) - srs := make([]io.ReadSeeker, 0, len(sections)) + srs := make([]io.Reader, 0, len(sections)) for _, s := range sections { sr := io.NewSectionReader(r, s.offset, s.length) srs = append(srs, sr) } - return newMultiReadSeeker(srs...), true + return io.MultiReader(srs...), true } func (i *index) getSectionSet(schema, packageName, name string) sets.Set[section] { diff --git a/catalogd/internal/storage/localdir.go b/catalogd/internal/storage/localdir.go index 07bc13faf..bfd45bab9 100644 --- a/catalogd/internal/storage/localdir.go +++ b/catalogd/internal/storage/localdir.go @@ -236,7 +236,7 @@ func (s *LocalDirV1) handleV1Query(w http.ResponseWriter, r *http.Request) { httpError(w, fs.ErrNotExist) return } - serveJsonLines(w, r, catalogStat.ModTime(), indexReader) + serveJsonLinesQuery(w, indexReader) } func (s *LocalDirV1) catalogData(catalog string) (*os.File, os.FileInfo, error) { @@ -271,6 +271,15 @@ func serveJsonLines(w http.ResponseWriter, r *http.Request, modTime time.Time, r http.ServeContent(w, r, "", modTime, rs) } +func serveJsonLinesQuery(w http.ResponseWriter, rs io.Reader) { + w.Header().Add("Content-Type", "application/jsonl") + _, err := io.Copy(w, rs) + if err != nil { + httpError(w, err) + return + } +} + func (s *LocalDirV1) getIndex(catalog string) (*index, error) { idx, err, _ := s.sf.Do(catalog, func() (interface{}, error) { indexFile, err := os.Open(catalogIndexFilePath(s.catalogDir(catalog))) diff --git a/catalogd/internal/storage/multireadseeker.go b/catalogd/internal/storage/multireadseeker.go deleted file mode 100644 index f1e3e1cd3..000000000 --- a/catalogd/internal/storage/multireadseeker.go +++ /dev/null @@ -1,118 +0,0 @@ -package storage - -import ( - "io" - "os" -) - -type doubleReadSeeker struct { - rs1, rs2 io.ReadSeeker - rs1len, rs2len int64 - second bool - pos int64 -} - -func (r *doubleReadSeeker) Seek(offset int64, whence int) (int64, error) { - var err error - switch whence { - case os.SEEK_SET: - if offset < r.rs1len { - r.second = false - r.pos, err = r.rs1.Seek(offset, os.SEEK_SET) - return r.pos, err - } else { - r.second = true - r.pos, err = r.rs2.Seek(offset-r.rs1len, os.SEEK_SET) - r.pos += r.rs1len - return r.pos, err - } - case os.SEEK_END: // negative offset - return r.Seek(r.rs1len+r.rs2len+offset-1, os.SEEK_SET) - default: // os.SEEK_CUR - return r.Seek(r.pos+offset, os.SEEK_SET) - } -} - -func (r *doubleReadSeeker) Read(p []byte) (n int, err error) { - switch { - case r.pos >= r.rs1len: // read only from the second reader - n, err := r.rs2.Read(p) - r.pos += int64(n) - return n, err - case r.pos+int64(len(p)) <= r.rs1len: // read only from the first reader - n, err := r.rs1.Read(p) - r.pos += int64(n) - return n, err - default: // read on the border - end of first reader and start of second reader - n1, err := r.rs1.Read(p) - r.pos += int64(n1) - if r.pos != r.rs1len || (err != nil && err != io.EOF) { - // Read() might not read all, return - // If error (but not EOF), return - return n1, err - } - _, err = r.rs2.Seek(0, os.SEEK_SET) - if err != nil { - return n1, err - } - r.second = true - n2, err := r.rs2.Read(p[n1:]) - r.pos += int64(n2) - return n1 + n2, err - } -} - -func multiReadSeeker(rs []io.ReadSeeker, leftmost bool) (io.ReadSeeker, int64, error) { - if len(rs) == 1 { - r := rs[0] - l, err := r.Seek(0, io.SeekEnd) - if err != nil { - return nil, 0, err - } - if leftmost { - _, err = r.Seek(0, io.SeekStart) - } - return r, l, err - } else { - rs1, l1, err := multiReadSeeker(rs[:len(rs)/2], leftmost) - if err != nil { - return nil, 0, err - } - rs2, l2, err := multiReadSeeker(rs[len(rs)/2:], false) - if err != nil { - return nil, 0, err - } - return &doubleReadSeeker{rs1, rs2, l1, l2, false, 0}, l1 + l2, nil - } -} - -type emptyReadSeeker struct{} - -func (r *emptyReadSeeker) Read(p []byte) (n int, err error) { - return 0, io.EOF -} - -func (r *emptyReadSeeker) Seek(offset int64, whence int) (int64, error) { - return 0, io.EOF -} - -// newMultiReadSeeker returns a ReadSeeker that's the logical concatenation of the provided -// input readseekers. After calling this method the initial position is set to the -// beginning of the first ReadSeeker. At the end of a ReadSeeker, Read always advances -// to the beginning of the next ReadSeeker and returns EOF at the end of the last ReadSeeker. -// Seek can be used over the sum of lengths of all readseekers. -// -// When a newMultiReadSeeker is used, no Read and Seek operations should be made on -// its ReadSeeker components and the length of the readseekers should not change. -// Also, users should make no assumption on the state of individual readseekers -// while the newMultiReadSeeker is used. -func newMultiReadSeeker(rs ...io.ReadSeeker) io.ReadSeeker { - if len(rs) == 0 { - return &emptyReadSeeker{} - } - r, _, err := multiReadSeeker(rs, true) - if err != nil { - return &emptyReadSeeker{} - } - return r -} From 625abb58e4bf914687b59834c46508298f1c61f0 Mon Sep 17 00:00:00 2001 From: Anik Bhattacharjee Date: Wed, 29 Jan 2025 16:05:44 -0500 Subject: [PATCH 07/15] include checkPrecoditions check --- .../storage/http_precoditions_check.go | 224 ++++++++++++++++++ catalogd/internal/storage/localdir.go | 6 + 2 files changed, 230 insertions(+) create mode 100644 catalogd/internal/storage/http_precoditions_check.go diff --git a/catalogd/internal/storage/http_precoditions_check.go b/catalogd/internal/storage/http_precoditions_check.go new file mode 100644 index 000000000..9caba92dd --- /dev/null +++ b/catalogd/internal/storage/http_precoditions_check.go @@ -0,0 +1,224 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +// +// Source: Originally from Go's net/http/fs.go +// https://cs.opensource.google/go/go/+/master:src/net/http/fs.go + +package storage + +import ( + "net/http" + "net/textproto" + "strings" + "time" +) + +type condResult int + +const ( + condNone condResult = iota + condTrue + condFalse +) + +// checkPreconditions evaluates request preconditions and reports whether a precondition +// resulted in sending StatusNotModified or StatusPreconditionFailed. +func checkPreconditions(w http.ResponseWriter, r *http.Request, modtime time.Time) (done bool) { + // This function carefully follows RFC 7232 section 6. + ch := checkIfMatch(r) + if ch == condNone { + ch = checkIfUnmodifiedSince(r, modtime) + } + if ch == condFalse { + w.WriteHeader(http.StatusPreconditionFailed) + return true + } + switch checkIfNoneMatch(r) { + case condFalse: + if r.Method == "GET" || r.Method == "HEAD" { + writeNotModified(w) + return true + } else { + w.WriteHeader(http.StatusPreconditionFailed) + return true + } + case condNone: + if checkIfModifiedSince(r, w, modtime) == condFalse { + writeNotModified(w) + return true + } + } + return false +} + +func checkIfModifiedSince(r *http.Request, w http.ResponseWriter, modtime time.Time) condResult { + ims := r.Header.Get("If-Modified-Since") + if ims == "" || isZeroTime(modtime) { + return condTrue + } + t, err := ParseTime(ims) + if err != nil { + httpError(w, err) + return condNone + } + // The Last-Modified header truncates sub-second precision so + // the modtime needs to be truncated too. + modtime = modtime.Truncate(time.Second) + if modtime.Compare(t) <= 0 { + return condFalse + } + return condTrue +} + +func checkIfUnmodifiedSince(r *http.Request, modtime time.Time) condResult { + ius := r.Header.Get("If-Unmodified-Since") + if ius == "" || isZeroTime(modtime) { + return condNone + } + t, err := ParseTime(ius) + if err != nil { + return condNone + } + + // The Last-Modified header truncates sub-second precision so + // the modtime needs to be truncated too. + modtime = modtime.Truncate(time.Second) + if ret := modtime.Compare(t); ret <= 0 { + return condTrue + } + return condFalse +} + +// TimeFormat is the time format to use when generating times in HTTP +// headers. It is like [time.RFC1123] but hard-codes GMT as the time +// zone. The time being formatted must be in UTC for Format to +// generate the correct format. +// +// For parsing this time format, see [ParseTime]. +const TimeFormat = "Mon, 02 Jan 2006 15:04:05 GMT" + +var ( + unixEpochTime = time.Unix(0, 0) + timeFormats = []string{ + TimeFormat, + time.RFC850, + time.ANSIC, + } +) + +// isZeroTime reports whether t is obviously unspecified (either zero or Unix()=0). +func isZeroTime(t time.Time) bool { + return t.IsZero() || t.Equal(unixEpochTime) +} + +func writeNotModified(w http.ResponseWriter) { + // RFC 7232 section 4.1: + // a sender SHOULD NOT generate representation metadata other than the + // above listed fields unless said metadata exists for the purpose of + // guiding cache updates (e.g., Last-Modified might be useful if the + // response does not have an ETag field). + h := w.Header() + delete(h, "Content-Type") + delete(h, "Content-Length") + delete(h, "Content-Encoding") + if h.Get("Etag") != "" { + delete(h, "Last-Modified") + } + w.WriteHeader(http.StatusNotModified) +} + +func checkIfNoneMatch(r *http.Request) condResult { + inm := r.Header.Get("If-None-Match") + if inm == "" { + return condNone + } + buf := inm + for { + buf = textproto.TrimString(buf) + if len(buf) == 0 { + break + } + if buf[0] == ',' { + buf = buf[1:] + continue + } + if buf[0] == '*' { + return condFalse + } + etag, remain := scanETag(buf) + if etag == "" { + break + } + buf = remain + } + return condTrue +} + +// ParseTime parses a time header (such as the Date: header), +// trying each of the three formats allowed by HTTP/1.1: +// [TimeFormat], [time.RFC850], and [time.ANSIC]. +func ParseTime(text string) (t time.Time, err error) { + for _, layout := range timeFormats { + t, err = time.Parse(layout, text) + if err == nil { + return + } + } + return +} + +func checkIfMatch(r *http.Request) condResult { + im := r.Header.Get("If-Match") + if im == "" { + return condNone + } + for { + im = textproto.TrimString(im) + if len(im) == 0 { + break + } + if im[0] == ',' { + im = im[1:] + continue + } + if im[0] == '*' { + return condTrue + } + etag, remain := scanETag(im) + if etag == "" { + break + } + im = remain + } + + return condFalse +} + +// scanETag determines if a syntactically valid ETag is present at s. If so, +// the ETag and remaining text after consuming ETag is returned. Otherwise, +// it returns "", "". +func scanETag(s string) (etag string, remain string) { + s = textproto.TrimString(s) + start := 0 + if strings.HasPrefix(s, "W/") { + start = 2 + } + if len(s[start:]) < 2 || s[start] != '"' { + return "", "" + } + // ETag is either W/"text" or "text". + // See RFC 7232 2.3. + for i := start + 1; i < len(s); i++ { + c := s[i] + switch { + // Character values allowed in ETags. + case c == 0x21 || c >= 0x23 && c <= 0x7E || c >= 0x80: + case c == '"': + return s[:i+1], s[i+1:] + default: + return "", "" + } + } + return "", "" +} diff --git a/catalogd/internal/storage/localdir.go b/catalogd/internal/storage/localdir.go index bfd45bab9..e4942053d 100644 --- a/catalogd/internal/storage/localdir.go +++ b/catalogd/internal/storage/localdir.go @@ -217,6 +217,12 @@ func (s *LocalDirV1) handleV1Query(w http.ResponseWriter, r *http.Request) { } defer catalogFile.Close() + w.Header().Set("Last-Modified", catalogStat.ModTime().UTC().Format(TimeFormat)) + if checkPreconditions(w, r, catalogStat.ModTime()) { + w.WriteHeader(http.StatusNotModified) + return + } + schema := r.URL.Query().Get("schema") pkg := r.URL.Query().Get("package") name := r.URL.Query().Get("name") From cb0fc736992fac2875fc4b72d4673c592802c9a7 Mon Sep 17 00:00:00 2001 From: Anik Bhattacharjee Date: Fri, 31 Jan 2025 15:48:42 -0500 Subject: [PATCH 08/15] add comments --- catalogd/internal/storage/index.go | 13 +++++++++++++ catalogd/internal/storage/localdir.go | 15 +++++++++++---- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/catalogd/internal/storage/index.go b/catalogd/internal/storage/index.go index 02f14d11b..2f5b74812 100644 --- a/catalogd/internal/storage/index.go +++ b/catalogd/internal/storage/index.go @@ -12,12 +12,25 @@ import ( "github.com/operator-framework/operator-registry/alpha/declcfg" ) +// index is an index of sections of an FBC file used to lookup FBC blobs that +// match any combination of their schema, package, and name fields. + +// This index strikes a balance between space and performance. It indexes each field +// separately, and performs logical set intersections at lookup time in order to implement +// a multi-parameter query. +// +// Note: it is permissible to change the indexing algorithm later if it is necessary to +// tune the space / performance tradeoff. However care should be taken to ensure +// that the actual content returned by the index remains identical, as users of the index +// may be sensitive to differences introduced by index algorithm changes (e.g. if the +// order of the returned sections changes). type index struct { BySchema map[string][]section `json:"by_schema"` ByPackage map[string][]section `json:"by_package"` ByName map[string][]section `json:"by_name"` } +// A section is the byte offset and length of an FBC blob within the file. type section struct { offset int64 length int64 diff --git a/catalogd/internal/storage/localdir.go b/catalogd/internal/storage/localdir.go index e4942053d..d4145908e 100644 --- a/catalogd/internal/storage/localdir.go +++ b/catalogd/internal/storage/localdir.go @@ -22,15 +22,22 @@ import ( // LocalDirV1 is a storage Instance. When Storing a new FBC contained in // fs.FS, the content is first written to a temporary file, after which -// it is copied to its final destination in RootDir/catalogName/. This is -// done so that clients accessing the content stored in RootDir/catalogName have -// atomic view of the content for a catalog. +// it is copied to its final destination in RootDir/.jsonl. This is +// done so that clients accessing the content stored in RootDir/.json1 +// have an atomic view of the content for a catalog. type LocalDirV1 struct { RootDir string RootURL *url.URL EnableQueryHandler bool - m sync.RWMutex + m sync.RWMutex + // this singleflight Group is used in `getIndex()`` to handle concurrent HTTP requests + // optimally. With the use of this slightflight group, the index is loaded from disk + // once per concurrent group of HTTP requests being handled by the query handler. + // The single flight instance gives us a way to load the index from disk exactly once + // per concurrent group of callers, and then let every concurrent caller have access to + // the loaded index. This avoids lots of unnecessary open/decode/close cycles when concurrent + // requests are being handled, which improves overall performance and decreases response latency. sf singleflight.Group } From eed3c39f80e6c2d4585f2a4ebcc16adc0861b668 Mon Sep 17 00:00:00 2001 From: Anik Bhattacharjee Date: Fri, 31 Jan 2025 16:18:16 -0500 Subject: [PATCH 09/15] code cleanup --- catalogd/internal/serverutil/serverutil.go | 1 - .../internal/serverutil/serverutil_test.go | 5 +++- .../storage/http_precoditions_check.go | 2 +- catalogd/internal/storage/index.go | 4 +-- catalogd/internal/storage/localdir.go | 21 +++++++-------- catalogd/internal/storage/localdir_test.go | 27 ++++++++++--------- 6 files changed, 30 insertions(+), 30 deletions(-) diff --git a/catalogd/internal/serverutil/serverutil.go b/catalogd/internal/serverutil/serverutil.go index b87bd1c0a..4c3653266 100644 --- a/catalogd/internal/serverutil/serverutil.go +++ b/catalogd/internal/serverutil/serverutil.go @@ -95,7 +95,6 @@ func logrLoggingHandler(l logr.Logger, handler http.Handler) http.Handler { } func storageServerHandlerWrapped(mgr ctrl.Manager, cfg CatalogServerConfig) http.Handler { - handler := cfg.LocalStorage.StorageServerHandler() handler = gzhttp.GzipHandler(handler) handler = catalogdmetrics.AddMetricsToHandler(handler) diff --git a/catalogd/internal/serverutil/serverutil_test.go b/catalogd/internal/serverutil/serverutil_test.go index 89938d82a..de5c2b03b 100644 --- a/catalogd/internal/serverutil/serverutil_test.go +++ b/catalogd/internal/serverutil/serverutil_test.go @@ -234,7 +234,10 @@ type mockStorageInstance struct { func (m *mockStorageInstance) StorageServerHandler() http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte(m.content)) + _, err := w.Write([]byte(m.content)) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } }) } diff --git a/catalogd/internal/storage/http_precoditions_check.go b/catalogd/internal/storage/http_precoditions_check.go index 9caba92dd..1a3705f72 100644 --- a/catalogd/internal/storage/http_precoditions_check.go +++ b/catalogd/internal/storage/http_precoditions_check.go @@ -24,7 +24,7 @@ const ( // checkPreconditions evaluates request preconditions and reports whether a precondition // resulted in sending StatusNotModified or StatusPreconditionFailed. -func checkPreconditions(w http.ResponseWriter, r *http.Request, modtime time.Time) (done bool) { +func checkPreconditions(w http.ResponseWriter, r *http.Request, modtime time.Time) bool { // This function carefully follows RFC 7232 section 6. ch := checkIfMatch(r) if ch == condNone { diff --git a/catalogd/internal/storage/index.go b/catalogd/internal/storage/index.go index 2f5b74812..2c854e7bd 100644 --- a/catalogd/internal/storage/index.go +++ b/catalogd/internal/storage/index.go @@ -119,7 +119,7 @@ func (i *index) getSectionSet(schema, packageName, name string) sets.Set[section } } -func newIndex(metasChan <-chan *declcfg.Meta) (*index, error) { +func newIndex(metasChan <-chan *declcfg.Meta) *index { idx := &index{ BySchema: make(map[string][]section), ByPackage: make(map[string][]section), @@ -142,5 +142,5 @@ func newIndex(metasChan <-chan *declcfg.Meta) (*index, error) { idx.ByName[meta.Name] = append(idx.ByName[meta.Name], s) } } - return idx, nil + return idx } diff --git a/catalogd/internal/storage/localdir.go b/catalogd/internal/storage/localdir.go index d4145908e..04a616a6a 100644 --- a/catalogd/internal/storage/localdir.go +++ b/catalogd/internal/storage/localdir.go @@ -42,8 +42,8 @@ type LocalDirV1 struct { } var ( - _ Instance = &LocalDirV1{} - ErrInvalidParams = errors.New("invalid parameters") + _ Instance = (*LocalDirV1)(nil) + errInvalidParams = errors.New("invalid parameters") ) func (s *LocalDirV1) Store(ctx context.Context, catalog string, fsys fs.FS) error { @@ -169,10 +169,7 @@ func storeCatalogData(catalogDir string, metas <-chan *declcfg.Meta) error { } func storeIndexData(catalogDir string, metas <-chan *declcfg.Meta) error { - idx, err := newIndex(metas) - if err != nil { - return err - } + idx := newIndex(metas) f, err := os.Create(catalogIndexFilePath(catalogDir)) if err != nil { @@ -209,7 +206,7 @@ func (s *LocalDirV1) handleV1All(w http.ResponseWriter, r *http.Request) { httpError(w, err) return } - serveJsonLines(w, r, catalogStat.ModTime(), catalogFile) + serveJSONLines(w, r, catalogStat.ModTime(), catalogFile) } func (s *LocalDirV1) handleV1Query(w http.ResponseWriter, r *http.Request) { @@ -236,7 +233,7 @@ func (s *LocalDirV1) handleV1Query(w http.ResponseWriter, r *http.Request) { if schema == "" && pkg == "" && name == "" { // If no parameters are provided, return the entire catalog (this is the same as /api/v1/all) - serveJsonLines(w, r, catalogStat.ModTime(), catalogFile) + serveJSONLines(w, r, catalogStat.ModTime(), catalogFile) return } idx, err := s.getIndex(catalog) @@ -249,7 +246,7 @@ func (s *LocalDirV1) handleV1Query(w http.ResponseWriter, r *http.Request) { httpError(w, fs.ErrNotExist) return } - serveJsonLinesQuery(w, indexReader) + serveJSONLinesQuery(w, indexReader) } func (s *LocalDirV1) catalogData(catalog string) (*os.File, os.FileInfo, error) { @@ -271,7 +268,7 @@ func httpError(w http.ResponseWriter, err error) { code = http.StatusNotFound case errors.Is(err, fs.ErrPermission): code = http.StatusForbidden - case errors.Is(err, ErrInvalidParams): + case errors.Is(err, errInvalidParams): code = http.StatusBadRequest default: code = http.StatusInternalServerError @@ -279,12 +276,12 @@ func httpError(w http.ResponseWriter, err error) { http.Error(w, fmt.Sprintf("%d %s", code, http.StatusText(code)), code) } -func serveJsonLines(w http.ResponseWriter, r *http.Request, modTime time.Time, rs io.ReadSeeker) { +func serveJSONLines(w http.ResponseWriter, r *http.Request, modTime time.Time, rs io.ReadSeeker) { w.Header().Add("Content-Type", "application/jsonl") http.ServeContent(w, r, "", modTime, rs) } -func serveJsonLinesQuery(w http.ResponseWriter, rs io.Reader) { +func serveJSONLinesQuery(w http.ResponseWriter, rs io.Reader) { w.Header().Add("Content-Type", "application/jsonl") _, err := io.Copy(w, rs) if err != nil { diff --git a/catalogd/internal/storage/localdir_test.go b/catalogd/internal/storage/localdir_test.go index df7e9dcb0..589a654fc 100644 --- a/catalogd/internal/storage/localdir_test.go +++ b/catalogd/internal/storage/localdir_test.go @@ -310,24 +310,24 @@ func TestQueryEndpoint(t *testing.T) { expectedStatusCode: http.StatusOK, expectedContent: `{"image":"quaydock.io/namespace/bundle:0.0.3","name":"bundle.v0.0.1","package":"webhook_operator_test","properties":[{"type":"olm.bundle.object","value":{"data":"dW5pbXBvcnRhbnQK"}},{"type":"some.other","value":{"data":"arbitrary-info"}}],"relatedImages":[{"image":"testimage:latest","name":"test"}],"schema":"olm.bundle"}`, }, - // { - // name: "valid query for package schema for a package that does not exist", - // queryParams: "?schema=olm.package&name=not-present", - // expectedStatusCode: http.StatusOK, - // expectedContent: "", - // }, + { + name: "valid query for package schema for a package that does not exist", + queryParams: "?schema=olm.package&name=not-present", + expectedStatusCode: http.StatusOK, + expectedContent: "", + }, { name: "valid query with package and name", queryParams: "?package=webhook_operator_test&name=bundle.v0.0.1", expectedStatusCode: http.StatusOK, expectedContent: `{"image":"quaydock.io/namespace/bundle:0.0.3","name":"bundle.v0.0.1","package":"webhook_operator_test","properties":[{"type":"olm.bundle.object","value":{"data":"dW5pbXBvcnRhbnQK"}},{"type":"some.other","value":{"data":"arbitrary-info"}}],"relatedImages":[{"image":"testimage:latest","name":"test"}],"schema":"olm.bundle"}`, }, - // { - // name: "invalid query with non-existent schema", - // queryParams: "?schema=non_existent_schema", - // expectedStatusCode: http.StatusNotFound, - // expectedContent: "400 Bad Request", - // }, + { + name: "query with non-existent schema", + queryParams: "?schema=non_existent_schema", + expectedStatusCode: http.StatusOK, + expectedContent: "", + }, { name: "cached response with If-Modified-Since", queryParams: "?schema=olm.package", @@ -346,6 +346,7 @@ func TestQueryEndpoint(t *testing.T) { // for the actual request resp, err := http.DefaultClient.Do(req) require.NoError(t, err) + resp.Body.Close() req.Header.Set("If-Modified-Since", resp.Header.Get("Last-Modified")) } resp, err := http.DefaultClient.Do(req) @@ -409,7 +410,7 @@ func TestServerLoadHandling(t *testing.T) { } for _, resp := range responses { require.Equal(t, http.StatusOK, resp.StatusCode) - require.Equal(t, resp.Header.Get("Content-Type"), "application/jsonl") + require.Equal(t, "application/jsonl", resp.Header.Get("Content-Type")) resp.Body.Close() } }, From ee38cbeec343f1dcf628889b6f244a1f2f58c74a Mon Sep 17 00:00:00 2001 From: Anik Bhattacharjee Date: Fri, 31 Jan 2025 16:36:43 -0500 Subject: [PATCH 10/15] cleanup index.Get's signature --- catalogd/internal/storage/index.go | 4 ++-- catalogd/internal/storage/localdir.go | 6 +----- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/catalogd/internal/storage/index.go b/catalogd/internal/storage/index.go index 2c854e7bd..8af3dc0b9 100644 --- a/catalogd/internal/storage/index.go +++ b/catalogd/internal/storage/index.go @@ -64,7 +64,7 @@ func (i index) Size() int64 { return int64(size) } -func (i index) Get(r io.ReaderAt, schema, packageName, name string) (io.Reader, bool) { +func (i index) Get(r io.ReaderAt, schema, packageName, name string) io.Reader { sectionSet := i.getSectionSet(schema, packageName, name) sections := sectionSet.UnsortedList() @@ -77,7 +77,7 @@ func (i index) Get(r io.ReaderAt, schema, packageName, name string) (io.Reader, sr := io.NewSectionReader(r, s.offset, s.length) srs = append(srs, sr) } - return io.MultiReader(srs...), true + return io.MultiReader(srs...) } func (i *index) getSectionSet(schema, packageName, name string) sets.Set[section] { diff --git a/catalogd/internal/storage/localdir.go b/catalogd/internal/storage/localdir.go index 04a616a6a..6bffdfcdb 100644 --- a/catalogd/internal/storage/localdir.go +++ b/catalogd/internal/storage/localdir.go @@ -241,11 +241,7 @@ func (s *LocalDirV1) handleV1Query(w http.ResponseWriter, r *http.Request) { httpError(w, err) return } - indexReader, ok := idx.Get(catalogFile, schema, pkg, name) - if !ok { - httpError(w, fs.ErrNotExist) - return - } + indexReader := idx.Get(catalogFile, schema, pkg, name) serveJSONLinesQuery(w, indexReader) } From f8b7a64a32e9447d4e711b6f0e89cca887097add Mon Sep 17 00:00:00 2001 From: Anik Bhattacharjee Date: Fri, 31 Jan 2025 17:17:02 -0500 Subject: [PATCH 11/15] only allow GET/HEAD methods --- catalogd/internal/storage/localdir.go | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/catalogd/internal/storage/localdir.go b/catalogd/internal/storage/localdir.go index 6bffdfcdb..6942d6fe2 100644 --- a/catalogd/internal/storage/localdir.go +++ b/catalogd/internal/storage/localdir.go @@ -16,6 +16,7 @@ import ( "golang.org/x/sync/errgroup" "golang.org/x/sync/singleflight" + "k8s.io/apimachinery/pkg/util/sets" "github.com/operator-framework/operator-registry/alpha/declcfg" ) @@ -193,7 +194,17 @@ func (s *LocalDirV1) StorageServerHandler() http.Handler { if s.EnableQueryHandler { mux.HandleFunc(s.RootURL.JoinPath("{catalog}", "api", "v1", "query").Path, s.handleV1Query) } - return mux + allowedMethodsHandler := func(next http.Handler, allowedMethods ...string) http.Handler { + allowedMethodSet := sets.New[string](allowedMethods...) + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !allowedMethodSet.Has(r.Method) { + http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) + return + } + next.ServeHTTP(w, r) + }) + } + return allowedMethodsHandler(mux, http.MethodGet, http.MethodHead) } func (s *LocalDirV1) handleV1All(w http.ResponseWriter, r *http.Request) { @@ -242,7 +253,7 @@ func (s *LocalDirV1) handleV1Query(w http.ResponseWriter, r *http.Request) { return } indexReader := idx.Get(catalogFile, schema, pkg, name) - serveJSONLinesQuery(w, indexReader) + serveJSONLinesQuery(w, r, indexReader) } func (s *LocalDirV1) catalogData(catalog string) (*os.File, os.FileInfo, error) { @@ -277,8 +288,13 @@ func serveJSONLines(w http.ResponseWriter, r *http.Request, modTime time.Time, r http.ServeContent(w, r, "", modTime, rs) } -func serveJSONLinesQuery(w http.ResponseWriter, rs io.Reader) { +func serveJSONLinesQuery(w http.ResponseWriter, r *http.Request, rs io.Reader) { w.Header().Add("Content-Type", "application/jsonl") + // Copy the content of the reader to the response writer + // only if it's a Get request + if r.Method == http.MethodHead { + return + } _, err := io.Copy(w, rs) if err != nil { httpError(w, err) From 55333ba4f3df0057d145c8ac605b21103d65e55c Mon Sep 17 00:00:00 2001 From: Anik Bhattacharjee Date: Fri, 31 Jan 2025 17:52:59 -0500 Subject: [PATCH 12/15] refractor serverJSONLines --- catalogd/internal/storage/localdir.go | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/catalogd/internal/storage/localdir.go b/catalogd/internal/storage/localdir.go index 6942d6fe2..6b426a1d8 100644 --- a/catalogd/internal/storage/localdir.go +++ b/catalogd/internal/storage/localdir.go @@ -12,7 +12,6 @@ import ( "os" "path/filepath" "sync" - "time" "golang.org/x/sync/errgroup" "golang.org/x/sync/singleflight" @@ -217,7 +216,8 @@ func (s *LocalDirV1) handleV1All(w http.ResponseWriter, r *http.Request) { httpError(w, err) return } - serveJSONLines(w, r, catalogStat.ModTime(), catalogFile) + w.Header().Add("Content-Type", "application/jsonl") + http.ServeContent(w, r, "", catalogStat.ModTime(), catalogFile) } func (s *LocalDirV1) handleV1Query(w http.ResponseWriter, r *http.Request) { @@ -244,7 +244,8 @@ func (s *LocalDirV1) handleV1Query(w http.ResponseWriter, r *http.Request) { if schema == "" && pkg == "" && name == "" { // If no parameters are provided, return the entire catalog (this is the same as /api/v1/all) - serveJSONLines(w, r, catalogStat.ModTime(), catalogFile) + w.Header().Add("Content-Type", "application/jsonl") + http.ServeContent(w, r, "", catalogStat.ModTime(), catalogFile) return } idx, err := s.getIndex(catalog) @@ -253,7 +254,7 @@ func (s *LocalDirV1) handleV1Query(w http.ResponseWriter, r *http.Request) { return } indexReader := idx.Get(catalogFile, schema, pkg, name) - serveJSONLinesQuery(w, r, indexReader) + serveJSONLines(w, r, indexReader) } func (s *LocalDirV1) catalogData(catalog string) (*os.File, os.FileInfo, error) { @@ -283,12 +284,7 @@ func httpError(w http.ResponseWriter, err error) { http.Error(w, fmt.Sprintf("%d %s", code, http.StatusText(code)), code) } -func serveJSONLines(w http.ResponseWriter, r *http.Request, modTime time.Time, rs io.ReadSeeker) { - w.Header().Add("Content-Type", "application/jsonl") - http.ServeContent(w, r, "", modTime, rs) -} - -func serveJSONLinesQuery(w http.ResponseWriter, r *http.Request, rs io.Reader) { +func serveJSONLines(w http.ResponseWriter, r *http.Request, rs io.Reader) { w.Header().Add("Content-Type", "application/jsonl") // Copy the content of the reader to the response writer // only if it's a Get request From 6ae66cd0b51f04fdd362e899d5e06f049d14c360 Mon Sep 17 00:00:00 2001 From: Anik Bhattacharjee Date: Mon, 3 Feb 2025 09:28:23 -0500 Subject: [PATCH 13/15] replace static test variable --- catalogd/internal/serverutil/serverutil.go | 1 - .../internal/serverutil/serverutil_test.go | 139 +----------------- 2 files changed, 6 insertions(+), 134 deletions(-) diff --git a/catalogd/internal/serverutil/serverutil.go b/catalogd/internal/serverutil/serverutil.go index 4c3653266..912bd6fbc 100644 --- a/catalogd/internal/serverutil/serverutil.go +++ b/catalogd/internal/serverutil/serverutil.go @@ -102,5 +102,4 @@ func storageServerHandlerWrapped(mgr ctrl.Manager, cfg CatalogServerConfig) http l := mgr.GetLogger().WithName("catalogd-http-server") handler = logrLoggingHandler(l, handler) return handler - } diff --git a/catalogd/internal/serverutil/serverutil_test.go b/catalogd/internal/serverutil/serverutil_test.go index de5c2b03b..f4d224345 100644 --- a/catalogd/internal/serverutil/serverutil_test.go +++ b/catalogd/internal/serverutil/serverutil_test.go @@ -7,6 +7,7 @@ import ( "io/fs" "net/http" "net/http/httptest" + "strings" "testing" "github.com/stretchr/testify/require" @@ -14,6 +15,9 @@ import ( ) func TestStorageServerHandlerWrapped_Gzip(t *testing.T) { + var generatedJSON = func(size int) string { + return "{\"data\":\"" + strings.Repeat("test data ", size) + "\"}" + } tests := []struct { name string acceptEncoding string @@ -24,7 +28,7 @@ func TestStorageServerHandlerWrapped_Gzip(t *testing.T) { { name: "compresses large response when client accepts gzip", acceptEncoding: "gzip", - responseContent: testCompressableJSON, + responseContent: generatedJSON(1000), expectCompressed: true, expectedStatus: http.StatusOK, }, @@ -38,7 +42,7 @@ func TestStorageServerHandlerWrapped_Gzip(t *testing.T) { { name: "does not compress when client doesn't accept gzip", acceptEncoding: "", - responseContent: testCompressableJSON, + responseContent: generatedJSON(1000), expectCompressed: false, expectedStatus: http.StatusOK, }, @@ -96,137 +100,6 @@ func TestStorageServerHandlerWrapped_Gzip(t *testing.T) { } } -const testCompressableJSON = `{ - "defaultChannel": "stable-v6.x", - "name": "cockroachdb", - "schema": "olm.package" - } - { - "entries": [ - { - "name": "cockroachdb.v5.0.3" - }, - { - "name": "cockroachdb.v5.0.4", - "replaces": "cockroachdb.v5.0.3" - } - ], - "name": "stable-5.x", - "package": "cockroachdb", - "schema": "olm.channel" - } - { - "entries": [ - { - "name": "cockroachdb.v6.0.0", - "skipRange": "<6.0.0" - } - ], - "name": "stable-v6.x", - "package": "cockroachdb", - "schema": "olm.channel" - } - { - "image": "quay.io/openshift-community-operators/cockroachdb@sha256:a5d4f4467250074216eb1ba1c36e06a3ab797d81c431427fc2aca97ecaf4e9d8", - "name": "cockroachdb.v5.0.3", - "package": "cockroachdb", - "properties": [ - { - "type": "olm.gvk", - "value": { - "group": "charts.operatorhub.io", - "kind": "Cockroachdb", - "version": "v1alpha1" - } - }, - { - "type": "olm.package", - "value": { - "packageName": "cockroachdb", - "version": "5.0.3" - } - } - ], - "relatedImages": [ - { - "name": "", - "image": "quay.io/helmoperators/cockroachdb:v5.0.3" - }, - { - "name": "", - "image": "quay.io/openshift-community-operators/cockroachdb@sha256:a5d4f4467250074216eb1ba1c36e06a3ab797d81c431427fc2aca97ecaf4e9d8" - } - ], - "schema": "olm.bundle" - } - { - "image": "quay.io/openshift-community-operators/cockroachdb@sha256:f42337e7b85a46d83c94694638e2312e10ca16a03542399a65ba783c94a32b63", - "name": "cockroachdb.v5.0.4", - "package": "cockroachdb", - "properties": [ - { - "type": "olm.gvk", - "value": { - "group": "charts.operatorhub.io", - "kind": "Cockroachdb", - "version": "v1alpha1" - } - }, - { - "type": "olm.package", - "value": { - "packageName": "cockroachdb", - "version": "5.0.4" - } - } - ], - "relatedImages": [ - { - "name": "", - "image": "quay.io/helmoperators/cockroachdb:v5.0.4" - }, - { - "name": "", - "image": "quay.io/openshift-community-operators/cockroachdb@sha256:f42337e7b85a46d83c94694638e2312e10ca16a03542399a65ba783c94a32b63" - } - ], - "schema": "olm.bundle" - } - { - "image": "quay.io/openshift-community-operators/cockroachdb@sha256:d3016b1507515fc7712f9c47fd9082baf9ccb070aaab58ed0ef6e5abdedde8ba", - "name": "cockroachdb.v6.0.0", - "package": "cockroachdb", - "properties": [ - { - "type": "olm.gvk", - "value": { - "group": "charts.operatorhub.io", - "kind": "Cockroachdb", - "version": "v1alpha1" - } - }, - { - "type": "olm.package", - "value": { - "packageName": "cockroachdb", - "version": "6.0.0" - } - } - ], - "relatedImages": [ - { - "name": "", - "image": "quay.io/cockroachdb/cockroach-helm-operator:6.0.0" - }, - { - "name": "", - "image": "quay.io/openshift-community-operators/cockroachdb@sha256:d3016b1507515fc7712f9c47fd9082baf9ccb070aaab58ed0ef6e5abdedde8ba" - } - ], - "schema": "olm.bundle" - } - ` - // mockStorageInstance implements storage.Instance interface for testing type mockStorageInstance struct { content string From 870f38fbe2ccb5073616b730c88a1447df7ea93a Mon Sep 17 00:00:00 2001 From: Anik Bhattacharjee Date: Mon, 3 Feb 2025 15:35:52 -0500 Subject: [PATCH 14/15] fix unit test --- catalogd/internal/serverutil/serverutil.go | 5 ++--- catalogd/internal/serverutil/serverutil_test.go | 6 ++---- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/catalogd/internal/serverutil/serverutil.go b/catalogd/internal/serverutil/serverutil.go index 912bd6fbc..2d84b46d1 100644 --- a/catalogd/internal/serverutil/serverutil.go +++ b/catalogd/internal/serverutil/serverutil.go @@ -48,7 +48,7 @@ func AddCatalogServerToManager(mgr ctrl.Manager, cfg CatalogServerConfig, tlsFil OnlyServeWhenLeader: true, Server: &http.Server{ Addr: cfg.CatalogAddr, - Handler: storageServerHandlerWrapped(mgr, cfg), + Handler: storageServerHandlerWrapped(mgr.GetLogger().WithName("catalogd-http-server"), cfg), ReadTimeout: 5 * time.Second, // TODO: Revert this to 10 seconds if/when the API // evolves to have significantly smaller responses @@ -94,12 +94,11 @@ func logrLoggingHandler(l logr.Logger, handler http.Handler) http.Handler { }) } -func storageServerHandlerWrapped(mgr ctrl.Manager, cfg CatalogServerConfig) http.Handler { +func storageServerHandlerWrapped(l logr.Logger, cfg CatalogServerConfig) http.Handler { handler := cfg.LocalStorage.StorageServerHandler() handler = gzhttp.GzipHandler(handler) handler = catalogdmetrics.AddMetricsToHandler(handler) - l := mgr.GetLogger().WithName("catalogd-http-server") handler = logrLoggingHandler(l, handler) return handler } diff --git a/catalogd/internal/serverutil/serverutil_test.go b/catalogd/internal/serverutil/serverutil_test.go index f4d224345..183bf97f1 100644 --- a/catalogd/internal/serverutil/serverutil_test.go +++ b/catalogd/internal/serverutil/serverutil_test.go @@ -10,8 +10,8 @@ import ( "strings" "testing" + "github.com/go-logr/logr" "github.com/stretchr/testify/require" - ctrl "sigs.k8s.io/controller-runtime" ) func TestStorageServerHandlerWrapped_Gzip(t *testing.T) { @@ -54,13 +54,11 @@ func TestStorageServerHandlerWrapped_Gzip(t *testing.T) { mockStorage := &mockStorageInstance{ content: tt.responseContent, } - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{}) - require.NoError(t, err) cfg := CatalogServerConfig{ LocalStorage: mockStorage, } - handler := storageServerHandlerWrapped(mgr, cfg) + handler := storageServerHandlerWrapped(logr.Logger{}, cfg) // Create test request req := httptest.NewRequest("GET", "/test", nil) From 5b46ad1df41cc5c735deda21d5eab52512af575a Mon Sep 17 00:00:00 2001 From: Anik Bhattacharjee Date: Mon, 3 Feb 2025 16:34:04 -0500 Subject: [PATCH 15/15] refractor getIndex() (and other lint issues) --- .../storage/http_precoditions_check.go | 2 + catalogd/internal/storage/index.go | 51 +++++++------------ catalogd/internal/storage/localdir.go | 7 ++- catalogd/internal/storage/localdir_test.go | 2 + 4 files changed, 26 insertions(+), 36 deletions(-) diff --git a/catalogd/internal/storage/http_precoditions_check.go b/catalogd/internal/storage/http_precoditions_check.go index 1a3705f72..c4ee083ed 100644 --- a/catalogd/internal/storage/http_precoditions_check.go +++ b/catalogd/internal/storage/http_precoditions_check.go @@ -158,6 +158,7 @@ func checkIfNoneMatch(r *http.Request) condResult { // ParseTime parses a time header (such as the Date: header), // trying each of the three formats allowed by HTTP/1.1: // [TimeFormat], [time.RFC850], and [time.ANSIC]. +// nolint:nonamedreturns func ParseTime(text string) (t time.Time, err error) { for _, layout := range timeFormats { t, err = time.Parse(layout, text) @@ -198,6 +199,7 @@ func checkIfMatch(r *http.Request) condResult { // scanETag determines if a syntactically valid ETag is present at s. If so, // the ETag and remaining text after consuming ETag is returned. Otherwise, // it returns "", "". +// nolint:nonamedreturns func scanETag(s string) (etag string, remain string) { s = textproto.TrimString(s) start := 0 diff --git a/catalogd/internal/storage/index.go b/catalogd/internal/storage/index.go index 8af3dc0b9..c40ac4b3e 100644 --- a/catalogd/internal/storage/index.go +++ b/catalogd/internal/storage/index.go @@ -81,42 +81,29 @@ func (i index) Get(r io.ReaderAt, schema, packageName, name string) io.Reader { } func (i *index) getSectionSet(schema, packageName, name string) sets.Set[section] { + // Initialize with all sections if no schema specified, otherwise use schema sections + sectionSet := sets.New[section]() if schema == "" { - if packageName == "" { - if name == "" { - sectionSet := sets.New[section]() - for _, s := range i.BySchema { - sectionSet.Insert(s...) - } - return sectionSet - } else { - return sets.New[section](i.ByName[name]...) - } - } else { - sectionSet := sets.New[section](i.ByPackage[packageName]...) - if name == "" { - return sectionSet - } else { - return sectionSet.Intersection(sets.New[section](i.ByName[name]...)) - } + for _, s := range i.BySchema { + sectionSet.Insert(s...) } } else { - sectionSet := sets.New[section](i.BySchema[schema]...) - if packageName == "" { - if name == "" { - return sectionSet - } else { - return sectionSet.Intersection(sets.New[section](i.ByName[name]...)) - } - } else { - sectionSet = sectionSet.Intersection(sets.New[section](i.ByPackage[packageName]...)) - if name == "" { - return sectionSet - } else { - return sectionSet.Intersection(sets.New[section](i.ByName[name]...)) - } - } + sectionSet = sets.New[section](i.BySchema[schema]...) + } + + // Filter by package name if specified + if packageName != "" { + packageSections := sets.New[section](i.ByPackage[packageName]...) + sectionSet = sectionSet.Intersection(packageSections) } + + // Filter by name if specified + if name != "" { + nameSections := sets.New[section](i.ByName[name]...) + sectionSet = sectionSet.Intersection(nameSections) + } + + return sectionSet } func newIndex(metasChan <-chan *declcfg.Meta) *index { diff --git a/catalogd/internal/storage/localdir.go b/catalogd/internal/storage/localdir.go index 6b426a1d8..b34428940 100644 --- a/catalogd/internal/storage/localdir.go +++ b/catalogd/internal/storage/localdir.go @@ -64,10 +64,9 @@ func (s *LocalDirV1) Store(ctx context.Context, catalog string, fsys fs.FS) erro storeMetaFuncs = append(storeMetaFuncs, storeIndexData) } - var ( - eg, egCtx = errgroup.WithContext(ctx) - metaChans []chan *declcfg.Meta - ) + eg, egCtx := errgroup.WithContext(ctx) + metaChans := []chan *declcfg.Meta{} + for range storeMetaFuncs { metaChans = append(metaChans, make(chan *declcfg.Meta, 1)) } diff --git a/catalogd/internal/storage/localdir_test.go b/catalogd/internal/storage/localdir_test.go index 589a654fc..400d2236e 100644 --- a/catalogd/internal/storage/localdir_test.go +++ b/catalogd/internal/storage/localdir_test.go @@ -485,6 +485,8 @@ func TestServerLoadHandling(t *testing.T) { wg.Add(1) go func(idx int) { defer wg.Done() + // nolint:bodyclose + // the response body is closed in the validateFunc resp, err := http.DefaultClient.Do(requests[idx]) responses[idx] = resp errs[idx] = err