Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,11 @@ where
pool_config.pool_mode.to_string(),
];
for column in &columns[3..columns.len()] {
let value = pool_stats.get(column.0).unwrap_or(&0).to_string();
let value = match column.0 {
"maxwait" => (pool_stats.get("maxwait_us").unwrap_or(&0) / 1_000_000).to_string(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also show these in Prometheus metrics (prometheus.rs), it would be good to update this logic there too.

"maxwait_us" => (pool_stats.get(column.0).unwrap_or(&0) % 1_000_000).to_string(),
_other_values => pool_stats.get(column.0).unwrap_or(&0).to_string()
};
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the microsecond split to admin instead of doing on stats side

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prometheus metrics would be incorrect in this case.

row.push(value);
}
res.put(data_row(&row));
Expand Down
9 changes: 5 additions & 4 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,10 @@ impl ConnectionPool {
process_id: i32, // client id
) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> {
let now = Instant::now();

// Indicate we're waiting on a server connection from a pool.
self.stats.client_waiting(process_id);

let mut candidates: Vec<&Address> = self.addresses[shard]
.iter()
.filter(|address| address.role == role)
Expand All @@ -357,9 +361,6 @@ impl ConnectionPool {
continue;
}

// Indicate we're waiting on a server connection from a pool.
self.stats.client_waiting(process_id);

// Check if we can connect
let mut conn = match self.databases[address.shard][address.address_index]
.get()
Expand Down Expand Up @@ -397,7 +398,7 @@ impl ConnectionPool {

match tokio::time::timeout(
tokio::time::Duration::from_millis(healthcheck_timeout),
server.query(";"), // Cheap query (query parser not used in PG)
server.query(";"), // Cheap query as it skips the query planner
)
.await
{
Expand Down
41 changes: 21 additions & 20 deletions src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ impl Reporter {

/// Reportes the time spent by a client waiting to get a healthy connection from the pool
pub fn checkout_time(&self, microseconds: u128, client_id: i32, server_id: i32) {
let event = Event {
let event = Event {
name: EventName::CheckoutTime {
client_id,
server_id,
Expand Down Expand Up @@ -580,15 +580,15 @@ impl Collector {
server_info.query_count += stat.value as u64;
server_info.application_name = app_name;

let pool_stats = address_stat_lookup
let address_stats = address_stat_lookup
.entry(server_info.address_id)
.or_insert(HashMap::default());
let counter = pool_stats
let counter = address_stats
.entry("total_query_count".to_string())
.or_insert(0);
*counter += stat.value;

let duration = pool_stats
let duration = address_stats
.entry("total_query_time".to_string())
.or_insert(0);
*duration += duration_ms as i64;
Expand Down Expand Up @@ -681,26 +681,21 @@ impl Collector {
Some(server_info) => {
server_info.application_name = app_name;

let pool_stats = address_stat_lookup
let address_stats = address_stat_lookup
.entry(server_info.address_id)
.or_insert(HashMap::default());
let counter =
pool_stats.entry("total_wait_time".to_string()).or_insert(0);
address_stats.entry("total_wait_time".to_string()).or_insert(0);
*counter += stat.value;

let counter = pool_stats.entry("maxwait_us".to_string()).or_insert(0);
let mic_part = stat.value % 1_000_000;
let pool_stats = pool_stat_lookup
.entry((server_info.pool_name.clone(), server_info.username.clone()))
.or_insert(HashMap::default());

// Report max time here
if mic_part > *counter {
*counter = mic_part;
}

let counter = pool_stats.entry("maxwait".to_string()).or_insert(0);
let seconds = *counter / 1_000_000;

if seconds > *counter {
*counter = seconds;
// We record max wait in microseconds, we do the pgbouncer second/microsecond split on admin
let old_microseconds = pool_stats.entry("maxwait_us".to_string()).or_insert(0);
if stat.value > *old_microseconds {
*old_microseconds = stat.value;
}
}
None => (),
Expand Down Expand Up @@ -903,8 +898,6 @@ impl Collector {
"sv_active",
"sv_tested",
"sv_login",
"maxwait",
"maxwait_us",
] {
pool_stats.insert(stat.to_string(), 0);
}
Expand Down Expand Up @@ -957,11 +950,19 @@ impl Collector {
};
}


// The following calls publish the internal stats making it visible
// to clients using admin database to issue queries like `SHOW STATS`
LATEST_CLIENT_STATS.store(Arc::new(client_states.clone()));
LATEST_SERVER_STATS.store(Arc::new(server_states.clone()));
LATEST_POOL_STATS.store(Arc::new(pool_stat_lookup.clone()));

// Clear maxwait after reporting
pool_stat_lookup
.entry((pool_name.clone(), username.clone()))
.or_insert(HashMap::default())
.insert("maxwait_us".to_string(), 0);

}

EventName::UpdateAverages { address_id } => {
Expand Down
22 changes: 22 additions & 0 deletions tests/ruby/admin_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,28 @@
threads.map(&:join)
connections.map(&:close)
end

it "show correct max_wait" do
threads = []
connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") }
end

sleep(2.5) # Allow time for stats to update
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should introduce a command to refresh stats immediately. I can see our CI getting slower and slower over time as we add features :)

admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]

expect(results["maxwait"]).to eq("1")
expect(results["maxwait_us"].to_i).to be_within(100_000).of(500_000)

sleep(4.5) # Allow time for stats to update
results = admin_conn.async_exec("SHOW POOLS")[0]
expect(results["maxwait"]).to eq("0")

threads.map(&:join)
connections.map(&:close)
end
end
end

Expand Down