Skip to content

Commit 27c7ca8

Browse files
add purpose label to metric being used for tracing cassandra session and relevant tests
Signed-off-by: Sandeep Sukhani <[email protected]>
1 parent dce0c72 commit 27c7ca8

File tree

8 files changed

+167
-29
lines changed

8 files changed

+167
-29
lines changed

pkg/chunk/cassandra/fixtures.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,19 @@ func (f *fixture) Clients() (chunk.IndexClient, chunk.Client, chunk.TableClient,
3434

3535
// Get a SchemaConfig with the defaults.
3636
schemaConfig := testutils.DefaultSchemaConfig("cassandra")
37+
schemaStart := schemaConfig.Configs[0].From.String()
3738

38-
storageClient, err := NewStorageClient(cfg, schemaConfig)
39+
storageClient, err := NewStorageClient(cfg, schemaConfig, schemaStart, nil)
3940
if err != nil {
4041
return nil, nil, nil, schemaConfig, nil, err
4142
}
4243

43-
objectClient, err := NewObjectClient(cfg, schemaConfig)
44+
objectClient, err := NewObjectClient(cfg, schemaConfig, schemaStart, nil)
4445
if err != nil {
4546
return nil, nil, nil, schemaConfig, nil, err
4647
}
4748

48-
tableClient, err := NewTableClient(context.Background(), cfg)
49+
tableClient, err := NewTableClient(context.Background(), cfg, nil)
4950
if err != nil {
5051
return nil, nil, nil, schemaConfig, nil, err
5152
}

pkg/chunk/cassandra/storage_client.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func (cfg *Config) Validate() error {
8989
return nil
9090
}
9191

92-
func (cfg *Config) session(name string) (*gocql.Session, error) {
92+
func (cfg *Config) session(name, purpose string, registerer prometheus.Registerer) (*gocql.Session, error) {
9393
consistency, err := gocql.ParseConsistencyWrapper(cfg.Consistency)
9494
if err != nil {
9595
return nil, errors.WithStack(err)
@@ -107,7 +107,7 @@ func (cfg *Config) session(name string) (*gocql.Session, error) {
107107
cluster.NumConns = cfg.NumConnections
108108
cluster.Logger = log.With(pkgutil.Logger, "module", "gocql", "client", name)
109109
cluster.Registerer = prometheus.WrapRegistererWith(
110-
prometheus.Labels{"client": name}, prometheus.DefaultRegisterer)
110+
prometheus.Labels{"client": name, "purpose": purpose}, registerer)
111111
if cfg.Retries > 0 {
112112
cluster.RetryPolicy = &gocql.ExponentialBackoffRetryPolicy{
113113
NumRetries: cfg.Retries,
@@ -222,15 +222,15 @@ type StorageClient struct {
222222
}
223223

224224
// NewStorageClient returns a new StorageClient.
225-
func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig) (*StorageClient, error) {
225+
func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig, purpose string, registerer prometheus.Registerer) (*StorageClient, error) {
226226
pkgutil.WarnExperimentalUse("Cassandra Backend")
227227

228-
readSession, err := cfg.session("index-read")
228+
readSession, err := cfg.session("index-read", purpose, registerer)
229229
if err != nil {
230230
return nil, errors.WithStack(err)
231231
}
232232

233-
writeSession, err := cfg.session("index-write")
233+
writeSession, err := cfg.session("index-write", purpose, registerer)
234234
if err != nil {
235235
return nil, errors.WithStack(err)
236236
}
@@ -407,15 +407,15 @@ type ObjectClient struct {
407407
}
408408

409409
// NewObjectClient returns a new ObjectClient.
410-
func NewObjectClient(cfg Config, schemaCfg chunk.SchemaConfig) (*ObjectClient, error) {
410+
func NewObjectClient(cfg Config, schemaCfg chunk.SchemaConfig, purpose string, registerer prometheus.Registerer) (*ObjectClient, error) {
411411
pkgutil.WarnExperimentalUse("Cassandra Backend")
412412

413-
readSession, err := cfg.session("chunks-read")
413+
readSession, err := cfg.session("chunks-read", purpose, registerer)
414414
if err != nil {
415415
return nil, errors.WithStack(err)
416416
}
417417

418-
writeSession, err := cfg.session("chunks-write")
418+
writeSession, err := cfg.session("chunks-write", purpose, registerer)
419419
if err != nil {
420420
return nil, errors.WithStack(err)
421421
}

pkg/chunk/cassandra/storage_client_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package cassandra
22

33
import (
4+
"os"
45
"testing"
56

67
"github.com/gocql/gocql"
8+
"github.com/prometheus/client_golang/prometheus"
79
"github.com/stretchr/testify/assert"
810
"github.com/stretchr/testify/require"
911

@@ -79,3 +81,86 @@ func TestConfig_setClusterConfig_authWithPasswordAndPasswordFile(t *testing.T) {
7981
}
8082
assert.Error(t, cfg.Validate())
8183
}
84+
85+
type sessionDetails struct {
86+
name, purpose string
87+
}
88+
89+
func TestConfig_Session(t *testing.T) {
90+
addresses := os.Getenv("CASSANDRA_TEST_ADDRESSES")
91+
if addresses == "" {
92+
return
93+
}
94+
95+
var cfg Config
96+
flagext.DefaultValues(&cfg)
97+
cfg.Addresses = addresses
98+
cfg.Keyspace = "test"
99+
cfg.Consistency = "QUORUM"
100+
cfg.ReplicationFactor = 1
101+
102+
for _, tc := range []struct {
103+
name string
104+
sessionDetails1 sessionDetails
105+
sessionDetails2 sessionDetails
106+
panicsWithErr string
107+
}{
108+
{
109+
name: "same name and purpose",
110+
sessionDetails1: sessionDetails{
111+
name: "session-test",
112+
purpose: "foo",
113+
},
114+
sessionDetails2: sessionDetails{
115+
name: "session-test",
116+
purpose: "foo",
117+
},
118+
panicsWithErr: "duplicate metrics collector registration attempted",
119+
},
120+
{
121+
name: "same name and different purposes",
122+
sessionDetails1: sessionDetails{
123+
name: "session-test",
124+
purpose: "foo",
125+
},
126+
sessionDetails2: sessionDetails{
127+
name: "session-test",
128+
purpose: "bar",
129+
},
130+
},
131+
{
132+
name: "same name and empty purpose",
133+
sessionDetails1: sessionDetails{
134+
name: "session-test",
135+
},
136+
sessionDetails2: sessionDetails{
137+
name: "session-test",
138+
},
139+
panicsWithErr: "duplicate metrics collector registration attempted",
140+
},
141+
} {
142+
t.Run(tc.name, func(t *testing.T) {
143+
registerer := prometheus.NewRegistry()
144+
145+
session1, err := cfg.session(tc.sessionDetails1.name, tc.sessionDetails1.purpose, registerer)
146+
require.NoError(t, err)
147+
148+
defer func() {
149+
session1.Close()
150+
}()
151+
152+
if tc.panicsWithErr != "" {
153+
require.PanicsWithError(t, tc.panicsWithErr, func() {
154+
_, _ = cfg.session(tc.sessionDetails2.name, tc.sessionDetails2.purpose, registerer)
155+
})
156+
} else {
157+
session2, err := cfg.session(tc.sessionDetails2.name, tc.sessionDetails2.purpose, registerer)
158+
require.NoError(t, err)
159+
160+
defer func() {
161+
session2.Close()
162+
}()
163+
}
164+
})
165+
}
166+
}

pkg/chunk/cassandra/table_client.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/gocql/gocql"
88
"github.com/pkg/errors"
9+
"github.com/prometheus/client_golang/prometheus"
910

1011
"github.com/cortexproject/cortex/pkg/chunk"
1112
)
@@ -16,8 +17,8 @@ type tableClient struct {
1617
}
1718

1819
// NewTableClient returns a new TableClient.
19-
func NewTableClient(ctx context.Context, cfg Config) (chunk.TableClient, error) {
20-
session, err := cfg.session("table-manager")
20+
func NewTableClient(ctx context.Context, cfg Config, registerer prometheus.Registerer) (chunk.TableClient, error) {
21+
session, err := cfg.session("table-manager", "", registerer)
2122
if err != nil {
2223
return nil, errors.WithStack(err)
2324
}

pkg/chunk/schema_config.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ type DayTime struct {
4949

5050
// MarshalYAML implements yaml.Marshaller.
5151
func (d DayTime) MarshalYAML() (interface{}, error) {
52-
return d.Time.Time().Format("2006-01-02"), nil
52+
return d.String(), nil
5353
}
5454

5555
// UnmarshalYAML implements yaml.Unmarshaller.
@@ -66,6 +66,10 @@ func (d *DayTime) UnmarshalYAML(unmarshal func(interface{}) error) error {
6666
return nil
6767
}
6868

69+
func (d *DayTime) String() string {
70+
return d.Time.Time().Format("2006-01-02")
71+
}
72+
6973
// SchemaConfig contains the config for our chunk index schemas
7074
type SchemaConfig struct {
7175
Configs []PeriodConfig `yaml:"configs"`

pkg/chunk/storage/factory.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf
153153
stores := chunk.NewCompositeStore(cacheGenNumLoader)
154154

155155
for _, s := range schemaCfg.Configs {
156-
index, err := NewIndexClient(s.IndexType, cfg, schemaCfg)
156+
index, err := NewIndexClient(s.IndexType, cfg, schemaCfg, s.From.String(), reg)
157157
if err != nil {
158158
return nil, errors.Wrap(err, "error creating index client")
159159
}
@@ -163,7 +163,7 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf
163163
if objectStoreType == "" {
164164
objectStoreType = s.IndexType
165165
}
166-
chunks, err := NewChunkClient(objectStoreType, cfg, schemaCfg)
166+
chunks, err := NewChunkClient(objectStoreType, cfg, schemaCfg, s.From.String(), reg)
167167
if err != nil {
168168
return nil, errors.Wrap(err, "error creating object client")
169169
}
@@ -180,7 +180,7 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf
180180
}
181181

182182
// NewIndexClient makes a new index client of the desired type.
183-
func NewIndexClient(name string, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.IndexClient, error) {
183+
func NewIndexClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, purpose string, registerer prometheus.Registerer) (chunk.IndexClient, error) {
184184
if indexClientFactory, ok := customIndexStores[name]; ok {
185185
if indexClientFactory.indexClientFactoryFunc != nil {
186186
return indexClientFactory.indexClientFactoryFunc()
@@ -208,7 +208,7 @@ func NewIndexClient(name string, cfg Config, schemaCfg chunk.SchemaConfig) (chun
208208
cfg.GCPStorageConfig.DistributeKeys = true
209209
return gcp.NewStorageClientColumnKey(context.Background(), cfg.GCPStorageConfig, schemaCfg)
210210
case "cassandra":
211-
return cassandra.NewStorageClient(cfg.CassandraStorageConfig, schemaCfg)
211+
return cassandra.NewStorageClient(cfg.CassandraStorageConfig, schemaCfg, purpose, registerer)
212212
case "boltdb":
213213
return local.NewBoltDBIndexClient(cfg.BoltDBConfig)
214214
case "grpc-store":
@@ -219,7 +219,7 @@ func NewIndexClient(name string, cfg Config, schemaCfg chunk.SchemaConfig) (chun
219219
}
220220

221221
// NewChunkClient makes a new chunk.Client of the desired types.
222-
func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.Client, error) {
222+
func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, purpose string, registerer prometheus.Registerer) (chunk.Client, error) {
223223
switch name {
224224
case "inmemory":
225225
return chunk.NewMockStorage(), nil
@@ -245,7 +245,7 @@ func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig) (chun
245245
case "swift":
246246
return newChunkClientFromStore(openstack.NewSwiftObjectClient(cfg.Swift, chunk.DirDelim))
247247
case "cassandra":
248-
return cassandra.NewObjectClient(cfg.CassandraStorageConfig, schemaCfg)
248+
return cassandra.NewObjectClient(cfg.CassandraStorageConfig, schemaCfg, purpose, registerer)
249249
case "filesystem":
250250
store, err := local.NewFSObjectClient(cfg.FSConfig)
251251
if err != nil {
@@ -267,7 +267,7 @@ func newChunkClientFromStore(store chunk.ObjectClient, err error) (chunk.Client,
267267
}
268268

269269
// NewTableClient makes a new table client based on the configuration.
270-
func NewTableClient(name string, cfg Config) (chunk.TableClient, error) {
270+
func NewTableClient(name string, cfg Config, registerer prometheus.Registerer) (chunk.TableClient, error) {
271271
if indexClientFactory, ok := customIndexStores[name]; ok {
272272
if indexClientFactory.tableClientFactoryFunc != nil {
273273
return indexClientFactory.tableClientFactoryFunc()
@@ -289,7 +289,7 @@ func NewTableClient(name string, cfg Config) (chunk.TableClient, error) {
289289
case "gcp", "gcp-columnkey", "bigtable", "bigtable-hashed":
290290
return gcp.NewTableClient(context.Background(), cfg.GCPStorageConfig)
291291
case "cassandra":
292-
return cassandra.NewTableClient(context.Background(), cfg.CassandraStorageConfig)
292+
return cassandra.NewTableClient(context.Background(), cfg.CassandraStorageConfig, registerer)
293293
case "boltdb":
294294
return local.NewTableClient(cfg.BoltDBConfig.Directory)
295295
case "grpc-store":

pkg/chunk/storage/factory_test.go

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@ import (
55
"os"
66
"reflect"
77
"testing"
8+
"time"
89

10+
"github.com/prometheus/client_golang/prometheus"
911
"github.com/prometheus/common/model"
1012
"github.com/stretchr/testify/require"
1113

1214
"github.com/cortexproject/cortex/pkg/chunk"
15+
"github.com/cortexproject/cortex/pkg/chunk/cassandra"
1316
"github.com/cortexproject/cortex/pkg/chunk/local"
1417
"github.com/cortexproject/cortex/pkg/util/flagext"
1518
"github.com/cortexproject/cortex/pkg/util/validation"
@@ -135,15 +138,15 @@ func TestCustomIndexClient(t *testing.T) {
135138
RegisterIndexStore(tc.indexClientName, tc.indexClientFactories.indexClientFactoryFunc, tc.indexClientFactories.tableClientFactoryFunc)
136139
}
137140

138-
indexClient, err := NewIndexClient(tc.indexClientName, cfg, schemaCfg)
141+
indexClient, err := NewIndexClient(tc.indexClientName, cfg, schemaCfg, "test", nil)
139142
if tc.errorExpected {
140143
require.Error(t, err)
141144
} else {
142145
require.NoError(t, err)
143146
require.Equal(t, tc.expectedIndexClientType, reflect.TypeOf(indexClient))
144147
}
145148

146-
tableClient, err := NewTableClient(tc.indexClientName, cfg)
149+
tableClient, err := NewTableClient(tc.indexClientName, cfg, nil)
147150
if tc.errorExpected {
148151
require.Error(t, err)
149152
} else {
@@ -154,6 +157,45 @@ func TestCustomIndexClient(t *testing.T) {
154157
}
155158
}
156159

160+
func TestCassandraInMultipleSchemas(t *testing.T) {
161+
addresses := os.Getenv("CASSANDRA_TEST_ADDRESSES")
162+
if addresses == "" {
163+
return
164+
}
165+
166+
// cassandra config
167+
var cassandraCfg cassandra.Config
168+
flagext.DefaultValues(&cassandraCfg)
169+
cassandraCfg.Addresses = addresses
170+
cassandraCfg.Keyspace = "test"
171+
cassandraCfg.Consistency = "QUORUM"
172+
cassandraCfg.ReplicationFactor = 1
173+
174+
// build schema with cassandra in multiple periodic configs
175+
schemaCfg := chunk.DefaultSchemaConfig("cassandra", "v1", model.Now().Add(-7*24*time.Hour))
176+
newSchemaCfg := schemaCfg.Configs[0]
177+
newSchemaCfg.Schema = "v2"
178+
newSchemaCfg.From = chunk.DayTime{Time: model.Now()}
179+
180+
schemaCfg.Configs = append(schemaCfg.Configs, newSchemaCfg)
181+
182+
var (
183+
cfg Config
184+
storeConfig chunk.StoreConfig
185+
defaults validation.Limits
186+
)
187+
flagext.DefaultValues(&cfg, &storeConfig, &defaults)
188+
cfg.CassandraStorageConfig = cassandraCfg
189+
190+
limits, err := validation.NewOverrides(defaults, nil)
191+
require.NoError(t, err)
192+
193+
store, err := NewStore(cfg, storeConfig, schemaCfg, limits, prometheus.NewRegistry(), nil)
194+
require.NoError(t, err)
195+
196+
store.Stop()
197+
}
198+
157199
// useful for cleaning up state after tests
158200
func unregisterAllCustomIndexStores() {
159201
customIndexStores = map[string]indexStoreFactories{}

0 commit comments

Comments
 (0)