From 045ec9aff2f093bedd4c7149b22ea6b138f40000 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Mon, 30 Jun 2025 12:08:26 -0700 Subject: [PATCH] Created new grpc instrument to record stream push metrics Signed-off-by: Alex Le --- pkg/ingester/client/client.go | 8 +++- pkg/util/grpcclient/instrumentation.go | 15 ++++++++ pkg/util/middleware/grpc.go | 51 ++++++++++++++++++++++++++ 3 files changed, 72 insertions(+), 2 deletions(-) diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index 40987d185ee..b52ac69634b 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -139,7 +139,11 @@ func (c *closableHealthAndIngesterClient) handlePushRequest(mainFunc func() (*co // MakeIngesterClient makes a new IngesterClient func MakeIngesterClient(addr string, cfg Config, useStreamConnection bool) (HealthAndIngesterClient, error) { - dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(ingesterClientRequestDuration)) + unaryClientInterceptor, streamClientInterceptor := grpcclient.Instrument(ingesterClientRequestDuration) + if useStreamConnection { + unaryClientInterceptor, streamClientInterceptor = grpcclient.InstrumentReusableStream(ingesterClientRequestDuration) + } + dialOpts, err := cfg.GRPCClientConfig.DialOption(unaryClientInterceptor, streamClientInterceptor) if err != nil { return nil, err } @@ -202,7 +206,7 @@ func (c *closableHealthAndIngesterClient) Run(streamPushChan chan *streamWriteJo var workerErr error var wg sync.WaitGroup for i := 0; i < INGESTER_CLIENT_STREAM_WORKER_COUNT; i++ { - workerName := fmt.Sprintf("stream-push-worker-%d", i) + workerName := fmt.Sprintf("ingester-%s-stream-push-worker-%d", c.addr, i) wg.Add(1) go func() { workerCtx := user.InjectOrgID(streamCtx, workerName) diff --git a/pkg/util/grpcclient/instrumentation.go b/pkg/util/grpcclient/instrumentation.go index 93353eda09b..a4e6cad731f 100644 --- a/pkg/util/grpcclient/instrumentation.go +++ b/pkg/util/grpcclient/instrumentation.go @@ -25,3 +25,18 @@ func Instrument(requestDuration *prometheus.HistogramVec) ([]grpc.UnaryClientInt cortexmiddleware.PrometheusGRPCStreamInstrumentation(requestDuration), } } + +func InstrumentReusableStream(requestDuration *prometheus.HistogramVec) ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) { + return []grpc.UnaryClientInterceptor{ + grpcutil.HTTPHeaderPropagationClientInterceptor, + otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), + middleware.ClientUserHeaderInterceptor, + cortexmiddleware.PrometheusGRPCUnaryInstrumentation(requestDuration), + }, []grpc.StreamClientInterceptor{ + grpcutil.HTTPHeaderPropagationStreamClientInterceptor, + unwrapErrorStreamClientInterceptor(), + otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()), + middleware.StreamClientUserHeaderInterceptor, + cortexmiddleware.PrometheusGRPCReusableStreamInstrumentation(requestDuration), + } +} diff --git a/pkg/util/middleware/grpc.go b/pkg/util/middleware/grpc.go index 66f0d376608..aee899095b0 100644 --- a/pkg/util/middleware/grpc.go +++ b/pkg/util/middleware/grpc.go @@ -84,6 +84,57 @@ func (s *instrumentedClientStream) Header() (metadata.MD, error) { return md, err } +// PrometheusGRPCReusableStreamInstrumentation records duration of reusable streaming gRPC requests client side. +func PrometheusGRPCReusableStreamInstrumentation(metric *prometheus.HistogramVec) grpc.StreamClientInterceptor { + return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, + streamer grpc.Streamer, opts ...grpc.CallOption, + ) (grpc.ClientStream, error) { + stream, err := streamer(ctx, desc, cc, method, opts...) + return &instrumentedReusableClientStream{ + metric: metric, + method: method, + ClientStream: stream, + }, err + } +} + +type instrumentedReusableClientStream struct { + metric *prometheus.HistogramVec + method string + grpc.ClientStream +} + +func (s *instrumentedReusableClientStream) SendMsg(m interface{}) error { + start := time.Now() + err := s.ClientStream.SendMsg(m) + if err != nil && err != io.EOF { + s.metric.WithLabelValues(s.method, errorCode(err)).Observe(time.Since(start).Seconds()) + return err + } + s.metric.WithLabelValues(s.method, errorCode(nil)).Observe(time.Since(start).Seconds()) + return err +} + +func (s *instrumentedReusableClientStream) RecvMsg(m interface{}) error { + start := time.Now() + err := s.ClientStream.RecvMsg(m) + if err != nil && err != io.EOF { + s.metric.WithLabelValues(s.method, errorCode(err)).Observe(time.Since(start).Seconds()) + return err + } + s.metric.WithLabelValues(s.method, errorCode(nil)).Observe(time.Since(start).Seconds()) + return err +} + +func (s *instrumentedReusableClientStream) Header() (metadata.MD, error) { + start := time.Now() + md, err := s.ClientStream.Header() + if err != nil { + s.metric.WithLabelValues(s.method, errorCode(err)).Observe(time.Since(start).Seconds()) + } + return md, err +} + func errorCode(err error) string { respStatus := "2xx" if err != nil {