From 2d6391c655b53fbb56aa0f250b77d82829a0cc84 Mon Sep 17 00:00:00 2001 From: Felix Yuan Date: Wed, 28 Jun 2023 11:06:42 -0700 Subject: [PATCH 01/11] Wal Receiver Collector and Test Signed-off-by: Felix Yuan --- collector/pg_stat_walreceiver.go | 306 ++++++++++++++++++++++++++ collector/pg_stat_walreceiver_test.go | 266 ++++++++++++++++++++++ 2 files changed, 572 insertions(+) create mode 100644 collector/pg_stat_walreceiver.go create mode 100644 collector/pg_stat_walreceiver_test.go diff --git a/collector/pg_stat_walreceiver.go b/collector/pg_stat_walreceiver.go new file mode 100644 index 000000000..deb9aa733 --- /dev/null +++ b/collector/pg_stat_walreceiver.go @@ -0,0 +1,306 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "database/sql" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +func init() { + registerCollector(statWalReceiverSubsystem, defaultDisabled, NewPGStatWalReceiverCollector) +} + +type PGStatWalReceiverCollector struct { + log log.Logger +} + +const statWalReceiverSubsystem = "stat_wal_receiver" + +func NewPGStatWalReceiverCollector(config collectorConfig) (Collector, error) { + return &PGStatWalReceiverCollector{log: config.logger}, nil +} + +var ( + statWalReceiverStatus = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "status"), + "Activity status of the WAL receiver process", + []string{"upstream_host", "slot_name"}, + prometheus.Labels{}, + ) + statWalReceiverReceiveStartLsn = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "receive_start_lsn"), + "First write-ahead log location used when WAL receiver is started represented as a decimal", + []string{"upstream_host", "slot_name"}, + prometheus.Labels{}, + ) + statWalReceiverReceiveStartTli = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "receive_start_tli"), + "First timeline number used when WAL receiver is started", + []string{"upstream_host", "slot_name"}, + prometheus.Labels{}, + ) + statWalReceiverFlushedLSN = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "flushed_lsn"), + "Last write-ahead log location already received and flushed to disk, the initial value of this field being the first log location used when WAL receiver is started represented as a decimal", + []string{"upstream_host", "slot_name"}, + prometheus.Labels{}, + ) + statWalReceiverReceivedTli = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "received_tli"), + "Timeline number of last write-ahead log location received and flushed to disk", + []string{"upstream_host", "slot_name"}, + prometheus.Labels{}, + ) + statWalReceiverLastMsgSendTime = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "last_msg_send_time"), + "Send time of last message received from origin WAL sender", + []string{"upstream_host", "slot_name"}, + prometheus.Labels{}, + ) + statWalReceiverLastMsgReceiptTime = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "last_msg_receipt_time"), + "Send time of last message received from origin WAL sender", + []string{"upstream_host", "slot_name"}, + prometheus.Labels{}, + ) + statWalReceiverLatestEndLsn = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "latest_end_lsn"), + "Last write-ahead log location reported to origin WAL sender as integer", + []string{"upstream_host", "slot_name"}, + prometheus.Labels{}, + ) + statWalReceiverLatestEndTime = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "latest_end_time"), + "Time of last write-ahead log location reported to origin WAL sender", + []string{"upstream_host", "slot_name"}, + prometheus.Labels{}, + ) + statWalReceiverUpstreamNode = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "upstream_node"), + "Node ID of the upstream node", + []string{"upstream_host", "slot_name"}, + prometheus.Labels{}, + ) + + pgStatWalColumnQuery = ` + SELECT + column_name + FROM information_schema.columns + WHERE + table_name = 'pg_stat_wal_receiver' and + column_name = 'flushed_lsn' + ` + + pgStatWalReceiverQueryWithNoFlushedLSN = ` + SELECT + trim(both '''' from substring(conninfo from 'host=([^ ]*)')) as upstream_host, + slot_name, + case status + when 'stopped' then 0 + when 'starting' then 1 + when 'streaming' then 2 + when 'waiting' then 3 + when 'restarting' then 4 + when 'stopping' then 5 else -1 + end as status, + (receive_start_lsn- '0/0') % (2^52)::bigint as receive_start_lsn, + receive_start_tli, + received_tli, + extract(epoch from last_msg_send_time) as last_msg_send_time, + extract(epoch from last_msg_receipt_time) as last_msg_receipt_time, + (latest_end_lsn - '0/0') % (2^52)::bigint as latest_end_lsn, + extract(epoch from latest_end_time) as latest_end_time, + substring(slot_name from 'repmgr_slot_([0-9]*)') as upstream_node + FROM pg_catalog.pg_stat_wal_receiver + ` + + pgStatWalReceiverQueryWithFlushedLSN = ` + SELECT + trim(both '''' from substring(conninfo from 'host=([^ ]*)')) as upstream_host, + slot_name, + case status + when 'stopped' then 0 + when 'starting' then 1 + when 'streaming' then 2 + when 'waiting' then 3 + when 'restarting' then 4 + when 'stopping' then 5 else -1 + end as status, + (receive_start_lsn- '0/0') % (2^52)::bigint as receive_start_lsn, + receive_start_tli, + (flushed_lsn- '0/0') % (2^52)::bigint as flushed_lsn, + received_tli, + extract(epoch from last_msg_send_time) as last_msg_send_time, + extract(epoch from last_msg_receipt_time) as last_msg_receipt_time, + (latest_end_lsn - '0/0') % (2^52)::bigint as latest_end_lsn, + extract(epoch from latest_end_time) as latest_end_time, + substring(slot_name from 'repmgr_slot_([0-9]*)') as upstream_node + FROM pg_catalog.pg_stat_wal_receiver + ` +) + +func (c *PGStatWalReceiverCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + hasFlushedLSNRows, err := db.QueryContext(ctx, pgStatWalColumnQuery) + if err != nil { + return err + } + + defer hasFlushedLSNRows.Close() + hasFlushedLSN := hasFlushedLSNRows.Next() + var query string + if hasFlushedLSN { + query = pgStatWalReceiverQueryWithFlushedLSN + } else { + query = pgStatWalReceiverQueryWithNoFlushedLSN + } + rows, err := db.QueryContext(ctx, query) + if err != nil { + return err + } + defer rows.Close() + for rows.Next() { + var upstreamHost, slotName sql.NullString + var status, receiveStartLsn, receiveStartTli, flushedLsn, receivedTli, latestEndLsn, upstreamNode sql.NullInt64 + var lastMsgSendTime, lastMsgReceiptTime, latestEndTime sql.NullFloat64 + + if hasFlushedLSN { + if err := rows.Scan(&upstreamHost, &slotName, &status, &receiveStartLsn, &receiveStartTli, &flushedLsn, &receivedTli, &lastMsgSendTime, &lastMsgReceiptTime, &latestEndLsn, &latestEndTime, &upstreamNode); err != nil { + return err + } + } else { + if err := rows.Scan(&upstreamHost, &slotName, &status, &receiveStartLsn, &receiveStartTli, &receivedTli, &lastMsgSendTime, &lastMsgReceiptTime, &latestEndLsn, &latestEndTime, &upstreamNode); err != nil { + return err + } + } + upstreamHostLabel := "unknown" + if upstreamHost.Valid { + upstreamHostLabel = upstreamHost.String + } + slotNameLabel := "unknown" + if slotName.Valid { + slotNameLabel = slotName.String + } + labels := []string{upstreamHostLabel, slotNameLabel} + + statusMetric := 0.0 + if status.Valid { + statusMetric = float64(status.Int64) + } + ch <- prometheus.MustNewConstMetric( + statWalReceiverStatus, + prometheus.GaugeValue, + statusMetric, + labels...) + + receiveStartLsnMetric := 0.0 + if receiveStartLsn.Valid { + receiveStartLsnMetric = float64(receiveStartLsn.Int64) + } + ch <- prometheus.MustNewConstMetric( + statWalReceiverReceiveStartLsn, + prometheus.CounterValue, + receiveStartLsnMetric, + labels...) + + receiveStartTliMetric := 0.0 + if receiveStartTli.Valid { + receiveStartTliMetric = float64(receiveStartTli.Int64) + } + ch <- prometheus.MustNewConstMetric( + statWalReceiverReceiveStartTli, + prometheus.GaugeValue, + receiveStartTliMetric, + labels...) + + if hasFlushedLSN { + flushedLsnMetric := 0.0 + if flushedLsn.Valid { + flushedLsnMetric = float64(flushedLsn.Int64) + } + ch <- prometheus.MustNewConstMetric( + statWalReceiverFlushedLSN, + prometheus.CounterValue, + flushedLsnMetric, + labels...) + } + + receivedTliMetric := 0.0 + if receivedTli.Valid { + receivedTliMetric = float64(receivedTli.Int64) + } + ch <- prometheus.MustNewConstMetric( + statWalReceiverReceivedTli, + prometheus.GaugeValue, + receivedTliMetric, + labels...) + + lastMsgSendTimeMetric := 0.0 + if lastMsgSendTime.Valid { + lastMsgSendTimeMetric = float64(lastMsgSendTime.Float64) + } + ch <- prometheus.MustNewConstMetric( + statWalReceiverLastMsgSendTime, + prometheus.CounterValue, + lastMsgSendTimeMetric, + labels...) + + lastMsgReceiptTimeMetric := 0.0 + if lastMsgReceiptTime.Valid { + lastMsgReceiptTimeMetric = float64(lastMsgReceiptTime.Float64) + } + ch <- prometheus.MustNewConstMetric( + statWalReceiverLastMsgReceiptTime, + prometheus.CounterValue, + lastMsgReceiptTimeMetric, + labels...) + + latestEndLsnMetric := 0.0 + if latestEndLsn.Valid { + latestEndLsnMetric = float64(latestEndLsn.Int64) + } + ch <- prometheus.MustNewConstMetric( + statWalReceiverLatestEndLsn, + prometheus.CounterValue, + latestEndLsnMetric, + labels...) + + latestEndTimeMetric := 0.0 + if latestEndTime.Valid { + latestEndTimeMetric = float64(latestEndTime.Float64) + } + ch <- prometheus.MustNewConstMetric( + statWalReceiverLatestEndTime, + prometheus.CounterValue, + latestEndTimeMetric, + labels...) + + upstreamNodeMetric := 0.0 + if upstreamNode.Valid { + upstreamNodeMetric = float64(upstreamNode.Int64) + } + ch <- prometheus.MustNewConstMetric( + statWalReceiverUpstreamNode, + prometheus.GaugeValue, + upstreamNodeMetric, + labels...) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_stat_walreceiver_test.go b/collector/pg_stat_walreceiver_test.go new file mode 100644 index 000000000..6a7dc1cce --- /dev/null +++ b/collector/pg_stat_walreceiver_test.go @@ -0,0 +1,266 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPGStatWalReceiverCollectorWithFlushedLSN(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + infoSchemaColumns := []string{ + "column_name", + } + + infoSchemaRows := sqlmock.NewRows(infoSchemaColumns). + AddRow( + "flushed_lsn", + ) + + mock.ExpectQuery(sanitizeQuery(pgStatWalColumnQuery)).WillReturnRows(infoSchemaRows) + + columns := []string{ + "upstream_host", + "slot_name", + "status", + "receive_start_lsn", + "receive_start_tli", + "flushed_lsn", + "received_tli", + "last_msg_send_time", + "last_msg_receipt_time", + "latest_end_lsn", + "latest_end_time", + "upstream_node", + } + rows := sqlmock.NewRows(columns). + AddRow( + "foo", + "bar", + 2, + 1200668684563608, + 1687321285, + 1200668684563609, + 1687321280, + 1687321275, + 1687321276, + 1200668684563610, + 1687321277, + 5, + ) + mock.ExpectQuery(sanitizeQuery(pgStatWalReceiverQueryWithFlushedLSN)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatWalReceiverCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PgStatWalReceiverCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 2, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1200668684563608, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321285, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1200668684563609, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321280, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321275, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321276, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1200668684563610, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321277, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 5, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } + +} + +func TestPGStatWalReceiverCollectorWithNoFlushedLSN(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + infoSchemaColumns := []string{ + "column_name", + } + + infoSchemaRows := sqlmock.NewRows(infoSchemaColumns) + + mock.ExpectQuery(sanitizeQuery(pgStatWalColumnQuery)).WillReturnRows(infoSchemaRows) + + columns := []string{ + "upstream_host", + "slot_name", + "status", + "receive_start_lsn", + "receive_start_tli", + "received_tli", + "last_msg_send_time", + "last_msg_receipt_time", + "latest_end_lsn", + "latest_end_time", + "upstream_node", + } + rows := sqlmock.NewRows(columns). + AddRow( + "foo", + "bar", + 2, + 1200668684563608, + 1687321285, + 1687321280, + 1687321275, + 1687321276, + 1200668684563610, + 1687321277, + 5, + ) + mock.ExpectQuery(sanitizeQuery(pgStatWalReceiverQueryWithNoFlushedLSN)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatWalReceiverCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PgStatWalReceiverCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 2, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1200668684563608, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321285, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321280, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321275, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321276, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1200668684563610, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321277, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 5, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } + +} + +func TestPGStatWalReceiverCollectorWithFlushedLSNNull(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + infoSchemaColumns := []string{ + "column_name", + } + + infoSchemaRows := sqlmock.NewRows(infoSchemaColumns). + AddRow( + "flushed_lsn", + ) + + mock.ExpectQuery(sanitizeQuery(pgStatWalColumnQuery)).WillReturnRows(infoSchemaRows) + + columns := []string{ + "upstream_host", + "slot_name", + "status", + "receive_start_lsn", + "receive_start_tli", + "flushed_lsn", + "received_tli", + "last_msg_send_time", + "last_msg_receipt_time", + "latest_end_lsn", + "latest_end_time", + "upstream_node", + } + rows := sqlmock.NewRows(columns). + AddRow( + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + ) + mock.ExpectQuery(sanitizeQuery(pgStatWalReceiverQueryWithFlushedLSN)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatWalReceiverCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PgStatWalReceiverCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } + +} From 4b94c96a1502ee6c1d5a8993249036d31bc92462 Mon Sep 17 00:00:00 2001 From: Felix Yuan Date: Wed, 28 Jun 2023 11:10:39 -0700 Subject: [PATCH 02/11] Add more escapes Signed-off-by: Felix Yuan --- collector/collector_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/collector/collector_test.go b/collector/collector_test.go index 00c21ed23..18101f00e 100644 --- a/collector/collector_test.go +++ b/collector/collector_test.go @@ -49,6 +49,7 @@ func readMetric(m prometheus.Metric) MetricResult { func sanitizeQuery(q string) string { q = strings.Join(strings.Fields(q), " ") q = strings.Replace(q, "(", "\\(", -1) + q = strings.Replace(q, "?", "\\?", -1) q = strings.Replace(q, ")", "\\)", -1) q = strings.Replace(q, "[", "\\[", -1) q = strings.Replace(q, "]", "\\]", -1) From 43a96a103773bd416066e1ef8d93fd34898aeb0c Mon Sep 17 00:00:00 2001 From: Felix Yuan Date: Thu, 29 Jun 2023 15:33:45 -0700 Subject: [PATCH 03/11] Corrections to wal_receiver Signed-off-by: Felix Yuan --- collector/pg_stat_walreceiver.go | 72 ++++++++++++--------------- collector/pg_stat_walreceiver_test.go | 21 +++++--- 2 files changed, 45 insertions(+), 48 deletions(-) diff --git a/collector/pg_stat_walreceiver.go b/collector/pg_stat_walreceiver.go index deb9aa733..fa649867e 100644 --- a/collector/pg_stat_walreceiver.go +++ b/collector/pg_stat_walreceiver.go @@ -15,6 +15,7 @@ package collector import ( "context" "database/sql" + "fmt" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" @@ -105,44 +106,13 @@ var ( column_name = 'flushed_lsn' ` - pgStatWalReceiverQueryWithNoFlushedLSN = ` + pgStatWalReceiverQueryTemplate = ` SELECT trim(both '''' from substring(conninfo from 'host=([^ ]*)')) as upstream_host, slot_name, - case status - when 'stopped' then 0 - when 'starting' then 1 - when 'streaming' then 2 - when 'waiting' then 3 - when 'restarting' then 4 - when 'stopping' then 5 else -1 - end as status, + status, (receive_start_lsn- '0/0') % (2^52)::bigint as receive_start_lsn, - receive_start_tli, - received_tli, - extract(epoch from last_msg_send_time) as last_msg_send_time, - extract(epoch from last_msg_receipt_time) as last_msg_receipt_time, - (latest_end_lsn - '0/0') % (2^52)::bigint as latest_end_lsn, - extract(epoch from latest_end_time) as latest_end_time, - substring(slot_name from 'repmgr_slot_([0-9]*)') as upstream_node - FROM pg_catalog.pg_stat_wal_receiver - ` - - pgStatWalReceiverQueryWithFlushedLSN = ` - SELECT - trim(both '''' from substring(conninfo from 'host=([^ ]*)')) as upstream_host, - slot_name, - case status - when 'stopped' then 0 - when 'starting' then 1 - when 'streaming' then 2 - when 'waiting' then 3 - when 'restarting' then 4 - when 'stopping' then 5 else -1 - end as status, - (receive_start_lsn- '0/0') % (2^52)::bigint as receive_start_lsn, - receive_start_tli, - (flushed_lsn- '0/0') % (2^52)::bigint as flushed_lsn, + %sreceive_start_tli, received_tli, extract(epoch from last_msg_send_time) as last_msg_send_time, extract(epoch from last_msg_receipt_time) as last_msg_receipt_time, @@ -164,9 +134,9 @@ func (c *PGStatWalReceiverCollector) Update(ctx context.Context, instance *insta hasFlushedLSN := hasFlushedLSNRows.Next() var query string if hasFlushedLSN { - query = pgStatWalReceiverQueryWithFlushedLSN + query = fmt.Sprintf(pgStatWalReceiverQueryTemplate, "(flushed_lsn - '0/0') % (2^52)::bigint as flushed_lsn,\n") } else { - query = pgStatWalReceiverQueryWithNoFlushedLSN + query = fmt.Sprintf(pgStatWalReceiverQueryTemplate, "") } rows, err := db.QueryContext(ctx, query) if err != nil { @@ -174,8 +144,8 @@ func (c *PGStatWalReceiverCollector) Update(ctx context.Context, instance *insta } defer rows.Close() for rows.Next() { - var upstreamHost, slotName sql.NullString - var status, receiveStartLsn, receiveStartTli, flushedLsn, receivedTli, latestEndLsn, upstreamNode sql.NullInt64 + var upstreamHost, slotName, status sql.NullString + var receiveStartLsn, receiveStartTli, flushedLsn, receivedTli, latestEndLsn, upstreamNode sql.NullInt64 var lastMsgSendTime, lastMsgReceiptTime, latestEndTime sql.NullFloat64 if hasFlushedLSN { @@ -197,9 +167,31 @@ func (c *PGStatWalReceiverCollector) Update(ctx context.Context, instance *insta } labels := []string{upstreamHostLabel, slotNameLabel} - statusMetric := 0.0 + statusMetric := -3.0 if status.Valid { - statusMetric = float64(status.Int64) + switch status.String { + case "stopped": + statusMetric = 0.0 + break + case "starting": + statusMetric = 1.0 + break + case "streaming": + statusMetric = 2.0 + break + case "waiting": + statusMetric = 3.0 + break + case "restarting": + statusMetric = 4.0 + break + case "stopping": + statusMetric = -1.0 + break + default: + statusMetric = -2.0 + break + } } ch <- prometheus.MustNewConstMetric( statWalReceiverStatus, diff --git a/collector/pg_stat_walreceiver_test.go b/collector/pg_stat_walreceiver_test.go index 6a7dc1cce..d0206977f 100644 --- a/collector/pg_stat_walreceiver_test.go +++ b/collector/pg_stat_walreceiver_test.go @@ -14,6 +14,7 @@ package collector import ( "context" + "fmt" "testing" "github.com/DATA-DOG/go-sqlmock" @@ -22,6 +23,9 @@ import ( "github.com/smartystreets/goconvey/convey" ) +var queryWithFlushedLSN = fmt.Sprintf(pgStatWalReceiverQueryTemplate, "(flushed_lsn - '0/0') % (2^52)::bigint as flushed_lsn,\n") +var queryWithNoFlushedLSN = fmt.Sprintf(pgStatWalReceiverQueryTemplate, "") + func TestPGStatWalReceiverCollectorWithFlushedLSN(t *testing.T) { db, mock, err := sqlmock.New() if err != nil { @@ -59,7 +63,7 @@ func TestPGStatWalReceiverCollectorWithFlushedLSN(t *testing.T) { AddRow( "foo", "bar", - 2, + "stopping", 1200668684563608, 1687321285, 1200668684563609, @@ -70,7 +74,8 @@ func TestPGStatWalReceiverCollectorWithFlushedLSN(t *testing.T) { 1687321277, 5, ) - mock.ExpectQuery(sanitizeQuery(pgStatWalReceiverQueryWithFlushedLSN)).WillReturnRows(rows) + + mock.ExpectQuery(sanitizeQuery(queryWithFlushedLSN)).WillReturnRows(rows) ch := make(chan prometheus.Metric) go func() { @@ -82,7 +87,7 @@ func TestPGStatWalReceiverCollectorWithFlushedLSN(t *testing.T) { } }() expected := []MetricResult{ - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 2, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: -1.0, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1200668684563608, metricType: dto.MetricType_COUNTER}, {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321285, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1200668684563609, metricType: dto.MetricType_COUNTER}, @@ -138,7 +143,7 @@ func TestPGStatWalReceiverCollectorWithNoFlushedLSN(t *testing.T) { AddRow( "foo", "bar", - 2, + "starting", 1200668684563608, 1687321285, 1687321280, @@ -148,7 +153,7 @@ func TestPGStatWalReceiverCollectorWithNoFlushedLSN(t *testing.T) { 1687321277, 5, ) - mock.ExpectQuery(sanitizeQuery(pgStatWalReceiverQueryWithNoFlushedLSN)).WillReturnRows(rows) + mock.ExpectQuery(sanitizeQuery(queryWithNoFlushedLSN)).WillReturnRows(rows) ch := make(chan prometheus.Metric) go func() { @@ -160,7 +165,7 @@ func TestPGStatWalReceiverCollectorWithNoFlushedLSN(t *testing.T) { } }() expected := []MetricResult{ - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 2, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1.0, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1200668684563608, metricType: dto.MetricType_COUNTER}, {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321285, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321280, metricType: dto.MetricType_GAUGE}, @@ -230,7 +235,7 @@ func TestPGStatWalReceiverCollectorWithFlushedLSNNull(t *testing.T) { nil, nil, ) - mock.ExpectQuery(sanitizeQuery(pgStatWalReceiverQueryWithFlushedLSN)).WillReturnRows(rows) + mock.ExpectQuery(sanitizeQuery(queryWithFlushedLSN)).WillReturnRows(rows) ch := make(chan prometheus.Metric) go func() { @@ -242,7 +247,7 @@ func TestPGStatWalReceiverCollectorWithFlushedLSNNull(t *testing.T) { } }() expected := []MetricResult{ - {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: -3.0, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER}, {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER}, From 0209a53a1582a99ed024fede7d9575e0e39e0c98 Mon Sep 17 00:00:00 2001 From: Felix Yuan Date: Thu, 29 Jun 2023 15:35:11 -0700 Subject: [PATCH 04/11] Continue on null labels Signed-off-by: Felix Yuan --- collector/pg_stat_walreceiver.go | 13 ++++++------- collector/pg_stat_walreceiver_test.go | 24 ++++++++++++------------ 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/collector/pg_stat_walreceiver.go b/collector/pg_stat_walreceiver.go index fa649867e..23dadb674 100644 --- a/collector/pg_stat_walreceiver.go +++ b/collector/pg_stat_walreceiver.go @@ -157,15 +157,14 @@ func (c *PGStatWalReceiverCollector) Update(ctx context.Context, instance *insta return err } } - upstreamHostLabel := "unknown" - if upstreamHost.Valid { - upstreamHostLabel = upstreamHost.String + if !upstreamHost.Valid { + continue } - slotNameLabel := "unknown" - if slotName.Valid { - slotNameLabel = slotName.String + + if !slotName.Valid { + continue } - labels := []string{upstreamHostLabel, slotNameLabel} + labels := []string{upstreamHost.String, slotName.String} statusMetric := -3.0 if status.Valid { diff --git a/collector/pg_stat_walreceiver_test.go b/collector/pg_stat_walreceiver_test.go index d0206977f..49468bfc7 100644 --- a/collector/pg_stat_walreceiver_test.go +++ b/collector/pg_stat_walreceiver_test.go @@ -222,8 +222,8 @@ func TestPGStatWalReceiverCollectorWithFlushedLSNNull(t *testing.T) { } rows := sqlmock.NewRows(columns). AddRow( - nil, - nil, + "foo", + "bar", nil, nil, nil, @@ -247,16 +247,16 @@ func TestPGStatWalReceiverCollectorWithFlushedLSNNull(t *testing.T) { } }() expected := []MetricResult{ - {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: -3.0, metricType: dto.MetricType_GAUGE}, - {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER}, - {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE}, - {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER}, - {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE}, - {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER}, - {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER}, - {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER}, - {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER}, - {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: -3.0, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 0, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 0, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 0, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 0, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 0, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 0, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 0, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 0, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 0, metricType: dto.MetricType_GAUGE}, } convey.Convey("Metrics comparison", t, func() { for _, expect := range expected { From 69c1bb50f39e3fbc6684684dd20781e4a9cdf1ff Mon Sep 17 00:00:00 2001 From: Felix Yuan Date: Thu, 29 Jun 2023 15:44:57 -0700 Subject: [PATCH 05/11] Skip nulls and log a message Signed-off-by: Felix Yuan --- collector/pg_stat_walreceiver.go | 138 ++++++++++++++------------ collector/pg_stat_walreceiver_test.go | 83 ---------------- 2 files changed, 77 insertions(+), 144 deletions(-) diff --git a/collector/pg_stat_walreceiver.go b/collector/pg_stat_walreceiver.go index 23dadb674..56ed6335b 100644 --- a/collector/pg_stat_walreceiver.go +++ b/collector/pg_stat_walreceiver.go @@ -18,6 +18,7 @@ import ( "fmt" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" ) @@ -158,60 +159,96 @@ func (c *PGStatWalReceiverCollector) Update(ctx context.Context, instance *insta } } if !upstreamHost.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because upstream host is null") continue } if !slotName.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because slotname host is null") continue } labels := []string{upstreamHost.String, slotName.String} + if !status.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because status is null") + continue + } - statusMetric := -3.0 - if status.Valid { - switch status.String { - case "stopped": - statusMetric = 0.0 - break - case "starting": - statusMetric = 1.0 - break - case "streaming": - statusMetric = 2.0 - break - case "waiting": - statusMetric = 3.0 - break - case "restarting": - statusMetric = 4.0 - break - case "stopping": - statusMetric = -1.0 - break - default: - statusMetric = -2.0 - break - } + var statusMetric float64 + switch status.String { + case "stopped": + statusMetric = 0.0 + break + case "starting": + statusMetric = 1.0 + break + case "streaming": + statusMetric = 2.0 + break + case "waiting": + statusMetric = 3.0 + break + case "restarting": + statusMetric = 4.0 + break + case "stopping": + statusMetric = -1.0 + break + default: + statusMetric = -2.0 + break + } + + if !receiveStartLsn.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because receive_start_lsn is null") + continue + } + if !receiveStartTli.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because receive_start_tli is null") + continue + } + if hasFlushedLSN && !flushedLsn.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because flushed_lsn is null") + continue + } + if !receivedTli.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because received_tli is null") + continue + } + if !lastMsgSendTime.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because last_msg_send_time is null") + continue + } + if !lastMsgReceiptTime.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because last_msg_receipt_time is null") + continue } + if !latestEndLsn.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because latest_end_lsn is null") + continue + } + if !latestEndTime.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because latest_end_time is null") + continue + } + if !upstreamNode.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because upstream_node is null") + continue + } + + receiveStartLsnMetric := float64(receiveStartLsn.Int64) ch <- prometheus.MustNewConstMetric( statWalReceiverStatus, prometheus.GaugeValue, statusMetric, labels...) - receiveStartLsnMetric := 0.0 - if receiveStartLsn.Valid { - receiveStartLsnMetric = float64(receiveStartLsn.Int64) - } ch <- prometheus.MustNewConstMetric( statWalReceiverReceiveStartLsn, prometheus.CounterValue, receiveStartLsnMetric, labels...) - receiveStartTliMetric := 0.0 - if receiveStartTli.Valid { - receiveStartTliMetric = float64(receiveStartTli.Int64) - } + receiveStartTliMetric := float64(receiveStartTli.Int64) ch <- prometheus.MustNewConstMetric( statWalReceiverReceiveStartTli, prometheus.GaugeValue, @@ -219,10 +256,7 @@ func (c *PGStatWalReceiverCollector) Update(ctx context.Context, instance *insta labels...) if hasFlushedLSN { - flushedLsnMetric := 0.0 - if flushedLsn.Valid { - flushedLsnMetric = float64(flushedLsn.Int64) - } + flushedLsnMetric := float64(flushedLsn.Int64) ch <- prometheus.MustNewConstMetric( statWalReceiverFlushedLSN, prometheus.CounterValue, @@ -230,60 +264,42 @@ func (c *PGStatWalReceiverCollector) Update(ctx context.Context, instance *insta labels...) } - receivedTliMetric := 0.0 - if receivedTli.Valid { - receivedTliMetric = float64(receivedTli.Int64) - } + receivedTliMetric := float64(receivedTli.Int64) ch <- prometheus.MustNewConstMetric( statWalReceiverReceivedTli, prometheus.GaugeValue, receivedTliMetric, labels...) - lastMsgSendTimeMetric := 0.0 - if lastMsgSendTime.Valid { - lastMsgSendTimeMetric = float64(lastMsgSendTime.Float64) - } + lastMsgSendTimeMetric := float64(lastMsgSendTime.Float64) ch <- prometheus.MustNewConstMetric( statWalReceiverLastMsgSendTime, prometheus.CounterValue, lastMsgSendTimeMetric, labels...) - lastMsgReceiptTimeMetric := 0.0 - if lastMsgReceiptTime.Valid { - lastMsgReceiptTimeMetric = float64(lastMsgReceiptTime.Float64) - } + lastMsgReceiptTimeMetric := float64(lastMsgReceiptTime.Float64) ch <- prometheus.MustNewConstMetric( statWalReceiverLastMsgReceiptTime, prometheus.CounterValue, lastMsgReceiptTimeMetric, labels...) - latestEndLsnMetric := 0.0 - if latestEndLsn.Valid { - latestEndLsnMetric = float64(latestEndLsn.Int64) - } + latestEndLsnMetric := float64(latestEndLsn.Int64) ch <- prometheus.MustNewConstMetric( statWalReceiverLatestEndLsn, prometheus.CounterValue, latestEndLsnMetric, labels...) - latestEndTimeMetric := 0.0 - if latestEndTime.Valid { - latestEndTimeMetric = float64(latestEndTime.Float64) - } + latestEndTimeMetric := float64(latestEndTime.Float64) ch <- prometheus.MustNewConstMetric( statWalReceiverLatestEndTime, prometheus.CounterValue, latestEndTimeMetric, labels...) - upstreamNodeMetric := 0.0 - if upstreamNode.Valid { - upstreamNodeMetric = float64(upstreamNode.Int64) - } + upstreamNodeMetric := float64(upstreamNode.Int64) ch <- prometheus.MustNewConstMetric( statWalReceiverUpstreamNode, prometheus.GaugeValue, diff --git a/collector/pg_stat_walreceiver_test.go b/collector/pg_stat_walreceiver_test.go index 49468bfc7..d8113a0ff 100644 --- a/collector/pg_stat_walreceiver_test.go +++ b/collector/pg_stat_walreceiver_test.go @@ -186,86 +186,3 @@ func TestPGStatWalReceiverCollectorWithNoFlushedLSN(t *testing.T) { } } - -func TestPGStatWalReceiverCollectorWithFlushedLSNNull(t *testing.T) { - db, mock, err := sqlmock.New() - if err != nil { - t.Fatalf("Error opening a stub db connection: %s", err) - } - defer db.Close() - - inst := &instance{db: db} - infoSchemaColumns := []string{ - "column_name", - } - - infoSchemaRows := sqlmock.NewRows(infoSchemaColumns). - AddRow( - "flushed_lsn", - ) - - mock.ExpectQuery(sanitizeQuery(pgStatWalColumnQuery)).WillReturnRows(infoSchemaRows) - - columns := []string{ - "upstream_host", - "slot_name", - "status", - "receive_start_lsn", - "receive_start_tli", - "flushed_lsn", - "received_tli", - "last_msg_send_time", - "last_msg_receipt_time", - "latest_end_lsn", - "latest_end_time", - "upstream_node", - } - rows := sqlmock.NewRows(columns). - AddRow( - "foo", - "bar", - nil, - nil, - nil, - nil, - nil, - nil, - nil, - nil, - nil, - nil, - ) - mock.ExpectQuery(sanitizeQuery(queryWithFlushedLSN)).WillReturnRows(rows) - - ch := make(chan prometheus.Metric) - go func() { - defer close(ch) - c := PGStatWalReceiverCollector{} - - if err := c.Update(context.Background(), inst, ch); err != nil { - t.Errorf("Error calling PgStatWalReceiverCollector.Update: %s", err) - } - }() - expected := []MetricResult{ - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: -3.0, metricType: dto.MetricType_GAUGE}, - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 0, metricType: dto.MetricType_COUNTER}, - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 0, metricType: dto.MetricType_GAUGE}, - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 0, metricType: dto.MetricType_COUNTER}, - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 0, metricType: dto.MetricType_GAUGE}, - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 0, metricType: dto.MetricType_COUNTER}, - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 0, metricType: dto.MetricType_COUNTER}, - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 0, metricType: dto.MetricType_COUNTER}, - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 0, metricType: dto.MetricType_COUNTER}, - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 0, metricType: dto.MetricType_GAUGE}, - } - convey.Convey("Metrics comparison", t, func() { - for _, expect := range expected { - m := readMetric(<-ch) - convey.So(expect, convey.ShouldResemble, m) - } - }) - if err := mock.ExpectationsWereMet(); err != nil { - t.Errorf("there were unfulfilled exceptions: %s", err) - } - -} From a55ffe849fb57924f9d470f7fbef0647503206bf Mon Sep 17 00:00:00 2001 From: Felix Yuan Date: Thu, 29 Jun 2023 15:56:54 -0700 Subject: [PATCH 06/11] Redundant breaks Signed-off-by: Felix Yuan --- collector/pg_stat_walreceiver.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/collector/pg_stat_walreceiver.go b/collector/pg_stat_walreceiver.go index 56ed6335b..6de395c90 100644 --- a/collector/pg_stat_walreceiver.go +++ b/collector/pg_stat_walreceiver.go @@ -177,25 +177,18 @@ func (c *PGStatWalReceiverCollector) Update(ctx context.Context, instance *insta switch status.String { case "stopped": statusMetric = 0.0 - break case "starting": statusMetric = 1.0 - break case "streaming": statusMetric = 2.0 - break case "waiting": statusMetric = 3.0 - break case "restarting": statusMetric = 4.0 - break case "stopping": statusMetric = -1.0 - break default: statusMetric = -2.0 - break } if !receiveStartLsn.Valid { From 771fd4c227cef0ed564e9920c3657d747271ce32 Mon Sep 17 00:00:00 2001 From: Felix Yuan Date: Mon, 10 Jul 2023 10:30:50 -0700 Subject: [PATCH 07/11] Fix up walreceiver Signed-off-by: Felix Yuan --- collector/pg_stat_walreceiver.go | 49 +++++++-------------------- collector/pg_stat_walreceiver_test.go | 36 ++++++++++---------- 2 files changed, 30 insertions(+), 55 deletions(-) diff --git a/collector/pg_stat_walreceiver.go b/collector/pg_stat_walreceiver.go index 6de395c90..a5b6f8e96 100644 --- a/collector/pg_stat_walreceiver.go +++ b/collector/pg_stat_walreceiver.go @@ -37,64 +37,65 @@ func NewPGStatWalReceiverCollector(config collectorConfig) (Collector, error) { } var ( + labelCats = []string{"upstream_host", "slot_name", "status"} statWalReceiverStatus = prometheus.NewDesc( prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "status"), "Activity status of the WAL receiver process", - []string{"upstream_host", "slot_name"}, + labelCats, prometheus.Labels{}, ) statWalReceiverReceiveStartLsn = prometheus.NewDesc( prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "receive_start_lsn"), "First write-ahead log location used when WAL receiver is started represented as a decimal", - []string{"upstream_host", "slot_name"}, + labelCats, prometheus.Labels{}, ) statWalReceiverReceiveStartTli = prometheus.NewDesc( prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "receive_start_tli"), "First timeline number used when WAL receiver is started", - []string{"upstream_host", "slot_name"}, + labelCats, prometheus.Labels{}, ) statWalReceiverFlushedLSN = prometheus.NewDesc( prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "flushed_lsn"), "Last write-ahead log location already received and flushed to disk, the initial value of this field being the first log location used when WAL receiver is started represented as a decimal", - []string{"upstream_host", "slot_name"}, + labelCats, prometheus.Labels{}, ) statWalReceiverReceivedTli = prometheus.NewDesc( prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "received_tli"), "Timeline number of last write-ahead log location received and flushed to disk", - []string{"upstream_host", "slot_name"}, + labelCats, prometheus.Labels{}, ) statWalReceiverLastMsgSendTime = prometheus.NewDesc( prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "last_msg_send_time"), "Send time of last message received from origin WAL sender", - []string{"upstream_host", "slot_name"}, + labelCats, prometheus.Labels{}, ) statWalReceiverLastMsgReceiptTime = prometheus.NewDesc( prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "last_msg_receipt_time"), "Send time of last message received from origin WAL sender", - []string{"upstream_host", "slot_name"}, + labelCats, prometheus.Labels{}, ) statWalReceiverLatestEndLsn = prometheus.NewDesc( prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "latest_end_lsn"), "Last write-ahead log location reported to origin WAL sender as integer", - []string{"upstream_host", "slot_name"}, + labelCats, prometheus.Labels{}, ) statWalReceiverLatestEndTime = prometheus.NewDesc( prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "latest_end_time"), "Time of last write-ahead log location reported to origin WAL sender", - []string{"upstream_host", "slot_name"}, + labelCats, prometheus.Labels{}, ) statWalReceiverUpstreamNode = prometheus.NewDesc( prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "upstream_node"), "Node ID of the upstream node", - []string{"upstream_host", "slot_name"}, + labelCats, prometheus.Labels{}, ) @@ -167,29 +168,12 @@ func (c *PGStatWalReceiverCollector) Update(ctx context.Context, instance *insta level.Debug(c.log).Log("msg", "Skipping wal receiver stats because slotname host is null") continue } - labels := []string{upstreamHost.String, slotName.String} + if !status.Valid { level.Debug(c.log).Log("msg", "Skipping wal receiver stats because status is null") continue } - - var statusMetric float64 - switch status.String { - case "stopped": - statusMetric = 0.0 - case "starting": - statusMetric = 1.0 - case "streaming": - statusMetric = 2.0 - case "waiting": - statusMetric = 3.0 - case "restarting": - statusMetric = 4.0 - case "stopping": - statusMetric = -1.0 - default: - statusMetric = -2.0 - } + labels := []string{upstreamHost.String, slotName.String, status.String} if !receiveStartLsn.Valid { level.Debug(c.log).Log("msg", "Skipping wal receiver stats because receive_start_lsn is null") @@ -227,14 +211,7 @@ func (c *PGStatWalReceiverCollector) Update(ctx context.Context, instance *insta level.Debug(c.log).Log("msg", "Skipping wal receiver stats because upstream_node is null") continue } - receiveStartLsnMetric := float64(receiveStartLsn.Int64) - ch <- prometheus.MustNewConstMetric( - statWalReceiverStatus, - prometheus.GaugeValue, - statusMetric, - labels...) - ch <- prometheus.MustNewConstMetric( statWalReceiverReceiveStartLsn, prometheus.CounterValue, diff --git a/collector/pg_stat_walreceiver_test.go b/collector/pg_stat_walreceiver_test.go index d8113a0ff..3e2418b25 100644 --- a/collector/pg_stat_walreceiver_test.go +++ b/collector/pg_stat_walreceiver_test.go @@ -87,16 +87,15 @@ func TestPGStatWalReceiverCollectorWithFlushedLSN(t *testing.T) { } }() expected := []MetricResult{ - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: -1.0, metricType: dto.MetricType_GAUGE}, - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1200668684563608, metricType: dto.MetricType_COUNTER}, - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321285, metricType: dto.MetricType_GAUGE}, - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1200668684563609, metricType: dto.MetricType_COUNTER}, - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321280, metricType: dto.MetricType_GAUGE}, - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321275, metricType: dto.MetricType_COUNTER}, - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321276, metricType: dto.MetricType_COUNTER}, - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1200668684563610, metricType: dto.MetricType_COUNTER}, - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321277, metricType: dto.MetricType_COUNTER}, - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 5, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1200668684563608, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1687321285, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1200668684563609, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1687321280, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1687321275, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1687321276, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1200668684563610, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1687321277, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 5, metricType: dto.MetricType_GAUGE}, } convey.Convey("Metrics comparison", t, func() { for _, expect := range expected { @@ -165,15 +164,14 @@ func TestPGStatWalReceiverCollectorWithNoFlushedLSN(t *testing.T) { } }() expected := []MetricResult{ - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1.0, metricType: dto.MetricType_GAUGE}, - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1200668684563608, metricType: dto.MetricType_COUNTER}, - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321285, metricType: dto.MetricType_GAUGE}, - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321280, metricType: dto.MetricType_GAUGE}, - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321275, metricType: dto.MetricType_COUNTER}, - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321276, metricType: dto.MetricType_COUNTER}, - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1200668684563610, metricType: dto.MetricType_COUNTER}, - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321277, metricType: dto.MetricType_COUNTER}, - {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 5, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1200668684563608, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1687321285, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1687321280, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1687321275, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1687321276, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1200668684563610, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1687321277, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 5, metricType: dto.MetricType_GAUGE}, } convey.Convey("Metrics comparison", t, func() { for _, expect := range expected { From 2e9288708eec7faef176937bb56333fba1e831fb Mon Sep 17 00:00:00 2001 From: Felix Yuan Date: Mon, 10 Jul 2023 10:33:00 -0700 Subject: [PATCH 08/11] Remove extra label Signed-off-by: Felix Yuan --- collector/pg_stat_walreceiver.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/collector/pg_stat_walreceiver.go b/collector/pg_stat_walreceiver.go index a5b6f8e96..9ffc35d31 100644 --- a/collector/pg_stat_walreceiver.go +++ b/collector/pg_stat_walreceiver.go @@ -37,13 +37,7 @@ func NewPGStatWalReceiverCollector(config collectorConfig) (Collector, error) { } var ( - labelCats = []string{"upstream_host", "slot_name", "status"} - statWalReceiverStatus = prometheus.NewDesc( - prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "status"), - "Activity status of the WAL receiver process", - labelCats, - prometheus.Labels{}, - ) + labelCats = []string{"upstream_host", "slot_name", "status"} statWalReceiverReceiveStartLsn = prometheus.NewDesc( prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "receive_start_lsn"), "First write-ahead log location used when WAL receiver is started represented as a decimal", From 67ab6f388f3854438df396fce5ad93076faa5d3e Mon Sep 17 00:00:00 2001 From: Felix Yuan Date: Fri, 14 Jul 2023 09:28:52 -0700 Subject: [PATCH 09/11] Update collector/pg_stat_walreceiver.go Co-authored-by: Ben Kochie Signed-off-by: Felix Yuan --- collector/pg_stat_walreceiver.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/collector/pg_stat_walreceiver.go b/collector/pg_stat_walreceiver.go index 9ffc35d31..5c03861dc 100644 --- a/collector/pg_stat_walreceiver.go +++ b/collector/pg_stat_walreceiver.go @@ -256,11 +256,10 @@ func (c *PGStatWalReceiverCollector) Update(ctx context.Context, instance *insta latestEndLsnMetric, labels...) - latestEndTimeMetric := float64(latestEndTime.Float64) ch <- prometheus.MustNewConstMetric( statWalReceiverLatestEndTime, prometheus.CounterValue, - latestEndTimeMetric, + latestEndTime.Float64, labels...) upstreamNodeMetric := float64(upstreamNode.Int64) From f740d918653296aacea628f12e9bef94a4b4828d Mon Sep 17 00:00:00 2001 From: Felix Yuan Date: Fri, 14 Jul 2023 09:33:24 -0700 Subject: [PATCH 10/11] Clean up the extra assignments Signed-off-by: Felix Yuan --- collector/pg_stat_walreceiver.go | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/collector/pg_stat_walreceiver.go b/collector/pg_stat_walreceiver.go index 5c03861dc..62107f7f1 100644 --- a/collector/pg_stat_walreceiver.go +++ b/collector/pg_stat_walreceiver.go @@ -205,55 +205,48 @@ func (c *PGStatWalReceiverCollector) Update(ctx context.Context, instance *insta level.Debug(c.log).Log("msg", "Skipping wal receiver stats because upstream_node is null") continue } - receiveStartLsnMetric := float64(receiveStartLsn.Int64) ch <- prometheus.MustNewConstMetric( statWalReceiverReceiveStartLsn, prometheus.CounterValue, - receiveStartLsnMetric, + float64(receiveStartLsn.Int64), labels...) - receiveStartTliMetric := float64(receiveStartTli.Int64) ch <- prometheus.MustNewConstMetric( statWalReceiverReceiveStartTli, prometheus.GaugeValue, - receiveStartTliMetric, + float64(receiveStartTli.Int64), labels...) if hasFlushedLSN { - flushedLsnMetric := float64(flushedLsn.Int64) ch <- prometheus.MustNewConstMetric( statWalReceiverFlushedLSN, prometheus.CounterValue, - flushedLsnMetric, + float64(flushedLsn.Int64), labels...) } - receivedTliMetric := float64(receivedTli.Int64) ch <- prometheus.MustNewConstMetric( statWalReceiverReceivedTli, prometheus.GaugeValue, - receivedTliMetric, + float64(receivedTli.Int64), labels...) - lastMsgSendTimeMetric := float64(lastMsgSendTime.Float64) ch <- prometheus.MustNewConstMetric( statWalReceiverLastMsgSendTime, prometheus.CounterValue, - lastMsgSendTimeMetric, + float64(lastMsgSendTime.Float64), labels...) - lastMsgReceiptTimeMetric := float64(lastMsgReceiptTime.Float64) ch <- prometheus.MustNewConstMetric( statWalReceiverLastMsgReceiptTime, prometheus.CounterValue, - lastMsgReceiptTimeMetric, + float64(lastMsgReceiptTime.Float64), labels...) - latestEndLsnMetric := float64(latestEndLsn.Int64) ch <- prometheus.MustNewConstMetric( statWalReceiverLatestEndLsn, prometheus.CounterValue, - latestEndLsnMetric, + float64(latestEndLsn.Int64), labels...) ch <- prometheus.MustNewConstMetric( @@ -262,11 +255,10 @@ func (c *PGStatWalReceiverCollector) Update(ctx context.Context, instance *insta latestEndTime.Float64, labels...) - upstreamNodeMetric := float64(upstreamNode.Int64) ch <- prometheus.MustNewConstMetric( statWalReceiverUpstreamNode, prometheus.GaugeValue, - upstreamNodeMetric, + float64(upstreamNode.Int64), labels...) } if err := rows.Err(); err != nil { From b0037d6a35b892064e3e03bd089935429b84ad97 Mon Sep 17 00:00:00 2001 From: Felix Yuan Date: Mon, 17 Jul 2023 09:59:16 -0700 Subject: [PATCH 11/11] Update collector/pg_stat_walreceiver.go Co-authored-by: Joe Adams Signed-off-by: Felix Yuan --- collector/pg_stat_walreceiver.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/collector/pg_stat_walreceiver.go b/collector/pg_stat_walreceiver.go index 62107f7f1..3134c025b 100644 --- a/collector/pg_stat_walreceiver.go +++ b/collector/pg_stat_walreceiver.go @@ -108,7 +108,8 @@ var ( slot_name, status, (receive_start_lsn- '0/0') % (2^52)::bigint as receive_start_lsn, - %sreceive_start_tli, + %s +receive_start_tli, received_tli, extract(epoch from last_msg_send_time) as last_msg_send_time, extract(epoch from last_msg_receipt_time) as last_msg_receipt_time,