Skip to content

Faster rolling upgrades. #325

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ endef
$(foreach exe, $(EXES), $(eval $(call dep_exe, $(exe))))

# Manually declared dependancies And what goes into each exe
%.pb.go: %.proto
cortex.pb.go: cortex.proto
ring/ring.pb.go: ring/ring.proto
all: $(UPTODATE_FILES)
test: $(PROTO_GOS)

Expand Down
36 changes: 12 additions & 24 deletions cmd/ingester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/weaveworks/cortex"
"github.com/weaveworks/cortex/chunk"
"github.com/weaveworks/cortex/ingester"
"github.com/weaveworks/cortex/ring"
"github.com/weaveworks/cortex/util"
)

Expand All @@ -25,44 +24,33 @@ func main() {
middleware.ServerUserHeaderInterceptor,
},
}
ingesterRegistrationConfig ring.IngesterRegistrationConfig
chunkStoreConfig chunk.StoreConfig
ingesterConfig ingester.Config
chunkStoreConfig chunk.StoreConfig
ingesterConfig ingester.Config
)
// IngesterRegistrator needs to know our gRPC listen port
ingesterRegistrationConfig.ListenPort = &serverConfig.GRPCListenPort
util.RegisterFlags(&serverConfig, &ingesterRegistrationConfig, &chunkStoreConfig, &ingesterConfig)
// Ingester needs to know our gRPC listen port.
ingesterConfig.ListenPort = &serverConfig.GRPCListenPort
util.RegisterFlags(&serverConfig, &chunkStoreConfig, &ingesterConfig)
flag.Parse()

registration, err := ring.RegisterIngester(ingesterRegistrationConfig)
chunkStore, err := chunk.NewStore(chunkStoreConfig)
if err != nil {
log.Fatalf("Could not register ingester: %v", err)
log.Fatal(err)
}
defer registration.Ring.Stop()

chunkStore, err := chunk.NewStore(chunkStoreConfig)
server, err := server.New(serverConfig)
if err != nil {
log.Fatal(err)
log.Fatalf("Error initializing server: %v", err)
}
defer server.Shutdown()

ingester, err := ingester.New(ingesterConfig, chunkStore, registration.Ring)
ingester, err := ingester.New(ingesterConfig, chunkStore)
if err != nil {
log.Fatal(err)
}
prometheus.MustRegister(ingester)
defer ingester.Shutdown()

server, err := server.New(serverConfig)
if err != nil {
log.Fatalf("Error initializing server: %v", err)
}
cortex.RegisterIngesterServer(server.GRPC, ingester)
server.HTTP.Handle("/ring", registration.Ring)
server.HTTP.Path("/ready").Handler(http.HandlerFunc(ingester.ReadinessHandler))
server.Run()

// Shutdown order is important!
registration.ChangeState(ring.LEAVING)
ingester.Stop()
registration.Unregister()
server.Shutdown()
}
19 changes: 19 additions & 0 deletions cortex.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ service Ingester {
rpc LabelValues(LabelValuesRequest) returns (LabelValuesResponse) {};
rpc UserStats(UserStatsRequest) returns (UserStatsResponse) {};
rpc MetricsForLabelMatchers(MetricsForLabelMatchersRequest) returns (MetricsForLabelMatchersResponse) {};

// TransferChunks allows leaving ingester (client) to stream chunks directly to joining ingesters (server).
rpc TransferChunks(stream TimeSeriesChunk) returns (TransferChunksResponse) {};
}

message WriteRequest {
Expand Down Expand Up @@ -56,6 +59,22 @@ message MetricsForLabelMatchersResponse {
repeated Metric metric = 1;
}

message TimeSeriesChunk {
string from_ingester_id = 1;
string user_id = 2;
repeated LabelPair labels = 3 [(gogoproto.nullable) = false];
repeated Chunk chunks = 4 [(gogoproto.nullable) = false];
}

message Chunk {
int64 start_timestamp_ms = 1;
int64 end_timestamp_ms = 2;
int32 encoding = 3;
bytes data = 4;
}

message TransferChunksResponse {
}

message TimeSeries {
repeated LabelPair labels = 1 [(gogoproto.nullable) = false];
Expand Down
52 changes: 15 additions & 37 deletions distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,23 @@ import (
"flag"
"fmt"
"hash/fnv"
"io"
"sync"
"sync/atomic"
"time"

"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/mwitkow/go-grpc-middleware"
"github.com/opentracing/opentracing-go"
"golang.org/x/net/context"
"golang.org/x/time/rate"
"google.golang.org/grpc"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/metric"

"github.com/weaveworks/common/instrument"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"
"github.com/weaveworks/cortex"
ingester_client "github.com/weaveworks/cortex/ingester/client"
"github.com/weaveworks/cortex/ring"
"github.com/weaveworks/cortex/util"
)
Expand All @@ -46,7 +43,7 @@ type Distributor struct {
cfg Config
ring ReadRing
clientsMtx sync.RWMutex
clients map[string]ingesterClient
clients map[string]cortex.IngesterClient
quit chan struct{}
done chan struct{}

Expand All @@ -63,11 +60,6 @@ type Distributor struct {
ingesterQueryFailures *prometheus.CounterVec
}

type ingesterClient struct {
cortex.IngesterClient
conn *grpc.ClientConn
}

// ReadRing represents the read inferface to the ring.
type ReadRing interface {
prometheus.Collector
Expand All @@ -88,7 +80,7 @@ type Config struct {
IngestionBurstSize int

// for testing
ingesterClientFactory func(string) cortex.IngesterClient
ingesterClientFactory func(addr string, timeout time.Duration) (cortex.IngesterClient, error)
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -106,10 +98,14 @@ func New(cfg Config, ring ReadRing) (*Distributor, error) {
if 0 > cfg.ReplicationFactor {
return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor)
}
if cfg.ingesterClientFactory == nil {
cfg.ingesterClientFactory = ingester_client.MakeIngesterClient
}

d := &Distributor{
cfg: cfg,
ring: ring,
clients: map[string]ingesterClient{},
clients: map[string]cortex.IngesterClient{},
quit: make(chan struct{}),
done: make(chan struct{}),
ingestLimiters: map[string]*rate.Limiter{},
Expand Down Expand Up @@ -193,11 +189,11 @@ func (d *Distributor) removeStaleIngesterClients() {

// Do the gRPC closing in the background since it might take a while and
// we're holding a mutex.
go func(addr string, conn *grpc.ClientConn) {
if err := conn.Close(); err != nil {
go func(addr string, closer io.Closer) {
if err := closer.Close(); err != nil {
log.Errorf("Error closing connection to ingester %q: %v", addr, err)
}
}(addr, client.conn)
}(addr, client.(io.Closer))
}
}

Expand All @@ -216,27 +212,9 @@ func (d *Distributor) getClientFor(ingester *ring.IngesterDesc) (cortex.Ingester
return client, nil
}

if d.cfg.ingesterClientFactory != nil {
client = ingesterClient{
IngesterClient: d.cfg.ingesterClientFactory(ingester.Addr),
}
} else {
conn, err := grpc.Dial(
ingester.Addr,
grpc.WithTimeout(d.cfg.RemoteTimeout),
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
middleware.ClientUserHeaderInterceptor,
)),
)
if err != nil {
return nil, err
}
client = ingesterClient{
IngesterClient: cortex.NewIngesterClient(conn),
conn: conn,
}
client, err := d.cfg.ingesterClientFactory(ingester.Addr, d.cfg.RemoteTimeout)
if err != nil {
return nil, err
}
d.clients[ingester.Addr] = client
return client, nil
Expand Down
69 changes: 43 additions & 26 deletions distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (r mockRing) GetAll() []*ring.IngesterDesc {
}

type mockIngester struct {
cortex.IngesterClient
happy bool
}

Expand Down Expand Up @@ -78,18 +79,6 @@ func (i mockIngester) Query(ctx context.Context, in *cortex.QueryRequest, opts .
}, nil
}

func (i mockIngester) LabelValues(ctx context.Context, in *cortex.LabelValuesRequest, opts ...grpc.CallOption) (*cortex.LabelValuesResponse, error) {
return nil, nil
}

func (i mockIngester) UserStats(ctx context.Context, in *cortex.UserStatsRequest, opts ...grpc.CallOption) (*cortex.UserStatsResponse, error) {
return nil, nil
}

func (i mockIngester) MetricsForLabelMatchers(ctx context.Context, in *cortex.MetricsForLabelMatchersRequest, opts ...grpc.CallOption) (*cortex.MetricsForLabelMatchersResponse, error) {
return nil, nil
}

func TestDistributorPush(t *testing.T) {
ctx := user.Inject(context.Background(), "user")
for i, tc := range []struct {
Expand All @@ -106,22 +95,34 @@ func TestDistributorPush(t *testing.T) {

// A push to 3 happy ingesters should succeed
{
samples: 10,
ingesters: []mockIngester{{true}, {true}, {true}},
samples: 10,
ingesters: []mockIngester{
{happy: true},
{happy: true},
{happy: true},
},
expectedResponse: &cortex.WriteResponse{},
},

// A push to 2 happy ingesters should succeed
{
samples: 10,
ingesters: []mockIngester{{}, {true}, {true}},
samples: 10,
ingesters: []mockIngester{
{},
{happy: true},
{happy: true},
},
expectedResponse: &cortex.WriteResponse{},
},

// A push to 1 happy ingesters should fail
{
samples: 10,
ingesters: []mockIngester{{}, {}, {true}},
samples: 10,
ingesters: []mockIngester{
{},
{},
{happy: true},
},
expectedError: fmt.Errorf("Fail"),
},

Expand Down Expand Up @@ -159,8 +160,8 @@ func TestDistributorPush(t *testing.T) {
IngestionRateLimit: 10000,
IngestionBurstSize: 10000,

ingesterClientFactory: func(addr string) cortex.IngesterClient {
return ingesters[addr]
ingesterClientFactory: func(addr string, _ time.Duration) (cortex.IngesterClient, error) {
return ingesters[addr], nil
},
}, ring)
if err != nil {
Expand Down Expand Up @@ -219,25 +220,41 @@ func TestDistributorQuery(t *testing.T) {
}{
// A query to 3 happy ingesters should succeed
{
ingesters: []mockIngester{{true}, {true}, {true}},
ingesters: []mockIngester{
{happy: true},
{happy: true},
{happy: true},
},
expectedResponse: expectedResponse(0, 2),
},

// A query to 2 happy ingesters should succeed
{
ingesters: []mockIngester{{}, {true}, {true}},
ingesters: []mockIngester{
{happy: false},
{happy: true},
{happy: true},
},
expectedResponse: expectedResponse(0, 2),
},

// A query to 1 happy ingesters should fail
{
ingesters: []mockIngester{{}, {}, {true}},
ingesters: []mockIngester{
{happy: false},
{happy: false},
{happy: true},
},
expectedError: fmt.Errorf("Fail"),
},

// A query to 0 happy ingesters should succeed
{
ingesters: []mockIngester{{}, {}, {}},
ingesters: []mockIngester{
{happy: false},
{happy: false},
{happy: false},
},
expectedError: fmt.Errorf("Fail"),
},
} {
Expand Down Expand Up @@ -268,8 +285,8 @@ func TestDistributorQuery(t *testing.T) {
IngestionRateLimit: 10000,
IngestionBurstSize: 10000,

ingesterClientFactory: func(addr string) cortex.IngesterClient {
return ingesters[addr]
ingesterClientFactory: func(addr string, _ time.Duration) (cortex.IngesterClient, error) {
return ingesters[addr], nil
},
}, ring)
if err != nil {
Expand Down
Loading