diff --git a/Makefile b/Makefile index ddc792c293c..bd70d0e3f58 100644 --- a/Makefile +++ b/Makefile @@ -41,7 +41,8 @@ endef $(foreach exe, $(EXES), $(eval $(call dep_exe, $(exe)))) # Manually declared dependancies And what goes into each exe -%.pb.go: %.proto +cortex.pb.go: cortex.proto +ring/ring.pb.go: ring/ring.proto all: $(UPTODATE_FILES) test: $(PROTO_GOS) diff --git a/cmd/ingester/main.go b/cmd/ingester/main.go index e3dae25bd53..20efc9b8e5c 100644 --- a/cmd/ingester/main.go +++ b/cmd/ingester/main.go @@ -13,7 +13,6 @@ import ( "github.com/weaveworks/cortex" "github.com/weaveworks/cortex/chunk" "github.com/weaveworks/cortex/ingester" - "github.com/weaveworks/cortex/ring" "github.com/weaveworks/cortex/util" ) @@ -25,44 +24,33 @@ func main() { middleware.ServerUserHeaderInterceptor, }, } - ingesterRegistrationConfig ring.IngesterRegistrationConfig - chunkStoreConfig chunk.StoreConfig - ingesterConfig ingester.Config + chunkStoreConfig chunk.StoreConfig + ingesterConfig ingester.Config ) - // IngesterRegistrator needs to know our gRPC listen port - ingesterRegistrationConfig.ListenPort = &serverConfig.GRPCListenPort - util.RegisterFlags(&serverConfig, &ingesterRegistrationConfig, &chunkStoreConfig, &ingesterConfig) + // Ingester needs to know our gRPC listen port. + ingesterConfig.ListenPort = &serverConfig.GRPCListenPort + util.RegisterFlags(&serverConfig, &chunkStoreConfig, &ingesterConfig) flag.Parse() - registration, err := ring.RegisterIngester(ingesterRegistrationConfig) + chunkStore, err := chunk.NewStore(chunkStoreConfig) if err != nil { - log.Fatalf("Could not register ingester: %v", err) + log.Fatal(err) } - defer registration.Ring.Stop() - chunkStore, err := chunk.NewStore(chunkStoreConfig) + server, err := server.New(serverConfig) if err != nil { - log.Fatal(err) + log.Fatalf("Error initializing server: %v", err) } + defer server.Shutdown() - ingester, err := ingester.New(ingesterConfig, chunkStore, registration.Ring) + ingester, err := ingester.New(ingesterConfig, chunkStore) if err != nil { log.Fatal(err) } prometheus.MustRegister(ingester) + defer ingester.Shutdown() - server, err := server.New(serverConfig) - if err != nil { - log.Fatalf("Error initializing server: %v", err) - } cortex.RegisterIngesterServer(server.GRPC, ingester) - server.HTTP.Handle("/ring", registration.Ring) server.HTTP.Path("/ready").Handler(http.HandlerFunc(ingester.ReadinessHandler)) server.Run() - - // Shutdown order is important! - registration.ChangeState(ring.LEAVING) - ingester.Stop() - registration.Unregister() - server.Shutdown() } diff --git a/cortex.proto b/cortex.proto index b76d6d9b217..4d925baaf92 100644 --- a/cortex.proto +++ b/cortex.proto @@ -13,6 +13,9 @@ service Ingester { rpc LabelValues(LabelValuesRequest) returns (LabelValuesResponse) {}; rpc UserStats(UserStatsRequest) returns (UserStatsResponse) {}; rpc MetricsForLabelMatchers(MetricsForLabelMatchersRequest) returns (MetricsForLabelMatchersResponse) {}; + + // TransferChunks allows leaving ingester (client) to stream chunks directly to joining ingesters (server). + rpc TransferChunks(stream TimeSeriesChunk) returns (TransferChunksResponse) {}; } message WriteRequest { @@ -56,6 +59,22 @@ message MetricsForLabelMatchersResponse { repeated Metric metric = 1; } +message TimeSeriesChunk { + string from_ingester_id = 1; + string user_id = 2; + repeated LabelPair labels = 3 [(gogoproto.nullable) = false]; + repeated Chunk chunks = 4 [(gogoproto.nullable) = false]; +} + +message Chunk { + int64 start_timestamp_ms = 1; + int64 end_timestamp_ms = 2; + int32 encoding = 3; + bytes data = 4; +} + +message TransferChunksResponse { +} message TimeSeries { repeated LabelPair labels = 1 [(gogoproto.nullable) = false]; diff --git a/distributor/distributor.go b/distributor/distributor.go index ec5b7009e34..d9ccc2f6657 100644 --- a/distributor/distributor.go +++ b/distributor/distributor.go @@ -5,16 +5,13 @@ import ( "flag" "fmt" "hash/fnv" + "io" "sync" "sync/atomic" "time" - "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" - "github.com/mwitkow/go-grpc-middleware" - "github.com/opentracing/opentracing-go" "golang.org/x/net/context" "golang.org/x/time/rate" - "google.golang.org/grpc" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" @@ -22,9 +19,9 @@ import ( "github.com/prometheus/prometheus/storage/metric" "github.com/weaveworks/common/instrument" - "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/user" "github.com/weaveworks/cortex" + ingester_client "github.com/weaveworks/cortex/ingester/client" "github.com/weaveworks/cortex/ring" "github.com/weaveworks/cortex/util" ) @@ -46,7 +43,7 @@ type Distributor struct { cfg Config ring ReadRing clientsMtx sync.RWMutex - clients map[string]ingesterClient + clients map[string]cortex.IngesterClient quit chan struct{} done chan struct{} @@ -63,11 +60,6 @@ type Distributor struct { ingesterQueryFailures *prometheus.CounterVec } -type ingesterClient struct { - cortex.IngesterClient - conn *grpc.ClientConn -} - // ReadRing represents the read inferface to the ring. type ReadRing interface { prometheus.Collector @@ -88,7 +80,7 @@ type Config struct { IngestionBurstSize int // for testing - ingesterClientFactory func(string) cortex.IngesterClient + ingesterClientFactory func(addr string, timeout time.Duration) (cortex.IngesterClient, error) } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -106,10 +98,14 @@ func New(cfg Config, ring ReadRing) (*Distributor, error) { if 0 > cfg.ReplicationFactor { return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor) } + if cfg.ingesterClientFactory == nil { + cfg.ingesterClientFactory = ingester_client.MakeIngesterClient + } + d := &Distributor{ cfg: cfg, ring: ring, - clients: map[string]ingesterClient{}, + clients: map[string]cortex.IngesterClient{}, quit: make(chan struct{}), done: make(chan struct{}), ingestLimiters: map[string]*rate.Limiter{}, @@ -193,11 +189,11 @@ func (d *Distributor) removeStaleIngesterClients() { // Do the gRPC closing in the background since it might take a while and // we're holding a mutex. - go func(addr string, conn *grpc.ClientConn) { - if err := conn.Close(); err != nil { + go func(addr string, closer io.Closer) { + if err := closer.Close(); err != nil { log.Errorf("Error closing connection to ingester %q: %v", addr, err) } - }(addr, client.conn) + }(addr, client.(io.Closer)) } } @@ -216,27 +212,9 @@ func (d *Distributor) getClientFor(ingester *ring.IngesterDesc) (cortex.Ingester return client, nil } - if d.cfg.ingesterClientFactory != nil { - client = ingesterClient{ - IngesterClient: d.cfg.ingesterClientFactory(ingester.Addr), - } - } else { - conn, err := grpc.Dial( - ingester.Addr, - grpc.WithTimeout(d.cfg.RemoteTimeout), - grpc.WithInsecure(), - grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( - otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), - middleware.ClientUserHeaderInterceptor, - )), - ) - if err != nil { - return nil, err - } - client = ingesterClient{ - IngesterClient: cortex.NewIngesterClient(conn), - conn: conn, - } + client, err := d.cfg.ingesterClientFactory(ingester.Addr, d.cfg.RemoteTimeout) + if err != nil { + return nil, err } d.clients[ingester.Addr] = client return client, nil diff --git a/distributor/distributor_test.go b/distributor/distributor_test.go index 48076f05177..3d4550acf78 100644 --- a/distributor/distributor_test.go +++ b/distributor/distributor_test.go @@ -40,6 +40,7 @@ func (r mockRing) GetAll() []*ring.IngesterDesc { } type mockIngester struct { + cortex.IngesterClient happy bool } @@ -78,18 +79,6 @@ func (i mockIngester) Query(ctx context.Context, in *cortex.QueryRequest, opts . }, nil } -func (i mockIngester) LabelValues(ctx context.Context, in *cortex.LabelValuesRequest, opts ...grpc.CallOption) (*cortex.LabelValuesResponse, error) { - return nil, nil -} - -func (i mockIngester) UserStats(ctx context.Context, in *cortex.UserStatsRequest, opts ...grpc.CallOption) (*cortex.UserStatsResponse, error) { - return nil, nil -} - -func (i mockIngester) MetricsForLabelMatchers(ctx context.Context, in *cortex.MetricsForLabelMatchersRequest, opts ...grpc.CallOption) (*cortex.MetricsForLabelMatchersResponse, error) { - return nil, nil -} - func TestDistributorPush(t *testing.T) { ctx := user.Inject(context.Background(), "user") for i, tc := range []struct { @@ -106,22 +95,34 @@ func TestDistributorPush(t *testing.T) { // A push to 3 happy ingesters should succeed { - samples: 10, - ingesters: []mockIngester{{true}, {true}, {true}}, + samples: 10, + ingesters: []mockIngester{ + {happy: true}, + {happy: true}, + {happy: true}, + }, expectedResponse: &cortex.WriteResponse{}, }, // A push to 2 happy ingesters should succeed { - samples: 10, - ingesters: []mockIngester{{}, {true}, {true}}, + samples: 10, + ingesters: []mockIngester{ + {}, + {happy: true}, + {happy: true}, + }, expectedResponse: &cortex.WriteResponse{}, }, // A push to 1 happy ingesters should fail { - samples: 10, - ingesters: []mockIngester{{}, {}, {true}}, + samples: 10, + ingesters: []mockIngester{ + {}, + {}, + {happy: true}, + }, expectedError: fmt.Errorf("Fail"), }, @@ -159,8 +160,8 @@ func TestDistributorPush(t *testing.T) { IngestionRateLimit: 10000, IngestionBurstSize: 10000, - ingesterClientFactory: func(addr string) cortex.IngesterClient { - return ingesters[addr] + ingesterClientFactory: func(addr string, _ time.Duration) (cortex.IngesterClient, error) { + return ingesters[addr], nil }, }, ring) if err != nil { @@ -219,25 +220,41 @@ func TestDistributorQuery(t *testing.T) { }{ // A query to 3 happy ingesters should succeed { - ingesters: []mockIngester{{true}, {true}, {true}}, + ingesters: []mockIngester{ + {happy: true}, + {happy: true}, + {happy: true}, + }, expectedResponse: expectedResponse(0, 2), }, // A query to 2 happy ingesters should succeed { - ingesters: []mockIngester{{}, {true}, {true}}, + ingesters: []mockIngester{ + {happy: false}, + {happy: true}, + {happy: true}, + }, expectedResponse: expectedResponse(0, 2), }, // A query to 1 happy ingesters should fail { - ingesters: []mockIngester{{}, {}, {true}}, + ingesters: []mockIngester{ + {happy: false}, + {happy: false}, + {happy: true}, + }, expectedError: fmt.Errorf("Fail"), }, // A query to 0 happy ingesters should succeed { - ingesters: []mockIngester{{}, {}, {}}, + ingesters: []mockIngester{ + {happy: false}, + {happy: false}, + {happy: false}, + }, expectedError: fmt.Errorf("Fail"), }, } { @@ -268,8 +285,8 @@ func TestDistributorQuery(t *testing.T) { IngestionRateLimit: 10000, IngestionBurstSize: 10000, - ingesterClientFactory: func(addr string) cortex.IngesterClient { - return ingesters[addr] + ingesterClientFactory: func(addr string, _ time.Duration) (cortex.IngesterClient, error) { + return ingesters[addr], nil }, }, ring) if err != nil { diff --git a/ingester/client/client.go b/ingester/client/client.go new file mode 100644 index 00000000000..458b46431c7 --- /dev/null +++ b/ingester/client/client.go @@ -0,0 +1,42 @@ +package client + +import ( + "time" + + "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" + "github.com/mwitkow/go-grpc-middleware" + "github.com/opentracing/opentracing-go" + "google.golang.org/grpc" + + "github.com/weaveworks/common/middleware" + "github.com/weaveworks/cortex" +) + +type ingesterClient struct { + cortex.IngesterClient + conn *grpc.ClientConn +} + +// MakeIngesterClient makes a new cortex.IngesterClient +func MakeIngesterClient(addr string, timeout time.Duration) (cortex.IngesterClient, error) { + conn, err := grpc.Dial( + addr, + grpc.WithTimeout(timeout), + grpc.WithInsecure(), + grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( + otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), + middleware.ClientUserHeaderInterceptor, + )), + ) + if err != nil { + return nil, err + } + return &ingesterClient{ + IngesterClient: cortex.NewIngesterClient(conn), + conn: conn, + }, nil +} + +func (c *ingesterClient) Close() error { + return c.conn.Close() +} diff --git a/ingester/ingester.go b/ingester/ingester.go index 730b176aea9..131815a3b77 100644 --- a/ingester/ingester.go +++ b/ingester/ingester.go @@ -3,10 +3,11 @@ package ingester import ( "flag" "fmt" - "net/http" + "os" "sync" "time" + "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -15,7 +16,6 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/storage/local/chunk" "github.com/prometheus/prometheus/storage/metric" - "golang.org/x/net/context" "github.com/weaveworks/common/user" "github.com/weaveworks/cortex" @@ -38,12 +38,6 @@ const ( DefaultMaxSeriesPerUser = 5000000 // DefaultMaxSeriesPerMetric is the maximum number of series in one metric (of a single user). DefaultMaxSeriesPerMetric = 50000 - - minReadyDuration = 1 * time.Minute - - // Backoff for retrying 'immediate' flushes. Only counts for queue - // position, not wallclock time. - flushBackoff = 1 * time.Second ) var ( @@ -73,19 +67,90 @@ var ( ErrDuplicateSampleForTimestamp = fmt.Errorf("sample with repeated timestamp but different value") ) +// Config for an Ingester. +type Config struct { + ringConfig ring.Config + userStatesConfig UserStatesConfig + + // Config for the ingester lifecycle control + ListenPort *int + NumTokens int + HeartbeatPeriod time.Duration + JoinAfter time.Duration + SearchPendingFor time.Duration + ClaimOnRollout bool + + // Config for chunk flushing + FlushCheckPeriod time.Duration + MaxChunkIdle time.Duration + MaxChunkAge time.Duration + ConcurrentFlushes int + ChunkEncoding string + + // For testing, you can override the address and UD of this ingester + addr string + id string + skipUnregister bool +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + cfg.ringConfig.RegisterFlags(f) + cfg.userStatesConfig.RegisterFlags(f) + + f.IntVar(&cfg.NumTokens, "ingester.num-tokens", 128, "Number of tokens for each ingester.") + f.DurationVar(&cfg.HeartbeatPeriod, "ingester.heartbeat-period", 5*time.Second, "Period at which to heartbeat to consul.") + f.DurationVar(&cfg.JoinAfter, "ingester.join-after", 0*time.Second, "Period to wait for a claim from another ingester; will join automatically after this.") + f.DurationVar(&cfg.SearchPendingFor, "ingester.search-pending-for", 30*time.Second, "Time to spend searching for a pending ingester when shutting down.") + f.BoolVar(&cfg.ClaimOnRollout, "ingester.claim-on-rollout", false, "Send chunks to PENDING ingesters on exit.") + + f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.") + f.DurationVar(&cfg.MaxChunkIdle, "ingester.max-chunk-idle", 1*time.Hour, "Maximum chunk idle time before flushing.") + f.DurationVar(&cfg.MaxChunkAge, "ingester.max-chunk-age", 12*time.Hour, "Maximum chunk age time before flushing.") + f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushes", DefaultConcurrentFlush, "Number of concurrent goroutines flushing to dynamodb.") + f.StringVar(&cfg.ChunkEncoding, "ingester.chunk-encoding", "1", "Encoding version to use for chunks.") + + addr, err := util.GetFirstAddressOf(infName) + if err != nil { + log.Fatalf("Failed to get address of %s: %v", infName, err) + } + + hostname, err := os.Hostname() + if err != nil { + log.Fatalf("Failed to get hostname: %v", err) + } + + f.StringVar(&cfg.addr, "ingester.addr", addr, "IP address to register into consul.") + f.StringVar(&cfg.id, "ingester.id", hostname, "ID to register into consul.") +} + // Ingester deals with "in flight" chunks. // Its like MemorySeriesStorage, but simpler. type Ingester struct { cfg Config chunkStore ChunkStore - userStates *userStates - ring *ring.Ring + consul ring.ConsulClient + + userStatesMtx sync.RWMutex + userStates *userStates + + // These values are initialised at startup, and never change + id string + addr string - stopLock sync.RWMutex - stopped bool - quit chan struct{} - done sync.WaitGroup + // Controls the lifecycle of the ingester + stopLock sync.RWMutex + stopped bool + quit chan struct{} + done sync.WaitGroup + actorChan chan func() + // We need to remember the ingester state just in case consul goes away and comes + // back empty. And it changes during lifecycle of ingester. + state ring.IngesterState + tokens []uint32 + + // Controls the ready-reporting readyLock sync.Mutex startTime time.Time ready bool @@ -108,45 +173,8 @@ type ChunkStore interface { Put(ctx context.Context, chunks []cortex_chunk.Chunk) error } -// Config configures an Ingester. -type Config struct { - FlushCheckPeriod time.Duration - MaxChunkIdle time.Duration - MaxChunkAge time.Duration - ConcurrentFlushes int - ChunkEncoding string - UserStatesConfig UserStatesConfig -} - -// RegisterFlags adds the flags required to config this to the given FlagSet -func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.") - f.DurationVar(&cfg.MaxChunkIdle, "ingester.max-chunk-idle", 1*time.Hour, "Maximum chunk idle time before flushing.") - f.DurationVar(&cfg.MaxChunkAge, "ingester.max-chunk-age", 12*time.Hour, "Maximum chunk age time before flushing.") - f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushes", DefaultConcurrentFlush, "Number of concurrent goroutines flushing to dynamodb.") - f.StringVar(&cfg.ChunkEncoding, "ingester.chunk-encoding", "1", "Encoding version to use for chunks.") - f.DurationVar(&cfg.UserStatesConfig.RateUpdatePeriod, "ingester.rate-update-period", 15*time.Second, "Period with which to update the per-user ingestion rates.") - f.IntVar(&cfg.UserStatesConfig.MaxSeriesPerUser, "ingester.max-series-per-user", DefaultMaxSeriesPerUser, "Maximum number of active series per user.") - f.IntVar(&cfg.UserStatesConfig.MaxSeriesPerMetric, "ingester.max-series-per-metric", DefaultMaxSeriesPerMetric, "Maximum number of active series per metric name.") -} - -type flushOp struct { - from model.Time - userID string - fp model.Fingerprint - immediate bool -} - -func (o *flushOp) Key() string { - return fmt.Sprintf("%s-%d-%v", o.userID, o.fp, o.immediate) -} - -func (o *flushOp) Priority() int64 { - return -int64(o.from) -} - // New constructs a new Ingester. -func New(cfg Config, chunkStore ChunkStore, ring *ring.Ring) (*Ingester, error) { +func New(cfg Config, chunkStore ChunkStore) (*Ingester, error) { if cfg.FlushCheckPeriod == 0 { cfg.FlushCheckPeriod = 1 * time.Minute } @@ -159,29 +187,40 @@ func New(cfg Config, chunkStore ChunkStore, ring *ring.Ring) (*Ingester, error) if cfg.ChunkEncoding == "" { cfg.ChunkEncoding = "1" } - if cfg.UserStatesConfig.RateUpdatePeriod == 0 { - cfg.UserStatesConfig.RateUpdatePeriod = 15 * time.Second + if cfg.userStatesConfig.RateUpdatePeriod == 0 { + cfg.userStatesConfig.RateUpdatePeriod = 15 * time.Second } - if cfg.UserStatesConfig.MaxSeriesPerUser <= 0 { - cfg.UserStatesConfig.MaxSeriesPerUser = DefaultMaxSeriesPerUser + if cfg.userStatesConfig.MaxSeriesPerUser <= 0 { + cfg.userStatesConfig.MaxSeriesPerUser = DefaultMaxSeriesPerUser } - if cfg.UserStatesConfig.MaxSeriesPerMetric <= 0 { - cfg.UserStatesConfig.MaxSeriesPerMetric = DefaultMaxSeriesPerMetric + if cfg.userStatesConfig.MaxSeriesPerMetric <= 0 { + cfg.userStatesConfig.MaxSeriesPerMetric = DefaultMaxSeriesPerMetric } if err := chunk.DefaultEncoding.Set(cfg.ChunkEncoding); err != nil { return nil, err } + codec := ring.ProtoCodec{Factory: ring.ProtoDescFactory} + consul, err := ring.NewConsulClient(cfg.ringConfig.ConsulConfig, codec) + if err != nil { + return nil, err + } + i := &Ingester{ cfg: cfg, + consul: consul, chunkStore: chunkStore, - quit: make(chan struct{}), - ring: ring, + userStates: newUserStates(&cfg.userStatesConfig), + addr: fmt.Sprintf("%s:%d", cfg.addr, *cfg.ListenPort), + id: cfg.id, + + quit: make(chan struct{}), + actorChan: make(chan func()), + state: ring.PENDING, startTime: time.Now(), - userStates: newUserStates(&cfg.UserStatesConfig), flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes, cfg.ConcurrentFlushes), ingestedSamples: prometheus.NewCounter(prometheus.CounterOpts{ @@ -225,36 +264,8 @@ func New(cfg Config, chunkStore ChunkStore, ring *ring.Ring) (*Ingester, error) i.done.Add(1) go i.loop() - return i, nil -} - -// ReadinessHandler is used to indicate to k8s when the ingesters are ready for -// the addition removal of another ingester. Returns 204 when the ingester is -// ready, 500 otherwise. -func (i *Ingester) ReadinessHandler(w http.ResponseWriter, r *http.Request) { - if i.isReady() { - w.WriteHeader(http.StatusNoContent) - } else { - w.WriteHeader(http.StatusInternalServerError) - } -} - -func (i *Ingester) isReady() bool { - i.readyLock.Lock() - defer i.readyLock.Unlock() - if i.ready { - return true - } - - // Ingester always take at least minReadyDuration to become ready to work - // around race conditions with ingesters exiting and updating the ring - if time.Now().Sub(i.startTime) < minReadyDuration { - return false - } - - i.ready = i.ready || i.ring.Ready() - return i.ready + return i, nil } // Push implements cortex.IngesterServer @@ -293,6 +304,8 @@ func (i *Ingester) append(ctx context.Context, sample *model.Sample) error { return fmt.Errorf("ingester stopping") } + i.userStatesMtx.RLock() + defer i.userStatesMtx.RUnlock() state, fp, series, err := i.userStates.getOrCreateSeries(ctx, sample.Metric) if err != nil { return err @@ -334,6 +347,8 @@ func (i *Ingester) Query(ctx context.Context, req *cortex.QueryRequest) (*cortex func (i *Ingester) query(ctx context.Context, from, through model.Time, matchers []*metric.LabelMatcher) (model.Matrix, error) { i.queries.Inc() + i.userStatesMtx.RLock() + defer i.userStatesMtx.RUnlock() state, err := i.userStates.getOrCreate(ctx) if err != nil { return nil, err @@ -360,6 +375,8 @@ func (i *Ingester) query(ctx context.Context, from, through model.Time, matchers // LabelValues returns all label values that are associated with a given label name. func (i *Ingester) LabelValues(ctx context.Context, req *cortex.LabelValuesRequest) (*cortex.LabelValuesResponse, error) { + i.userStatesMtx.RLock() + defer i.userStatesMtx.RUnlock() state, err := i.userStates.getOrCreate(ctx) if err != nil { return nil, err @@ -375,6 +392,8 @@ func (i *Ingester) LabelValues(ctx context.Context, req *cortex.LabelValuesReque // MetricsForLabelMatchers returns all the metrics which match a set of matchers. func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *cortex.MetricsForLabelMatchersRequest) (*cortex.MetricsForLabelMatchersResponse, error) { + i.userStatesMtx.RLock() + defer i.userStatesMtx.RUnlock() state, err := i.userStates.getOrCreate(ctx) if err != nil { return nil, err @@ -408,6 +427,8 @@ func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *cortex.Metr // UserStats returns ingestion statistics for the current user. func (i *Ingester) UserStats(ctx context.Context, req *cortex.UserStatsRequest) (*cortex.UserStatsResponse, error) { + i.userStatesMtx.RLock() + defer i.userStatesMtx.RUnlock() state, err := i.userStates.getOrCreate(ctx) if err != nil { return nil, err @@ -419,204 +440,6 @@ func (i *Ingester) UserStats(ctx context.Context, req *cortex.UserStatsRequest) }, nil } -// Stop stops the Ingester. -func (i *Ingester) Stop() { - i.stopLock.Lock() - i.stopped = true - i.stopLock.Unlock() - - // Closing i.quit triggers i.loop() to exit; i.loop() exiting - // will trigger i.flushLoop()s to exit. - close(i.quit) - - i.done.Wait() -} - -func (i *Ingester) loop() { - defer func() { - i.sweepUsers(true) - - // We close flush queue here to ensure the flushLoops pick - // up all the flushes triggered by the last run - for _, flushQueue := range i.flushQueues { - flushQueue.Close() - } - - log.Infof("Ingester.loop() exited gracefully") - i.done.Done() - }() - - flushTick := time.Tick(i.cfg.FlushCheckPeriod) - rateUpdateTick := time.Tick(i.cfg.UserStatesConfig.RateUpdatePeriod) - for { - select { - case <-flushTick: - i.sweepUsers(false) - case <-rateUpdateTick: - i.userStates.updateRates() - case <-i.quit: - return - } - } -} - -// sweepUsers periodically schedules series for flushing and garbage collects users with no series -func (i *Ingester) sweepUsers(immediate bool) { - if i.chunkStore == nil { - return - } - - for id, state := range i.userStates.cp() { - for pair := range state.fpToSeries.iter() { - state.fpLocker.Lock(pair.fp) - i.sweepSeries(id, pair.fp, pair.series, immediate) - state.fpLocker.Unlock(pair.fp) - } - } -} - -// sweepSeries schedules a series for flushing based on a set of criteria -// -// NB we don't close the head chunk here, as the series could wait in the queue -// for some time, and we want to encourage chunks to be as full as possible. -func (i *Ingester) sweepSeries(userID string, fp model.Fingerprint, series *memorySeries, immediate bool) { - if len(series.chunkDescs) <= 0 { - return - } - - firstTime := series.firstTime() - flush := i.shouldFlushSeries(series, immediate) - - if flush { - flushQueueIndex := int(uint64(fp) % uint64(i.cfg.ConcurrentFlushes)) - i.flushQueues[flushQueueIndex].Enqueue(&flushOp{firstTime, userID, fp, immediate}) - } -} - -func (i *Ingester) shouldFlushSeries(series *memorySeries, immediate bool) bool { - // Series should be scheduled for flushing if they have more than one chunk - if immediate || len(series.chunkDescs) > 1 { - return true - } - - // Or if the only existing chunk need flushing - if len(series.chunkDescs) > 0 { - return i.shouldFlushChunk(series.chunkDescs[0]) - } - - return false -} - -func (i *Ingester) shouldFlushChunk(c *desc) bool { - // Chunks should be flushed if their oldest entry is older than MaxChunkAge - if model.Now().Sub(c.FirstTime) > i.cfg.MaxChunkAge { - return true - } - - // Chunk should be flushed if their last entry is older then MaxChunkIdle - if model.Now().Sub(c.LastTime) > i.cfg.MaxChunkIdle { - return true - } - - return false -} - -func (i *Ingester) flushLoop(j int) { - defer func() { - log.Info("Ingester.flushLoop() exited") - i.done.Done() - }() - - for { - o := i.flushQueues[j].Dequeue() - if o == nil { - return - } - op := o.(*flushOp) - - err := i.flushUserSeries(op.userID, op.fp, op.immediate) - if err != nil { - log.Errorf("Failed to flush user %v: %v", op.userID, err) - } - - // If we're exiting & we failed to flush, put the failed operation - // back in the queue at a later point. - if op.immediate && err != nil { - op.from = op.from.Add(flushBackoff) - i.flushQueues[j].Enqueue(op) - } - } -} - -func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error { - userState, ok := i.userStates.get(userID) - if !ok { - return nil - } - - series, ok := userState.fpToSeries.get(fp) - if !ok { - return nil - } - - userState.fpLocker.Lock(fp) - if !i.shouldFlushSeries(series, immediate) { - userState.fpLocker.Unlock(fp) - return nil - } - - // Assume we're going to flush everything, and maybe don't flush the head chunk if it doesn't need it. - chunks := series.chunkDescs - if immediate || (len(chunks) > 0 && i.shouldFlushChunk(series.head())) { - series.closeHead() - } else { - chunks = chunks[:len(chunks)-1] - } - userState.fpLocker.Unlock(fp) - - if len(chunks) == 0 { - return nil - } - - // flush the chunks without locking the series, as we don't want to hold the series lock for the duration of the dynamo/s3 rpcs. - ctx := user.Inject(context.Background(), userID) - err := i.flushChunks(ctx, fp, series.metric, chunks) - if err != nil { - return err - } - - // now remove the chunks - userState.fpLocker.Lock(fp) - series.chunkDescs = series.chunkDescs[len(chunks):] - i.memoryChunks.Sub(float64(len(chunks))) - if len(series.chunkDescs) == 0 { - userState.removeSeries(fp, series.metric) - } - userState.fpLocker.Unlock(fp) - return nil -} - -func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, metric model.Metric, chunkDescs []*desc) error { - wireChunks := make([]cortex_chunk.Chunk, 0, len(chunkDescs)) - for _, chunkDesc := range chunkDescs { - wireChunks = append(wireChunks, cortex_chunk.NewChunk(fp, metric, chunkDesc.C, chunkDesc.FirstTime, chunkDesc.LastTime)) - } - - err := i.chunkStore.Put(ctx, wireChunks) - if err != nil { - return err - } - - // Record statistsics only when actual put request did not return error. - for _, chunkDesc := range chunkDescs { - i.chunkUtilization.Observe(chunkDesc.C.Utilization()) - i.chunkLength.Observe(float64(chunkDesc.C.Len())) - i.chunkAge.Observe(model.Now().Sub(chunkDesc.FirstTime).Seconds()) - } - - return nil -} - // Describe implements prometheus.Collector. func (i *Ingester) Describe(ch chan<- *prometheus.Desc) { ch <- memorySeriesDesc @@ -633,6 +456,8 @@ func (i *Ingester) Describe(ch chan<- *prometheus.Desc) { // Collect implements prometheus.Collector. func (i *Ingester) Collect(ch chan<- prometheus.Metric) { + i.userStatesMtx.RLock() + defer i.userStatesMtx.RUnlock() numUsers := i.userStates.numUsers() numSeries := i.userStates.numSeries() diff --git a/ingester/ingester_claim.go b/ingester/ingester_claim.go new file mode 100644 index 00000000000..4a7b22c3f39 --- /dev/null +++ b/ingester/ingester_claim.go @@ -0,0 +1,138 @@ +package ingester + +import ( + "io" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage/local/chunk" + + "github.com/weaveworks/common/user" + "github.com/weaveworks/cortex" + + "github.com/weaveworks/cortex/ring" + "github.com/weaveworks/cortex/util" +) + +var ( + sentChunks = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "cortex_ingester_sent_chunks", + Help: "The total number of chunks sent by this ingester whilst leaving.", + }) + receivedChunks = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "cortex_ingester_received_chunks", + Help: "The total number of chunks received by this ingester whilst joining", + }) +) + +func init() { + prometheus.MustRegister(sentChunks) + prometheus.MustRegister(receivedChunks) +} + +// TransferChunks receives all the chunks from another ingester. +func (i *Ingester) TransferChunks(stream cortex.Ingester_TransferChunksServer) error { + // Enter JOINING state (only valid from PENDING) + if err := i.ChangeState(ring.JOINING); err != nil { + return err + } + + userStates := newUserStates(&i.cfg.userStatesConfig) + fromIngesterID := "" + + for { + wireSeries, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + return err + } + + // We can't send "extra" fields with a streaming call, so we repeat + // wireSeries.FromIngesterId and assume it is the same every time + // round this loop. + fromIngesterID = wireSeries.FromIngesterId + metric := util.FromLabelPairs(wireSeries.Labels) + userCtx := user.Inject(stream.Context(), wireSeries.UserId) + descs, err := fromWireChunks(wireSeries.Chunks) + if err != nil { + return err + } + + state, fp, series, err := userStates.getOrCreateSeries(userCtx, metric) + if err != nil { + return err + } + prevNumChunks := len(series.chunkDescs) + + err = series.setChunks(descs) + state.fpLocker.Unlock(fp) // acquired in getOrCreateSeries + if err != nil { + return err + } + + i.memoryChunks.Add(float64(len(series.chunkDescs) - prevNumChunks)) + sentChunks.Add(float64(len(descs))) + } + + if err := stream.SendAndClose(&cortex.TransferChunksResponse{}); err != nil { + return err + } + + if err := i.ClaimTokensFor(fromIngesterID); err != nil { + return err + } + + i.userStatesMtx.Lock() + defer i.userStatesMtx.Unlock() + + if err := i.ChangeState(ring.ACTIVE); err != nil { + return err + } + i.userStates = userStates + + return nil +} + +func toWireChunks(descs []*desc) ([]cortex.Chunk, error) { + wireChunks := make([]cortex.Chunk, 0, len(descs)) + for _, d := range descs { + wireChunk := cortex.Chunk{ + StartTimestampMs: int64(d.FirstTime), + EndTimestampMs: int64(d.LastTime), + Encoding: int32(d.C.Encoding()), + Data: make([]byte, chunk.ChunkLen, chunk.ChunkLen), + } + + if err := d.C.MarshalToBuf(wireChunk.Data); err != nil { + return nil, err + } + + wireChunks = append(wireChunks, wireChunk) + } + return wireChunks, nil +} + +func fromWireChunks(wireChunks []cortex.Chunk) ([]*desc, error) { + descs := make([]*desc, 0, len(wireChunks)) + for _, c := range wireChunks { + desc := &desc{ + FirstTime: model.Time(c.StartTimestampMs), + LastTime: model.Time(c.EndTimestampMs), + } + + var err error + desc.C, err = chunk.NewForEncoding(chunk.Encoding(byte(c.Encoding))) + if err != nil { + return nil, err + } + + if err := desc.C.UnmarshalFromBuf(c.Data); err != nil { + return nil, err + } + + descs = append(descs, desc) + } + return descs, nil +} diff --git a/ingester/ingester_flush.go b/ingester/ingester_flush.go new file mode 100644 index 00000000000..a9e3480200e --- /dev/null +++ b/ingester/ingester_flush.go @@ -0,0 +1,192 @@ +package ingester + +import ( + "fmt" + "time" + + "golang.org/x/net/context" + + "github.com/prometheus/common/log" + "github.com/prometheus/common/model" + + "github.com/weaveworks/common/user" + "github.com/weaveworks/cortex/chunk" +) + +const ( + // Backoff for retrying 'immediate' flushes. Only counts for queue + // position, not wallclock time. + flushBackoff = 1 * time.Second +) + +type flushOp struct { + from model.Time + userID string + fp model.Fingerprint + immediate bool +} + +func (o *flushOp) Key() string { + return fmt.Sprintf("%s-%d-%v", o.userID, o.fp, o.immediate) +} + +func (o *flushOp) Priority() int64 { + return -int64(o.from) +} + +// sweepUsers periodically schedules series for flushing and garbage collects users with no series +func (i *Ingester) sweepUsers(immediate bool) { + if i.chunkStore == nil { + return + } + + for id, state := range i.userStates.cp() { + for pair := range state.fpToSeries.iter() { + state.fpLocker.Lock(pair.fp) + i.sweepSeries(id, pair.fp, pair.series, immediate) + state.fpLocker.Unlock(pair.fp) + } + } +} + +// sweepSeries schedules a series for flushing based on a set of criteria +// +// NB we don't close the head chunk here, as the series could wait in the queue +// for some time, and we want to encourage chunks to be as full as possible. +func (i *Ingester) sweepSeries(userID string, fp model.Fingerprint, series *memorySeries, immediate bool) { + if len(series.chunkDescs) <= 0 { + return + } + + firstTime := series.firstTime() + flush := i.shouldFlushSeries(series, immediate) + + if flush { + flushQueueIndex := int(uint64(fp) % uint64(i.cfg.ConcurrentFlushes)) + i.flushQueues[flushQueueIndex].Enqueue(&flushOp{firstTime, userID, fp, immediate}) + } +} + +func (i *Ingester) shouldFlushSeries(series *memorySeries, immediate bool) bool { + // Series should be scheduled for flushing if they have more than one chunk + if immediate || len(series.chunkDescs) > 1 { + return true + } + + // Or if the only existing chunk need flushing + if len(series.chunkDescs) > 0 { + return i.shouldFlushChunk(series.chunkDescs[0]) + } + + return false +} + +func (i *Ingester) shouldFlushChunk(c *desc) bool { + // Chunks should be flushed if their oldest entry is older than MaxChunkAge + if model.Now().Sub(c.FirstTime) > i.cfg.MaxChunkAge { + return true + } + + // Chunk should be flushed if their last entry is older then MaxChunkIdle + if model.Now().Sub(c.LastTime) > i.cfg.MaxChunkIdle { + return true + } + + return false +} + +func (i *Ingester) flushLoop(j int) { + defer func() { + log.Info("Ingester.flushLoop() exited") + i.done.Done() + }() + + for { + o := i.flushQueues[j].Dequeue() + if o == nil { + return + } + op := o.(*flushOp) + + err := i.flushUserSeries(op.userID, op.fp, op.immediate) + if err != nil { + log.Errorf("Failed to flush user %v: %v", op.userID, err) + } + + // If we're exiting & we failed to flush, put the failed operation + // back in the queue at a later point. + if op.immediate && err != nil { + op.from = op.from.Add(flushBackoff) + i.flushQueues[j].Enqueue(op) + } + } +} + +func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error { + userState, ok := i.userStates.get(userID) + if !ok { + return nil + } + + series, ok := userState.fpToSeries.get(fp) + if !ok { + return nil + } + + userState.fpLocker.Lock(fp) + if !i.shouldFlushSeries(series, immediate) { + userState.fpLocker.Unlock(fp) + return nil + } + + // Assume we're going to flush everything, and maybe don't flush the head chunk if it doesn't need it. + chunks := series.chunkDescs + if immediate || (len(chunks) > 0 && i.shouldFlushChunk(series.head())) { + series.closeHead() + } else { + chunks = chunks[:len(chunks)-1] + } + userState.fpLocker.Unlock(fp) + + if len(chunks) == 0 { + return nil + } + + // flush the chunks without locking the series, as we don't want to hold the series lock for the duration of the dynamo/s3 rpcs. + ctx := user.Inject(context.Background(), userID) + err := i.flushChunks(ctx, fp, series.metric, chunks) + if err != nil { + return err + } + + // now remove the chunks + userState.fpLocker.Lock(fp) + series.chunkDescs = series.chunkDescs[len(chunks):] + i.memoryChunks.Sub(float64(len(chunks))) + if len(series.chunkDescs) == 0 { + userState.removeSeries(fp, series.metric) + } + userState.fpLocker.Unlock(fp) + return nil +} + +func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, metric model.Metric, chunkDescs []*desc) error { + wireChunks := make([]chunk.Chunk, 0, len(chunkDescs)) + for _, chunkDesc := range chunkDescs { + wireChunks = append(wireChunks, chunk.NewChunk(fp, metric, chunkDesc.C, chunkDesc.FirstTime, chunkDesc.LastTime)) + } + + err := i.chunkStore.Put(ctx, wireChunks) + if err != nil { + return err + } + + // Record statistsics only when actual put request did not return error. + for _, chunkDesc := range chunkDescs { + i.chunkUtilization.Observe(chunkDesc.C.Utilization()) + i.chunkLength.Observe(float64(chunkDesc.C.Len())) + i.chunkAge.Observe(model.Now().Sub(chunkDesc.FirstTime).Seconds()) + } + + return nil +} diff --git a/ingester/ingester_lifecycle.go b/ingester/ingester_lifecycle.go new file mode 100644 index 00000000000..511831751dc --- /dev/null +++ b/ingester/ingester_lifecycle.go @@ -0,0 +1,407 @@ +// Responsible for managing the ingester lifecycle. + +package ingester + +import ( + "fmt" + "io" + "net/http" + "sort" + "time" + + "golang.org/x/net/context" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/log" + + "github.com/weaveworks/common/user" + "github.com/weaveworks/cortex" + "github.com/weaveworks/cortex/ingester/client" + "github.com/weaveworks/cortex/ring" + "github.com/weaveworks/cortex/util" +) + +const ( + infName = "eth0" + minReadyDuration = 1 * time.Minute + pendingSearchIterations = 10 +) + +var ( + consulHeartbeats = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "cortex_ingester_consul_heartbeats_total", + Help: "The total number of heartbeats sent to consul.", + }) +) + +func init() { + prometheus.MustRegister(consulHeartbeats) +} + +// ReadinessHandler is used to indicate to k8s when the ingesters are ready for +// the addition removal of another ingester. Returns 204 when the ingester is +// ready, 500 otherwise. +func (i *Ingester) ReadinessHandler(w http.ResponseWriter, r *http.Request) { + if i.isReady() { + w.WriteHeader(http.StatusNoContent) + } else { + w.WriteHeader(http.StatusInternalServerError) + } +} + +func (i *Ingester) isReady() bool { + i.readyLock.Lock() + defer i.readyLock.Unlock() + + if i.ready { + return true + } + + // Ingester always take at least minReadyDuration to become ready to work + // around race conditions with ingesters exiting and updating the ring + if time.Now().Sub(i.startTime) < minReadyDuration { + return false + } + + ringDesc, err := i.consul.Get(ring.ConsulKey) + if err != nil { + log.Error("Error talking to consul: %v", err) + return false + } + + i.ready = i.ready || ringDesc.(*ring.Desc).Ready(i.cfg.ringConfig.HeartbeatTimeout) + return i.ready +} + +// ChangeState of the ingester, for use off of the loop() goroutine. +func (i *Ingester) ChangeState(state ring.IngesterState) error { + err := make(chan error) + i.actorChan <- func() { + err <- i.changeState(state) + } + return <-err +} + +// ClaimTokensFor takes all the tokens for the supplied ingester and assigns them to this ingester. +func (i *Ingester) ClaimTokensFor(ingesterID string) error { + err := make(chan error) + + i.actorChan <- func() { + var tokens []uint32 + + claimTokens := func(in interface{}) (out interface{}, retry bool, err error) { + ringDesc, ok := in.(*ring.Desc) + if !ok || ringDesc == nil { + return nil, false, fmt.Errorf("Cannot claim tokens in an empty ring") + } + + tokens = ringDesc.ClaimTokens(ingesterID, i.id) + return ringDesc, true, nil + } + + if err := i.consul.CAS(ring.ConsulKey, claimTokens); err != nil { + log.Errorf("Failed to write to consul: %v", err) + } + + i.tokens = tokens + err <- nil + } + + return <-err +} + +// Shutdown stops the ingester. It will: +// - send chunks to another ingester, if it can. +// - otherwise, flush chunks to the chunk store. +// - remove config from Consul. +// - block until we've successfully shutdown. +func (i *Ingester) Shutdown() { + // This will prevent us accepting any more samples + i.stopLock.Lock() + i.stopped = true + i.stopLock.Unlock() + + // closing i.quit triggers loop() to exit, which in turn will trigger + // the removal of our tokens etc + close(i.quit) + + i.done.Wait() +} + +func (i *Ingester) loop() { + defer func() { + log.Infof("Ingester.loop() exited gracefully") + i.done.Done() + }() + + // First, see if we exist in the cluster, update our state to match if we do, + // and add ourselves (without tokens) if we don't. + if err := i.initRing(); err != nil { + log.Fatalf("Failed to join consul: %v", err) + } + + // We do various period tasks + autoJoinAfter := time.After(i.cfg.JoinAfter) + + heartbeatTicker := time.NewTicker(i.cfg.HeartbeatPeriod) + defer heartbeatTicker.Stop() + + flushTicker := time.NewTicker(i.cfg.FlushCheckPeriod) + defer flushTicker.Stop() + + rateUpdateTicker := time.NewTicker(i.cfg.userStatesConfig.RateUpdatePeriod) + defer rateUpdateTicker.Stop() + +loop: + for { + select { + case <-autoJoinAfter: + // Will only fire once, after auto join timeout. If we haven't entered "JOINING" state, + // then pick some tokens and enter ACTIVE state. + if i.state == ring.PENDING { + log.Infof("Auto-joining cluster after timout.") + if err := i.autoJoin(); err != nil { + log.Fatalf("failed to pick tokens in consul: %v", err) + } + } + + case <-heartbeatTicker.C: + consulHeartbeats.Inc() + if err := i.updateConsul(); err != nil { + log.Errorf("Failed to write to consul, sleeping: %v", err) + } + + case <-flushTicker.C: + i.sweepUsers(false) + + case <-rateUpdateTicker.C: + i.userStates.updateRates() + + case f := <-i.actorChan: + f() + + case <-i.quit: + break loop + } + } + + // Mark ourselved as Leaving so no more samples are send to us. + i.changeState(ring.LEAVING) + + flushRequired := true + if i.cfg.ClaimOnRollout { + if err := i.transferChunks(); err != nil { + log.Fatalf("Failed to transfer chunks to another ingester: %v", err) + } + flushRequired = false + } + if flushRequired { + i.flushAllChunks() + } + + // Close the flush queues, will wait for chunks to be flushed. + for _, flushQueue := range i.flushQueues { + flushQueue.Close() + } + + if !i.cfg.skipUnregister { + if err := i.unregister(); err != nil { + log.Fatalf("Failed to unregister from consul: %v", err) + } + log.Infof("Ingester removed from consul") + } +} + +// initRing is the first thing we do when we start. It: +// - add an ingester entry to the ring +// - copies out our state and tokens if they exist +func (i *Ingester) initRing() error { + return i.consul.CAS(ring.ConsulKey, func(in interface{}) (out interface{}, retry bool, err error) { + var ringDesc *ring.Desc + if in == nil { + ringDesc = ring.NewDesc() + } else { + ringDesc = in.(*ring.Desc) + } + + ingesterDesc, ok := ringDesc.Ingesters[i.id] + if !ok { + // Either we are a new ingester, or consul must have restarted + log.Infof("Entry not found in ring, adding with no tokens.") + ringDesc.AddIngester(i.id, i.addr, []uint32{}, i.state) + return ringDesc, true, nil + } + + // We exist in the ring, so assume the ring is right and copy out tokens & state out of there. + i.state = ingesterDesc.State + i.tokens, _ = ringDesc.TokensFor(i.id) + + log.Infof("Existing entry found in ring with state=%s, tokens=%v.", i.state, i.tokens) + return ringDesc, true, nil + }) +} + +// autoJoin selects random tokens & moves state to ACTIVE +func (i *Ingester) autoJoin() error { + return i.consul.CAS(ring.ConsulKey, func(in interface{}) (out interface{}, retry bool, err error) { + var ringDesc *ring.Desc + if in == nil { + ringDesc = ring.NewDesc() + } else { + ringDesc = in.(*ring.Desc) + } + + // At this point, we should not have any tokens, and we should be in PENDING state. + myTokens, takenTokens := ringDesc.TokensFor(i.id) + if len(myTokens) > 0 { + log.Errorf("%d tokens already exist for this ingester - wasn't expecting any!", len(myTokens)) + } + + newTokens := ring.GenerateTokens(i.cfg.NumTokens-len(myTokens), takenTokens) + i.state = ring.ACTIVE + ringDesc.AddIngester(i.id, i.addr, newTokens, i.state) + + tokens := append(myTokens, newTokens...) + sort.Sort(sortableUint32(tokens)) + i.tokens = tokens + + return ringDesc, true, nil + }) +} + +// updateConsul updates our entries in consul, heartbeating and dealing with +// consul restarts. +func (i *Ingester) updateConsul() error { + return i.consul.CAS(ring.ConsulKey, func(in interface{}) (out interface{}, retry bool, err error) { + var ringDesc *ring.Desc + if in == nil { + ringDesc = ring.NewDesc() + } else { + ringDesc = in.(*ring.Desc) + } + + ingesterDesc, ok := ringDesc.Ingesters[i.id] + if !ok { + // consul must have restarted + log.Infof("Found empty ring, inserting tokens!") + ringDesc.AddIngester(i.id, i.addr, i.tokens, i.state) + } else { + ingesterDesc.Timestamp = time.Now().Unix() + ingesterDesc.State = i.state + ingesterDesc.Addr = i.addr + ringDesc.Ingesters[i.id] = ingesterDesc + } + + return ringDesc, true, nil + }) +} + +// changeState updates consul with state transitions for us. NB this must be +// called from loop()! Use ChangeState for calls from outside of loop(). +func (i *Ingester) changeState(state ring.IngesterState) error { + // Only the following state transitions can be triggered externally + if !((i.state == ring.PENDING && state == ring.JOINING) || // triggered by ClaimStart + (i.state == ring.PENDING && state == ring.ACTIVE) || // triggered by autoJoin + (i.state == ring.JOINING && state == ring.ACTIVE) || // triggered by ClaimFinish + (i.state == ring.ACTIVE && state == ring.LEAVING)) { // triggered by shutdown + return fmt.Errorf("Changing ingester state from %v -> %v is disallowed", i.state, state) + } + + log.Infof("Changing ingester state from %v -> %v", i.state, state) + i.state = state + return i.updateConsul() +} + +// transferChunks finds an ingester in PENDING state and transfers our chunks +// to it. +func (i *Ingester) transferChunks() error { + targetIngester := i.findTargetIngester() + if targetIngester == nil { + return fmt.Errorf("cannot find ingester to transfer chunks to") + } + + log.Infof("Sending chunks to %v", targetIngester.Addr) + client, err := client.MakeIngesterClient(targetIngester.Addr, i.cfg.SearchPendingFor) + if err != nil { + return err + } + defer client.(io.Closer).Close() + + ctx := user.Inject(context.Background(), "-1") + stream, err := client.TransferChunks(ctx) + if err != nil { + return err + } + + for userID, state := range i.userStates.cp() { + for pair := range state.fpToSeries.iter() { + state.fpLocker.Lock(pair.fp) + + chunks, err := toWireChunks(pair.series.chunkDescs) + if err != nil { + state.fpLocker.Unlock(pair.fp) + return err + } + + err = stream.Send(&cortex.TimeSeriesChunk{ + FromIngesterId: i.id, + UserId: userID, + Labels: util.ToLabelPairs(pair.series.metric), + Chunks: chunks, + }) + state.fpLocker.Unlock(pair.fp) + if err != nil { + return err + } + + sentChunks.Add(float64(len(chunks))) + } + } + + _, err = stream.CloseAndRecv() + if err != nil { + return err + } + + return nil +} + +// findTargetIngester finds an ingester in PENDING state. +func (i *Ingester) findTargetIngester() *ring.IngesterDesc { + for j := 0; j < pendingSearchIterations; j++ { + ringDesc, err := i.consul.Get(ring.ConsulKey) + if err != nil { + log.Errorf("Error talking to consul: %v", err) + time.Sleep(i.cfg.SearchPendingFor / pendingSearchIterations) + continue + } + + ingesters := ringDesc.(*ring.Desc).FindIngestersByState(ring.PENDING) + if len(ingesters) <= 0 { + log.Warnf("No pending ingesters found...") + time.Sleep(i.cfg.SearchPendingFor / pendingSearchIterations) + continue + } + + return ingesters[0] + } + return nil +} + +// flushChunks writes all remaining chunks to the chunkStore, +func (i *Ingester) flushAllChunks() { + i.sweepUsers(true) +} + +// unregister removes our entry from consul. +func (i *Ingester) unregister() error { + return i.consul.CAS(ring.ConsulKey, func(in interface{}) (out interface{}, retry bool, err error) { + if in == nil { + return nil, false, fmt.Errorf("found empty ring when trying to unregister") + } + + ringDesc := in.(*ring.Desc) + ringDesc.RemoveIngester(i.id) + return ringDesc, true, nil + }) +} diff --git a/ingester/ingester_lifecycle_test.go b/ingester/ingester_lifecycle_test.go new file mode 100644 index 00000000000..cebaefeeeaf --- /dev/null +++ b/ingester/ingester_lifecycle_test.go @@ -0,0 +1,97 @@ +package ingester + +import ( + "reflect" + "runtime" + "testing" + "time" + + "github.com/prometheus/common/log" + "github.com/stretchr/testify/require" + "github.com/weaveworks/cortex/ring" +) + +func defaultIngesterTestConfig() Config { + consul := ring.NewMockConsulClient() + return Config{ + ringConfig: ring.Config{ + ConsulConfig: ring.ConsulConfig{ + Mock: consul, + }, + }, + + NumTokens: 1, + HeartbeatPeriod: 5 * time.Second, + ListenPort: func(i int) *int { return &i }(0), + addr: "localhost", + id: "localhost", + + FlushCheckPeriod: 99999 * time.Hour, + MaxChunkIdle: 99999 * time.Hour, + } +} + +// TestIngesterRestart tests a restarting ingester doesn't keep adding more tokens. +func TestIngesterRestart(t *testing.T) { + config := defaultIngesterTestConfig() + config.skipUnregister = true + + { + ingester, err := New(config, nil) + require.NoError(t, err) + time.Sleep(100 * time.Millisecond) + ingester.Shutdown() // doesn't actually unregister due to skipUnregister: true + } + + poll(t, 100*time.Millisecond, 1, func() interface{} { + return numTokens(config.ringConfig.ConsulConfig.Mock, "localhost") + }) + + { + ingester, err := New(config, nil) + require.NoError(t, err) + time.Sleep(100 * time.Millisecond) + ingester.Shutdown() // doesn't actually unregister due to skipUnregister: true + } + + time.Sleep(200 * time.Millisecond) + + poll(t, 100*time.Millisecond, 1, func() interface{} { + return numTokens(config.ringConfig.ConsulConfig.Mock, "localhost") + }) +} + +func numTokens(c ring.ConsulClient, name string) int { + ringDesc, err := c.Get(ring.ConsulKey) + if err != nil { + log.Errorf("Error reading consul: %v", err) + return 0 + } + + count := 0 + for _, token := range ringDesc.(*ring.Desc).Tokens { + if token.Ingester == name { + count++ + } + } + return count +} + +// poll repeatedly evaluates condition until we either timeout, or it succeeds. +func poll(t *testing.T, d time.Duration, want interface{}, have func() interface{}) { + deadline := time.Now().Add(d) + for { + if time.Now().After(deadline) { + break + } + if reflect.DeepEqual(want, have()) { + return + } + time.Sleep(d / 10) + } + h := have() + if !reflect.DeepEqual(want, h) { + _, file, line, _ := runtime.Caller(1) + t.Fatalf("%s:%d: %v != %v", file, line, want, h) + } +} diff --git a/ingester/ingester_test.go b/ingester/ingester_test.go index 885b24043c3..00ba0aa6b5b 100644 --- a/ingester/ingester_test.go +++ b/ingester/ingester_test.go @@ -6,7 +6,6 @@ import ( "sort" "sync" "testing" - "time" "google.golang.org/grpc" @@ -76,14 +75,11 @@ func matrixToSamples(m model.Matrix) []model.Sample { } func TestIngesterAppend(t *testing.T) { - cfg := Config{ - FlushCheckPeriod: 99999 * time.Hour, - MaxChunkIdle: 99999 * time.Hour, - } + cfg := defaultIngesterTestConfig() store := &testStore{ chunks: map[string][]chunk.Chunk{}, } - ing, err := New(cfg, store, nil) + ing, err := New(cfg, store) if err != nil { t.Fatal(err) } @@ -131,7 +127,7 @@ func TestIngesterAppend(t *testing.T) { } // Read samples back via chunk store. - ing.Stop() + ing.Shutdown() for _, userID := range userIDs { res, err := chunk.ChunksToMatrix(store.chunks[userID]) if err != nil { @@ -146,17 +142,15 @@ func TestIngesterAppend(t *testing.T) { } func TestIngesterUserSeriesLimitExceeded(t *testing.T) { - cfg := Config{ - FlushCheckPeriod: 99999 * time.Hour, - MaxChunkIdle: 99999 * time.Hour, - UserStatesConfig: UserStatesConfig{ - MaxSeriesPerUser: 1, - }, + cfg := defaultIngesterTestConfig() + cfg.userStatesConfig = UserStatesConfig{ + MaxSeriesPerUser: 1, } + store := &testStore{ chunks: map[string][]chunk.Chunk{}, } - ing, err := New(cfg, store, nil) + ing, err := New(cfg, store) if err != nil { t.Fatal(err) } @@ -232,17 +226,15 @@ func TestIngesterUserSeriesLimitExceeded(t *testing.T) { } func TestIngesterMetricSeriesLimitExceeded(t *testing.T) { - cfg := Config{ - FlushCheckPeriod: 99999 * time.Hour, - MaxChunkIdle: 99999 * time.Hour, - UserStatesConfig: UserStatesConfig{ - MaxSeriesPerMetric: 1, - }, + cfg := defaultIngesterTestConfig() + cfg.userStatesConfig = UserStatesConfig{ + MaxSeriesPerMetric: 1, } + store := &testStore{ chunks: map[string][]chunk.Chunk{}, } - ing, err := New(cfg, store, nil) + ing, err := New(cfg, store) if err != nil { t.Fatal(err) } diff --git a/ingester/series.go b/ingester/series.go index 32cd2df5bd5..f9245c3e10d 100644 --- a/ingester/series.go +++ b/ingester/series.go @@ -1,6 +1,7 @@ package ingester import ( + "fmt" "sort" "github.com/prometheus/client_golang/prometheus" @@ -157,6 +158,17 @@ func (s *memorySeries) samplesForRange(from, through model.Time) ([]model.Sample return values, nil } +func (s *memorySeries) setChunks(descs []*desc) error { + if len(s.chunkDescs) != 0 { + return fmt.Errorf("series already has chunks") + } + + s.chunkDescs = descs + s.lastSampleValueSet = true + s.lastTime = descs[len(descs)-1].LastTime + return nil +} + type desc struct { C chunk.Chunk // nil if chunk is evicted. FirstTime model.Time // Populated at creation. Immutable. diff --git a/ingester/user_state.go b/ingester/user_state.go index 93edc9bfa81..7525c799baa 100644 --- a/ingester/user_state.go +++ b/ingester/user_state.go @@ -1,6 +1,7 @@ package ingester import ( + "flag" "fmt" "sync" "time" @@ -38,6 +39,13 @@ type UserStatesConfig struct { MaxSeriesPerMetric int } +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *UserStatesConfig) RegisterFlags(f *flag.FlagSet) { + f.DurationVar(&cfg.RateUpdatePeriod, "ingester.rate-update-period", 15*time.Second, "Period with which to update the per-user ingestion rates.") + f.IntVar(&cfg.MaxSeriesPerUser, "ingester.max-series-per-user", DefaultMaxSeriesPerUser, "Maximum number of active series per user.") + f.IntVar(&cfg.MaxSeriesPerMetric, "ingester.max-series-per-metric", DefaultMaxSeriesPerMetric, "Maximum number of active series per metric name.") +} + func newUserStates(cfg *UserStatesConfig) *userStates { return &userStates{ states: map[string]*userState{}, diff --git a/ingester/util.go b/ingester/util.go new file mode 100644 index 00000000000..308f0c6596f --- /dev/null +++ b/ingester/util.go @@ -0,0 +1,7 @@ +package ingester + +type sortableUint32 []uint32 + +func (ts sortableUint32) Len() int { return len(ts) } +func (ts sortableUint32) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] } +func (ts sortableUint32) Less(i, j int) bool { return ts[i] < ts[j] } diff --git a/k8s/alertmanager-dep.yaml b/k8s/alertmanager-dep.yaml index e7a4808ebcc..f1ca535b5c2 100644 --- a/k8s/alertmanager-dep.yaml +++ b/k8s/alertmanager-dep.yaml @@ -16,7 +16,6 @@ spec: imagePullPolicy: IfNotPresent args: - -log.level=debug - - -server.log-success=true - -server.http-listen-port=80 - -alertmanager.configs.url=http://configs.default.svc.cluster.local:80 - -alertmanager.web.external-url=/api/prom/alertmanager diff --git a/k8s/distributor-dep.yaml b/k8s/distributor-dep.yaml index faf4d049383..91502a8244e 100644 --- a/k8s/distributor-dep.yaml +++ b/k8s/distributor-dep.yaml @@ -16,7 +16,6 @@ spec: imagePullPolicy: IfNotPresent args: - -log.level=debug - - -server.log-success=true - -server.http-listen-port=80 - -consul.hostname=consul.default.svc.cluster.local:8500 - -distributor.replication-factor=1 diff --git a/k8s/ingester-dep.yaml b/k8s/ingester-dep.yaml index 47f16f1fc0c..16beb719991 100644 --- a/k8s/ingester-dep.yaml +++ b/k8s/ingester-dep.yaml @@ -37,8 +37,8 @@ spec: image: quay.io/weaveworks/cortex-ingester imagePullPolicy: IfNotPresent args: - - -server.log-success=true - - -server.http-listen-port=80 + - -ingester.join-after=30s + - -ingester.claim-on-rollout=true - -consul.hostname=consul.default.svc.cluster.local:8500 - -s3.url=s3://abc:123@s3.default.svc.cluster.local:4569 - -dynamodb.url=dynamodb://user:pass@dynamodb.default.svc.cluster.local:8000/cortex diff --git a/k8s/memcached-dep.yaml b/k8s/memcached-dep.yaml index 32f42bc808e..185287cc65d 100644 --- a/k8s/memcached-dep.yaml +++ b/k8s/memcached-dep.yaml @@ -10,7 +10,7 @@ spec: labels: name: memcached annotations: - prometheus.io.port: "9150" + prometheus.io.scrape: "false" spec: containers: - name: memcached @@ -22,6 +22,3 @@ spec: ports: - name: clients containerPort: 11211 - ports: - - name: prom - containerPort: 9150 diff --git a/k8s/nginx-dep.yaml b/k8s/nginx-dep.yaml index bed500cfaa1..56085f3abe1 100644 --- a/k8s/nginx-dep.yaml +++ b/k8s/nginx-dep.yaml @@ -9,6 +9,8 @@ spec: metadata: labels: name: nginx + annotations: + prometheus.io.scrape: "false" spec: containers: - name: nginx diff --git a/k8s/querier-dep.yaml b/k8s/querier-dep.yaml index ba9f638e143..e9d34eeb1ee 100644 --- a/k8s/querier-dep.yaml +++ b/k8s/querier-dep.yaml @@ -15,7 +15,6 @@ spec: image: quay.io/weaveworks/cortex-querier imagePullPolicy: IfNotPresent args: - - -server.log-success=true - -server.http-listen-port=80 - -consul.hostname=consul.default.svc.cluster.local:8500 - -s3.url=s3://abc:123@s3.default.svc.cluster.local:4569 diff --git a/k8s/retrieval-dep.yaml b/k8s/retrieval-dep.yaml index add9439dabb..4148174136c 100644 --- a/k8s/retrieval-dep.yaml +++ b/k8s/retrieval-dep.yaml @@ -17,7 +17,6 @@ spec: args: - -config.file=/etc/prometheus/prometheus.yml - -web.listen-address=:80 - - -storage.local.engine=none ports: - containerPort: 80 volumeMounts: diff --git a/k8s/ruler-dep.yaml b/k8s/ruler-dep.yaml index 6072a697c7e..7763be55e9a 100644 --- a/k8s/ruler-dep.yaml +++ b/k8s/ruler-dep.yaml @@ -16,7 +16,6 @@ spec: imagePullPolicy: IfNotPresent args: - -log.level=debug - - -server.log-success=true - -server.http-listen-port=80 - -ruler.configs.url=http://configs.default.svc.cluster.local:80 - -ruler.alertmanager-url=http://alertmanager.default.svc.cluster.local/ diff --git a/k8s/table-manager-dep.yaml b/k8s/table-manager-dep.yaml index a8215f5ac23..da5bc6feaa9 100644 --- a/k8s/table-manager-dep.yaml +++ b/k8s/table-manager-dep.yaml @@ -15,7 +15,6 @@ spec: image: quay.io/weaveworks/cortex-table-manager imagePullPolicy: IfNotPresent args: - - -server.log-success=true - -server.http-listen-port=80 - -dynamodb.url=dynamodb://user:pass@dynamodb.default.svc.cluster.local:8000/cortex - -dynamodb.periodic-table.prefix=cortex_weekly_ diff --git a/ring/consul_client.go b/ring/consul_client.go index 51aa2bad118..7e27f2ce9d1 100644 --- a/ring/consul_client.go +++ b/ring/consul_client.go @@ -20,7 +20,7 @@ type ConsulConfig struct { Host string Prefix string - mock ConsulClient + Mock ConsulClient } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -36,6 +36,7 @@ type ConsulClient interface { CAS(key string, f CASCallback) error WatchPrefix(path string, done <-chan struct{}, f func(string, interface{}) bool) WatchKey(key string, done <-chan struct{}, f func(interface{}) bool) + Get(key string) (interface{}, error) PutBytes(key string, buf []byte) error } @@ -62,8 +63,8 @@ type consulClient struct { // NewConsulClient returns a new ConsulClient. func NewConsulClient(cfg ConsulConfig, codec Codec) (ConsulClient, error) { - if cfg.mock != nil { - return cfg.mock, nil + if cfg.Mock != nil { + return cfg.Mock, nil } client, err := consul.NewClient(&consul.Config{ @@ -327,6 +328,14 @@ func (c *consulClient) PutBytes(key string, buf []byte) error { return err } +func (c *consulClient) Get(key string) (interface{}, error) { + kvp, _, err := c.kv.Get(key, &consul.QueryOptions{}) + if err != nil { + return nil, err + } + return c.codec.Decode(kvp.Value) +} + type prefixedConsulClient struct { prefix string consul ConsulClient @@ -357,3 +366,7 @@ func (c *prefixedConsulClient) WatchKey(key string, done <-chan struct{}, f func func (c *prefixedConsulClient) PutBytes(key string, buf []byte) error { return c.consul.PutBytes(c.prefix+key, buf) } + +func (c *prefixedConsulClient) Get(key string) (interface{}, error) { + return c.consul.Get(c.prefix + key) +} diff --git a/ring/consul_client_test.go b/ring/consul_client_mock.go similarity index 97% rename from ring/consul_client_test.go rename to ring/consul_client_mock.go index 75e13d3a721..e538db48aa1 100644 --- a/ring/consul_client_test.go +++ b/ring/consul_client_mock.go @@ -15,7 +15,8 @@ type mockKV struct { current uint64 // the current 'index in the log' } -func newMockConsulClient() ConsulClient { +// NewMockConsulClient makes a new mock consul client. +func NewMockConsulClient() ConsulClient { m := mockKV{ kvps: map[string]*consul.KVPair{}, } diff --git a/ring/http.go b/ring/http.go index dcf256fd432..bec6a693845 100644 --- a/ring/http.go +++ b/ring/http.go @@ -71,10 +71,10 @@ func (r *Ring) forget(id string) error { } ringDesc := in.(*Desc) - ringDesc.removeIngester(id) + ringDesc.RemoveIngester(id) return ringDesc, true, nil } - return r.consul.CAS(consulKey, unregister) + return r.consul.CAS(ConsulKey, unregister) } func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { diff --git a/ring/ingester_lifecycle.go b/ring/ingester_lifecycle.go deleted file mode 100644 index 577c2df75ca..00000000000 --- a/ring/ingester_lifecycle.go +++ /dev/null @@ -1,306 +0,0 @@ -// Responsible for managing the ingester lifecycle. - -package ring - -import ( - "flag" - "fmt" - "math/rand" - "net" - "os" - "sort" - "sync" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/log" -) - -const ( - infName = "eth0" - consulKey = "ring" - heartbeatInterval = 5 * time.Second -) - -var ( - consulHeartbeats = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "cortex_ingester_consul_heartbeats_total", - Help: "The total number of heartbeats sent to consul.", - }) -) - -func init() { - prometheus.MustRegister(consulHeartbeats) -} - -// IngesterRegistrationConfig is the config for an IngesterRegistration -type IngesterRegistrationConfig struct { - Config - - ListenPort *int - NumTokens int - - // For testing - Addr string - Hostname string - skipUnregister bool - mock *Ring -} - -// RegisterFlags adds the flags required to config this to the given FlagSet -func (cfg *IngesterRegistrationConfig) RegisterFlags(f *flag.FlagSet) { - cfg.Config.RegisterFlags(f) - f.IntVar(&cfg.NumTokens, "ingester.num-tokens", 128, "Number of tokens for each ingester.") -} - -// IngesterRegistration manages the connection between the ingester and Consul. -type IngesterRegistration struct { - Ring *Ring - - consul ConsulClient - numTokens int - skipUnregister bool - - id string - addr string - quit chan struct{} - wait sync.WaitGroup - - // We need to remember the ingester state just in case consul goes away and comes - // back empty. Channel is used to tell the actor to update consul on state changes. - state IngesterState - stateChange chan IngesterState -} - -// RegisterIngester registers an ingester with Consul. -func RegisterIngester(cfg IngesterRegistrationConfig) (*IngesterRegistration, error) { - ring := cfg.mock - if ring == nil { - var err error - ring, err = New(cfg.Config) - if err != nil { - return nil, err - } - } - - hostname := cfg.Hostname - if hostname == "" { - var err error - hostname, err = os.Hostname() - if err != nil { - return nil, err - } - } - - addr := cfg.Addr - if addr == "" { - var err error - addr, err = getFirstAddressOf(infName) - if err != nil { - return nil, err - } - } - - r := &IngesterRegistration{ - Ring: ring, - - consul: ring.consul, - numTokens: cfg.NumTokens, - skipUnregister: cfg.skipUnregister, - - id: hostname, - // hostname is the ip+port of this instance, written to consul so - // the distributors know where to connect. - addr: fmt.Sprintf("%s:%d", addr, *cfg.ListenPort), - quit: make(chan struct{}), - - // Only read/written on actor goroutine. - state: ACTIVE, - stateChange: make(chan IngesterState), - } - - r.wait.Add(1) - go r.loop() - return r, nil -} - -// ChangeState changes the state of an ingester in the ring. -func (r *IngesterRegistration) ChangeState(state IngesterState) { - log.Infof("Changing ingester state to %v", state) - r.stateChange <- state -} - -// Unregister removes ingester config from Consul; will block -// until we'll successfully unregistered. -func (r *IngesterRegistration) Unregister() { - // closing r.quit triggers loop() to exit, which in turn will trigger - // the removal of our tokens. - close(r.quit) - r.wait.Wait() -} - -func (r *IngesterRegistration) loop() { - defer r.wait.Done() - tokens, err := r.pickTokens() - if err != nil { - log.Fatalf("Failed to pick tokens in consul: %v", err) - } - - if !r.skipUnregister { - defer r.unregister() - } - - r.heartbeat(tokens) -} - -func (r *IngesterRegistration) pickTokens() ([]uint32, error) { - var tokens []uint32 - pickTokens := func(in interface{}) (out interface{}, retry bool, err error) { - var ringDesc *Desc - if in == nil { - ringDesc = newDesc() - } else { - ringDesc = in.(*Desc) - } - - var takenTokens, myTokens []uint32 - for _, token := range ringDesc.Tokens { - takenTokens = append(takenTokens, token.Token) - - if token.Ingester == r.id { - myTokens = append(myTokens, token.Token) - } - } - - if len(myTokens) > 0 { - log.Infof("%d tokens already exist for this ingester!", len(myTokens)) - } - - newTokens := generateTokens(r.numTokens-len(myTokens), takenTokens) - ringDesc.addIngester(r.id, r.addr, newTokens, r.state) - - tokens := append(myTokens, newTokens...) - sort.Sort(sortableUint32(tokens)) - return ringDesc, true, nil - } - if err := r.consul.CAS(consulKey, pickTokens); err != nil { - return nil, err - } - log.Infof("Ingester added to consul") - return tokens, nil -} - -func (r *IngesterRegistration) heartbeat(tokens []uint32) { - - updateConsul := func(in interface{}) (out interface{}, retry bool, err error) { - var ringDesc *Desc - if in == nil { - ringDesc = newDesc() - } else { - ringDesc = in.(*Desc) - } - - ingesterDesc, ok := ringDesc.Ingesters[r.id] - if !ok { - // consul must have restarted - log.Infof("Found empty ring, inserting tokens!") - ringDesc.addIngester(r.id, r.addr, tokens, r.state) - } else { - ingesterDesc.Timestamp = time.Now().Unix() - ingesterDesc.State = r.state - ingesterDesc.Addr = r.addr - - // Set ProtoRing back to true for the case where an existing ingester that didn't understand this field removed it whilst updating the ring. - ingesterDesc.ProtoRing = true - ringDesc.Ingesters[r.id] = ingesterDesc - } - - return ringDesc, true, nil - } - - ticker := time.NewTicker(heartbeatInterval) - defer ticker.Stop() - for { - select { - case r.state = <-r.stateChange: - if err := r.consul.CAS(consulKey, updateConsul); err != nil { - log.Errorf("Failed to write to consul, sleeping: %v", err) - } - case <-ticker.C: - consulHeartbeats.Inc() - if err := r.consul.CAS(consulKey, updateConsul); err != nil { - log.Errorf("Failed to write to consul, sleeping: %v", err) - } - case <-r.quit: - return - } - } -} - -func (r *IngesterRegistration) unregister() { - unregister := func(in interface{}) (out interface{}, retry bool, err error) { - if in == nil { - return nil, false, fmt.Errorf("found empty ring when trying to unregister") - } - - ringDesc := in.(*Desc) - ringDesc.removeIngester(r.id) - return ringDesc, true, nil - } - if err := r.consul.CAS(consulKey, unregister); err != nil { - log.Fatalf("Failed to unregister from consul: %v", err) - } - log.Infof("Ingester removed from consul") -} - -type sortableUint32 []uint32 - -func (ts sortableUint32) Len() int { return len(ts) } -func (ts sortableUint32) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] } -func (ts sortableUint32) Less(i, j int) bool { return ts[i] < ts[j] } - -// generateTokens make numTokens random tokens, none of which clash -// with takenTokens. Assumes takenTokens is sorted. -func generateTokens(numTokens int, takenTokens []uint32) []uint32 { - r := rand.New(rand.NewSource(time.Now().UnixNano())) - tokens := sortableUint32{} - for i := 0; i < numTokens; { - candidate := r.Uint32() - j := sort.Search(len(takenTokens), func(i int) bool { - return takenTokens[i] >= candidate - }) - if j < len(takenTokens) && takenTokens[j] == candidate { - continue - } - tokens = append(tokens, candidate) - i++ - } - return tokens -} - -// getFirstAddressOf returns the first IPv4 address of the supplied interface name. -func getFirstAddressOf(name string) (string, error) { - inf, err := net.InterfaceByName(name) - if err != nil { - return "", err - } - - addrs, err := inf.Addrs() - if err != nil { - return "", err - } - if len(addrs) <= 0 { - return "", fmt.Errorf("No address found for %s", name) - } - - for _, addr := range addrs { - switch v := addr.(type) { - case *net.IPNet: - if ip := v.IP.To4(); ip != nil { - return v.IP.String(), nil - } - } - } - - return "", fmt.Errorf("No address found for %s", name) -} diff --git a/ring/ingester_lifecycle_test.go b/ring/ingester_lifecycle_test.go deleted file mode 100644 index 2ebefc96930..00000000000 --- a/ring/ingester_lifecycle_test.go +++ /dev/null @@ -1,93 +0,0 @@ -package ring - -import ( - "reflect" - "runtime" - "testing" - "time" -) - -func TestIngesterRestart(t *testing.T) { - consul := newMockConsulClient() - ring, err := New(Config{ - ConsulConfig: ConsulConfig{ - mock: consul, - }, - }) - if err != nil { - t.Fatal(err) - } - - { - registra, err := RegisterIngester(IngesterRegistrationConfig{ - mock: ring, - skipUnregister: true, - - NumTokens: 1, - ListenPort: func(i int) *int { return &i }(0), - Addr: "localhost", - Hostname: "localhost", - }) - if err != nil { - t.Fatal(err) - } - registra.Unregister() // doesn't actually unregister due to skipUnregister: true - } - - poll(t, 100*time.Millisecond, 1, func() interface{} { - return ring.numTokens("localhost") - }) - - { - registra, err := RegisterIngester(IngesterRegistrationConfig{ - mock: ring, - skipUnregister: true, - - NumTokens: 1, - ListenPort: func(i int) *int { return &i }(0), - Addr: "localhost", - Hostname: "localhost", - }) - if err != nil { - t.Fatal(err) - } - registra.Unregister() // doesn't actually unregister due to skipUnregister: true - } - - time.Sleep(200 * time.Millisecond) - - poll(t, 100*time.Millisecond, 1, func() interface{} { - return ring.numTokens("localhost") - }) -} - -func (r *Ring) numTokens(name string) int { - r.mtx.RLock() - defer r.mtx.RUnlock() - count := 0 - for _, token := range r.ringDesc.Tokens { - if token.Ingester == name { - count++ - } - } - return count -} - -// poll repeatedly evaluates condition until we either timeout, or it succeeds. -func poll(t *testing.T, d time.Duration, want interface{}, have func() interface{}) { - deadline := time.Now().Add(d) - for { - if time.Now().After(deadline) { - break - } - if reflect.DeepEqual(want, have()) { - return - } - time.Sleep(d / 10) - } - h := have() - if !reflect.DeepEqual(want, h) { - _, file, line, _ := runtime.Caller(1) - t.Fatalf("%s:%d: %v != %v", file, line, want, h) - } -} diff --git a/ring/model.go b/ring/model.go index 39771795fbe..9d6c9716910 100644 --- a/ring/model.go +++ b/ring/model.go @@ -16,16 +16,18 @@ func (ts ByToken) Less(i, j int) bool { return ts[i].Token < ts[j].Token } // ProtoDescFactory makes new Descs func ProtoDescFactory() proto.Message { - return newDesc() + return NewDesc() } -func newDesc() *Desc { +// NewDesc returns an empty ring.Desc +func NewDesc() *Desc { return &Desc{ Ingesters: map[string]*IngesterDesc{}, } } -func (d *Desc) addIngester(id, addr string, tokens []uint32, state IngesterState) { +// AddIngester adds the given ingester to the ring. +func (d *Desc) AddIngester(id, addr string, tokens []uint32, state IngesterState) { if d.Ingesters == nil { d.Ingesters = map[string]*IngesterDesc{} } @@ -33,7 +35,6 @@ func (d *Desc) addIngester(id, addr string, tokens []uint32, state IngesterState Addr: addr, Timestamp: time.Now().Unix(), State: state, - ProtoRing: true, } for _, token := range tokens { @@ -46,7 +47,8 @@ func (d *Desc) addIngester(id, addr string, tokens []uint32, state IngesterState sort.Sort(ByToken(d.Tokens)) } -func (d *Desc) removeIngester(id string) { +// RemoveIngester removes the given ingester and all its tokens. +func (d *Desc) RemoveIngester(id string) { delete(d.Ingesters, id) output := []*TokenDesc{} for i := 0; i < len(d.Tokens); i++ { @@ -56,3 +58,52 @@ func (d *Desc) removeIngester(id string) { } d.Tokens = output } + +// ClaimTokens transfers all the tokens from one ingester to another, +// returning the claimed token. +func (d *Desc) ClaimTokens(from, to string) []uint32 { + var result []uint32 + for i := 0; i < len(d.Tokens); i++ { + if d.Tokens[i].Ingester == from { + d.Tokens[i].Ingester = to + result = append(result, d.Tokens[i].Token) + } + } + return result +} + +// FindIngestersByState returns the list of ingesters in the given state +func (d *Desc) FindIngestersByState(state IngesterState) []*IngesterDesc { + var result []*IngesterDesc + for _, ing := range d.Ingesters { + if ing.State == state { + result = append(result, ing) + } + } + return result +} + +// Ready is true when all ingesters are active and healthy. +func (d *Desc) Ready(heartbeatTimeout time.Duration) bool { + for _, ingester := range d.Ingesters { + if time.Now().Sub(time.Unix(ingester.Timestamp, 0)) > heartbeatTimeout { + return false + } else if ingester.State != ACTIVE { + return false + } + } + + return len(d.Tokens) > 0 +} + +// TokensFor partitions the tokens into those for the given ID, and those for others. +func (d *Desc) TokensFor(id string) (tokens, other []uint32) { + var takenTokens, myTokens []uint32 + for _, token := range d.Tokens { + takenTokens = append(takenTokens, token.Token) + if token.Ingester == id { + myTokens = append(myTokens, token.Token) + } + } + return myTokens, takenTokens +} diff --git a/ring/ring.go b/ring/ring.go index 84504cdeef9..3ec97651bd0 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -16,6 +16,9 @@ import ( const ( unhealthy = "Unhealthy" + + // ConsulKey is the key under which we store the ring in consul. + ConsulKey = "ring" ) // Operation can be Read or Write @@ -105,7 +108,7 @@ func (r *Ring) Stop() { func (r *Ring) loop() { defer close(r.done) - r.consul.WatchKey(consulKey, r.quit, func(value interface{}) bool { + r.consul.WatchKey(ConsulKey, r.quit, func(value interface{}) bool { if value == nil { log.Infof("Ring doesn't exist in consul yet.") return true @@ -165,17 +168,18 @@ func (r *Ring) getInternal(key uint32, n int, op Operation) ([]*IngesterDesc, er distinctHosts[token.Ingester] = struct{}{} ingester := r.ringDesc.Ingesters[token.Ingester] - // Ingesters that are Leaving do not count to the replication limit. We do + // Ingesters that are not ACTIVE do not count to the replication limit. We do // not want to Write to them because they are about to go away, but we do // want to write the extra replica somewhere. So we increase the size of the // set of replicas for the key. This means we have to also increase the // size of the replica set for read, but we can read from Leaving ingesters, // so don't skip it in this case. - if ingester.State == LEAVING { + if op == Write && ingester.State != ACTIVE { + n++ + continue + } else if op == Read && (ingester.State != ACTIVE && ingester.State != LEAVING) { n++ - if op == Write { - continue - } + continue } ingesters = append(ingesters, ingester) @@ -202,26 +206,6 @@ func (r *Ring) GetAll() []*IngesterDesc { return ingesters } -// Ready is true when all ingesters are active and healthy. -func (r *Ring) Ready() bool { - r.mtx.RLock() - defer r.mtx.RUnlock() - - if r.ringDesc == nil { - return false - } - - for _, ingester := range r.ringDesc.Ingesters { - if time.Now().Sub(time.Unix(ingester.Timestamp, 0)) > r.heartbeatTimeout { - return false - } else if ingester.State != ACTIVE { - return false - } - } - - return len(r.ringDesc.Tokens) > 0 -} - func (r *Ring) search(key uint32) int { i := sort.Search(len(r.ringDesc.Tokens), func(x int) bool { return r.ringDesc.Tokens[x].Token > key diff --git a/ring/ring.proto b/ring/ring.proto index a3ee9872eb4..a54b98b79b8 100644 --- a/ring/ring.proto +++ b/ring/ring.proto @@ -8,10 +8,11 @@ message Desc { } message IngesterDesc { + reserved 4, 5; // old, deprecated fields + string addr = 1; int64 timestamp = 2; IngesterState state = 3; - bool protoRing = 5; } message TokenDesc { @@ -22,4 +23,7 @@ message TokenDesc { enum IngesterState { ACTIVE = 0; LEAVING = 1; + + PENDING = 2; + JOINING = 3; } diff --git a/ring/ring_test.go b/ring/ring_test.go index eb5806dc587..75884045d75 100644 --- a/ring/ring_test.go +++ b/ring/ring_test.go @@ -12,24 +12,24 @@ const ( func BenchmarkRing(b *testing.B) { // Make a random ring with N ingesters, and M tokens per ingests - desc := newDesc() + desc := NewDesc() takenTokens := []uint32{} for i := 0; i < numIngester; i++ { - tokens := generateTokens(numTokens, takenTokens) + tokens := GenerateTokens(numTokens, takenTokens) takenTokens = append(takenTokens, tokens...) - desc.addIngester(fmt.Sprintf("%d", i), fmt.Sprintf("ingester%d", i), tokens, ACTIVE) + desc.AddIngester(fmt.Sprintf("%d", i), fmt.Sprintf("ingester%d", i), tokens, ACTIVE) } - consul := newMockConsulClient() + consul := NewMockConsulClient() ringBytes, err := ProtoCodec{}.Encode(desc) if err != nil { b.Fatal(err) } - consul.PutBytes(consulKey, ringBytes) + consul.PutBytes(ConsulKey, ringBytes) r, err := New(Config{ ConsulConfig: ConsulConfig{ - mock: consul, + Mock: consul, }, }) if err != nil { @@ -39,7 +39,7 @@ func BenchmarkRing(b *testing.B) { // Generate a batch of N random keys, and look them up b.ResetTimer() for i := 0; i < b.N; i++ { - keys := generateTokens(100, nil) + keys := GenerateTokens(100, nil) r.BatchGet(keys, 3, Write) } } diff --git a/ring/util.go b/ring/util.go new file mode 100644 index 00000000000..734f9b20bb7 --- /dev/null +++ b/ring/util.go @@ -0,0 +1,26 @@ +package ring + +import ( + "math/rand" + "sort" + "time" +) + +// GenerateTokens make numTokens random tokens, none of which clash +// with takenTokens. Assumes takenTokens is sorted. +func GenerateTokens(numTokens int, takenTokens []uint32) []uint32 { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + tokens := []uint32{} + for i := 0; i < numTokens; { + candidate := r.Uint32() + j := sort.Search(len(takenTokens), func(i int) bool { + return takenTokens[i] >= candidate + }) + if j < len(takenTokens) && takenTokens[j] == candidate { + continue + } + tokens = append(tokens, candidate) + i++ + } + return tokens +} diff --git a/util/compat.go b/util/compat.go index 79f94eef1db..715a0f91dca 100644 --- a/util/compat.go +++ b/util/compat.go @@ -16,7 +16,7 @@ func FromWriteRequest(req *cortex.WriteRequest) []model.Sample { for _, ts := range req.Timeseries { for _, s := range ts.Samples { samples = append(samples, model.Sample{ - Metric: fromLabelPairs(ts.Labels), + Metric: FromLabelPairs(ts.Labels), Value: model.SampleValue(s.Value), Timestamp: model.Time(s.TimestampMs), }) @@ -33,7 +33,7 @@ func ToWriteRequest(samples []model.Sample) *cortex.WriteRequest { for _, s := range samples { ts := cortex.TimeSeries{ - Labels: toLabelPairs(s.Metric), + Labels: ToLabelPairs(s.Metric), Samples: []cortex.Sample{ { Value: float64(s.Value), @@ -77,7 +77,7 @@ func ToQueryResponse(matrix model.Matrix) *cortex.QueryResponse { resp := &cortex.QueryResponse{} for _, ss := range matrix { ts := cortex.TimeSeries{ - Labels: toLabelPairs(ss.Metric), + Labels: ToLabelPairs(ss.Metric), Samples: make([]cortex.Sample, 0, len(ss.Values)), } for _, s := range ss.Values { @@ -96,7 +96,7 @@ func FromQueryResponse(resp *cortex.QueryResponse) model.Matrix { m := make(model.Matrix, 0, len(resp.Timeseries)) for _, ts := range resp.Timeseries { var ss model.SampleStream - ss.Metric = fromLabelPairs(ts.Labels) + ss.Metric = FromLabelPairs(ts.Labels) ss.Values = make([]model.SamplePair, 0, len(ts.Samples)) for _, s := range ts.Samples { ss.Values = append(ss.Values, model.SamplePair{ @@ -152,7 +152,7 @@ func ToMetricsForLabelMatchersResponse(metrics []model.Metric) *cortex.MetricsFo } for _, metric := range metrics { resp.Metric = append(resp.Metric, &cortex.Metric{ - Labels: toLabelPairs(metric), + Labels: ToLabelPairs(metric), }) } return resp @@ -162,7 +162,7 @@ func ToMetricsForLabelMatchersResponse(metrics []model.Metric) *cortex.MetricsFo func FromMetricsForLabelMatchersResponse(resp *cortex.MetricsForLabelMatchersResponse) []model.Metric { metrics := []model.Metric{} for _, m := range resp.Metric { - metrics = append(metrics, fromLabelPairs(m.Labels)) + metrics = append(metrics, FromLabelPairs(m.Labels)) } return metrics } @@ -217,7 +217,8 @@ func fromLabelMatchers(matchers []*cortex.LabelMatcher) ([]*metric.LabelMatcher, return result, nil } -func toLabelPairs(metric model.Metric) []cortex.LabelPair { +// ToLabelPairs builds a []cortex.LabelPair from a model.Metric +func ToLabelPairs(metric model.Metric) []cortex.LabelPair { labelPairs := make([]cortex.LabelPair, 0, len(metric)) for k, v := range metric { labelPairs = append(labelPairs, cortex.LabelPair{ @@ -228,7 +229,8 @@ func toLabelPairs(metric model.Metric) []cortex.LabelPair { return labelPairs } -func fromLabelPairs(labelPairs []cortex.LabelPair) model.Metric { +// FromLabelPairs unpack a []cortex.LabelPair to a model.Metric +func FromLabelPairs(labelPairs []cortex.LabelPair) model.Metric { metric := make(model.Metric, len(labelPairs)) for _, l := range labelPairs { metric[model.LabelName(l.Name)] = model.LabelValue(l.Value) diff --git a/util/net.go b/util/net.go new file mode 100644 index 00000000000..8c66e084433 --- /dev/null +++ b/util/net.go @@ -0,0 +1,33 @@ +package util + +import ( + "fmt" + "net" +) + +// GetFirstAddressOf returns the first IPv4 address of the supplied interface name. +func GetFirstAddressOf(name string) (string, error) { + inf, err := net.InterfaceByName(name) + if err != nil { + return "", err + } + + addrs, err := inf.Addrs() + if err != nil { + return "", err + } + if len(addrs) <= 0 { + return "", fmt.Errorf("No address found for %s", name) + } + + for _, addr := range addrs { + switch v := addr.(type) { + case *net.IPNet: + if ip := v.IP.To4(); ip != nil { + return v.IP.String(), nil + } + } + } + + return "", fmt.Errorf("No address found for %s", name) +} diff --git a/vendor/github.com/weaveworks/common/middleware/grpc_logging.go b/vendor/github.com/weaveworks/common/middleware/grpc_logging.go index cfae0cb5d3c..9a781216e39 100644 --- a/vendor/github.com/weaveworks/common/middleware/grpc_logging.go +++ b/vendor/github.com/weaveworks/common/middleware/grpc_logging.go @@ -15,9 +15,9 @@ var ServerLoggingInterceptor = func(ctx context.Context, req interface{}, info * begin := time.Now() resp, err := handler(ctx, req) if err != nil { - log.Debugf("%s %s (%v) %s", gRPC, info.FullMethod, err, time.Since(begin)) + log.Warnf("%s %s (%v) %s", gRPC, info.FullMethod, err, time.Since(begin)) } else { - log.Infof("%s %s (success) %s", gRPC, info.FullMethod, time.Since(begin)) + log.Debugf("%s %s (success) %s", gRPC, info.FullMethod, time.Since(begin)) } return resp, err } diff --git a/vendor/manifest b/vendor/manifest index 8813c0ca5b0..d408dd2c54b 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -1497,7 +1497,7 @@ "importpath": "github.com/weaveworks/common", "repository": "https://github.com/weaveworks/common", "vcs": "git", - "revision": "7f3a36b04c07e159b0ed739fb684b29682688133", + "revision": "52057d9043e1d0d3e9903d2584beec7a905bcc90", "branch": "master", "notests": true },