Skip to content

Commit 570d2cc

Browse files
committed
Signing the other fields of the requests
Signed-off-by: Alan Protasio <[email protected]>
1 parent 0aeaf46 commit 570d2cc

File tree

9 files changed

+198
-40
lines changed

9 files changed

+198
-40
lines changed

docs/configuration/config-file-reference.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2193,7 +2193,7 @@ ha_tracker:
21932193
21942194
# EXPERIMENTAL: If enabled, sign the write request between distributors and
21952195
# ingesters.
2196-
# CLI flag: -distributor.sign_write_requests
2196+
# CLI flag: -distributor.sign-write-requests
21972197
[sign_write_requests: <boolean> | default = false]
21982198
21992199
ring:

pkg/cortex/cortex.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -383,8 +383,8 @@ func (t *Cortex) setupGRPCHeaderForwarding() {
383383
}
384384

385385
func (t *Cortex) setupRequestSigning() {
386-
if t.Cfg.Distributor.EnableSignWriteRequests {
387-
util_log.WarnExperimentalUse("Distributor EnableSignWriteRequests")
386+
if t.Cfg.Distributor.SignWriteRequestsEnabled {
387+
util_log.WarnExperimentalUse("Distributor SignWriteRequestsEnabled")
388388
t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, grpcclient.UnarySigningServerInterceptor)
389389
}
390390
}

pkg/cortex/modules.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ func (t *Cortex) initOverridesExporter() (services.Service, error) {
203203
func (t *Cortex) initDistributorService() (serv services.Service, err error) {
204204
t.Cfg.Distributor.DistributorRing.ListenPort = t.Cfg.Server.GRPCListenPort
205205
t.Cfg.Distributor.ShuffleShardingLookbackPeriod = t.Cfg.Querier.ShuffleShardingIngestersLookbackPeriod
206-
t.Cfg.IngesterClient.GRPCClientConfig.EnableSignRequests = t.Cfg.Distributor.EnableSignWriteRequests
206+
t.Cfg.IngesterClient.GRPCClientConfig.SignWriteRequestsEnabled = t.Cfg.Distributor.SignWriteRequestsEnabled
207207

208208
// Check whether the distributor can join the distributors ring, which is
209209
// whenever it's not running as an internal dependency (ie. querier or

pkg/cortexpb/extensions.go

Lines changed: 72 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package cortexpb
33
import (
44
"context"
55
"fmt"
6+
"strconv"
67
"sync"
78

89
"github.com/cespare/xxhash/v2"
@@ -13,13 +14,56 @@ import (
1314
const maxBufferSize = 1024
1415
const signVersion = "v1"
1516

16-
var byteSlicePool = sync.Pool{
17+
var signerPool = sync.Pool{
1718
New: func() interface{} {
18-
b := make([]byte, 0, maxBufferSize)
19-
return &b
19+
return newSigner()
2020
},
2121
}
2222

23+
type signer struct {
24+
h *xxhash.Digest
25+
b []byte
26+
optimized bool
27+
}
28+
29+
func newSigner() *signer {
30+
s := &signer{
31+
h: xxhash.New(),
32+
b: make([]byte, 0, maxBufferSize),
33+
}
34+
s.Reset()
35+
return s
36+
}
37+
38+
func (s *signer) Reset() {
39+
s.h.Reset()
40+
s.b = s.b[:0]
41+
s.optimized = true
42+
}
43+
44+
func (s *signer) WriteString(val string) {
45+
switch {
46+
case !s.optimized:
47+
s.h.WriteString(val)
48+
case len(s.b)+len(val) > cap(s.b):
49+
// If labels val does not fit in the []byte we fall back to not allocate the whole entry.
50+
s.h.Write(s.b)
51+
s.h.WriteString(val)
52+
s.optimized = false
53+
default:
54+
// Use xxhash.Sum64(b) for fast path as it's faster.
55+
s.b = append(s.b, val...)
56+
}
57+
}
58+
59+
func (s *signer) Sum64() uint64 {
60+
if s.optimized {
61+
return xxhash.Sum64(s.b)
62+
}
63+
64+
return s.h.Sum64()
65+
}
66+
2367
func (w *WriteRequest) VerifySign(ctx context.Context, signature string) (bool, error) {
2468
s, err := w.Sign(ctx)
2569
return s == signature, err
@@ -31,29 +75,33 @@ func (w *WriteRequest) Sign(ctx context.Context) (string, error) {
3175
return "", err
3276
}
3377

34-
// Use xxhash.Sum64(b) for fast path as it's faster.
35-
bp := byteSlicePool.Get().(*[]byte)
36-
b := (*bp)[:0]
37-
defer byteSlicePool.Put(bp)
38-
b = append(b, u...)
39-
40-
for _, s := range w.Timeseries {
41-
for i, v := range s.Labels {
42-
if len(b)+len(v.Name)+len(v.Value)+2 >= cap(b) {
43-
// If labels entry is {maxBufferSize}+ do not allocate whole entry.
44-
h := xxhash.New()
45-
_, _ = h.Write(b)
46-
for _, v := range s.Labels[i:] {
47-
_, _ = h.WriteString(v.Name)
48-
_, _ = h.WriteString(v.Value)
49-
}
50-
return fmt.Sprintf("%v/%v", signVersion, h.Sum64()), nil
51-
}
78+
s := signerPool.Get().(*signer)
79+
defer func() {
80+
s.Reset()
81+
signerPool.Put(s)
82+
}()
83+
s.WriteString(u)
5284

53-
b = append(b, v.Name...)
54-
b = append(b, v.Value...)
85+
for _, md := range w.Metadata {
86+
s.WriteString(strconv.Itoa(int(md.Type)))
87+
s.WriteString(md.MetricFamilyName)
88+
s.WriteString(md.Help)
89+
s.WriteString(md.Unit)
90+
}
91+
92+
for _, ts := range w.Timeseries {
93+
for _, lbl := range ts.Labels {
94+
s.WriteString(lbl.Name)
95+
s.WriteString(lbl.Value)
96+
}
97+
98+
for _, ex := range ts.Exemplars {
99+
for _, lbl := range ex.Labels {
100+
s.WriteString(lbl.Name)
101+
s.WriteString(lbl.Value)
102+
}
55103
}
56104
}
57105

58-
return fmt.Sprintf("%v/%v", signVersion, xxhash.Sum64(b)), nil
106+
return fmt.Sprintf("%v/%v", signVersion, s.Sum64()), nil
59107
}

pkg/cortexpb/extensions_test.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package cortexpb
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"testing"
8+
9+
"github.com/stretchr/testify/require"
10+
"github.com/weaveworks/common/user"
11+
)
12+
13+
func TestWriteRequest_Sign(t *testing.T) {
14+
ctx := context.Background()
15+
ctx = user.InjectOrgID(ctx, "user-1")
16+
17+
tests := map[string]struct {
18+
w *WriteRequest
19+
expectedSign string
20+
}{
21+
"small write with exemplar": {
22+
w: createWriteRequest(10, true, "family1", "help1", "unit"),
23+
expectedSign: "v1/9125893422459502203",
24+
},
25+
"small write with exemplar and changed md": {
26+
w: createWriteRequest(10, true, "family2", "help1", "unit"),
27+
expectedSign: "v1/18044786562323437562",
28+
},
29+
"small write without exemplar": {
30+
w: createWriteRequest(10, false, "family1", "help1", "unit"),
31+
expectedSign: "v1/7697478040597284323",
32+
},
33+
"big write with exemplar": {
34+
w: createWriteRequest(10000, true, "family1", "help1", "unit"),
35+
expectedSign: "v1/18402783317092766507",
36+
},
37+
"big write without exemplar": {
38+
w: createWriteRequest(10000, false, "family1", "help1", "unit"),
39+
expectedSign: "v1/14973071954515615892",
40+
},
41+
}
42+
43+
for name, tc := range tests {
44+
t.Run(name, func(t *testing.T) {
45+
// running multiple times in parallel to make sure no race
46+
itNumber := 1000
47+
wg := sync.WaitGroup{}
48+
wg.Add(itNumber)
49+
for i := 0; i < itNumber; i++ {
50+
go func() {
51+
defer wg.Done()
52+
s, err := tc.w.Sign(ctx)
53+
require.NoError(t, err)
54+
// Make sure this sign doesn't change
55+
require.Equal(t, tc.expectedSign, s)
56+
}()
57+
}
58+
wg.Wait()
59+
})
60+
}
61+
}
62+
63+
func createWriteRequest(numTs int, exemplar bool, family string, help string, unit string) *WriteRequest {
64+
w := &WriteRequest{}
65+
w.Metadata = []*MetricMetadata{
66+
{
67+
MetricFamilyName: family,
68+
Help: help,
69+
Unit: unit,
70+
},
71+
}
72+
73+
for i := 0; i < numTs; i++ {
74+
w.Timeseries = append(w.Timeseries, PreallocTimeseries{
75+
TimeSeries: &TimeSeries{
76+
Labels: []LabelAdapter{
77+
{
78+
Name: fmt.Sprintf("Name-%v", i),
79+
Value: fmt.Sprintf("Value-%v", i),
80+
},
81+
},
82+
},
83+
})
84+
85+
if exemplar {
86+
w.Timeseries[i].Exemplars = []Exemplar{
87+
{
88+
Labels: []LabelAdapter{
89+
{
90+
Name: fmt.Sprintf("Ex-Name-%v", i),
91+
Value: fmt.Sprintf("Ex-Value-%v", i),
92+
},
93+
},
94+
},
95+
}
96+
}
97+
}
98+
99+
return w
100+
}

pkg/distributor/distributor.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,10 @@ type Config struct {
127127
RemoteTimeout time.Duration `yaml:"remote_timeout"`
128128
ExtraQueryDelay time.Duration `yaml:"extra_queue_delay"`
129129

130-
ShardingStrategy string `yaml:"sharding_strategy"`
131-
ShardByAllLabels bool `yaml:"shard_by_all_labels"`
132-
ExtendWrites bool `yaml:"extend_writes"`
133-
EnableSignWriteRequests bool `yaml:"sign_write_requests"`
130+
ShardingStrategy string `yaml:"sharding_strategy"`
131+
ShardByAllLabels bool `yaml:"shard_by_all_labels"`
132+
ExtendWrites bool `yaml:"extend_writes"`
133+
SignWriteRequestsEnabled bool `yaml:"sign_write_requests"`
134134

135135
// Distributors ring
136136
DistributorRing RingConfig `yaml:"ring"`
@@ -164,7 +164,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
164164
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
165165
f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
166166
f.BoolVar(&cfg.ShardByAllLabels, "distributor.shard-by-all-labels", false, "Distribute samples based on all labels, as opposed to solely by user and metric name.")
167-
f.BoolVar(&cfg.EnableSignWriteRequests, "distributor.sign_write_requests", false, "EXPERIMENTAL: If enabled, sign the write request between distributors and ingesters.")
167+
f.BoolVar(&cfg.SignWriteRequestsEnabled, "distributor.sign-write-requests", false, "EXPERIMENTAL: If enabled, sign the write request between distributors and ingesters.")
168168
f.StringVar(&cfg.ShardingStrategy, "distributor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", ")))
169169
f.BoolVar(&cfg.ExtendWrites, "distributor.extend-writes", true, "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state. It is useful to disable this along with -ingester.unregister-on-shutdown=false in order to not spread samples to extra ingesters during rolling restarts with consistent naming.")
170170

pkg/util/grpcclient/grpcclient.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ type Config struct {
2929
BackoffOnRatelimits bool `yaml:"backoff_on_ratelimits"`
3030
BackoffConfig backoff.Config `yaml:"backoff_config"`
3131

32-
TLSEnabled bool `yaml:"tls_enabled"`
33-
TLS tls.ClientConfig `yaml:",inline"`
34-
EnableSignRequests bool `yaml:"-"`
32+
TLSEnabled bool `yaml:"tls_enabled"`
33+
TLS tls.ClientConfig `yaml:",inline"`
34+
SignWriteRequestsEnabled bool `yaml:"-"`
3535
}
3636

3737
// RegisterFlags registers flags.
@@ -92,7 +92,7 @@ func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientIntercep
9292
unaryClientInterceptors = append([]grpc.UnaryClientInterceptor{NewRateLimiter(cfg)}, unaryClientInterceptors...)
9393
}
9494

95-
if cfg.EnableSignRequests {
95+
if cfg.SignWriteRequestsEnabled {
9696
unaryClientInterceptors = append(unaryClientInterceptors, UnarySigningClientInterceptor)
9797
}
9898

pkg/util/grpcclient/signing_handler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ var (
1414

1515
const (
1616
ErrDifferentSignaturePresent = errors.Error("different signature already present")
17+
ErrMultipleSignaturePresent = errors.Error("multiples signature present")
1718
ErrSignatureNotPresent = errors.Error("signature not present")
1819
ErrSignatureMismatch = errors.Error("signature mismatch")
1920
)
@@ -83,7 +84,7 @@ func UnarySigningClientInterceptor(ctx context.Context, method string, req, repl
8384
return ErrDifferentSignaturePresent
8485
}
8586
} else {
86-
return ErrDifferentSignaturePresent
87+
return ErrMultipleSignaturePresent
8788
}
8889
} else {
8990
md = md.Copy()

pkg/util/grpcclient/signing_handler_test.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func TestUnarySigningHandler(t *testing.T) {
4545
return nil, nil
4646
})
4747

48-
require.ErrorIs(t, ErrSignatureMismatch, err)
48+
require.ErrorIs(t, err, ErrSignatureMismatch)
4949

5050
// Return error when signature is not present
5151
ctx = user.InjectOrgID(context.Background(), "user-")
@@ -54,5 +54,14 @@ func TestUnarySigningHandler(t *testing.T) {
5454
return nil, nil
5555
})
5656

57-
require.ErrorIs(t, ErrSignatureNotPresent, err)
57+
require.ErrorIs(t, err, ErrSignatureNotPresent)
58+
59+
// Return error when multiples signatures are present
60+
md[reqSignHeaderName] = append(md[reqSignHeaderName], "sig1", "sig2")
61+
ctx = metadata.NewOutgoingContext(ctx, md)
62+
err = UnarySigningClientInterceptor(ctx, "", w, w, nil, func(c context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error {
63+
ctx = c
64+
return nil
65+
})
66+
require.ErrorIs(t, err, ErrMultipleSignaturePresent)
5867
}

0 commit comments

Comments
 (0)