@@ -4,15 +4,17 @@ import (
4
4
"context"
5
5
6
6
"github.com/go-kit/log"
7
+ "github.com/go-kit/log/level"
7
8
"github.com/thanos-io/thanos/pkg/querysharding"
8
9
"github.com/thanos-io/thanos/pkg/store/storepb"
9
10
10
11
cquerysharding "github.com/cortexproject/cortex/pkg/querysharding"
12
+ util_log "github.com/cortexproject/cortex/pkg/util/log"
11
13
)
12
14
13
15
func ShardByMiddleware (logger log.Logger , limits Limits , merger Merger , numShards int ) Middleware {
14
16
return MiddlewareFunc (func (next Handler ) Handler {
15
- return shardQueryByMiddleware {
17
+ return shardBy {
16
18
next : next ,
17
19
limits : limits ,
18
20
merger : merger ,
@@ -22,7 +24,7 @@ func ShardByMiddleware(logger log.Logger, limits Limits, merger Merger, numShard
22
24
})
23
25
}
24
26
25
- type shardQueryByMiddleware struct {
27
+ type shardBy struct {
26
28
next Handler
27
29
limits Limits
28
30
logger log.Logger
@@ -31,17 +33,18 @@ type shardQueryByMiddleware struct {
31
33
numShards int
32
34
}
33
35
34
- func (s shardQueryByMiddleware ) Do (ctx context.Context , r Request ) (Response , error ) {
36
+ func (s shardBy ) Do (ctx context.Context , r Request ) (Response , error ) {
37
+ logger := util_log .WithContext (ctx , s .logger )
35
38
analysis , err := s .queryAnalyzer .Analyze (r .GetQuery ())
36
39
if err != nil {
37
- return nil , err
40
+ level . Warn ( logger ). Log ( "msg" , "error sharding query" , "q" , r . GetQuery (), " err" , err )
38
41
}
39
42
40
- if ! analysis .IsShardable () {
43
+ if err != nil || ! analysis .IsShardable () {
41
44
return s .next .Do (ctx , r )
42
45
}
43
46
44
- reqs := s .shardQuery (r , analysis )
47
+ reqs := s .shardQuery (logger , r , analysis )
45
48
46
49
reqResps , err := DoRequests (ctx , s .next , reqs , s .limits )
47
50
if err != nil {
@@ -60,7 +63,7 @@ func (s shardQueryByMiddleware) Do(ctx context.Context, r Request) (Response, er
60
63
return response , nil
61
64
}
62
65
63
- func (s shardQueryByMiddleware ) shardQuery (r Request , analysis querysharding.QueryAnalysis ) []Request {
66
+ func (s shardBy ) shardQuery (l log. Logger , r Request , analysis querysharding.QueryAnalysis ) []Request {
64
67
reqs := make ([]Request , s .numShards )
65
68
for i := 0 ; i < s .numShards ; i ++ {
66
69
q , err := cquerysharding .InjectShardingInfo (r .GetQuery (), & storepb.ShardInfo {
@@ -72,7 +75,7 @@ func (s shardQueryByMiddleware) shardQuery(r Request, analysis querysharding.Que
72
75
reqs [i ] = r .WithQuery (q )
73
76
74
77
if err != nil {
75
- s . logger .Log ("error sharding query" , "q" , r .GetQuery ())
78
+ level . Warn ( l ) .Log ("msg" , " error sharding query" , "q" , r .GetQuery (), "err" , err )
76
79
return []Request {r }
77
80
}
78
81
}
0 commit comments