Skip to content

Commit 26d98e9

Browse files
committed
Long running transactions marginalia
Signed-off-by: Felix Yuan <[email protected]>
1 parent 5401e23 commit 26d98e9

File tree

2 files changed

+158
-0
lines changed

2 files changed

+158
-0
lines changed
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Copyright 2023 The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package collector
15+
16+
import (
17+
"context"
18+
19+
"github.com/go-kit/log"
20+
"github.com/prometheus/client_golang/prometheus"
21+
)
22+
23+
const longRunningTransactionsMarginaliaSubsystem = "long_running_transactions_marginalia"
24+
25+
func init() {
26+
registerCollector(longRunningTransactionsMarginaliaSubsystem, defaultEnabled, NewPGLongRunningTransactionsMarginaliaCollector)
27+
}
28+
29+
type PGLongRunningTransactionsMarginaliaCollector struct {
30+
log log.Logger
31+
}
32+
33+
func NewPGLongRunningTransactionsMarginaliaCollector(config collectorConfig) (Collector, error) {
34+
return &PGLongRunningTransactionsMarginaliaCollector{log: config.logger}, nil
35+
}
36+
37+
var (
38+
longRunningTransactionsMarginaliaMaxAgeInSeconds = prometheus.NewDesc(
39+
prometheus.BuildFQName(namespace, longRunningTransactionsMarginaliaSubsystem, "max_age_in_seconds"),
40+
"The current maximum transaction age in seconds",
41+
[]string{"application", "endpoint"},
42+
prometheus.Labels{},
43+
)
44+
45+
longRunningTransactionsMarginaliaQuery = `
46+
SELECT
47+
activity.matches[1] AS application,
48+
activity.matches[2] AS endpoint,
49+
MAX(age_in_seconds) AS max_age_in_seconds
50+
FROM (
51+
SELECT
52+
regexp_matches(query, '^\s*(?:\/\*(?:application:(\w+),?)?(?:correlation_id:\w+,?)?(?:jid:\w+,?)?(?:endpoint_id:([\w/\-\.:\#\s]+),?)?.*?\*\/)?\s*(\w+)') AS matches,
53+
EXTRACT(EPOCH FROM (clock_timestamp() - xact_start)) AS age_in_seconds
54+
FROM
55+
pg_catalog.pg_stat_activity
56+
WHERE state <> 'idle'
57+
AND (clock_timestamp() - xact_start) > '30 seconds'::interval
58+
AND query NOT LIKE 'autovacuum:%'
59+
) activity
60+
GROUP BY application, endpoint
61+
ORDER BY max_age_in_seconds DESC
62+
`
63+
)
64+
65+
func (PGLongRunningTransactionsMarginaliaCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
66+
db := instance.getDB()
67+
rows, err := db.QueryContext(ctx,
68+
longRunningTransactionsMarginaliaQuery)
69+
70+
if err != nil {
71+
return err
72+
}
73+
defer rows.Close()
74+
75+
for rows.Next() {
76+
var application, endpoint string
77+
var max_age_in_seconds float64
78+
79+
if err := rows.Scan(&application, &endpoint, &max_age_in_seconds); err != nil {
80+
return err
81+
}
82+
83+
ch <- prometheus.MustNewConstMetric(
84+
longRunningTransactionsAgeInSeconds,
85+
prometheus.GaugeValue,
86+
max_age_in_seconds,
87+
application, endpoint,
88+
)
89+
}
90+
if err := rows.Err(); err != nil {
91+
return err
92+
}
93+
return nil
94+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Copyright 2023 The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
package collector
14+
15+
import (
16+
"context"
17+
"testing"
18+
19+
"github.com/DATA-DOG/go-sqlmock"
20+
"github.com/prometheus/client_golang/prometheus"
21+
dto "github.com/prometheus/client_model/go"
22+
"github.com/smartystreets/goconvey/convey"
23+
)
24+
25+
func xTestPGLongRunningTransactionsMarginaliaCollector(t *testing.T) {
26+
db, mock, err := sqlmock.New()
27+
if err != nil {
28+
t.Fatalf("Error opening a stub db connection: %s", err)
29+
}
30+
defer db.Close()
31+
inst := &instance{db: db}
32+
columns := []string{
33+
"application",
34+
"endpoint",
35+
"max_age_in_seconds",
36+
}
37+
rows := sqlmock.NewRows(columns).
38+
AddRow("reddit", "GET /r/programming", 32)
39+
40+
// TODO: Fix the sanitizeQuery escaper to deal better with regex
41+
mock.ExpectQuery(sanitizeQuery(longRunningTransactionsMarginaliaQuery)).WillReturnRows(rows)
42+
43+
ch := make(chan prometheus.Metric)
44+
go func() {
45+
defer close(ch)
46+
c := PGLongRunningTransactionsMarginaliaCollector{}
47+
48+
if err := c.Update(context.Background(), inst, ch); err != nil {
49+
t.Errorf("Error calling PGLongRunningTransactionsMarginaliaCollector.Update: %s", err)
50+
}
51+
}()
52+
expected := []MetricResult{
53+
{labels: labelMap{}, value: 25, metricType: dto.MetricType_GAUGE},
54+
}
55+
convey.Convey("Metrics comparison", t, func() {
56+
for _, expect := range expected {
57+
m := readMetric(<-ch)
58+
convey.So(expect, convey.ShouldResemble, m)
59+
}
60+
})
61+
if err := mock.ExpectationsWereMet(); err != nil {
62+
t.Errorf("there were unfulfilled exceptions: %s", err)
63+
}
64+
}

0 commit comments

Comments
 (0)