Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit de42ef7

Browse files
authoredMay 16, 2025··
Export query itself together with queryId in stat_statement metrics (#940)
* Export query itself together with queryId in stat_statement metrics * The feature must be enabled via flag. * Limit length of selected query. The query is not added to every metrics, but instead of new metric stat_statement_query_id is introduced that contains mapping between queryId and query. Fixes: #813 --------- Signed-off-by: Jakub Štiller <[email protected]> Signed-off-by: Jakub Štiller <[email protected]>
1 parent bd8a613 commit de42ef7

File tree

4 files changed

+270
-17
lines changed

4 files changed

+270
-17
lines changed
 

‎README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,12 @@ This will build the docker image as `prometheuscommunity/postgres_exporter:${bra
150150
* `[no-]collector.stat_statements`
151151
Enable the `stat_statements` collector (default: disabled).
152152

153+
* `[no-]collector.stat_statements.include_query`
154+
Enable selecting statement query together with queryId. (default: disabled)
155+
156+
* `--collector.stat_statements.query_length`
157+
Maximum length of the statement text. Default is 120.
158+
153159
* `[no-]collector.stat_user_tables`
154160
Enable the `stat_user_tables` collector (default: enabled).
155161

‎collector/collector.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@ const (
3737
// Namespace for all metrics.
3838
namespace = "pg"
3939

40-
defaultEnabled = true
41-
defaultDisabled = false
40+
collectorFlagPrefix = "collector."
41+
defaultEnabled = true
42+
defaultDisabled = false
4243
)
4344

4445
var (
@@ -74,7 +75,7 @@ func registerCollector(name string, isDefaultEnabled bool, createFunc func(colle
7475
}
7576

7677
// Create flag for this collector
77-
flagName := fmt.Sprintf("collector.%s", name)
78+
flagName := collectorFlagPrefix + name
7879
flagHelp := fmt.Sprintf("Enable the %s collector (default: %s).", name, helpDefaultState)
7980
defaultValue := fmt.Sprintf("%v", isDefaultEnabled)
8081

‎collector/pg_stat_statements.go

Lines changed: 79 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,51 @@ package collector
1616
import (
1717
"context"
1818
"database/sql"
19+
"fmt"
1920
"log/slog"
2021

22+
"github.com/alecthomas/kingpin/v2"
2123
"github.com/blang/semver/v4"
2224
"github.com/prometheus/client_golang/prometheus"
2325
)
2426

2527
const statStatementsSubsystem = "stat_statements"
2628

29+
var (
30+
includeQueryFlag *bool = nil
31+
statementLengthFlag *uint = nil
32+
)
33+
2734
func init() {
2835
// WARNING:
2936
// Disabled by default because this set of metrics can be quite expensive on a busy server
3037
// Every unique query will cause a new timeseries to be created
3138
registerCollector(statStatementsSubsystem, defaultDisabled, NewPGStatStatementsCollector)
39+
40+
includeQueryFlag = kingpin.Flag(
41+
fmt.Sprint(collectorFlagPrefix, statStatementsSubsystem, ".include_query"),
42+
"Enable selecting statement query together with queryId. (default: disabled)").
43+
Default(fmt.Sprintf("%v", defaultDisabled)).
44+
Bool()
45+
statementLengthFlag = kingpin.Flag(
46+
fmt.Sprint(collectorFlagPrefix, statStatementsSubsystem, ".query_length"),
47+
"Maximum length of the statement text.").
48+
Default("120").
49+
Uint()
3250
}
3351

3452
type PGStatStatementsCollector struct {
35-
log *slog.Logger
53+
log *slog.Logger
54+
includeQueryStatement bool
55+
statementLength uint
3656
}
3757

3858
func NewPGStatStatementsCollector(config collectorConfig) (Collector, error) {
39-
return &PGStatStatementsCollector{log: config.logger}, nil
59+
return &PGStatStatementsCollector{
60+
log: config.logger,
61+
includeQueryStatement: *includeQueryFlag,
62+
statementLength: *statementLengthFlag,
63+
}, nil
4064
}
4165

4266
var (
@@ -71,10 +95,22 @@ var (
7195
prometheus.Labels{},
7296
)
7397

98+
statStatementsQuery = prometheus.NewDesc(
99+
prometheus.BuildFQName(namespace, statStatementsSubsystem, "query_id"),
100+
"SQL Query to queryid mapping",
101+
[]string{"queryid", "query"},
102+
prometheus.Labels{},
103+
)
104+
)
105+
106+
const (
107+
pgStatStatementQuerySelect = `LEFT(pg_stat_statements.query, %d) as query,`
108+
74109
pgStatStatementsQuery = `SELECT
75110
pg_get_userbyid(userid) as user,
76111
pg_database.datname,
77112
pg_stat_statements.queryid,
113+
%s
78114
pg_stat_statements.calls as calls_total,
79115
pg_stat_statements.total_time / 1000.0 as seconds_total,
80116
pg_stat_statements.rows as rows_total,
@@ -96,6 +132,7 @@ var (
96132
pg_get_userbyid(userid) as user,
97133
pg_database.datname,
98134
pg_stat_statements.queryid,
135+
%s
99136
pg_stat_statements.calls as calls_total,
100137
pg_stat_statements.total_exec_time / 1000.0 as seconds_total,
101138
pg_stat_statements.rows as rows_total,
@@ -117,6 +154,7 @@ var (
117154
pg_get_userbyid(userid) as user,
118155
pg_database.datname,
119156
pg_stat_statements.queryid,
157+
%s
120158
pg_stat_statements.calls as calls_total,
121159
pg_stat_statements.total_exec_time / 1000.0 as seconds_total,
122160
pg_stat_statements.rows as rows_total,
@@ -135,30 +173,42 @@ var (
135173
LIMIT 100;`
136174
)
137175

138-
func (PGStatStatementsCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
139-
var query string
176+
func (c PGStatStatementsCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
177+
var queryTemplate string
140178
switch {
141179
case instance.version.GE(semver.MustParse("17.0.0")):
142-
query = pgStatStatementsQuery_PG17
180+
queryTemplate = pgStatStatementsQuery_PG17
143181
case instance.version.GE(semver.MustParse("13.0.0")):
144-
query = pgStatStatementsNewQuery
182+
queryTemplate = pgStatStatementsNewQuery
145183
default:
146-
query = pgStatStatementsQuery
184+
queryTemplate = pgStatStatementsQuery
147185
}
186+
var querySelect = ""
187+
if c.includeQueryStatement {
188+
querySelect = fmt.Sprintf(pgStatStatementQuerySelect, c.statementLength)
189+
}
190+
query := fmt.Sprintf(queryTemplate, querySelect)
148191

149192
db := instance.getDB()
150193
rows, err := db.QueryContext(ctx, query)
151194

195+
var presentQueryIds = make(map[string]struct{})
196+
152197
if err != nil {
153198
return err
154199
}
155200
defer rows.Close()
156201
for rows.Next() {
157-
var user, datname, queryid sql.NullString
202+
var user, datname, queryid, statement sql.NullString
158203
var callsTotal, rowsTotal sql.NullInt64
159204
var secondsTotal, blockReadSecondsTotal, blockWriteSecondsTotal sql.NullFloat64
160-
161-
if err := rows.Scan(&user, &datname, &queryid, &callsTotal, &secondsTotal, &rowsTotal, &blockReadSecondsTotal, &blockWriteSecondsTotal); err != nil {
205+
var columns []any
206+
if c.includeQueryStatement {
207+
columns = []any{&user, &datname, &queryid, &statement, &callsTotal, &secondsTotal, &rowsTotal, &blockReadSecondsTotal, &blockWriteSecondsTotal}
208+
} else {
209+
columns = []any{&user, &datname, &queryid, &callsTotal, &secondsTotal, &rowsTotal, &blockReadSecondsTotal, &blockWriteSecondsTotal}
210+
}
211+
if err := rows.Scan(columns...); err != nil {
162212
return err
163213
}
164214

@@ -229,6 +279,25 @@ func (PGStatStatementsCollector) Update(ctx context.Context, instance *instance,
229279
blockWriteSecondsTotalMetric,
230280
userLabel, datnameLabel, queryidLabel,
231281
)
282+
283+
if c.includeQueryStatement {
284+
_, ok := presentQueryIds[queryidLabel]
285+
if !ok {
286+
presentQueryIds[queryidLabel] = struct{}{}
287+
288+
queryLabel := "unknown"
289+
if statement.Valid {
290+
queryLabel = statement.String
291+
}
292+
293+
ch <- prometheus.MustNewConstMetric(
294+
statStatementsQuery,
295+
prometheus.CounterValue,
296+
1,
297+
queryidLabel, queryLabel,
298+
)
299+
}
300+
}
232301
}
233302
if err := rows.Err(); err != nil {
234303
return err

‎collector/pg_stat_statements_test.go

Lines changed: 181 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ package collector
1414

1515
import (
1616
"context"
17+
"fmt"
1718
"testing"
1819

1920
"github.com/DATA-DOG/go-sqlmock"
@@ -35,7 +36,7 @@ func TestPGStateStatementsCollector(t *testing.T) {
3536
columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
3637
rows := sqlmock.NewRows(columns).
3738
AddRow("postgres", "postgres", 1500, 5, 0.4, 100, 0.1, 0.2)
38-
mock.ExpectQuery(sanitizeQuery(pgStatStatementsQuery)).WillReturnRows(rows)
39+
mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery, ""))).WillReturnRows(rows)
3940

4041
ch := make(chan prometheus.Metric)
4142
go func() {
@@ -66,6 +67,50 @@ func TestPGStateStatementsCollector(t *testing.T) {
6667
}
6768
}
6869

70+
func TestPGStateStatementsCollectorWithStatement(t *testing.T) {
71+
db, mock, err := sqlmock.New()
72+
if err != nil {
73+
t.Fatalf("Error opening a stub db connection: %s", err)
74+
}
75+
defer db.Close()
76+
77+
inst := &instance{db: db, version: semver.MustParse("12.0.0")}
78+
79+
columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 100) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
80+
rows := sqlmock.NewRows(columns).
81+
AddRow("postgres", "postgres", 1500, "select 1 from foo", 5, 0.4, 100, 0.1, 0.2)
82+
mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery, fmt.Sprintf(pgStatStatementQuerySelect, 100)))).WillReturnRows(rows)
83+
84+
ch := make(chan prometheus.Metric)
85+
go func() {
86+
defer close(ch)
87+
c := PGStatStatementsCollector{includeQueryStatement: true, statementLength: 100}
88+
89+
if err := c.Update(context.Background(), inst, ch); err != nil {
90+
t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err)
91+
}
92+
}()
93+
94+
expected := []MetricResult{
95+
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 5},
96+
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.4},
97+
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 100},
98+
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.1},
99+
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.2},
100+
{labels: labelMap{"queryid": "1500", "query": "select 1 from foo"}, metricType: dto.MetricType_COUNTER, value: 1},
101+
}
102+
103+
convey.Convey("Metrics comparison", t, func() {
104+
for _, expect := range expected {
105+
m := readMetric(<-ch)
106+
convey.So(expect, convey.ShouldResemble, m)
107+
}
108+
})
109+
if err := mock.ExpectationsWereMet(); err != nil {
110+
t.Errorf("there were unfulfilled exceptions: %s", err)
111+
}
112+
}
113+
69114
func TestPGStateStatementsCollectorNull(t *testing.T) {
70115
db, mock, err := sqlmock.New()
71116
if err != nil {
@@ -78,7 +123,7 @@ func TestPGStateStatementsCollectorNull(t *testing.T) {
78123
columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
79124
rows := sqlmock.NewRows(columns).
80125
AddRow(nil, nil, nil, nil, nil, nil, nil, nil)
81-
mock.ExpectQuery(sanitizeQuery(pgStatStatementsNewQuery)).WillReturnRows(rows)
126+
mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsNewQuery, ""))).WillReturnRows(rows)
82127

83128
ch := make(chan prometheus.Metric)
84129
go func() {
@@ -109,6 +154,50 @@ func TestPGStateStatementsCollectorNull(t *testing.T) {
109154
}
110155
}
111156

157+
func TestPGStateStatementsCollectorNullWithStatement(t *testing.T) {
158+
db, mock, err := sqlmock.New()
159+
if err != nil {
160+
t.Fatalf("Error opening a stub db connection: %s", err)
161+
}
162+
defer db.Close()
163+
164+
inst := &instance{db: db, version: semver.MustParse("13.3.7")}
165+
166+
columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 200) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
167+
rows := sqlmock.NewRows(columns).
168+
AddRow(nil, nil, nil, nil, nil, nil, nil, nil, nil)
169+
mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsNewQuery, fmt.Sprintf(pgStatStatementQuerySelect, 200)))).WillReturnRows(rows)
170+
171+
ch := make(chan prometheus.Metric)
172+
go func() {
173+
defer close(ch)
174+
c := PGStatStatementsCollector{includeQueryStatement: true, statementLength: 200}
175+
176+
if err := c.Update(context.Background(), inst, ch); err != nil {
177+
t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err)
178+
}
179+
}()
180+
181+
expected := []MetricResult{
182+
{labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
183+
{labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
184+
{labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
185+
{labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
186+
{labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
187+
{labels: labelMap{"queryid": "unknown", "query": "unknown"}, metricType: dto.MetricType_COUNTER, value: 1},
188+
}
189+
190+
convey.Convey("Metrics comparison", t, func() {
191+
for _, expect := range expected {
192+
m := readMetric(<-ch)
193+
convey.So(expect, convey.ShouldResemble, m)
194+
}
195+
})
196+
if err := mock.ExpectationsWereMet(); err != nil {
197+
t.Errorf("there were unfulfilled exceptions: %s", err)
198+
}
199+
}
200+
112201
func TestPGStateStatementsCollectorNewPG(t *testing.T) {
113202
db, mock, err := sqlmock.New()
114203
if err != nil {
@@ -121,7 +210,7 @@ func TestPGStateStatementsCollectorNewPG(t *testing.T) {
121210
columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
122211
rows := sqlmock.NewRows(columns).
123212
AddRow("postgres", "postgres", 1500, 5, 0.4, 100, 0.1, 0.2)
124-
mock.ExpectQuery(sanitizeQuery(pgStatStatementsNewQuery)).WillReturnRows(rows)
213+
mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsNewQuery, ""))).WillReturnRows(rows)
125214

126215
ch := make(chan prometheus.Metric)
127216
go func() {
@@ -152,6 +241,50 @@ func TestPGStateStatementsCollectorNewPG(t *testing.T) {
152241
}
153242
}
154243

244+
func TestPGStateStatementsCollectorNewPGWithStatement(t *testing.T) {
245+
db, mock, err := sqlmock.New()
246+
if err != nil {
247+
t.Fatalf("Error opening a stub db connection: %s", err)
248+
}
249+
defer db.Close()
250+
251+
inst := &instance{db: db, version: semver.MustParse("13.3.7")}
252+
253+
columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 300) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
254+
rows := sqlmock.NewRows(columns).
255+
AddRow("postgres", "postgres", 1500, "select 1 from foo", 5, 0.4, 100, 0.1, 0.2)
256+
mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsNewQuery, fmt.Sprintf(pgStatStatementQuerySelect, 300)))).WillReturnRows(rows)
257+
258+
ch := make(chan prometheus.Metric)
259+
go func() {
260+
defer close(ch)
261+
c := PGStatStatementsCollector{includeQueryStatement: true, statementLength: 300}
262+
263+
if err := c.Update(context.Background(), inst, ch); err != nil {
264+
t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err)
265+
}
266+
}()
267+
268+
expected := []MetricResult{
269+
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 5},
270+
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.4},
271+
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 100},
272+
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.1},
273+
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.2},
274+
{labels: labelMap{"queryid": "1500", "query": "select 1 from foo"}, metricType: dto.MetricType_COUNTER, value: 1},
275+
}
276+
277+
convey.Convey("Metrics comparison", t, func() {
278+
for _, expect := range expected {
279+
m := readMetric(<-ch)
280+
convey.So(expect, convey.ShouldResemble, m)
281+
}
282+
})
283+
if err := mock.ExpectationsWereMet(); err != nil {
284+
t.Errorf("there were unfulfilled exceptions: %s", err)
285+
}
286+
}
287+
155288
func TestPGStateStatementsCollector_PG17(t *testing.T) {
156289
db, mock, err := sqlmock.New()
157290
if err != nil {
@@ -164,7 +297,7 @@ func TestPGStateStatementsCollector_PG17(t *testing.T) {
164297
columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
165298
rows := sqlmock.NewRows(columns).
166299
AddRow("postgres", "postgres", 1500, 5, 0.4, 100, 0.1, 0.2)
167-
mock.ExpectQuery(sanitizeQuery(pgStatStatementsQuery_PG17)).WillReturnRows(rows)
300+
mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery_PG17, ""))).WillReturnRows(rows)
168301

169302
ch := make(chan prometheus.Metric)
170303
go func() {
@@ -194,3 +327,47 @@ func TestPGStateStatementsCollector_PG17(t *testing.T) {
194327
t.Errorf("there were unfulfilled exceptions: %s", err)
195328
}
196329
}
330+
331+
func TestPGStateStatementsCollector_PG17_WithStatement(t *testing.T) {
332+
db, mock, err := sqlmock.New()
333+
if err != nil {
334+
t.Fatalf("Error opening a stub db connection: %s", err)
335+
}
336+
defer db.Close()
337+
338+
inst := &instance{db: db, version: semver.MustParse("17.0.0")}
339+
340+
columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 300) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
341+
rows := sqlmock.NewRows(columns).
342+
AddRow("postgres", "postgres", 1500, "select 1 from foo", 5, 0.4, 100, 0.1, 0.2)
343+
mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery_PG17, fmt.Sprintf(pgStatStatementQuerySelect, 300)))).WillReturnRows(rows)
344+
345+
ch := make(chan prometheus.Metric)
346+
go func() {
347+
defer close(ch)
348+
c := PGStatStatementsCollector{includeQueryStatement: true, statementLength: 300}
349+
350+
if err := c.Update(context.Background(), inst, ch); err != nil {
351+
t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err)
352+
}
353+
}()
354+
355+
expected := []MetricResult{
356+
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 5},
357+
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.4},
358+
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 100},
359+
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.1},
360+
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.2},
361+
{labels: labelMap{"queryid": "1500", "query": "select 1 from foo"}, metricType: dto.MetricType_COUNTER, value: 1},
362+
}
363+
364+
convey.Convey("Metrics comparison", t, func() {
365+
for _, expect := range expected {
366+
m := readMetric(<-ch)
367+
convey.So(expect, convey.ShouldResemble, m)
368+
}
369+
})
370+
if err := mock.ExpectationsWereMet(); err != nil {
371+
t.Errorf("there were unfulfilled exceptions: %s", err)
372+
}
373+
}

0 commit comments

Comments
 (0)
Please sign in to comment.