diff --git a/.errcheck-exclude b/.errcheck-exclude index d52dc39c00f..d3c9a0d7977 100644 --- a/.errcheck-exclude +++ b/.errcheck-exclude @@ -1,3 +1,5 @@ io/ioutil.WriteFile io/ioutil.ReadFile (github.com/go-kit/kit/log.Logger).Log +io.Copy +(github.com/opentracing/opentracing-go.Tracer).Inject \ No newline at end of file diff --git a/Makefile b/Makefile index 3b89c3f9306..0c3cfd01c44 100644 --- a/Makefile +++ b/Makefile @@ -122,7 +122,7 @@ protos: $(PROTO_GOS) lint: misspell -error docs - golangci-lint run --new-from-rev ed7c302fd968 --build-tags netgo --timeout=5m --enable golint --enable misspell --enable gofmt + golangci-lint run --build-tags netgo --timeout=5m --enable golint --enable misspell --enable gofmt # Validate Kubernetes spec files. Requires: # https://kubeval.instrumenta.dev diff --git a/cmd/cortex/main_test.go b/cmd/cortex/main_test.go index e6ee1f8ee8d..75ac3726044 100644 --- a/cmd/cortex/main_test.go +++ b/cmd/cortex/main_test.go @@ -161,13 +161,13 @@ func captureOutput(t *testing.T) *capturedOutput { co.wg.Add(1) go func() { defer co.wg.Done() - _, _ = io.Copy(&co.stdoutBuf, stdoutR) + io.Copy(&co.stdoutBuf, stdoutR) }() co.wg.Add(1) go func() { defer co.wg.Done() - _, _ = io.Copy(&co.stderrBuf, stderrR) + io.Copy(&co.stderrBuf, stderrR) }() return co diff --git a/cmd/test-exporter/main.go b/cmd/test-exporter/main.go index 23ae5a87941..dc9555649ce 100644 --- a/cmd/test-exporter/main.go +++ b/cmd/test-exporter/main.go @@ -52,5 +52,6 @@ func main() { })) prometheus.MustRegister(runner) - server.Run() + err = server.Run() + util.CheckFatal("running server", err) } diff --git a/pkg/alertmanager/alertmanager.go b/pkg/alertmanager/alertmanager.go index 943f0839a32..253f9371022 100644 --- a/pkg/alertmanager/alertmanager.go +++ b/pkg/alertmanager/alertmanager.go @@ -78,6 +78,8 @@ func init() { // from disk, we just ignore web-based reload signals. Config updates are // only applied externally via ApplyConfig(). case <-webReload: + default: + continue } } }() @@ -176,7 +178,7 @@ func clusterWait(p *cluster.Peer, timeout time.Duration) func() time.Duration { // ApplyConfig applies a new configuration to an Alertmanager. func (am *Alertmanager) ApplyConfig(userID string, conf *config.Config) error { - templateFiles := make([]string, len(conf.Templates), len(conf.Templates)) + templateFiles := make([]string, len(conf.Templates)) if len(conf.Templates) > 0 { for i, t := range conf.Templates { templateFiles[i] = filepath.Join(am.cfg.DataDir, "templates", userID, t) diff --git a/pkg/chunk/aws/dynamodb_storage_client.go b/pkg/chunk/aws/dynamodb_storage_client.go index 0657b7ac27f..88d71bd84e3 100644 --- a/pkg/chunk/aws/dynamodb_storage_client.go +++ b/pkg/chunk/aws/dynamodb_storage_client.go @@ -245,7 +245,7 @@ func (a dynamoDBStorageClient) BatchWrite(ctx context.Context, input chunk.Write if awsErr, ok := err.(awserr.Error); ok && ((awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException) || request.Retryable()) { logWriteRetry(ctx, requests) unprocessed.TakeReqs(requests, -1) - a.writeThrottle.WaitN(ctx, len(requests)) + _ = a.writeThrottle.WaitN(ctx, len(requests)) backoff.Wait() continue } else if ok && awsErr.Code() == validationException { @@ -269,7 +269,7 @@ func (a dynamoDBStorageClient) BatchWrite(ctx context.Context, input chunk.Write unprocessedItems := dynamoDBWriteBatch(resp.UnprocessedItems) if len(unprocessedItems) > 0 { logWriteRetry(ctx, unprocessedItems) - a.writeThrottle.WaitN(ctx, unprocessedItems.Len()) + _ = a.writeThrottle.WaitN(ctx, unprocessedItems.Len()) unprocessed.TakeReqs(unprocessedItems, -1) } diff --git a/pkg/chunk/aws/dynamodb_storage_client_test.go b/pkg/chunk/aws/dynamodb_storage_client_test.go index 00b08eacade..1607b354be1 100644 --- a/pkg/chunk/aws/dynamodb_storage_client_test.go +++ b/pkg/chunk/aws/dynamodb_storage_client_test.go @@ -17,7 +17,8 @@ const ( func TestChunksPartialError(t *testing.T) { fixture := dynamoDBFixture(0, 10, 20) - defer fixture.Teardown() + defer testutils.TeardownFixture(t, fixture) + _, client, err := testutils.Setup(fixture, tableName) require.NoError(t, err) diff --git a/pkg/chunk/aws/dynamodb_table_client.go b/pkg/chunk/aws/dynamodb_table_client.go index 8a41ae4f705..84ae8c3c229 100644 --- a/pkg/chunk/aws/dynamodb_table_client.go +++ b/pkg/chunk/aws/dynamodb_table_client.go @@ -80,7 +80,7 @@ func (d dynamoTableClient) backoffAndRetry(ctx context.Context, fn func(context. func (d callManager) backoffAndRetry(ctx context.Context, fn func(context.Context) error) error { if d.limiter != nil { // Tests will have a nil limiter. - d.limiter.Wait(ctx) + _ = d.limiter.Wait(ctx) } backoff := util.NewBackoff(ctx, d.backoffConfig) diff --git a/pkg/chunk/aws/mock.go b/pkg/chunk/aws/mock.go index 5b33748a0a5..84fdad28ecc 100644 --- a/pkg/chunk/aws/mock.go +++ b/pkg/chunk/aws/mock.go @@ -56,6 +56,7 @@ func (a dynamoDBStorageClient) setErrorParameters(provisionedErr, errAfter int) } } +//nolint:unused //Leaving this around in the case we need to create a table via mock this is useful. func (m *mockDynamoDBClient) createTable(name string) { m.mtx.Lock() defer m.mtx.Unlock() diff --git a/pkg/chunk/cache/fifo_cache.go b/pkg/chunk/cache/fifo_cache.go index 164d6c645b3..0b8a6b30407 100644 --- a/pkg/chunk/cache/fifo_cache.go +++ b/pkg/chunk/cache/fifo_cache.go @@ -228,7 +228,7 @@ func (c *FifoCache) Get(ctx context.Context, key string) (interface{}, bool) { index, ok := c.index[key] if ok { updated := c.entries[index].updated - if c.validity == 0 || time.Now().Sub(updated) < c.validity { + if c.validity == 0 || time.Since(updated) < c.validity { return c.entries[index].value, true } diff --git a/pkg/chunk/cache/fifo_cache_test.go b/pkg/chunk/cache/fifo_cache_test.go index b3461689d27..7d533ad42aa 100644 --- a/pkg/chunk/cache/fifo_cache_test.go +++ b/pkg/chunk/cache/fifo_cache_test.go @@ -2,7 +2,6 @@ package cache import ( "context" - "fmt" "strconv" "testing" "time" @@ -88,10 +87,3 @@ func TestFifoCacheExpiry(t *testing.T) { _, ok = c.Get(ctx, strconv.Itoa(0)) require.False(t, ok) } - -func (c *FifoCache) print() { - fmt.Println("first", c.first, "last", c.last) - for i, entry := range c.entries { - fmt.Printf(" %d -> key: %s, value: %v, next: %d, prev: %d\n", i, entry.key, entry.value, entry.next, entry.prev) - } -} diff --git a/pkg/chunk/cache/instrumented.go b/pkg/chunk/cache/instrumented.go index f425cf21f94..c5c43b21cec 100644 --- a/pkg/chunk/cache/instrumented.go +++ b/pkg/chunk/cache/instrumented.go @@ -75,7 +75,7 @@ func (i *instrumentedCache) Store(ctx context.Context, keys []string, bufs [][]b } method := i.name + ".store" - instr.CollectedRequest(ctx, method, requestDuration, instr.ErrorCode, func(ctx context.Context) error { + _ = instr.CollectedRequest(ctx, method, requestDuration, instr.ErrorCode, func(ctx context.Context) error { sp := ot.SpanFromContext(ctx) sp.LogFields(otlog.Int("keys", len(keys))) i.Cache.Store(ctx, keys, bufs) @@ -91,7 +91,7 @@ func (i *instrumentedCache) Fetch(ctx context.Context, keys []string) ([]string, method = i.name + ".fetch" ) - instr.CollectedRequest(ctx, method, requestDuration, instr.ErrorCode, func(ctx context.Context) error { + _ = instr.CollectedRequest(ctx, method, requestDuration, instr.ErrorCode, func(ctx context.Context) error { sp := ot.SpanFromContext(ctx) sp.LogFields(otlog.Int("keys requested", len(keys))) diff --git a/pkg/chunk/cache/memcached.go b/pkg/chunk/cache/memcached.go index c8a4868f76e..b56a9206c1b 100644 --- a/pkg/chunk/cache/memcached.go +++ b/pkg/chunk/cache/memcached.go @@ -36,7 +36,7 @@ type observableVecCollector struct { func (observableVecCollector) Register() {} func (observableVecCollector) Before(method string, start time.Time) {} func (o observableVecCollector) After(method, statusCode string, start time.Time) { - o.v.WithLabelValues(method, statusCode).Observe(time.Now().Sub(start).Seconds()) + o.v.WithLabelValues(method, statusCode).Observe(time.Since(start).Seconds()) } // MemcachedConfig is config to make a Memcached @@ -135,7 +135,7 @@ func memcacheStatusCode(err error) string { // Fetch gets keys from the cache. The keys that are found must be in the order of the keys requested. func (c *Memcached) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string) { - instr.CollectedRequest(ctx, "Memcache.Get", c.requestDuration, memcacheStatusCode, func(ctx context.Context) error { + _ = instr.CollectedRequest(ctx, "Memcache.Get", c.requestDuration, memcacheStatusCode, func(ctx context.Context) error { if c.cfg.BatchSize == 0 { found, bufs, missed = c.fetch(ctx, keys) return nil @@ -149,7 +149,7 @@ func (c *Memcached) Fetch(ctx context.Context, keys []string) (found []string, b func (c *Memcached) fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string) { var items map[string]*memcache.Item - instr.CollectedRequest(ctx, "Memcache.GetMulti", c.requestDuration, memcacheStatusCode, func(_ context.Context) error { + err := instr.CollectedRequest(ctx, "Memcache.GetMulti", c.requestDuration, memcacheStatusCode, func(_ context.Context) error { sp := opentracing.SpanFromContext(ctx) sp.LogFields(otlog.Int("keys requested", len(keys))) @@ -166,6 +166,10 @@ func (c *Memcached) fetch(ctx context.Context, keys []string) (found []string, b return err }) + if err != nil { + return found, bufs, keys + } + for _, key := range keys { item, ok := items[key] if ok { @@ -248,7 +252,7 @@ func (c *Memcached) Stop() { // HashKey hashes key into something you can store in memcached. func HashKey(key string) string { hasher := fnv.New64a() - hasher.Write([]byte(key)) // This'll never error. + _, _ = hasher.Write([]byte(key)) // This'll never error. // Hex because memcache errors for the bytes produced by the hash. return hex.EncodeToString(hasher.Sum(nil)) diff --git a/pkg/chunk/cache/redis_cache_test.go b/pkg/chunk/cache/redis_cache_test.go index 7153a67f78a..1330cde97fc 100644 --- a/pkg/chunk/cache/redis_cache_test.go +++ b/pkg/chunk/cache/redis_cache_test.go @@ -19,9 +19,9 @@ func TestRedisCache(t *testing.T) { conn := redigomock.NewConn() conn.Clear() - pool := redis.NewPool(func() (redis.Conn, error) { + pool := &redis.Pool{Dial: func() (redis.Conn, error) { return conn, nil - }, 10) + }, MaxIdle: 10} keys := []string{"key1", "key2", "key3"} bufs := [][]byte{[]byte("data1"), []byte("data2"), []byte("data3")} diff --git a/pkg/chunk/cassandra/storage_client.go b/pkg/chunk/cassandra/storage_client.go index b9555a664b1..07c22e2a1fc 100644 --- a/pkg/chunk/cassandra/storage_client.go +++ b/pkg/chunk/cassandra/storage_client.go @@ -294,7 +294,6 @@ func (s *StorageClient) query(ctx context.Context, query chunk.IndexQuery, callb // readBatch represents a batch of rows read from Cassandra. type readBatch struct { - consumed bool rangeValue []byte value []byte } diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index f27be924a08..2f9b95e78ae 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -118,6 +118,7 @@ func (c *store) Put(ctx context.Context, chunks []Chunk) error { // PutOne implements ChunkStore func (c *store) PutOne(ctx context.Context, from, through model.Time, chunk Chunk) error { + log, ctx := spanlogger.New(ctx, "ChunkStore.PutOne") chunks := []Chunk{chunk} err := c.storage.PutChunks(ctx, chunks) @@ -125,7 +126,9 @@ func (c *store) PutOne(ctx context.Context, from, through model.Time, chunk Chun return err } - c.writeBackCache(ctx, chunks) + if cacheErr := c.writeBackCache(ctx, chunks); cacheErr != nil { + level.Warn(log).Log("msg", "could not store chunks in chunk cache", "err", cacheErr) + } writeReqs, err := c.calculateIndexEntries(chunk.UserID, from, through, chunk) if err != nil { @@ -252,6 +255,7 @@ func (c *store) LabelNamesForMetricName(ctx context.Context, userID string, from } func (c *store) validateQueryTimeRange(ctx context.Context, userID string, from *model.Time, through *model.Time) (bool, error) { + //nolint:ineffassign,staticcheck //Leaving ctx even though we don't currently use it, we want to make it available for when we might need it and hopefully will ensure us using the correct context at that time log, ctx := spanlogger.New(ctx, "store.validateQueryTimeRange") defer log.Span.Finish() diff --git a/pkg/chunk/chunk_store_test.go b/pkg/chunk/chunk_store_test.go index f3faa766e9b..e391c9f06ac 100644 --- a/pkg/chunk/chunk_store_test.go +++ b/pkg/chunk/chunk_store_test.go @@ -711,7 +711,8 @@ func BenchmarkIndexCaching(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - store.Put(ctx, []Chunk{fooChunk1}) + err := store.Put(ctx, []Chunk{fooChunk1}) + require.NoError(b, err) } } @@ -813,7 +814,7 @@ func TestStoreMaxLookBack(t *testing.T) { chunks, err = storeWithLookBackLimit.Get(ctx, userID, now.Add(-time.Hour), now, matchers...) require.NoError(t, err) require.Equal(t, 1, len(chunks)) - chunks[0].Through.Equal(now) + require.Equal(t, now, chunks[0].Through) } func benchmarkParseIndexEntries(i int64, b *testing.B) { diff --git a/pkg/chunk/chunk_test.go b/pkg/chunk/chunk_test.go index d81a296ab94..d7b5570c6ab 100644 --- a/pkg/chunk/chunk_test.go +++ b/pkg/chunk/chunk_test.go @@ -268,7 +268,8 @@ func BenchmarkEncode(b *testing.B) { for i := 0; i < b.N; i++ { chunk.encoded = nil - chunk.Encode() + err := chunk.Encode() + require.NoError(b, err) } } diff --git a/pkg/chunk/composite_store_test.go b/pkg/chunk/composite_store_test.go index 60bd5eb0c74..4c35ed5d666 100644 --- a/pkg/chunk/composite_store_test.go +++ b/pkg/chunk/composite_store_test.go @@ -6,6 +6,8 @@ import ( "reflect" "testing" + "github.com/stretchr/testify/require" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/weaveworks/common/test" @@ -180,7 +182,8 @@ func TestCompositeStore(t *testing.T) { } { t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { have := []result{} - tc.cs.forStores(model.TimeFromUnix(tc.from), model.TimeFromUnix(tc.through), collect(&have)) + err := tc.cs.forStores(model.TimeFromUnix(tc.from), model.TimeFromUnix(tc.through), collect(&have)) + require.NoError(t, err) if !reflect.DeepEqual(tc.want, have) { t.Fatalf("wrong stores - %s", test.Diff(tc.want, have)) } @@ -231,16 +234,12 @@ func TestCompositeStoreLabels(t *testing.T) { } { t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { have, err := cs.LabelNamesForMetricName(context.Background(), "", model.TimeFromUnix(tc.from), model.TimeFromUnix(tc.through), "") - if err != nil { - t.Fatalf("err - %s", err) - } + require.NoError(t, err) if !reflect.DeepEqual(tc.want, have) { t.Fatalf("wrong label names - %s", test.Diff(tc.want, have)) } have, err = cs.LabelValuesForMetricName(context.Background(), "", model.TimeFromUnix(tc.from), model.TimeFromUnix(tc.through), "", "") - if err != nil { - t.Fatalf("err - %s", err) - } + require.NoError(t, err) if !reflect.DeepEqual(tc.want, have) { t.Fatalf("wrong label values - %s", test.Diff(tc.want, have)) } diff --git a/pkg/chunk/encoding/bigchunk_test.go b/pkg/chunk/encoding/bigchunk_test.go index b0c2db12d84..d52cc44e8eb 100644 --- a/pkg/chunk/encoding/bigchunk_test.go +++ b/pkg/chunk/encoding/bigchunk_test.go @@ -84,7 +84,7 @@ func BenchmarkBiggerChunkMemory(b *testing.B) { // printSize calculates various sizes of the chunk when encoded, and in memory. func (b *bigchunk) printSize() { var buf bytes.Buffer - b.Marshal(&buf) + _ = b.Marshal(&buf) var size, allocd int for _, c := range b.chunks { diff --git a/pkg/chunk/encoding/chunk_test.go b/pkg/chunk/encoding/chunk_test.go index f3038941d8f..f0815649611 100644 --- a/pkg/chunk/encoding/chunk_test.go +++ b/pkg/chunk/encoding/chunk_test.go @@ -117,6 +117,8 @@ func testChunkEncoding(t *testing.T, encoding Encoding, samples int) { bs1 := buf.Bytes() chunk, err = NewForEncoding(encoding) + require.NoError(t, err) + err = chunk.UnmarshalFromBuf(bs1) require.NoError(t, err) diff --git a/pkg/chunk/encoding/varbit.go b/pkg/chunk/encoding/varbit.go index 2df8abc4827..c9580214d2c 100644 --- a/pkg/chunk/encoding/varbit.go +++ b/pkg/chunk/encoding/varbit.go @@ -13,7 +13,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - +//nolint //Since this was copied from Prometheus leave it as is package encoding import ( diff --git a/pkg/chunk/encoding/varbit_helpers.go b/pkg/chunk/encoding/varbit_helpers.go index 9fe9c09feaf..31f13b16478 100644 --- a/pkg/chunk/encoding/varbit_helpers.go +++ b/pkg/chunk/encoding/varbit_helpers.go @@ -13,7 +13,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - +//nolint //Since this was copied from Prometheus leave it as is package encoding import "github.com/prometheus/common/model" diff --git a/pkg/chunk/gcp/bigtable_index_client.go b/pkg/chunk/gcp/bigtable_index_client.go index 7b1f949b7cd..c163e529f1c 100644 --- a/pkg/chunk/gcp/bigtable_index_client.go +++ b/pkg/chunk/gcp/bigtable_index_client.go @@ -60,8 +60,6 @@ type storageClientColumnKey struct { schemaCfg chunk.SchemaConfig client *bigtable.Client keysFn keysFn - - distributeKeys bool } // storageClientV1 implements chunk.storageClient for GCP. diff --git a/pkg/chunk/schema_config.go b/pkg/chunk/schema_config.go index 8ef17395f73..0141fe3d9aa 100644 --- a/pkg/chunk/schema_config.go +++ b/pkg/chunk/schema_config.go @@ -320,12 +320,6 @@ func (cfg *SchemaConfig) Load() error { return cfg.Validate() } -// PrintYaml dumps the yaml to stdout, to aid in migration -func (cfg SchemaConfig) PrintYaml() { - encoder := yaml.NewEncoder(os.Stdout) - encoder.Encode(cfg) -} - // Bucket describes a range of time with a tableName and hashKey type Bucket struct { from uint32 diff --git a/pkg/chunk/schema_util.go b/pkg/chunk/schema_util.go index 0906288e93f..ed13060615e 100644 --- a/pkg/chunk/schema_util.go +++ b/pkg/chunk/schema_util.go @@ -63,7 +63,7 @@ func buildRangeValue(extra int, ss ...[]byte) []byte { for _, s := range ss { length += len(s) + 1 } - output, i := make([]byte, length, length), 0 + output, i := make([]byte, length), 0 for _, s := range ss { i += copy(output[i:], s) + 1 } @@ -99,21 +99,21 @@ func decodeRangeKey(value []byte) [][]byte { func encodeBase64Bytes(bytes []byte) []byte { encodedLen := base64.RawStdEncoding.EncodedLen(len(bytes)) - encoded := make([]byte, encodedLen, encodedLen) + encoded := make([]byte, encodedLen) base64.RawStdEncoding.Encode(encoded, bytes) return encoded } func encodeBase64Value(value string) []byte { encodedLen := base64.RawStdEncoding.EncodedLen(len(value)) - encoded := make([]byte, encodedLen, encodedLen) + encoded := make([]byte, encodedLen) base64.RawStdEncoding.Encode(encoded, []byte(value)) return encoded } func decodeBase64Value(bs []byte) (model.LabelValue, error) { decodedLen := base64.RawStdEncoding.DecodedLen(len(bs)) - decoded := make([]byte, decodedLen, decodedLen) + decoded := make([]byte, decodedLen) if _, err := base64.RawStdEncoding.Decode(decoded, bs); err != nil { return "", err } @@ -123,19 +123,13 @@ func decodeBase64Value(bs []byte) (model.LabelValue, error) { func encodeTime(t uint32) []byte { // timestamps are hex encoded such that it doesn't contain null byte, // but is still lexicographically sortable. - throughBytes := make([]byte, 4, 4) + throughBytes := make([]byte, 4) binary.BigEndian.PutUint32(throughBytes, t) - encodedThroughBytes := make([]byte, 8, 8) + encodedThroughBytes := make([]byte, 8) hex.Encode(encodedThroughBytes, throughBytes) return encodedThroughBytes } -func decodeTime(bs []byte) uint32 { - buf := make([]byte, 4, 4) - hex.Decode(buf, bs) - return binary.BigEndian.Uint32(buf) -} - // parseMetricNameRangeValue returns the metric name stored in metric name // range values. Currently checks range value key and returns the value as the // metric name. diff --git a/pkg/chunk/schema_util_test.go b/pkg/chunk/schema_util_test.go index 3e5eb95a3d4..ea0f5fa4faa 100644 --- a/pkg/chunk/schema_util_test.go +++ b/pkg/chunk/schema_util_test.go @@ -3,6 +3,7 @@ package chunk import ( "bytes" "encoding/binary" + "encoding/hex" "encoding/json" "math" "math/rand" @@ -140,3 +141,9 @@ func TestParseSeriesRangeValue(t *testing.T) { assert.Equal(t, c.expMetric, metric) } } + +func decodeTime(bs []byte) uint32 { + buf := make([]byte, 4) + _, _ = hex.Decode(buf, bs) + return binary.BigEndian.Uint32(buf) +} diff --git a/pkg/chunk/series_store.go b/pkg/chunk/series_store.go index 30eb6c8290b..16f5fb2fe33 100644 --- a/pkg/chunk/series_store.go +++ b/pkg/chunk/series_store.go @@ -457,6 +457,7 @@ func (c *seriesStore) Put(ctx context.Context, chunks []Chunk) error { // PutOne implements ChunkStore func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chunk Chunk) error { + log, ctx := spanlogger.New(ctx, "SeriesStore.PutOne") // If this chunk is in cache it must already be in the database so we don't need to write it again found, _, _ := c.cache.Fetch(ctx, []string{chunk.ExternalKey()}) if len(found) > 0 { @@ -483,7 +484,9 @@ func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chun return err } } - c.writeBackCache(ctx, chunks) + if cacheErr := c.writeBackCache(ctx, chunks); cacheErr != nil { + level.Warn(log).Log("msg", "could not store chunks in chunk cache", "err", cacheErr) + } bufs := make([][]byte, len(keysToCache)) c.writeDedupeCache.Store(ctx, keysToCache, bufs) diff --git a/pkg/chunk/storage/utils_test.go b/pkg/chunk/storage/utils_test.go index 159e3688570..6f7b263dbe2 100644 --- a/pkg/chunk/storage/utils_test.go +++ b/pkg/chunk/storage/utils_test.go @@ -35,7 +35,7 @@ func forAllFixtures(t *testing.T, storageClientTest storageClientTest) { t.Run(fixture.Name(), func(t *testing.T) { indexClient, objectClient, err := testutils.Setup(fixture, tableName) require.NoError(t, err) - defer fixture.Teardown() + defer testutils.TeardownFixture(t, fixture) storageClientTest(t, indexClient, objectClient) }) diff --git a/pkg/chunk/strings.go b/pkg/chunk/strings.go index 1db67516021..a2be670a7a5 100644 --- a/pkg/chunk/strings.go +++ b/pkg/chunk/strings.go @@ -41,6 +41,7 @@ func intersectStrings(left, right []string) []string { return result } +//nolint:unused //Ignoring linting as this might be useful func nWayIntersectStrings(sets [][]string) []string { l := len(sets) switch l { diff --git a/pkg/chunk/table_manager_test.go b/pkg/chunk/table_manager_test.go index 7aab3d70c89..ed91ea46eae 100644 --- a/pkg/chunk/table_manager_test.go +++ b/pkg/chunk/table_manager_test.go @@ -679,8 +679,12 @@ func TestTableManagerRetentionOnly(t *testing.T) { // Verify that without RetentionDeletesEnabled no tables are removed tableManager.cfg.RetentionDeletesEnabled = false // Retention > 0 will prevent older tables from being created so we need to create the old tables manually for the test - client.CreateTable(nil, TableDesc{Name: tablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite, WriteScale: inactiveScalingConfig}) - client.CreateTable(nil, TableDesc{Name: chunkTablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite}) + err = client.CreateTable(context.Background(), TableDesc{Name: tablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite, WriteScale: inactiveScalingConfig}) + require.NoError(t, err) + + err = client.CreateTable(context.Background(), TableDesc{Name: chunkTablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite}) + require.NoError(t, err) + tmTest(t, client, tableManager, "Move forward by three table periods (no deletes)", baseTableStart.Add(tablePeriod*3), @@ -703,8 +707,12 @@ func TestTableManagerRetentionOnly(t *testing.T) { tableManager.cfg.RetentionPeriod = 0 tableManager.schemaCfg.Configs[0].From = DayTime{model.TimeFromUnix(baseTableStart.Add(tablePeriod).Unix())} // Retention > 0 will prevent older tables from being created so we need to create the old tables manually for the test - client.CreateTable(nil, TableDesc{Name: tablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite, WriteScale: inactiveScalingConfig}) - client.CreateTable(nil, TableDesc{Name: chunkTablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite}) + err = client.CreateTable(context.Background(), TableDesc{Name: tablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite, WriteScale: inactiveScalingConfig}) + require.NoError(t, err) + + err = client.CreateTable(context.Background(), TableDesc{Name: chunkTablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite}) + require.NoError(t, err) + tmTest(t, client, tableManager, "Move forward by three table periods (no deletes) and move From one table forward", baseTableStart.Add(tablePeriod*3), diff --git a/pkg/chunk/testutils/testutils.go b/pkg/chunk/testutils/testutils.go index 6aa3a746209..b539548f4ca 100644 --- a/pkg/chunk/testutils/testutils.go +++ b/pkg/chunk/testutils/testutils.go @@ -3,8 +3,11 @@ package testutils import ( "context" "strconv" + "testing" "time" + "github.com/stretchr/testify/require" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" @@ -75,7 +78,10 @@ func CreateChunks(startIndex, batchSize int, start model.Time) ([]string, []chun func dummyChunkFor(now model.Time, metric labels.Labels) chunk.Chunk { cs := promchunk.New() - cs.Add(model.SamplePair{Timestamp: now, Value: 0}) + _, err := cs.Add(model.SamplePair{Timestamp: now, Value: 0}) + if err != nil { + panic(err) + } chunk := chunk.NewChunk( userID, client.Fingerprint(metric), @@ -85,9 +91,13 @@ func dummyChunkFor(now model.Time, metric labels.Labels) chunk.Chunk { now, ) // Force checksum calculation. - err := chunk.Encode() + err = chunk.Encode() if err != nil { panic(err) } return chunk } + +func TeardownFixture(t *testing.T, fixture Fixture) { + require.NoError(t, fixture.Teardown()) +} diff --git a/pkg/configs/api/helpers_test.go b/pkg/configs/api/helpers_test.go index 582911d7324..0e0ff739e39 100644 --- a/pkg/configs/api/helpers_test.go +++ b/pkg/configs/api/helpers_test.go @@ -51,7 +51,8 @@ func requestAsUser(t *testing.T, userID string, method, urlStr string, body io.R r, err := http.NewRequest(method, urlStr, body) require.NoError(t, err) r = r.WithContext(user.InjectOrgID(r.Context(), userID)) - user.InjectOrgIDIntoHTTPRequest(r.Context(), r) + err = user.InjectOrgIDIntoHTTPRequest(r.Context(), r) + require.NoError(t, err) app.ServeHTTP(w, r) return w } diff --git a/pkg/configs/db/timed.go b/pkg/configs/db/timed.go index 17421e66fe5..f7bbd6d6cd4 100644 --- a/pkg/configs/db/timed.go +++ b/pkg/configs/db/timed.go @@ -27,15 +27,6 @@ type timed struct { d DB } -func (t timed) errorCode(err error) string { - switch err { - case nil: - return "200" - default: - return "500" - } -} - func (t timed) GetConfig(ctx context.Context, userID string) (configs.View, error) { var cfg configs.View err := instrument.CollectedRequest(ctx, "DB.GetConfigs", databaseRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { @@ -53,11 +44,9 @@ func (t timed) SetConfig(ctx context.Context, userID string, cfg configs.Config) } func (t timed) GetAllConfigs(ctx context.Context) (map[string]configs.View, error) { - var ( - cfgs map[string]configs.View - err error - ) - instrument.CollectedRequest(ctx, "DB.GetAllConfigs", databaseRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + var cfgs map[string]configs.View + err := instrument.CollectedRequest(ctx, "DB.GetAllConfigs", databaseRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + var err error cfgs, err = t.d.GetAllConfigs(ctx) return err }) @@ -66,9 +55,7 @@ func (t timed) GetAllConfigs(ctx context.Context) (map[string]configs.View, erro } func (t timed) GetConfigs(ctx context.Context, since configs.ID) (map[string]configs.View, error) { - var ( - cfgs map[string]configs.View - ) + var cfgs map[string]configs.View err := instrument.CollectedRequest(ctx, "DB.GetConfigs", databaseRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { var err error cfgs, err = t.d.GetConfigs(ctx, since) diff --git a/pkg/configs/legacy_promql/ast.go b/pkg/configs/legacy_promql/ast.go index 0c8a3ad2abb..5e1f2989f72 100644 --- a/pkg/configs/legacy_promql/ast.go +++ b/pkg/configs/legacy_promql/ast.go @@ -10,7 +10,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - +//nolint //Since this was copied from Prometheus leave it as is package promql import ( diff --git a/pkg/configs/legacy_promql/engine.go b/pkg/configs/legacy_promql/engine.go index f62c019a939..f47ee739f64 100644 --- a/pkg/configs/legacy_promql/engine.go +++ b/pkg/configs/legacy_promql/engine.go @@ -10,7 +10,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - +//nolint //Since this was copied from Prometheus leave it as is package promql import ( diff --git a/pkg/configs/legacy_promql/functions_test.go b/pkg/configs/legacy_promql/functions_test.go index 4d09637fac9..c2c23bfa611 100644 --- a/pkg/configs/legacy_promql/functions_test.go +++ b/pkg/configs/legacy_promql/functions_test.go @@ -35,8 +35,11 @@ func TestDeriv(t *testing.T) { testutil.Ok(t, err) metric := labels.FromStrings("__name__", "foo") - a.Add(metric, 1493712816939, 1.0) - a.Add(metric, 1493712846939, 1.0) + _, err = a.Add(metric, 1493712816939, 1.0) + testutil.Ok(t, err) + + _, err = a.Add(metric, 1493712846939, 1.0) + testutil.Ok(t, err) err = a.Commit() testutil.Ok(t, err) diff --git a/pkg/configs/legacy_promql/parse.go b/pkg/configs/legacy_promql/parse.go index f370f783121..416af3c23ba 100644 --- a/pkg/configs/legacy_promql/parse.go +++ b/pkg/configs/legacy_promql/parse.go @@ -10,7 +10,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - +//nolint //Since this was copied from Prometheus leave it as is package promql import ( diff --git a/pkg/configs/legacy_promql/test.go b/pkg/configs/legacy_promql/test.go index 1fe43bd8bb5..cd74b7c9b0f 100644 --- a/pkg/configs/legacy_promql/test.go +++ b/pkg/configs/legacy_promql/test.go @@ -10,7 +10,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - +//nolint //Since this was copied from Prometheus leave it as is package promql import ( diff --git a/pkg/distributor/billing.go b/pkg/distributor/billing.go index 2c31581cdbc..4e7dd9c0942 100644 --- a/pkg/distributor/billing.go +++ b/pkg/distributor/billing.go @@ -22,7 +22,7 @@ func (d *Distributor) emitBillingRecord(ctx context.Context, buf []byte, samples now := time.Now().UTC() hasher := sha256.New() - hasher.Write(buf) + _, _ = hasher.Write(buf) hash := "sha256:" + base64.URLEncoding.EncodeToString(hasher.Sum(nil)) amounts := billing.Amounts{ billing.Samples: samples, diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index d8d77d81051..82a5b8860ef 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -397,16 +397,6 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie continue } - metricName, _ := extract.MetricNameFromLabelAdapters(ts.Labels) - samples := make([]client.Sample, 0, len(ts.Samples)) - for _, s := range ts.Samples { - if err := validation.ValidateSample(d.limits, userID, metricName, s); err != nil { - lastPartialErr = err - continue - } - samples = append(samples, s) - } - keys = append(keys, key) validatedTimeseries = append(validatedTimeseries, validatedSeries) validatedSamples += len(ts.Samples) diff --git a/pkg/distributor/ha_tracker_test.go b/pkg/distributor/ha_tracker_test.go index 664dfa89373..e76348df0e6 100644 --- a/pkg/distributor/ha_tracker_test.go +++ b/pkg/distributor/ha_tracker_test.go @@ -196,6 +196,7 @@ func TestCheckReplicaMultiCluster(t *testing.T) { UpdateTimeoutJitterMax: 0, FailoverTimeout: time.Second, }) + assert.NoError(t, err) // Write the first time. err = c.checkReplica(context.Background(), "user", "c1", replica1) diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 881c1348ea5..da7a450b7c5 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -136,7 +136,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri ingesterQueryFailures.WithLabelValues(ing.Addr).Inc() return nil, err } - defer stream.CloseSend() + defer stream.CloseSend() //nolint:errcheck var result []*ingester_client.QueryStreamResponse for { diff --git a/pkg/ingester/client/pool_test.go b/pkg/ingester/client/pool_test.go index 2842a7ed357..704e62ade9b 100644 --- a/pkg/ingester/client/pool_test.go +++ b/pkg/ingester/client/pool_test.go @@ -2,12 +2,13 @@ package client import ( "context" - "fmt" + fmt "fmt" "testing" "time" "github.com/go-kit/kit/log" "github.com/gogo/status" + "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/health/grpc_health_v1" @@ -81,17 +82,20 @@ func TestIngesterCache(t *testing.T) { }, mockReadRing{}, factory, log.NewNopLogger()) defer pool.Stop() - pool.GetClientFor("1") + _, err := pool.GetClientFor("1") + require.NoError(t, err) if buildCount != 1 { t.Errorf("Did not create client") } - pool.GetClientFor("1") + _, err = pool.GetClientFor("1") + require.NoError(t, err) if buildCount != 1 { t.Errorf("Created client that should have been cached") } - pool.GetClientFor("2") + _, err = pool.GetClientFor("2") + require.NoError(t, err) if pool.Count() != 2 { t.Errorf("Expected Count() = 2, got %d", pool.Count()) } @@ -101,12 +105,13 @@ func TestIngesterCache(t *testing.T) { t.Errorf("Expected Count() = 1, got %d", pool.Count()) } - pool.GetClientFor("1") + _, err = pool.GetClientFor("1") + require.NoError(t, err) if buildCount != 3 || pool.Count() != 2 { t.Errorf("Did not re-create client correctly") } - _, err := pool.GetClientFor("bad") + _, err = pool.GetClientFor("bad") if err == nil { t.Errorf("Bad create should have thrown an error") } diff --git a/pkg/ingester/client/timeseries.go b/pkg/ingester/client/timeseries.go index ba1d4333509..165c17ab615 100644 --- a/pkg/ingester/client/timeseries.go +++ b/pkg/ingester/client/timeseries.go @@ -16,6 +16,11 @@ var ( expectedLabels = 20 expectedSamplesPerSeries = 10 + /* + We cannot pool these as pointer-to-slice because the place we use them is in WriteRequest which is generated from Protobuf + and we don't have an option to make it a pointer. There is overhead here 24 bytes of garbage every time a PreallocTimeseries + is re-used. But since the slices are far far larger, we come out ahead. + */ slicePool = sync.Pool{ New: func() interface{} { return make([]PreallocTimeseries, 0, expectedTimeseries) @@ -265,7 +270,7 @@ func ReuseSlice(slice []PreallocTimeseries) { for i := range slice { ReuseTimeseries(slice[i].TimeSeries) } - slicePool.Put(slice[:0]) + slicePool.Put(slice[:0]) //nolint:staticcheck //see comment on slicePool for more details } // ReuseTimeseries puts the timeseries back into a sync.Pool for reuse. diff --git a/pkg/ingester/index/index.go b/pkg/ingester/index/index.go index 6e072e70335..1072142fc11 100644 --- a/pkg/ingester/index/index.go +++ b/pkg/ingester/index/index.go @@ -101,6 +101,7 @@ const cacheLineSize = 64 type indexShard struct { mtx sync.RWMutex idx unlockIndex + //nolint:structcheck,unused pad [cacheLineSize - unsafe.Sizeof(sync.Mutex{}) - unsafe.Sizeof(unlockIndex{})]byte } @@ -267,12 +268,6 @@ func intersect(a, b []model.Fingerprint) []model.Fingerprint { return result } -type fingerprints []model.Fingerprint - -func (a fingerprints) Len() int { return len(a) } -func (a fingerprints) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a fingerprints) Less(i, j int) bool { return a[i] < a[j] } - func mergeStringSlices(ss [][]string) []string { switch len(ss) { case 0: diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 8adb56f51e3..02971bdb04a 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -158,7 +158,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c limits: limits, chunkStore: chunkStore, quit: make(chan struct{}), - flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes, cfg.ConcurrentFlushes), + flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes), } var err error diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 00ea5b242b0..7f186f93c9b 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -520,7 +520,7 @@ func createUserStats(db *userTSDB) *client.UserStatsResponse { func (i *Ingester) getTSDB(userID string) *userTSDB { i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() - db, _ := i.TSDBState.dbs[userID] + db := i.TSDBState.dbs[userID] return db } diff --git a/pkg/ingester/locker.go b/pkg/ingester/locker.go index 495011b0ff3..3c97f38ba1b 100644 --- a/pkg/ingester/locker.go +++ b/pkg/ingester/locker.go @@ -16,6 +16,7 @@ const ( // Avoid false sharing when using array of mutexes. type paddedMutex struct { sync.Mutex + //nolint:structcheck,unused pad [cacheLineSize - unsafe.Sizeof(sync.Mutex{})]byte } diff --git a/pkg/ingester/query_test.go b/pkg/ingester/query_test.go index 4d8190c6cc2..baa6621b7d0 100644 --- a/pkg/ingester/query_test.go +++ b/pkg/ingester/query_test.go @@ -74,14 +74,14 @@ func BenchmarkQueryStream(b *testing.B) { l, err := net.Listen("tcp", "localhost:0") require.NoError(b, err) - go server.Serve(l) + go server.Serve(l) //nolint:errcheck b.ResetTimer() for iter := 0; iter < b.N; iter++ { b.Run("QueryStream", func(b *testing.B) { c, err := client.MakeIngesterClient(l.Addr().String(), clientCfg) require.NoError(b, err) - defer c.Close() + defer c.Close() //nolint:errcheck s, err := c.QueryStream(ctx, &client.QueryRequest{ StartTimestampMs: 0, diff --git a/pkg/ingester/series_map.go b/pkg/ingester/series_map.go index 62f46a50a41..dd4dbeacfdb 100644 --- a/pkg/ingester/series_map.go +++ b/pkg/ingester/series_map.go @@ -23,7 +23,7 @@ type seriesMap struct { type shard struct { mtx sync.Mutex m map[model.Fingerprint]*memorySeries - // Align this struct. + //nolint:structcheck,unused // Align this struct. pad [cacheLineSize - unsafe.Sizeof(sync.Mutex{}) - unsafe.Sizeof(map[model.Fingerprint]*memorySeries{})]byte } diff --git a/pkg/ingester/transfer_test.go b/pkg/ingester/transfer_test.go index 4e3eb3838af..92d9044bfb0 100644 --- a/pkg/ingester/transfer_test.go +++ b/pkg/ingester/transfer_test.go @@ -30,7 +30,8 @@ type testUserTSDB struct { func createTSDB(t *testing.T, dir string, users []*testUserTSDB) { for _, user := range users { - os.MkdirAll(filepath.Join(dir, user.userID), 0777) + err := os.MkdirAll(filepath.Join(dir, user.userID), 0777) + require.NoError(t, err) for i := 0; i < user.numBlocks; i++ { u, err := ulid.New(uint64(time.Now().Unix()*1000), rand.Reader) @@ -181,13 +182,14 @@ func TestTransferUser(t *testing.T) { var original []string var xferfiles []string - filepath.Walk(xfer, func(path string, info os.FileInfo, err error) error { + err = filepath.Walk(xfer, func(path string, info os.FileInfo, err error) error { p, _ := filepath.Rel(xfer, path) xferfiles = append(xferfiles, p) return nil }) + require.NoError(t, err) - filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { if info.Name() == "thanos.shipper.json" { return nil } @@ -195,6 +197,7 @@ func TestTransferUser(t *testing.T) { original = append(original, p) return nil }) + require.NoError(t, err) require.Equal(t, original, xferfiles) } diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index bbe3b0843ab..d3082de5a20 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -79,6 +79,7 @@ func (us *userStates) cp() map[string]*userState { return states } +//nolint:unused func (us *userStates) gc() { us.states.Range(func(key, value interface{}) bool { state := value.(*userState) diff --git a/pkg/ingester/util.go b/pkg/ingester/util.go deleted file mode 100644 index 308f0c6596f..00000000000 --- a/pkg/ingester/util.go +++ /dev/null @@ -1,7 +0,0 @@ -package ingester - -type sortableUint32 []uint32 - -func (ts sortableUint32) Len() int { return len(ts) } -func (ts sortableUint32) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] } -func (ts sortableUint32) Less(i, j int) bool { return ts[i] < ts[j] } diff --git a/pkg/querier/batch/batch.go b/pkg/querier/batch/batch.go index 2790c484bf9..9a199b0a3d1 100644 --- a/pkg/querier/batch/batch.go +++ b/pkg/querier/batch/batch.go @@ -1,8 +1,6 @@ package batch import ( - "fmt" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/storage" @@ -29,10 +27,6 @@ type iterator interface { Err() error } -func print(b promchunk.Batch) { - fmt.Println(" ", b.Timestamps, b.Index, b.Length) -} - // NewChunkMergeIterator returns a storage.SeriesIterator that merges chunks together. func NewChunkMergeIterator(chunks []chunk.Chunk, _, _ model.Time) storage.SeriesIterator { iter := newMergeIterator(chunks) diff --git a/pkg/querier/batch/non_overlapping.go b/pkg/querier/batch/non_overlapping.go index eaf67856da4..67bf43dd69b 100644 --- a/pkg/querier/batch/non_overlapping.go +++ b/pkg/querier/batch/non_overlapping.go @@ -9,8 +9,6 @@ type nonOverlappingIterator struct { curr int chunks []chunk.Chunk iter chunkIterator - input batchStream - output batchStream } // newNonOverlappingIterator returns a single iterator over an slice of sorted, diff --git a/pkg/querier/batch/stream.go b/pkg/querier/batch/stream.go index 6cbf121cfd3..66343b424c5 100644 --- a/pkg/querier/batch/stream.go +++ b/pkg/querier/batch/stream.go @@ -1,8 +1,6 @@ package batch import ( - "fmt" - promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" ) @@ -11,14 +9,6 @@ import ( // without allocations. type batchStream []promchunk.Batch -func (bs batchStream) print() { - fmt.Println("[") - for _, b := range bs { - print(b) - } - fmt.Println("]") -} - // reset, hasNext, next, atTime etc are all inlined in go1.11. func (bs *batchStream) reset() { diff --git a/pkg/querier/block_store.go b/pkg/querier/block_store.go index 1d4bc241ad3..fb1dc716c84 100644 --- a/pkg/querier/block_store.go +++ b/pkg/querier/block_store.go @@ -89,7 +89,7 @@ func NewUserStore(cfg tsdb.Config, bucketClient objstore.Bucket, logLevel loggin if err != nil { return nil, err } - go serv.Serve(l) + go serv.Serve(l) //nolint:errcheck cc, err := grpc.Dial(l.Addr().String(), grpc.WithInsecure()) if err != nil { diff --git a/pkg/querier/correctness/runner.go b/pkg/querier/correctness/runner.go index 2c6885fdc1c..43760330e2b 100644 --- a/pkg/querier/correctness/runner.go +++ b/pkg/querier/correctness/runner.go @@ -112,7 +112,7 @@ func NewRunner(cfg RunnerConfig) (*Runner, error) { if cfg.userID != "" { apiCfg.RoundTripper = &nethttp.Transport{ RoundTripper: promhttp.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { - user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(context.Background(), cfg.userID), req) + _ = user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(context.Background(), cfg.userID), req) return api.DefaultRoundTripper.RoundTrip(req) }), } diff --git a/pkg/querier/frontend/frontend_test.go b/pkg/querier/frontend/frontend_test.go index 6e10756c195..10566ec3829 100644 --- a/pkg/querier/frontend/frontend_test.go +++ b/pkg/querier/frontend/frontend_test.go @@ -27,7 +27,6 @@ import ( "google.golang.org/grpc" "github.com/cortexproject/cortex/pkg/util/flagext" - "github.com/cortexproject/cortex/pkg/util/validation" ) const ( @@ -37,7 +36,8 @@ const ( func TestFrontend(t *testing.T) { handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte("Hello World")) + _, err := w.Write([]byte("Hello World")) + require.NoError(t, err) }) test := func(addr string) { req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/", addr), nil) @@ -71,7 +71,8 @@ func TestFrontendPropagateTrace(t *testing.T) { traceID := fmt.Sprintf("%v", sp.Context().(jaeger.SpanContext).TraceID()) observedTraceID <- traceID - w.Write([]byte(responseBody)) + _, err = w.Write([]byte(responseBody)) + require.NoError(t, err) })) test := func(addr string) { @@ -154,14 +155,6 @@ func TestFrontendCancelStatusCode(t *testing.T) { } } -func defaultOverrides(t *testing.T) *validation.Overrides { - var limits validation.Limits - flagext.DefaultValues(&limits) - overrides, err := validation.NewOverrides(limits, nil) - require.NoError(t, err) - return overrides -} - func testFrontend(t *testing.T, handler http.Handler, test func(addr string)) { logger := log.NewNopLogger() @@ -197,10 +190,10 @@ func testFrontend(t *testing.T, handler http.Handler, test func(addr string)) { middleware.Tracer{}, ).Wrap(frontend.Handler()), } - defer httpServer.Shutdown(context.Background()) + defer httpServer.Shutdown(context.Background()) //nolint:errcheck - go httpServer.Serve(httpListen) - go grpcServer.Serve(grpcListen) + go httpServer.Serve(httpListen) //nolint:errcheck + go grpcServer.Serve(grpcListen) //nolint:errcheck worker, err := NewWorker(workerConfig, httpgrpc_server.NewServer(handler), logger) require.NoError(t, err) diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index 85e5e4318b0..f1a3b4a0808 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -57,7 +57,7 @@ type worker struct { ctx context.Context cancel context.CancelFunc - watcher naming.Watcher + watcher naming.Watcher //nolint:staticcheck //Skipping for now. If you still see this more than likely issue https://github.com/cortexproject/cortex/issues/2015 has not yet been addressed. wg sync.WaitGroup } diff --git a/pkg/querier/queryrange/instrumentation.go b/pkg/querier/queryrange/instrumentation.go index f40fd971dbc..abf775003cb 100644 --- a/pkg/querier/queryrange/instrumentation.go +++ b/pkg/querier/queryrange/instrumentation.go @@ -34,7 +34,7 @@ func InstrumentMiddleware(name string) Middleware { return MiddlewareFunc(func(next Handler) Handler { return HandlerFunc(func(ctx context.Context, req Request) (Response, error) { var resp Response - err := instrument.TimeRequestHistogram(ctx, name, queryRangeDuration, func(ctx context.Context) error { + err := instrument.CollectedRequest(ctx, name, instrument.NewHistogramCollector(queryRangeDuration), instrument.ErrorCode, func(ctx context.Context) error { var err error resp, err = next.Do(ctx, req) return err diff --git a/pkg/querier/queryrange/results_cache_test.go b/pkg/querier/queryrange/results_cache_test.go index d38bb84340d..65a2ed1022d 100644 --- a/pkg/querier/queryrange/results_cache_test.go +++ b/pkg/querier/queryrange/results_cache_test.go @@ -57,26 +57,6 @@ var ( } ) -var dummyResponse = &PrometheusResponse{ - Status: StatusSuccess, - Data: PrometheusData{ - ResultType: matrix, - Result: []SampleStream{ - { - Labels: []client.LabelAdapter{ - {Name: "foo", Value: "bar"}, - }, - Samples: []client.Sample{ - { - TimestampMs: 60, - Value: 60, - }, - }, - }, - }, - }, -} - func mkAPIResponse(start, end, step int64) *PrometheusResponse { var samples []client.Sample for i := start; i <= end; i += step { @@ -300,7 +280,7 @@ func TestResultsCache(t *testing.T) { // Doing request with new end time should do one more query. req := parsedRequest.WithStartEnd(parsedRequest.GetStart(), parsedRequest.GetEnd()+100) - resp, err = rc.Do(ctx, req) + _, err = rc.Do(ctx, req) require.NoError(t, err) require.Equal(t, 2, calls) } diff --git a/pkg/querier/queryrange/split_by_interval_test.go b/pkg/querier/queryrange/split_by_interval_test.go index 7e063ceea41..c8d9f74da7c 100644 --- a/pkg/querier/queryrange/split_by_interval_test.go +++ b/pkg/querier/queryrange/split_by_interval_test.go @@ -266,7 +266,7 @@ func TestSplitByDay(t *testing.T) { middleware.AuthenticateUser.Wrap( http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { atomic.AddInt32(&actualCount, 1) - w.Write([]byte(responseBody)) + _, _ = w.Write([]byte(responseBody)) }), ), ) diff --git a/pkg/querier/queryrange/step_align_test.go b/pkg/querier/queryrange/step_align_test.go index 5b676898ba4..e1eaa4eeebd 100644 --- a/pkg/querier/queryrange/step_align_test.go +++ b/pkg/querier/queryrange/step_align_test.go @@ -46,7 +46,8 @@ func TestStepAlign(t *testing.T) { return nil, nil }), } - s.Do(context.Background(), tc.input) + _, err := s.Do(context.Background(), tc.input) + require.NoError(t, err) require.Equal(t, tc.expected, result) }) } diff --git a/pkg/ring/kv/consul/client.go b/pkg/ring/kv/consul/client.go index d8cfb1ffe21..92457274b64 100644 --- a/pkg/ring/kv/consul/client.go +++ b/pkg/ring/kv/consul/client.go @@ -103,7 +103,6 @@ func (c *Client) cas(ctx context.Context, key string, f func(in interface{}) (ou var ( index = uint64(0) retries = 10 - retry = true ) for i := 0; i < retries; i++ { options := &consul.QueryOptions{ @@ -127,7 +126,7 @@ func (c *Client) cas(ctx context.Context, key string, f func(in interface{}) (ou intermediate = out } - intermediate, retry, err = f(intermediate) + intermediate, retry, err := f(intermediate) if err != nil { if !retry { return err diff --git a/pkg/ring/kv/consul/client_test.go b/pkg/ring/kv/consul/client_test.go index a83ad5d85fe..23ff9e49e90 100644 --- a/pkg/ring/kv/consul/client_test.go +++ b/pkg/ring/kv/consul/client_test.go @@ -156,10 +156,7 @@ func TestWatchKeyWithNoStartValue(t *testing.T) { reported := 0 c.WatchKey(ctx, key, func(i interface{}) bool { reported++ - if reported == 2 { - return false - } - return true + return reported != 2 }) // we should see both start and end values. diff --git a/pkg/ring/kv/memberlist/memberlist_client_test.go b/pkg/ring/kv/memberlist/memberlist_client_test.go index 696cbf2ea07..f1b4d03135b 100644 --- a/pkg/ring/kv/memberlist/memberlist_client_test.go +++ b/pkg/ring/kv/memberlist/memberlist_client_test.go @@ -644,7 +644,7 @@ func runClient(t *testing.T, kv *Client, name string, ringKey string, portToConn if portToConnect > 0 { _, err := kv.kv.JoinMembers([]string{fmt.Sprintf("127.0.0.1:%d", portToConnect)}) if err != nil { - t.Fatalf("%s failed to join the cluster: %v", name, err) + t.Errorf("%s failed to join the cluster: %v", name, err) return } } diff --git a/pkg/ring/kv/memberlist/tcp_transport.go b/pkg/ring/kv/memberlist/tcp_transport.go index 2abf8949ac4..dc149078798 100644 --- a/pkg/ring/kv/memberlist/tcp_transport.go +++ b/pkg/ring/kv/memberlist/tcp_transport.go @@ -270,7 +270,7 @@ func (t *TCPTransport) handleConnection(conn *net.TCPConn) { expectedDigest := md5.Sum(buf) - if bytes.Compare(receivedDigest, expectedDigest[:]) != 0 { + if !bytes.Equal(receivedDigest, expectedDigest[:]) { t.receivedPacketsErrors.Inc() level.Warn(util.Logger).Log("msg", "TCPTransport: packet digest mismatch", "expected", fmt.Sprintf("%x", expectedDigest), "received", fmt.Sprintf("%x", receivedDigest)) } diff --git a/pkg/ring/kv/metrics.go b/pkg/ring/kv/metrics.go index 37fb4fc9e04..c1f260dd185 100644 --- a/pkg/ring/kv/metrics.go +++ b/pkg/ring/kv/metrics.go @@ -52,14 +52,14 @@ func (m metrics) CAS(ctx context.Context, key string, f func(in interface{}) (ou } func (m metrics) WatchKey(ctx context.Context, key string, f func(interface{}) bool) { - instrument.CollectedRequest(ctx, "WatchKey", requestDuration, instrument.ErrorCode, func(ctx context.Context) error { + _ = instrument.CollectedRequest(ctx, "WatchKey", requestDuration, instrument.ErrorCode, func(ctx context.Context) error { m.c.WatchKey(ctx, key, f) return nil }) } func (m metrics) WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool) { - instrument.CollectedRequest(ctx, "WatchPrefix", requestDuration, instrument.ErrorCode, func(ctx context.Context) error { + _ = instrument.CollectedRequest(ctx, "WatchPrefix", requestDuration, instrument.ErrorCode, func(ctx context.Context) error { m.c.WatchPrefix(ctx, prefix, f) return nil }) diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index 45b67b7d475..18aa365da03 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -207,7 +207,7 @@ func (i *Lifecycler) CheckReady(ctx context.Context) error { // Ingester always take at least minReadyDuration to become ready to work // around race conditions with ingesters exiting and updating the ring - if time.Now().Sub(i.startTime) < i.cfg.MinReadyDuration { + if time.Since(i.startTime) < i.cfg.MinReadyDuration { return fmt.Errorf("waiting for %v after startup", i.cfg.MinReadyDuration) } @@ -422,7 +422,10 @@ loop: } // Mark ourselved as Leaving so no more samples are send to us. - i.changeState(context.Background(), LEAVING) + err := i.changeState(context.Background(), LEAVING) + if err != nil { + level.Error(util.Logger).Log("msg", "failed to set state to LEAVING", "ring", i.RingName, "err", err) + } // Do the transferring / flushing on a background goroutine so we can continue // to heartbeat to consul. diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 4b98a481912..0748ddf9f51 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -59,12 +59,6 @@ const ( Reporting // Special value for inquiring about health ) -type uint32s []uint32 - -func (x uint32s) Len() int { return len(x) } -func (x uint32s) Less(i, j int) bool { return x[i] < x[j] } -func (x uint32s) Swap(i, j int) { x[i], x[j] = x[j], x[i] } - // ErrEmptyRing is the error returned when trying to get an element when nothing has been added to hash. var ErrEmptyRing = errors.New("empty ring") diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 79c64c41450..9ff6dd2ced0 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -272,7 +272,7 @@ func (r *Ruler) getOrCreateNotifier(userID string) (*notifier.Manager, error) { sp := ot.GlobalTracer().StartSpan("notify", ot.Tag{Key: "organization", Value: userID}) defer sp.Finish() ctx = ot.ContextWithSpan(ctx, sp) - ot.GlobalTracer().Inject(sp.Context(), ot.HTTPHeaders, ot.HTTPHeadersCarrier(req.Header)) + _ = ot.GlobalTracer().Inject(sp.Context(), ot.HTTPHeaders, ot.HTTPHeadersCarrier(req.Header)) return ctxhttp.Do(ctx, client, req) }, }, util.Logger) diff --git a/pkg/util/flagext/register.go b/pkg/util/flagext/register.go index fbbed7f63d3..1140843e047 100644 --- a/pkg/util/flagext/register.go +++ b/pkg/util/flagext/register.go @@ -20,5 +20,5 @@ func DefaultValues(rs ...Registerer) { for _, r := range rs { r.RegisterFlags(fs) } - fs.Parse([]string{}) + _ = fs.Parse([]string{}) } diff --git a/pkg/util/grpcclient/backoff_retry.go b/pkg/util/grpcclient/backoff_retry.go index 321ffd0924b..2c8cc5dcadb 100644 --- a/pkg/util/grpcclient/backoff_retry.go +++ b/pkg/util/grpcclient/backoff_retry.go @@ -5,6 +5,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/cortexproject/cortex/pkg/util" ) @@ -19,7 +20,7 @@ func NewBackoffRetry(cfg util.BackoffConfig) grpc.UnaryClientInterceptor { return nil } - if grpc.Code(err) != codes.ResourceExhausted { + if status.Code(err) != codes.ResourceExhausted { return err } diff --git a/pkg/util/grpcclient/ratelimit.go b/pkg/util/grpcclient/ratelimit.go index d2432bf5249..59ba3b7f08a 100644 --- a/pkg/util/grpcclient/ratelimit.go +++ b/pkg/util/grpcclient/ratelimit.go @@ -5,6 +5,8 @@ import ( "golang.org/x/time/rate" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // NewRateLimiter creates a UnaryClientInterceptor for client side rate limiting. @@ -15,7 +17,10 @@ func NewRateLimiter(cfg *Config) grpc.UnaryClientInterceptor { } limiter := rate.NewLimiter(rate.Limit(cfg.RateLimit), burst) return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - limiter.Wait(ctx) + err := limiter.Wait(ctx) + if err != nil { + return status.Error(codes.ResourceExhausted, err.Error()) + } return invoker(ctx, method, req, reply, cc, opts...) } } diff --git a/pkg/util/grpcclient/ratelimit_test.go b/pkg/util/grpcclient/ratelimit_test.go new file mode 100644 index 00000000000..6e7eb7a466d --- /dev/null +++ b/pkg/util/grpcclient/ratelimit_test.go @@ -0,0 +1,36 @@ +package grpcclient_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/cortexproject/cortex/pkg/util/grpcclient" +) + +func TestRateLimiterFailureResultsInResourceExhaustedError(t *testing.T) { + config := grpcclient.Config{ + RateLimitBurst: 0, + RateLimit: 0, + } + conn := grpc.ClientConn{} + invoker := func(currentCtx context.Context, currentMethod string, currentReq, currentRepl interface{}, currentConn *grpc.ClientConn, currentOpts ...grpc.CallOption) error { + return nil + } + + limiter := grpcclient.NewRateLimiter(&config) + err := limiter(context.Background(), "methodName", "", "expectedReply", &conn, invoker) + + if se, ok := err.(interface { + GRPCStatus() *status.Status + }); ok { + assert.Equal(t, se.GRPCStatus().Code(), codes.ResourceExhausted) + assert.Equal(t, se.GRPCStatus().Message(), "rate: Wait(n=1) exceeds limiter's burst 0") + } else { + assert.Fail(t, "Could not convert error into expected Status type") + } +} diff --git a/pkg/util/http.go b/pkg/util/http.go index 452f993662a..a4afb0cb32e 100644 --- a/pkg/util/http.go +++ b/pkg/util/http.go @@ -129,9 +129,11 @@ func SerializeProtoResponse(w http.ResponseWriter, resp proto.Message, compressi case NoCompression: case FramedSnappy: buf := bytes.Buffer{} - if _, err := snappy.NewWriter(&buf).Write(data); err != nil { + writer := snappy.NewBufferedWriter(&buf) + if _, err := writer.Write(data); err != nil { return err } + writer.Close() data = buf.Bytes() case RawSnappy: data = snappy.Encode(nil, data) diff --git a/pkg/util/middleware/grpc.go b/pkg/util/middleware/grpc.go index d9aa92ac4e4..b0d9d31ba74 100644 --- a/pkg/util/middleware/grpc.go +++ b/pkg/util/middleware/grpc.go @@ -16,7 +16,7 @@ func PrometheusGRPCUnaryInstrumentation(metric *prometheus.HistogramVec) grpc.Un return func(ctx context.Context, method string, req, resp interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { start := time.Now() err := invoker(ctx, method, req, resp, cc, opts...) - metric.WithLabelValues(method, instrument.ErrorCode(err)).Observe(time.Now().Sub(start).Seconds()) + metric.WithLabelValues(method, instrument.ErrorCode(err)).Observe(time.Since(start).Seconds()) return err } } @@ -51,9 +51,9 @@ func (s *instrumentedClientStream) SendMsg(m interface{}) error { } if err == io.EOF { - s.metric.WithLabelValues(s.method, instrument.ErrorCode(nil)).Observe(time.Now().Sub(s.start).Seconds()) + s.metric.WithLabelValues(s.method, instrument.ErrorCode(nil)).Observe(time.Since(s.start).Seconds()) } else { - s.metric.WithLabelValues(s.method, instrument.ErrorCode(err)).Observe(time.Now().Sub(s.start).Seconds()) + s.metric.WithLabelValues(s.method, instrument.ErrorCode(err)).Observe(time.Since(s.start).Seconds()) } return err @@ -66,9 +66,9 @@ func (s *instrumentedClientStream) RecvMsg(m interface{}) error { } if err == io.EOF { - s.metric.WithLabelValues(s.method, instrument.ErrorCode(nil)).Observe(time.Now().Sub(s.start).Seconds()) + s.metric.WithLabelValues(s.method, instrument.ErrorCode(nil)).Observe(time.Since(s.start).Seconds()) } else { - s.metric.WithLabelValues(s.method, instrument.ErrorCode(err)).Observe(time.Now().Sub(s.start).Seconds()) + s.metric.WithLabelValues(s.method, instrument.ErrorCode(err)).Observe(time.Since(s.start).Seconds()) } return err @@ -77,7 +77,7 @@ func (s *instrumentedClientStream) RecvMsg(m interface{}) error { func (s *instrumentedClientStream) Header() (metadata.MD, error) { md, err := s.ClientStream.Header() if err != nil { - s.metric.WithLabelValues(s.method, instrument.ErrorCode(err)).Observe(time.Now().Sub(s.start).Seconds()) + s.metric.WithLabelValues(s.method, instrument.ErrorCode(err)).Observe(time.Since(s.start).Seconds()) } return md, err } diff --git a/pkg/util/priority_queue_test.go b/pkg/util/priority_queue_test.go index dd90b388823..79eaf2784fd 100644 --- a/pkg/util/priority_queue_test.go +++ b/pkg/util/priority_queue_test.go @@ -19,20 +19,6 @@ func (i simpleItem) Key() string { return strconv.FormatInt(int64(i), 10) } -type richItem struct { - priority int64 - key string - value string -} - -func (r richItem) Priority() int64 { - return r.priority -} - -func (r richItem) Key() string { - return r.key -} - func TestPriorityQueueBasic(t *testing.T) { queue := NewPriorityQueue(nil) assert.Equal(t, 0, queue.Length(), "Expected length = 0")