Skip to content

Removed gRPC server to communicate between querier and BucketStore #2324

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 10 additions & 42 deletions pkg/querier/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package querier

import (
"context"
"io"
"math"
"sort"

Expand All @@ -16,7 +15,6 @@ import (
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/weaveworks/common/logging"
"github.com/weaveworks/common/user"
"google.golang.org/grpc/metadata"

"github.com/cortexproject/cortex/pkg/querier/series"
"github.com/cortexproject/cortex/pkg/storage/tsdb"
Expand Down Expand Up @@ -74,23 +72,19 @@ func (b *BlockQueryable) Querier(ctx context.Context, mint, maxt int64) (storage
}

return &blocksQuerier{
ctx: ctx,
client: b.us.client,
mint: mint,
maxt: maxt,
userID: userID,
ctx: ctx,
mint: mint,
maxt: maxt,
userID: userID,
userStores: b.us,
}, nil
}

type blocksQuerier struct {
ctx context.Context
client storepb.StoreClient
mint, maxt int64
userID string
}

func (b *blocksQuerier) addUserToContext(ctx context.Context) context.Context {
return metadata.AppendToOutgoingContext(ctx, "user", b.userID)
userStores *UserStore
}

func (b *blocksQuerier) Select(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
Expand All @@ -107,9 +101,10 @@ func (b *blocksQuerier) SelectSorted(sp *storage.SelectParams, matchers ...*labe
}
converted := convertMatchersToLabelMatcher(matchers)

ctx = b.addUserToContext(ctx)
// returned series are sorted
seriesClient, err := b.client.Series(ctx, &storepb.SeriesRequest{
// Returned series are sorted.
// No processing of responses is done here. Dealing with multiple responses
// for the same series and overlapping chunks is done in blockQuerierSeriesSet.
series, warnings, err := b.userStores.Series(ctx, b.userID, &storepb.SeriesRequest{
MinTime: mint,
MaxTime: maxt,
Matchers: converted,
Expand All @@ -119,33 +114,6 @@ func (b *blocksQuerier) SelectSorted(sp *storage.SelectParams, matchers ...*labe
return nil, nil, promql.ErrStorage{Err: err}
}

series := []*storepb.Series(nil)
warnings := storage.Warnings(nil)

// only very basic processing of responses is done here. Dealing with multiple responses
// for the same series and overlapping chunks is done in blockQuerierSeriesSet.
for {
resp, err := seriesClient.Recv()
if err == io.EOF {
break
}
if err != nil {
return nil, nil, promql.ErrStorage{Err: err}
}

// response may either contain series or warning. If it's warning, we get nil here.
s := resp.GetSeries()
if s != nil {
series = append(series, s)
}

// collect and return warnings too
w := resp.GetWarning()
if w != "" {
warnings = append(warnings, errors.New(w))
}
}

return &blockQuerierSeriesSet{
series: series,
}, warnings, nil
Expand Down
128 changes: 12 additions & 116 deletions pkg/querier/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package querier

import (
"context"
"fmt"
"io"
"net"
"path/filepath"
"strings"
"sync"
Expand All @@ -15,15 +13,14 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/storage"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/weaveworks/common/logging"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/util"
Expand All @@ -38,7 +35,6 @@ type UserStore struct {
logger log.Logger
cfg tsdb.Config
bucket objstore.Bucket
client storepb.StoreClient
logLevel logging.Level
bucketStoreMetrics *tsdbBucketStoreMetrics
indexCacheMetrics prometheus.Collector
Expand All @@ -50,8 +46,6 @@ type UserStore struct {
storesMu sync.RWMutex
stores map[string]*store.BucketStore

serv *grpc.Server

// Metrics.
syncTimes prometheus.Histogram
}
Expand Down Expand Up @@ -85,31 +79,11 @@ func NewUserStore(cfg tsdb.Config, bucketClient objstore.Bucket, logLevel loggin
registerer.MustRegister(u.bucketStoreMetrics, u.indexCacheMetrics)
}

u.Service = services.NewBasicService(u.starting, u.syncStoresLoop, u.stopping)
u.Service = services.NewBasicService(u.starting, u.syncStoresLoop, nil)
return u, nil
}

func (u *UserStore) starting(ctx context.Context) error {
u.serv = grpc.NewServer()
storepb.RegisterStoreServer(u.serv, u)
l, err := net.Listen("tcp", "")
if err != nil {
return err
}
go func() {
err := u.serv.Serve(l)
if err != nil {
level.Error(u.logger).Log("msg", "block store grpc server failed", "err", err)
}
}()

cc, err := grpc.Dial(l.Addr().String(), grpc.WithInsecure())
if err != nil {
return err
}

u.client = storepb.NewStoreClient(cc)

if u.cfg.BucketStore.SyncInterval > 0 {
// Run an initial blocks sync, required in order to be able to serve queries.
if err := u.initialSync(ctx); err != nil {
Expand All @@ -120,11 +94,6 @@ func (u *UserStore) starting(ctx context.Context) error {
return nil
}

func (u *UserStore) stopping(_ error) error {
u.serv.Stop()
return nil
}

// initialSync iterates over the storage bucket creating user bucket stores, and calling initialSync on each of them
func (u *UserStore) initialSync(ctx context.Context) error {
level.Info(u.logger).Log("msg", "synchronizing TSDB blocks for all users")
Expand Down Expand Up @@ -241,96 +210,23 @@ func (u *UserStore) syncUserStores(ctx context.Context, f func(context.Context,
return err
}

// Info makes an info request to the underlying user store
func (u *UserStore) Info(ctx context.Context, req *storepb.InfoRequest) (*storepb.InfoResponse, error) {
log, ctx := spanlogger.New(ctx, "UserStore.Info")
defer log.Span.Finish()

md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, fmt.Errorf("no metadata")
}

v := md.Get("user")
if len(v) == 0 {
return nil, fmt.Errorf("no userID")
}

store := u.getStore(v[0])
if store == nil {
return nil, nil
}

return store.Info(ctx, req)
}

// Series makes a series request to the underlying user store
func (u *UserStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
log, ctx := spanlogger.New(srv.Context(), "UserStore.Series")
// Series makes a series request to the underlying user store.
func (u *UserStore) Series(ctx context.Context, userID string, req *storepb.SeriesRequest) ([]*storepb.Series, storage.Warnings, error) {
log, ctx := spanlogger.New(ctx, "UserStore.Series")
defer log.Span.Finish()

md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return fmt.Errorf("no metadata")
}

v := md.Get("user")
if len(v) == 0 {
return fmt.Errorf("no userID")
}

store := u.getStore(v[0])
if store == nil {
return nil
}

return store.Series(req, srv)
}

// LabelNames makes a labelnames request to the underlying user store
func (u *UserStore) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
log, ctx := spanlogger.New(ctx, "UserStore.LabelNames")
defer log.Span.Finish()

md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, fmt.Errorf("no metadata")
}

v := md.Get("user")
if len(v) == 0 {
return nil, fmt.Errorf("no userID")
}

store := u.getStore(v[0])
store := u.getStore(userID)
if store == nil {
return nil, nil
return nil, nil, nil
}

return store.LabelNames(ctx, req)
}

// LabelValues makes a labelvalues request to the underlying user store
func (u *UserStore) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) {
log, ctx := spanlogger.New(ctx, "UserStore.LabelValues")
defer log.Span.Finish()

md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, fmt.Errorf("no metadata")
}

v := md.Get("user")
if len(v) == 0 {
return nil, fmt.Errorf("no userID")
}

store := u.getStore(v[0])
if store == nil {
return nil, nil
srv := newBucketStoreSeriesServer(ctx)
err := store.Series(req, srv)
if err != nil {
return nil, nil, err
}

return store.LabelValues(ctx, req)
return srv.SeriesSet, srv.Warnings, nil
}

func (u *UserStore) getStore(userID string) *store.BucketStore {
Expand Down
42 changes: 42 additions & 0 deletions pkg/querier/blocks_bucket_store_inmemory_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package querier

import (
"context"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/storage"
"github.com/thanos-io/thanos/pkg/store/storepb"
)

// bucketStoreSeriesServer is an fake in-memory gRPC server used to
// call Thanos BucketStore.Series() without having to go through the
// gRPC networking stack.
type bucketStoreSeriesServer struct {
// This field just exist to pseudo-implement the unused methods of the interface.
storepb.Store_SeriesServer

ctx context.Context

SeriesSet []*storepb.Series
Warnings storage.Warnings
}

func newBucketStoreSeriesServer(ctx context.Context) *bucketStoreSeriesServer {
return &bucketStoreSeriesServer{ctx: ctx}
}

func (s *bucketStoreSeriesServer) Send(r *storepb.SeriesResponse) error {
if r.GetWarning() != "" {
s.Warnings = append(s.Warnings, errors.New(r.GetWarning()))
}

if r.GetSeries() != nil {
s.SeriesSet = append(s.SeriesSet, r.GetSeries())
}

return nil
}

func (s *bucketStoreSeriesServer) Context() context.Context {
return s.ctx
}