8
8
"github.com/go-kit/log"
9
9
"github.com/go-kit/log/level"
10
10
lru "github.com/hashicorp/golang-lru/v2"
11
+ "github.com/opentracing/opentracing-go"
11
12
"github.com/parquet-go/parquet-go"
12
13
"github.com/pkg/errors"
13
14
"github.com/prometheus-community/parquet-common/schema"
@@ -30,6 +31,7 @@ import (
30
31
util_log "github.com/cortexproject/cortex/pkg/util/log"
31
32
"github.com/cortexproject/cortex/pkg/util/multierror"
32
33
"github.com/cortexproject/cortex/pkg/util/services"
34
+ "github.com/cortexproject/cortex/pkg/util/spanlogger"
33
35
"github.com/cortexproject/cortex/pkg/util/validation"
34
36
)
35
37
@@ -146,6 +148,9 @@ func NewParquetQueryable(
146
148
shards := make ([]* parquet_storage.ParquetShard , len (blocks ))
147
149
errGroup := & errgroup.Group {}
148
150
151
+ log , ctx := spanlogger .New (ctx , "parquetQuerierWithFallback.OpenShards" )
152
+ defer log .Span .Finish ()
153
+
149
154
for i , block := range blocks {
150
155
errGroup .Go (func () error {
151
156
cacheKey := fmt .Sprintf ("%v-%v" , userID , block .ID )
@@ -165,7 +170,7 @@ func NewParquetQueryable(
165
170
parquet_storage .WithOptimisticReader (true ),
166
171
)
167
172
if err != nil {
168
- return err
173
+ return errors . Wrapf ( err , "failed to open parquet shard. block: %v" , block . ID . String ())
169
174
}
170
175
cache .Set (cacheKey , shard )
171
176
}
@@ -266,6 +271,9 @@ type parquetQuerierWithFallback struct {
266
271
}
267
272
268
273
func (q * parquetQuerierWithFallback ) LabelValues (ctx context.Context , name string , hints * storage.LabelHints , matchers ... * labels.Matcher ) ([]string , annotations.Annotations , error ) {
274
+ log , ctx := spanlogger .New (ctx , "parquetQuerierWithFallback.LabelValues" )
275
+ defer log .Span .Finish ()
276
+
269
277
remaining , parquet , err := q .getBlocks (ctx , q .minT , q .maxT )
270
278
defer q .incrementOpsMetric ("LabelValues" , remaining , parquet )
271
279
if err != nil {
@@ -312,6 +320,9 @@ func (q *parquetQuerierWithFallback) LabelValues(ctx context.Context, name strin
312
320
}
313
321
314
322
func (q * parquetQuerierWithFallback ) LabelNames (ctx context.Context , hints * storage.LabelHints , matchers ... * labels.Matcher ) ([]string , annotations.Annotations , error ) {
323
+ log , ctx := spanlogger .New (ctx , "parquetQuerierWithFallback.LabelNames" )
324
+ defer log .Span .Finish ()
325
+
315
326
remaining , parquet , err := q .getBlocks (ctx , q .minT , q .maxT )
316
327
defer q .incrementOpsMetric ("LabelNames" , remaining , parquet )
317
328
if err != nil {
@@ -359,6 +370,9 @@ func (q *parquetQuerierWithFallback) LabelNames(ctx context.Context, hints *stor
359
370
}
360
371
361
372
func (q * parquetQuerierWithFallback ) Select (ctx context.Context , sortSeries bool , h * storage.SelectHints , matchers ... * labels.Matcher ) storage.SeriesSet {
373
+ log , ctx := spanlogger .New (ctx , "parquetQuerierWithFallback.Select" )
374
+ defer log .Span .Finish ()
375
+
362
376
userID , err := tenant .TenantID (ctx )
363
377
if err != nil {
364
378
storage .ErrSeriesSet (err )
@@ -408,6 +422,8 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool
408
422
p := make (chan storage.SeriesSet , 1 )
409
423
promises = append (promises , p )
410
424
go func () {
425
+ span , _ := opentracing .StartSpanFromContext (ctx , "parquetQuerier.Select" )
426
+ defer span .Finish ()
411
427
p <- q .parquetQuerier .Select (InjectBlocksIntoContext (ctx , parquet ... ), sortSeries , & hints , matchers ... )
412
428
}()
413
429
}
0 commit comments