Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ require (
github.com/docker/go-units v0.5.0 // indirect
github.com/edsrzf/mmap-go v1.2.0 // indirect
github.com/efficientgo/tools/extkingpin v0.0.0-20230505153745-6b7392939a60 // indirect
github.com/envoyproxy/go-control-plane v0.12.0 // indirect
github.com/fatih/color v1.18.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect
Expand Down Expand Up @@ -300,9 +299,6 @@ replace github.com/google/gnostic => github.com/googleapis/gnostic v0.6.9
// https://github.com/thanos-io/thanos/blob/fdeea3917591fc363a329cbe23af37c6fff0b5f0/go.mod#L265
replace gopkg.in/alecthomas/kingpin.v2 => github.com/alecthomas/kingpin v1.3.8-0.20210301060133-17f40c25f497

// gRPC 1.66 introduced memory pooling which breaks Cortex queries. Pin 1.65.0 until we have a fix.
replace google.golang.org/grpc => google.golang.org/grpc v1.65.0

replace github.com/thanos-io/objstore => github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97

replace github.com/prometheus/prometheus => github.com/prometheus/prometheus v0.302.1
Expand Down
1,167 changes: 66 additions & 1,101 deletions go.sum

Large diffs are not rendered by default.

288 changes: 288 additions & 0 deletions integration/grpc_server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
//go:build requires_docker
// +build requires_docker

package integration

import (
"context"
"flag"
"fmt"
"io"
"math/rand"
"net"
"strconv"
"sync"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/server"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/distributor/distributorpb"
ingester_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
)

type mockGprcServer struct {
ingester_client.IngesterServer
}

func (m mockGprcServer) QueryStream(req *ingester_client.QueryRequest, streamServer ingester_client.Ingester_QueryStreamServer) error {
md, _ := metadata.FromIncomingContext(streamServer.Context())
i, _ := strconv.Atoi(md["i"][0])
return streamServer.Send(createStreamResponse(i))
}

func (m mockGprcServer) PushStream(srv ingester_client.Ingester_PushStreamServer) error {
for {
req, err := srv.Recv()
if err == io.EOF {
return nil
}
ctx := metadata.NewIncomingContext(srv.Context(), metadata.MD{"i": []string{req.TenantID}})
res, err := m.Push(ctx, req.Request)
req.Free()
if err != nil {
return err
}
err = srv.Send(res)
if err != nil {
return err
}
}
}

func (m mockGprcServer) Push(ctx context.Context, request *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
defer request.Free()
time.Sleep(time.Duration(rand.Int31n(100)) * time.Millisecond)
md, _ := metadata.FromIncomingContext(ctx)
i, _ := strconv.Atoi(md["i"][0])
expected := createRequest(i)
// Need to do this so the .String method return the same value for MessageWithBufRef
expected.MessageWithBufRef = request.MessageWithBufRef

if expected.String() != request.String() {
return nil, fmt.Errorf("expected %v, got %v", expected, request)
}
return &cortexpb.WriteResponse{}, nil
}

func run(t *testing.T, cfg server.Config, register func(s *grpc.Server), validate func(t *testing.T, con *grpc.ClientConn)) {
savedRegistry := prometheus.DefaultRegisterer
prometheus.DefaultRegisterer = prometheus.NewRegistry()
defer func() {
prometheus.DefaultRegisterer = savedRegistry
}()

grpcPort, closeGrpcPort, err := getLocalHostPort()
require.NoError(t, err)
httpPort, closeHTTPPort, err := getLocalHostPort()
require.NoError(t, err)

err = closeGrpcPort()
require.NoError(t, err)
err = closeHTTPPort()
require.NoError(t, err)

cfg.HTTPListenPort = httpPort
cfg.GRPCListenPort = grpcPort

serv, err := server.New(cfg)
require.NoError(t, err)
register(serv.GRPC)

go func() {
err := serv.Run()
require.NoError(t, err)
}()

defer serv.Shutdown()

grpcHost := fmt.Sprintf("localhost:%d", grpcPort)

clientConfig := grpcclient.Config{}
clientConfig.RegisterFlags(flag.NewFlagSet("fake", flag.ContinueOnError))

dialOptions, err := clientConfig.DialOption(nil, nil)
assert.NoError(t, err)
dialOptions = append([]grpc.DialOption{grpc.WithDefaultCallOptions(clientConfig.CallOptions()...)}, dialOptions...)

conn, err := grpc.NewClient(grpcHost, dialOptions...)
assert.NoError(t, err)
validate(t, conn)
}

func TestConcurrentGrpcCalls(t *testing.T) {
cfg := server.Config{}
(&cfg).RegisterFlags(flag.NewFlagSet("fake", flag.ContinueOnError))

tc := map[string]struct {
cfg server.Config
register func(s *grpc.Server)
validate func(t *testing.T, con *grpc.ClientConn)
}{
"distributor": {
cfg: cfg,
register: func(s *grpc.Server) {
d := &mockGprcServer{}
distributorpb.RegisterDistributorServer(s, d)
},
validate: func(t *testing.T, conn *grpc.ClientConn) {
client := distributorpb.NewDistributorClient(conn)
wg := sync.WaitGroup{}
n := 10000
wg.Add(n)
for i := 0; i < n; i++ {
go func(i int) {
defer wg.Done()
ctx := context.Background()
ctx = metadata.NewOutgoingContext(ctx, metadata.MD{"i": []string{strconv.Itoa(i)}})
_, err := client.Push(ctx, createRequest(i))
require.NoError(t, err)
}(i)
}

wg.Wait()
},
},
"distributor push stream": {
cfg: cfg,
register: func(s *grpc.Server) {
d := &mockGprcServer{}
ingester_client.RegisterIngesterServer(s, d)
},
validate: func(t *testing.T, conn *grpc.ClientConn) {
ctx := context.Background()
client := ingester_client.NewIngesterClient(conn)
wg := sync.WaitGroup{}
n := 10000
wg.Add(n)
for i := 0; i < n; i++ {
go func(i int) {
defer wg.Done()
stream, err := client.PushStream(ctx)
require.NoError(t, err)

ctx = metadata.NewOutgoingContext(ctx, metadata.MD{"i": []string{strconv.Itoa(i)}})
err = stream.Send(&cortexpb.StreamWriteRequest{TenantID: strconv.Itoa(i), Request: createRequest(i)})
require.NoError(t, err)
_, err = stream.Recv()
require.NoError(t, err)
//err = stream.Send(&cortexpb.StreamWriteRequest{"i", createRequest(i + 1)})
//require.NoError(t, err)
require.NoError(t, stream.CloseSend())
}(i)
}

wg.Wait()
},
},
"ingester": {
cfg: cfg,
register: func(s *grpc.Server) {
d := &mockGprcServer{}
ingester_client.RegisterIngesterServer(s, d)
},
validate: func(t *testing.T, conn *grpc.ClientConn) {
client := ingester_client.NewIngesterClient(conn)
wg := sync.WaitGroup{}
n := 10000
wg.Add(n)
for i := 0; i < n; i++ {
go func(i int) {
defer wg.Done()
ctx := context.Background()
ctx = metadata.NewOutgoingContext(ctx, metadata.MD{"i": []string{strconv.Itoa(i)}})
s, err := client.QueryStream(ctx, &ingester_client.QueryRequest{})
require.NoError(t, err)
resp, err := s.Recv()
require.NoError(t, err)
expected := createStreamResponse(i)
require.Equal(t, expected.String(), resp.String())
}(i)
}

wg.Wait()
},
},
}

for name, c := range tc {
t.Run(name, func(t *testing.T) {
run(t, c.cfg, c.register, c.validate)
})
}
}

func createStreamResponse(i int) *ingester_client.QueryStreamResponse {
return &ingester_client.QueryStreamResponse{Chunkseries: []ingester_client.TimeSeriesChunk{
{
FromIngesterId: strconv.Itoa(i),
Labels: createLabels(i),
Chunks: []ingester_client.Chunk{
{
StartTimestampMs: int64(i),
EndTimestampMs: int64(i),
Encoding: int32(i),
Data: []byte(strconv.Itoa(i)),
},
},
},
}}
}

func createRequest(i int) *cortexpb.WriteRequest {
labels := createLabels(i)
return &cortexpb.WriteRequest{
Timeseries: []cortexpb.PreallocTimeseries{
{
TimeSeries: &cortexpb.TimeSeries{
Labels: labels,
Samples: []cortexpb.Sample{
{TimestampMs: int64(i), Value: float64(i)},
},
Exemplars: []cortexpb.Exemplar{
{
Labels: labels,
Value: float64(i),
TimestampMs: int64(i),
},
},
},
},
},
}
}

func createLabels(i int) []cortexpb.LabelAdapter {
labels := make([]cortexpb.LabelAdapter, 0, 100)
for j := 0; j < 100; j++ {
labels = append(labels, cortexpb.LabelAdapter{
Name: fmt.Sprintf("test%d_%d", i, j),
Value: fmt.Sprintf("test%d_%d", i, j),
})
}
return labels
}

func getLocalHostPort() (int, func() error, error) {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
return 0, nil, err
}

l, err := net.ListenTCP("tcp", addr)
if err != nil {
return 0, nil, err
}

closePort := func() error {
return l.Close()
}
return l.Addr().(*net.TCPAddr).Port, closePort, nil
}
84 changes: 84 additions & 0 deletions pkg/cortexpb/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package cortexpb

import (
"fmt"

"google.golang.org/grpc/encoding"
"google.golang.org/grpc/mem"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/protoadapt"
)

func init() {
c := encoding.GetCodecV2("proto")
encoding.RegisterCodecV2(&cortexCodec{c: c})
}

type ReleasableMessage interface {
RegisterBuffer(mem.Buffer)
}

type cortexCodec struct {
c encoding.CodecV2
}

func (c cortexCodec) Name() string {
return c.c.Name()
}

func (c cortexCodec) Marshal(v any) (mem.BufferSlice, error) {
return c.c.Marshal(v)
}

// Unmarshal Copied from https://github.com/grpc/grpc-go/blob/d2e836604b36400a54fbf04af495d12b38fa1e3a/encoding/proto/proto.go#L69-L81
// but without releasing the buffer
func (c *cortexCodec) Unmarshal(data mem.BufferSlice, v any) error {
vv := messageV2Of(v)
if vv == nil {
return fmt.Errorf("failed to unmarshal, message is %T, want proto.Message", v)
}

// To be in the safe side, we will never automatically release the buffer used to Unmarshal the message automatically.
// This should simulate the same behavior of grpc v1.65.0 and before.
buf := data.MaterializeToBuffer(mem.DefaultBufferPool())

err := proto.Unmarshal(buf.ReadOnlyData(), vv)

if err != nil {
defer buf.Free()
return err
}

// If v implements ReleasableMessage interface, we add the buff to be freed later when the request is no longer being used
if fm, ok := v.(ReleasableMessage); ok {
fm.RegisterBuffer(buf)
}

return err
}

func messageV2Of(v any) proto.Message {
switch v := v.(type) {
case protoadapt.MessageV1:
return protoadapt.MessageV2Of(v)
case protoadapt.MessageV2:
return v
}

return nil
}

var _ ReleasableMessage = &MessageWithBufRef{}

type MessageWithBufRef struct {
bs mem.BufferSlice
}

func (m *MessageWithBufRef) RegisterBuffer(buffer mem.Buffer) {
m.bs = append(m.bs, buffer)
}

func (m *MessageWithBufRef) Free() {
m.bs.Free()
m.bs = m.bs[:0]
}
Loading
Loading