diff --git a/pkg/querier/block.go b/pkg/querier/block.go index 9771d5ef6ca..ff8730f5d65 100644 --- a/pkg/querier/block.go +++ b/pkg/querier/block.go @@ -2,7 +2,6 @@ package querier import ( "context" - "io" "math" "sort" @@ -16,7 +15,6 @@ import ( "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/weaveworks/common/logging" "github.com/weaveworks/common/user" - "google.golang.org/grpc/metadata" "github.com/cortexproject/cortex/pkg/querier/series" "github.com/cortexproject/cortex/pkg/storage/tsdb" @@ -74,23 +72,19 @@ func (b *BlockQueryable) Querier(ctx context.Context, mint, maxt int64) (storage } return &blocksQuerier{ - ctx: ctx, - client: b.us.client, - mint: mint, - maxt: maxt, - userID: userID, + ctx: ctx, + mint: mint, + maxt: maxt, + userID: userID, + userStores: b.us, }, nil } type blocksQuerier struct { ctx context.Context - client storepb.StoreClient mint, maxt int64 userID string -} - -func (b *blocksQuerier) addUserToContext(ctx context.Context) context.Context { - return metadata.AppendToOutgoingContext(ctx, "user", b.userID) + userStores *UserStore } func (b *blocksQuerier) Select(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { @@ -107,9 +101,10 @@ func (b *blocksQuerier) SelectSorted(sp *storage.SelectParams, matchers ...*labe } converted := convertMatchersToLabelMatcher(matchers) - ctx = b.addUserToContext(ctx) - // returned series are sorted - seriesClient, err := b.client.Series(ctx, &storepb.SeriesRequest{ + // Returned series are sorted. + // No processing of responses is done here. Dealing with multiple responses + // for the same series and overlapping chunks is done in blockQuerierSeriesSet. + series, warnings, err := b.userStores.Series(ctx, b.userID, &storepb.SeriesRequest{ MinTime: mint, MaxTime: maxt, Matchers: converted, @@ -119,33 +114,6 @@ func (b *blocksQuerier) SelectSorted(sp *storage.SelectParams, matchers ...*labe return nil, nil, promql.ErrStorage{Err: err} } - series := []*storepb.Series(nil) - warnings := storage.Warnings(nil) - - // only very basic processing of responses is done here. Dealing with multiple responses - // for the same series and overlapping chunks is done in blockQuerierSeriesSet. - for { - resp, err := seriesClient.Recv() - if err == io.EOF { - break - } - if err != nil { - return nil, nil, promql.ErrStorage{Err: err} - } - - // response may either contain series or warning. If it's warning, we get nil here. - s := resp.GetSeries() - if s != nil { - series = append(series, s) - } - - // collect and return warnings too - w := resp.GetWarning() - if w != "" { - warnings = append(warnings, errors.New(w)) - } - } - return &blockQuerierSeriesSet{ series: series, }, warnings, nil diff --git a/pkg/querier/block_store.go b/pkg/querier/block_store.go index 76499f6be56..3443a38767a 100644 --- a/pkg/querier/block_store.go +++ b/pkg/querier/block_store.go @@ -2,9 +2,7 @@ package querier import ( "context" - "fmt" "io" - "net" "path/filepath" "strings" "sync" @@ -15,6 +13,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/storage" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/runutil" @@ -22,8 +21,6 @@ import ( storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/weaveworks/common/logging" - "google.golang.org/grpc" - "google.golang.org/grpc/metadata" "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/util" @@ -38,7 +35,6 @@ type UserStore struct { logger log.Logger cfg tsdb.Config bucket objstore.Bucket - client storepb.StoreClient logLevel logging.Level bucketStoreMetrics *tsdbBucketStoreMetrics indexCacheMetrics prometheus.Collector @@ -50,8 +46,6 @@ type UserStore struct { storesMu sync.RWMutex stores map[string]*store.BucketStore - serv *grpc.Server - // Metrics. syncTimes prometheus.Histogram } @@ -85,31 +79,11 @@ func NewUserStore(cfg tsdb.Config, bucketClient objstore.Bucket, logLevel loggin registerer.MustRegister(u.bucketStoreMetrics, u.indexCacheMetrics) } - u.Service = services.NewBasicService(u.starting, u.syncStoresLoop, u.stopping) + u.Service = services.NewBasicService(u.starting, u.syncStoresLoop, nil) return u, nil } func (u *UserStore) starting(ctx context.Context) error { - u.serv = grpc.NewServer() - storepb.RegisterStoreServer(u.serv, u) - l, err := net.Listen("tcp", "") - if err != nil { - return err - } - go func() { - err := u.serv.Serve(l) - if err != nil { - level.Error(u.logger).Log("msg", "block store grpc server failed", "err", err) - } - }() - - cc, err := grpc.Dial(l.Addr().String(), grpc.WithInsecure()) - if err != nil { - return err - } - - u.client = storepb.NewStoreClient(cc) - if u.cfg.BucketStore.SyncInterval > 0 { // Run an initial blocks sync, required in order to be able to serve queries. if err := u.initialSync(ctx); err != nil { @@ -120,11 +94,6 @@ func (u *UserStore) starting(ctx context.Context) error { return nil } -func (u *UserStore) stopping(_ error) error { - u.serv.Stop() - return nil -} - // initialSync iterates over the storage bucket creating user bucket stores, and calling initialSync on each of them func (u *UserStore) initialSync(ctx context.Context) error { level.Info(u.logger).Log("msg", "synchronizing TSDB blocks for all users") @@ -241,96 +210,23 @@ func (u *UserStore) syncUserStores(ctx context.Context, f func(context.Context, return err } -// Info makes an info request to the underlying user store -func (u *UserStore) Info(ctx context.Context, req *storepb.InfoRequest) (*storepb.InfoResponse, error) { - log, ctx := spanlogger.New(ctx, "UserStore.Info") - defer log.Span.Finish() - - md, ok := metadata.FromIncomingContext(ctx) - if !ok { - return nil, fmt.Errorf("no metadata") - } - - v := md.Get("user") - if len(v) == 0 { - return nil, fmt.Errorf("no userID") - } - - store := u.getStore(v[0]) - if store == nil { - return nil, nil - } - - return store.Info(ctx, req) -} - -// Series makes a series request to the underlying user store -func (u *UserStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { - log, ctx := spanlogger.New(srv.Context(), "UserStore.Series") +// Series makes a series request to the underlying user store. +func (u *UserStore) Series(ctx context.Context, userID string, req *storepb.SeriesRequest) ([]*storepb.Series, storage.Warnings, error) { + log, ctx := spanlogger.New(ctx, "UserStore.Series") defer log.Span.Finish() - md, ok := metadata.FromIncomingContext(ctx) - if !ok { - return fmt.Errorf("no metadata") - } - - v := md.Get("user") - if len(v) == 0 { - return fmt.Errorf("no userID") - } - - store := u.getStore(v[0]) - if store == nil { - return nil - } - - return store.Series(req, srv) -} - -// LabelNames makes a labelnames request to the underlying user store -func (u *UserStore) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { - log, ctx := spanlogger.New(ctx, "UserStore.LabelNames") - defer log.Span.Finish() - - md, ok := metadata.FromIncomingContext(ctx) - if !ok { - return nil, fmt.Errorf("no metadata") - } - - v := md.Get("user") - if len(v) == 0 { - return nil, fmt.Errorf("no userID") - } - - store := u.getStore(v[0]) + store := u.getStore(userID) if store == nil { - return nil, nil + return nil, nil, nil } - return store.LabelNames(ctx, req) -} - -// LabelValues makes a labelvalues request to the underlying user store -func (u *UserStore) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { - log, ctx := spanlogger.New(ctx, "UserStore.LabelValues") - defer log.Span.Finish() - - md, ok := metadata.FromIncomingContext(ctx) - if !ok { - return nil, fmt.Errorf("no metadata") - } - - v := md.Get("user") - if len(v) == 0 { - return nil, fmt.Errorf("no userID") - } - - store := u.getStore(v[0]) - if store == nil { - return nil, nil + srv := newBucketStoreSeriesServer(ctx) + err := store.Series(req, srv) + if err != nil { + return nil, nil, err } - return store.LabelValues(ctx, req) + return srv.SeriesSet, srv.Warnings, nil } func (u *UserStore) getStore(userID string) *store.BucketStore { diff --git a/pkg/querier/blocks_bucket_store_inmemory_server.go b/pkg/querier/blocks_bucket_store_inmemory_server.go new file mode 100644 index 00000000000..cb4862739bc --- /dev/null +++ b/pkg/querier/blocks_bucket_store_inmemory_server.go @@ -0,0 +1,42 @@ +package querier + +import ( + "context" + + "github.com/pkg/errors" + "github.com/prometheus/prometheus/storage" + "github.com/thanos-io/thanos/pkg/store/storepb" +) + +// bucketStoreSeriesServer is an fake in-memory gRPC server used to +// call Thanos BucketStore.Series() without having to go through the +// gRPC networking stack. +type bucketStoreSeriesServer struct { + // This field just exist to pseudo-implement the unused methods of the interface. + storepb.Store_SeriesServer + + ctx context.Context + + SeriesSet []*storepb.Series + Warnings storage.Warnings +} + +func newBucketStoreSeriesServer(ctx context.Context) *bucketStoreSeriesServer { + return &bucketStoreSeriesServer{ctx: ctx} +} + +func (s *bucketStoreSeriesServer) Send(r *storepb.SeriesResponse) error { + if r.GetWarning() != "" { + s.Warnings = append(s.Warnings, errors.New(r.GetWarning())) + } + + if r.GetSeries() != nil { + s.SeriesSet = append(s.SeriesSet, r.GetSeries()) + } + + return nil +} + +func (s *bucketStoreSeriesServer) Context() context.Context { + return s.ctx +}