Skip to content

Commit 03b9dee

Browse files
committed
Add pg_stat_activity_marginalia sampler
Signed-off-by: Felix Yuan <[email protected]>
1 parent 40863b7 commit 03b9dee

File tree

3 files changed

+185
-0
lines changed

3 files changed

+185
-0
lines changed

collector/collector_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ func readMetric(m prometheus.Metric) MetricResult {
4949
func sanitizeQuery(q string) string {
5050
q = strings.Join(strings.Fields(q), " ")
5151
q = strings.Replace(q, "(", "\\(", -1)
52+
q = strings.Replace(q, "?", "\\?", -1)
5253
q = strings.Replace(q, ")", "\\)", -1)
5354
q = strings.Replace(q, "[", "\\[", -1)
5455
q = strings.Replace(q, "]", "\\]", -1)
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
// Copyright 2023 The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package collector
15+
16+
import (
17+
"context"
18+
19+
"github.com/go-kit/log"
20+
"github.com/prometheus/client_golang/prometheus"
21+
)
22+
23+
const statActivityMarginaliaSubsystem = "slow"
24+
25+
func init() {
26+
registerCollector(statActivityMarginaliaSubsystem, defaultEnabled, NewPGStatActivityMarginaliaCollector)
27+
}
28+
29+
type PGStatActivityMarginaliaCollector struct {
30+
log log.Logger
31+
}
32+
33+
func NewPGStatActivityMarginaliaCollector(config collectorConfig) (Collector, error) {
34+
return &PGStatActivityMarginaliaCollector{log: config.logger}, nil
35+
}
36+
37+
var (
38+
statActivityMarginaliaActiveCount = prometheus.NewDesc(
39+
prometheus.BuildFQName(namespace, statActivityMarginaliaSubsystem, "active_count"),
40+
"Number of active queries at time of sample",
41+
[]string{"usename", "application", "endpoint", "command", "state", "wait_event", "wait_event_type"},
42+
prometheus.Labels{},
43+
)
44+
statActivityMarginaliaMaxTxAgeInSeconds = prometheus.NewDesc(
45+
prometheus.BuildFQName(namespace, statActivityMarginaliaSubsystem, "max_tx_age_in_seconds"),
46+
"Number of active queries at time of sample",
47+
[]string{"usename", "application", "endpoint", "command", "state", "wait_event", "wait_event_type"},
48+
prometheus.Labels{},
49+
)
50+
51+
statActivityMarginaliaQuery = `
52+
SELECT
53+
usename AS usename,
54+
a.matches[1] AS application,
55+
a.matches[2] AS endpoint,
56+
a.matches[3] AS command,
57+
a.state AS state,
58+
a.wait_event AS wait_event,
59+
a.wait_event_type AS wait_event_type,
60+
COUNT(*) active_count,
61+
MAX(age_in_seconds) AS max_tx_age_in_seconds
62+
FROM (
63+
SELECT
64+
usename,
65+
regexp_matches(query, '^\s*(?:\/\*(?:application:(\w+),?)?(?:correlation_id:\w+,?)?(?:jid:\w+,?)?(?:endpoint_id:([\w/\-\.:\#\s]+),?)?.*?\*\/)?\s*(\w+)') AS matches,
66+
state,
67+
wait_event,
68+
wait_event_type,
69+
EXTRACT(EPOCH FROM (clock_timestamp() - xact_start)) AS age_in_seconds
70+
FROM
71+
pg_catalog.pg_stat_activity
72+
) a
73+
GROUP BY usename, application, endpoint, command, state, wait_event, wait_event_type
74+
ORDER BY active_count DESC
75+
`
76+
)
77+
78+
func (PGStatActivityMarginaliaCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
79+
db := instance.getDB()
80+
rows, err := db.QueryContext(ctx,
81+
statActivityMarginaliaQuery)
82+
83+
if err != nil {
84+
return err
85+
}
86+
defer rows.Close()
87+
88+
for rows.Next() {
89+
var usename, application, endpoint, command, state, wait_event, wait_event_type string
90+
var count, max_tx_age float64
91+
92+
if err := rows.Scan(&usename, &application, &endpoint, &command, &state, &wait_event, &wait_event_type, &count, &max_tx_age); err != nil {
93+
return err
94+
}
95+
96+
ch <- prometheus.MustNewConstMetric(
97+
statActivityMarginaliaActiveCount,
98+
prometheus.GaugeValue,
99+
count,
100+
usename, application, endpoint, command, state, wait_event, wait_event_type,
101+
)
102+
ch <- prometheus.MustNewConstMetric(
103+
statActivityMarginaliaMaxTxAgeInSeconds,
104+
prometheus.GaugeValue,
105+
max_tx_age,
106+
usename, application, endpoint, command, state, wait_event, wait_event_type,
107+
)
108+
}
109+
if err := rows.Err(); err != nil {
110+
return err
111+
}
112+
return nil
113+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Copyright 2023 The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
package collector
14+
15+
import (
16+
"context"
17+
"testing"
18+
19+
"github.com/DATA-DOG/go-sqlmock"
20+
"github.com/prometheus/client_golang/prometheus"
21+
dto "github.com/prometheus/client_model/go"
22+
"github.com/smartystreets/goconvey/convey"
23+
)
24+
25+
func xTestPGStatActivityMarginaliaCollector(t *testing.T) {
26+
db, mock, err := sqlmock.New()
27+
if err != nil {
28+
t.Fatalf("Error opening a stub db connection: %s", err)
29+
}
30+
defer db.Close()
31+
inst := &instance{db: db}
32+
columns := []string{
33+
"usename",
34+
"application",
35+
"endpoint",
36+
"command",
37+
"wait_event",
38+
"state",
39+
"wait_event_type",
40+
"active_count",
41+
"max_tx_age_in_seconds",
42+
}
43+
rows := sqlmock.NewRows(columns).
44+
AddRow("postgres", "service_thing", "", "COMMIT", "ClientRead", "idle", "Client", 25, 0)
45+
46+
// TODO: Query has a lot of things to escape, figure out how to get this test to work
47+
mock.ExpectQuery(sanitizeQuery(statActivityMarginaliaQuery)).WillReturnRows(rows)
48+
49+
ch := make(chan prometheus.Metric)
50+
go func() {
51+
defer close(ch)
52+
c := PGStatActivityMarginaliaCollector{}
53+
54+
if err := c.Update(context.Background(), inst, ch); err != nil {
55+
t.Errorf("Error calling PGStatActivityMarginaliaCollector.Update: %s", err)
56+
}
57+
}()
58+
expected := []MetricResult{
59+
{labels: labelMap{"usename": "postgres", "application": "service_thing", "endpoint": "", "command": "COMMIT", "wait_event": "ClientRead", "state": "idle", "wait_event_type": "Client"}, value: 25, metricType: dto.MetricType_GAUGE},
60+
{labels: labelMap{"usename": "postgres", "application": "service_thing", "endpoint": "", "command": "COMMIT", "wait_event": "ClientRead", "state": "idle", "wait_event_type": "Client"}, value: 0, metricType: dto.MetricType_GAUGE},
61+
}
62+
convey.Convey("Metrics comparison", t, func() {
63+
for _, expect := range expected {
64+
m := readMetric(<-ch)
65+
convey.So(expect, convey.ShouldResemble, m)
66+
}
67+
})
68+
if err := mock.ExpectationsWereMet(); err != nil {
69+
t.Errorf("there were unfulfilled exceptions: %s", err)
70+
}
71+
}

0 commit comments

Comments
 (0)