Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 11 additions & 12 deletions nexus/src/app/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use nexus_db_queries::{
use nexus_external_api::TimeseriesSchemaPaginationParams;
use nexus_types::external_api::params::SystemMetricName;
use omicron_common::api::external::{Error, InternalContext};
use oximeter_db::{Measurement, TimeseriesSchema};
use oximeter_db::{
Measurement, TimeseriesSchema, oxql::query::QueryAuthzScope,
};
use std::num::NonZeroU32;

impl super::Nexus {
Expand Down Expand Up @@ -138,7 +140,7 @@ impl super::Nexus {
// resources they have access to.
opctx.authorize(authz::Action::Read, &authz::FLEET).await?;
self.timeseries_client
.oxql_query(query)
.oxql_query(query, QueryAuthzScope::Fleet)
.await
// TODO-observability: The query method returns information
// about the duration of the OxQL query and the database
Expand All @@ -161,17 +163,14 @@ impl super::Nexus {
// Ensure the user has read access to the project
let (authz_silo, authz_project) =
project_lookup.lookup_for(authz::Action::Read).await?;

// Ensure the query only refers to the project
let filtered_query = format!(
"{} | filter silo_id == \"{}\" && project_id == \"{}\"",
query.as_ref(),
authz_silo.id(),
authz_project.id()
);

self.timeseries_client
.oxql_query(filtered_query)
.oxql_query(
query,
QueryAuthzScope::Project {
silo_id: authz_silo.id(),
project_id: authz_project.id(),
},
)
.await
.map(|result| result.tables)
.map_err(map_timeseries_err)
Expand Down
54 changes: 45 additions & 9 deletions nexus/tests/integration_tests/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,10 @@ async fn test_project_timeseries_query(
let _p2 = create_project(&client, "project2").await;

// Create resources in each project
let i1 = create_instance(&client, "project1", "instance1").await;
let _i2 = create_instance(&client, "project2", "instance2").await;
let i1p1 = create_instance(&client, "project1", "instance1").await;
// need a second instance to test group_by
let i2p1 = create_instance(&client, "project1", "instance2").await;
let _i3p2 = create_instance(&client, "project2", "instance3").await;

let internal_client = &cptestctx.internal_client;

Expand Down Expand Up @@ -520,14 +522,16 @@ async fn test_project_timeseries_query(

let result = metrics_querier.project_timeseries_query("project1", q2).await;
assert_eq!(result.len(), 1);
assert!(result[0].timeseries().len() > 0);
// we get 2 timeseries because there are two instances
assert!(result[0].timeseries().len() == 2);

let result = metrics_querier.project_timeseries_query("project2", q2).await;
assert_eq!(result.len(), 1);
assert_eq!(result[0].timeseries().len(), 0);

// with instance specified
let q3 = &format!("{} | filter instance_id == \"{}\"", q1, i1.identity.id);
let q3 =
&format!("{} | filter instance_id == \"{}\"", q1, i1p1.identity.id);

// project containing instance gives me something
let result = metrics_querier.project_timeseries_query("project1", q3).await;
Expand All @@ -539,11 +543,43 @@ async fn test_project_timeseries_query(
assert_eq!(result.len(), 1);
assert_eq!(result[0].timeseries().len(), 0);

// now let's test it with group_by
let q4 = &format!(
"{} | align mean_within(1m) | group_by [instance_id], sum",
q1
);
let result = metrics_querier.project_timeseries_query("project1", q4).await;
assert_eq!(result.len(), 1);
assert_eq!(result[0].timeseries().len(), 2);

// test with a nested query
let q5 = &format!(
"{{ \
get virtual_machine:check | filter instance_id == \"{}\"; \
get virtual_machine:check | filter instance_id == \"{}\" \
}} | filter timestamp < @now()",
i1p1.identity.id, i2p1.identity.id,
);
let result = metrics_querier.project_timeseries_query("project1", q5).await;

// we get two results, each contains one timeseries, and the instance ID
// on each corresponds to the one we requested
assert_eq!(result.len(), 2);
assert_eq!(result[0].timeseries().len(), 1);
let timeseries = result[0].timeseries().next().unwrap();
let instance_id = timeseries.fields.get("instance_id").unwrap().to_string();
assert_eq!(instance_id, i1p1.identity.id.to_string());

assert_eq!(result[1].timeseries().len(), 1);
let timeseries = result[1].timeseries().next().unwrap();
let instance_id = timeseries.fields.get("instance_id").unwrap().to_string();
assert_eq!(instance_id, i2p1.identity.id.to_string());

// expect error when querying a metric that has no project_id on it
let q4 = "get integration_target:integration_metric";
let q6 = "get integration_target:integration_metric";
let url = "/v1/timeseries/query?project=project1";
let body = nexus_types::external_api::params::TimeseriesQuery {
query: q4.to_string(),
query: q6.to_string(),
};
let result =
object_create_error(client, url, &body, StatusCode::BAD_REQUEST).await;
Expand All @@ -556,14 +592,14 @@ async fn test_project_timeseries_query(
The filter expression refers to \
identifiers that are not valid for its input \
table \"integration_target:integration_metric\". \
Invalid identifiers: [\"silo_id\", \"project_id\"], \
Invalid identifiers: [\"silo_id\"], \
valid identifiers: [\"datum\", \"metric_name\", \"target_name\", \"timestamp\"]";
assert!(result.message.ends_with(EXPECTED_ERROR_MESSAGE));

// nonexistent project
let url = "/v1/timeseries/query?project=nonexistent";
let body = nexus_types::external_api::params::TimeseriesQuery {
query: q4.to_string(),
query: q6.to_string(),
};
let result =
object_create_error(client, url, &body, StatusCode::NOT_FOUND).await;
Expand Down Expand Up @@ -606,7 +642,7 @@ async fn test_project_timeseries_query(
.execute_and_parse_unwrap::<OxqlQueryResult>()
.await;
assert_eq!(result.tables.len(), 1);
assert_eq!(result.tables[0].timeseries().len(), 1);
assert_eq!(result.tables[0].timeseries().len(), 2); // two instances
}

#[nexus_test]
Expand Down
1 change: 1 addition & 0 deletions oximeter/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ nom.workspace = true
quote.workspace = true

[dev-dependencies]
assert_matches.workspace = true
camino-tempfile.workspace = true
criterion = { workspace = true, features = [ "async_tokio" ] }
expectorate.workspace = true
Expand Down
33 changes: 14 additions & 19 deletions oximeter/db/src/client/oxql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::oxql::ast::table_ops::filter;
use crate::oxql::ast::table_ops::filter::Filter;
use crate::oxql::ast::table_ops::limit::Limit;
use crate::oxql::ast::table_ops::limit::LimitKind;
use crate::oxql::query::QueryAuthzScope;
use crate::query::field_table_name;
use oximeter::Measurement;
use oximeter::TimeseriesSchema;
Expand Down Expand Up @@ -147,22 +148,13 @@ impl Client {
pub async fn oxql_query(
&self,
query: impl AsRef<str>,
scope: QueryAuthzScope,
) -> Result<OxqlResult, Error> {
// TODO-security: Need a way to implement authz checks for things like
// viewing resources in another project or silo.
//
// I think one way to do that is look at the predicates and make sure
// they refer to things the user has access to. Another is to add some
// implicit predicates here, indicating the subset of fields that the
// query should be able to access.
//
// This probably means we'll need to parse the query in Nexus, so that
// we can attach the other filters ourselves.
//
// See https://github.com/oxidecomputer/omicron/issues/5298.
let query = query.as_ref();
let parsed_query = oxql::Query::new(query)?;
let plan = self.build_query_plan(&parsed_query).await?;
let filtered_query = parsed_query.insert_authz_filters(scope);

let plan = self.build_query_plan(&filtered_query).await?;
if plan.requires_full_table_scan() {
return Err(Error::Oxql(anyhow::anyhow!(
"This query requires at least one full table scan. \
Expand All @@ -179,6 +171,7 @@ impl Client {
"parsed OxQL query";
"query" => query,
"parsed_query" => ?parsed_query,
"filtered_query" => ?filtered_query,
);
let id = usdt::UniqueId::new();
probes::oxql__query__start!(|| (&id, &query_id, query));
Expand All @@ -188,7 +181,7 @@ impl Client {
&query_log,
&mut self.claim_connection().await?,
query_id,
parsed_query,
filtered_query,
&mut total_rows_fetched,
None,
None,
Expand Down Expand Up @@ -1187,7 +1180,9 @@ fn update_total_rows_and_check(
#[cfg(test)]
mod tests {
use super::ConsistentKeyGroup;
use crate::client::oxql::chunk_consistent_key_groups_impl;
use crate::client::oxql::{
QueryAuthzScope, chunk_consistent_key_groups_impl,
};
use crate::oxql::ast::grammar::query_parser;
use crate::{Client, DATABASE_TIMESTAMP_FORMAT, DbWrite};
use crate::{Metric, Target};
Expand Down Expand Up @@ -1348,7 +1343,7 @@ mod tests {
"get some_target:some_metric | filter timestamp > @2020-01-01";
let result = ctx
.client
.oxql_query(query)
.oxql_query(query, QueryAuthzScope::Fleet)
.await
.expect("failed to run OxQL query");
assert_eq!(result.tables.len(), 1, "Should be exactly 1 table");
Expand Down Expand Up @@ -1407,7 +1402,7 @@ mod tests {
);
let result = ctx
.client
.oxql_query(&query)
.oxql_query(&query, QueryAuthzScope::Fleet)
.await
.expect("failed to run OxQL query");
assert_eq!(result.tables.len(), 1, "Should be exactly 1 table");
Expand Down Expand Up @@ -1463,7 +1458,7 @@ mod tests {
);
let result = ctx
.client
.oxql_query(&query)
.oxql_query(&query, QueryAuthzScope::Fleet)
.await
.expect("failed to run OxQL query");
assert_eq!(result.tables.len(), 1, "Should be exactly 1 table");
Expand Down Expand Up @@ -1695,7 +1690,7 @@ mod tests {
);
let result = ctx
.client
.oxql_query(&query)
.oxql_query(&query, QueryAuthzScope::Fleet)
.await
.expect("failed to run OxQL query");
assert_eq!(result.tables.len(), 1, "Should be exactly 1 table");
Expand Down
34 changes: 33 additions & 1 deletion oximeter/db/src/oxql/ast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::fmt;
use chrono::DateTime;
use chrono::Utc;
use oximeter::TimeseriesName;
use table_ops::filter::Filter;

use self::table_ops::BasicTableOp;
use self::table_ops::GroupedTableOp;
Expand Down Expand Up @@ -44,7 +45,7 @@ impl fmt::Display for Query {

impl Query {
// Return the first operation in the query, which is always a form of `get`.
fn first_op(&self) -> &TableOp {
pub(crate) fn first_op(&self) -> &TableOp {
self.ops.first().expect("Should have parsed at least 1 operation")
}

Expand Down Expand Up @@ -188,6 +189,37 @@ impl Query {
}
}
}

/// Insert filters after the `get`, or in the case of subqueries, recurse
/// down the tree and insert them after each get.
pub(crate) fn insert_filters(&self, filters: Vec<Filter>) -> Self {
let mut new_ops = self.ops.clone();

match self.first_op() {
// for a basic query, just insert the filters after the first entry (the get)
TableOp::Basic(_) => {
let filter_ops = filters
.iter()
.map(|filter| {
TableOp::Basic(BasicTableOp::Filter(filter.clone()))
})
.collect::<Vec<_>>();
new_ops.splice(1..1, filter_ops);
}
// for a grouped query, recurse to insert the filters in all subqueries
TableOp::Grouped(op) => {
new_ops[0] = TableOp::Grouped(GroupedTableOp {
ops: op
.ops
.iter()
.map(|query| query.insert_filters(filters.clone()))
.collect(),
});
}
}

Self { ops: new_ops }
}
}

// Either a flat query or one with nested subqueries.
Expand Down
Loading
Loading