Skip to content

Commit

Permalink
Add a query planner for OxQL (#7152)
Browse files Browse the repository at this point in the history
- Add types to represent the plan tree and the currently-supported plan
tree nodes. These mostly correspond to the existing query AST nodes, but
include information about the expected schema for the input and output
tables, along with the query AST nodes that "implement" that
transformation.
- Add an explicit node for computing deltas from a cumulative
timeseries, automatically after the node for fetching its data from the
DB. This is currently implicitly done after fetching the data, but will
be part of an explicit plan step going forward. The ultimate goal is to
push that into the database itself where possible.
- Adds methods to optimize a query plan, which currently includes the
predicate-pushdown and limit-pushdown tricks we already do to limit the
amount of data we get from the database. Adds some tests to verify
behavior of these optimizations, in particular that they don't change
the _planned_ output of the query itself.
- Add pretty-printing of the plan tree, and include a way to show that
in the OxQL shell.
- Add detection of full table scans. Use the planner in OxQL queries,
_only to verify them_ and check that there are no scans. The queries
themselves are executed in the original method today.
  • Loading branch information
bnaecker authored Jan 17, 2025
1 parent 017830d commit 9093ac6
Show file tree
Hide file tree
Showing 32 changed files with 4,395 additions and 32 deletions.
9 changes: 8 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ tar = "0.4"
tempfile = "3.10"
term = "0.7"
termios = "0.3"
termtree = "0.5.1"
textwrap = "0.16.1"
test-strategy = "0.3.1"
thiserror = "1.0"
Expand Down
14 changes: 11 additions & 3 deletions nexus/tests/integration_tests/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,8 @@ async fn test_instance_watcher_metrics(
const STATE_STARTING: &str = "starting";
const STATE_RUNNING: &str = "running";
const STATE_STOPPING: &str = "stopping";
const OXQL_QUERY: &str = "get virtual_machine:check";
const OXQL_QUERY: &str = "get virtual_machine:check | \
filter timestamp > @2000-01-01";

let client = &cptestctx.external_client;
let internal_client = &cptestctx.internal_client;
Expand Down Expand Up @@ -706,7 +707,13 @@ async fn test_project_timeseries_query(
// fields are. This is helpful generally, but here it would be better if
// we could say something more like "you can't query this timeseries from
// this endpoint"
assert_eq!(result.message, "The filter expression contains identifiers that are not valid for its input timeseries. Invalid identifiers: [\"project_id\", \"silo_id\"], timeseries fields: {\"datum\", \"metric_name\", \"target_name\", \"timestamp\"}");
const EXPECTED_ERROR_MESSAGE: &str = "\
The filter expression refers to \
identifiers that are not valid for its input \
table \"integration_target:integration_metric\". \
Invalid identifiers: [\"silo_id\", \"project_id\"], \
valid identifiers: [\"datum\", \"metric_name\", \"target_name\", \"timestamp\"]";
assert!(result.message.ends_with(EXPECTED_ERROR_MESSAGE));

// nonexistent project
let url = "/v1/timeseries/query?project=nonexistent";
Expand Down Expand Up @@ -871,7 +878,8 @@ async fn test_mgs_metrics(
return;
}

let query = format!("get {metric_name}");
let query =
format!("get {metric_name} | filter timestamp > @2000-01-01");

// MGS polls SP sensor data once every second. It's possible that, when
// we triggered Oximeter to collect samples from MGS, it may not have
Expand Down
1 change: 1 addition & 0 deletions oximeter/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ slog-async.workspace = true
slog-dtrace.workspace = true
slog-term.workspace = true
strum.workspace = true
termtree.workspace = true
thiserror.workspace = true
tokio-util.workspace = true
usdt.workspace = true
Expand Down
54 changes: 47 additions & 7 deletions oximeter/db/src/client/oxql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::oxql::ast::table_ops::filter;
use crate::oxql::ast::table_ops::filter::Filter;
use crate::oxql::ast::table_ops::limit::Limit;
use crate::oxql::ast::table_ops::limit::LimitKind;
use crate::oxql::Query;
use crate::query::field_table_name;
use crate::Error;
use crate::Metric;
Expand Down Expand Up @@ -113,6 +114,34 @@ struct ConsistentKeyGroup {
}

impl Client {
/// Build a query plan for the OxQL query.
pub async fn plan_oxql_query(
&self,
query: impl AsRef<str>,
) -> Result<oxql::plan::Plan, Error> {
let query = query.as_ref();
let parsed_query = oxql::Query::new(query)?;
self.build_query_plan(&parsed_query).await
}

/// Build a query plan for the OxQL query.
async fn build_query_plan(
&self,
query: &Query,
) -> Result<oxql::plan::Plan, Error> {
let referenced_timeseries = query.all_timeseries_names();
let mut schema = BTreeMap::new();
for name in referenced_timeseries.into_iter() {
let Some(sch) = self.schema_for_timeseries(name).await? else {
return Err(Error::TimeseriesNotFound(name.to_string()));
};
schema.insert(name.clone(), sch);
}
let plan =
oxql::plan::Plan::new(query.parsed_query().clone(), &schema)?;
Ok(plan)
}

/// Run a OxQL query.
pub async fn oxql_query(
&self,
Expand All @@ -132,6 +161,15 @@ impl Client {
// See https://github.com/oxidecomputer/omicron/issues/5298.
let query = query.as_ref();
let parsed_query = oxql::Query::new(query)?;
let plan = self.build_query_plan(&parsed_query).await?;
if plan.requires_full_table_scan() {
return Err(Error::Oxql(anyhow::anyhow!(
"This query requires at least one full table scan. \
Please rewrite the query to filter either the fields \
or timestamps, in order to reduce the amount of data \
fetched from the database."
)));
}
let query_id = Uuid::new_v4();
let query_log =
self.log.new(slog::o!("query_id" => query_id.to_string()));
Expand Down Expand Up @@ -837,12 +875,12 @@ impl Client {
// return.
//
// This is used to ensure that we never go above the limit in
// `MAX_RESULT_SIZE`. That restricts the _total_ number of rows we want
// to retch from the database. So we set our limit to be one more than
// the remainder on our allotment. If we get exactly as many as we set
// in the limit, then we fail the query because there are more rows that
// _would_ be returned. We don't know how many more, but there is at
// least 1 that pushes us over the limit. This prevents tricky
// `MAX_DATABASE_ROWS`. That restricts the _total_ number of rows we
// want to retch from the database. So we set our limit to be one more
// than the remainder on our allotment. If we get exactly as many as we
// set in the limit, then we fail the query because there are more row
// that _would_ be returned. We don't know how many more, but there is
// at least 1 that pushes us over the limit. This prevents tricky
// TOCTOU-like bugs where we need to check the limit twice, and improves
// performance, since we don't return much more than we could possibly
// handle.
Expand Down Expand Up @@ -1293,7 +1331,9 @@ mod tests {
#[tokio::test]
async fn test_get_entire_table() {
let ctx = setup_oxql_test("test_get_entire_table").await;
let query = "get some_target:some_metric";
// We need _some_ filter here to avoid a provable full-table scan.
let query =
"get some_target:some_metric | filter timestamp > @2020-01-01";
let result = ctx
.client
.oxql_query(query)
Expand Down
5 changes: 5 additions & 0 deletions oximeter/db/src/native/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ pub enum Error {

#[error("Expected an empty data block")]
ExpectedEmptyDataBlock,

#[error(
"A query unexpectedly resulted in an empty data block; query: {query}"
)]
UnexpectedEmptyBlock { query: String },
}

impl Error {
Expand Down
31 changes: 31 additions & 0 deletions oximeter/db/src/oxql/ast/literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use chrono::DateTime;
use chrono::Utc;
use oximeter::FieldType;
use oximeter::FieldValue;
use oxql_types::point::DataType;
use regex::Regex;
use std::borrow::Borrow;
use std::fmt;
Expand All @@ -35,6 +36,20 @@ pub enum Literal {
}

impl Literal {
// Return the name of this literal's type as a string.
pub(crate) fn type_name(&self) -> &'static str {
match self {
Literal::Integer(_) => "Integer",
Literal::Double(_) => "Double",
Literal::String(_) => "String",
Literal::Boolean(_) => "Boolean",
Literal::Uuid(_) => "Uuid",
Literal::Duration(_) => "Duration",
Literal::Timestamp(_) => "Timestamp",
Literal::IpAddr(_) => "IpAddr",
}
}

// Format the literal as a safe, typed string for ClickHouse.
pub(crate) fn as_db_safe_string(&self) -> String {
match self {
Expand Down Expand Up @@ -93,6 +108,22 @@ impl Literal {
}
}

// Return true if this literal can be compared to a datum of the provided
// type.
pub(crate) fn is_compatible_with_datum(&self, data_type: DataType) -> bool {
match (self, data_type) {
(Literal::Integer(_), DataType::Integer)
| (Literal::Double(_), DataType::Double)
| (Literal::String(_), DataType::String)
| (Literal::Boolean(_), DataType::Boolean)
| (Literal::Duration(_), DataType::Integer)
| (Literal::Duration(_), DataType::Double)
| (Literal::Timestamp(_), DataType::Integer)
| (Literal::Timestamp(_), DataType::Double) => true,
(_, _) => false,
}
}

/// Apply the comparison op between self and the provided field.
///
/// Return None if the comparison cannot be applied, either because the type
Expand Down
53 changes: 53 additions & 0 deletions oximeter/db/src/oxql/ast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
// Copyright 2024 Oxide Computer Company

use std::collections::BTreeSet;
use std::fmt;

use chrono::DateTime;
use chrono::Utc;
use oximeter::TimeseriesName;
Expand All @@ -26,12 +29,35 @@ pub struct Query {
ops: Vec<TableOp>,
}

impl fmt::Display for Query {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let n_ops = self.ops.len();
for (i, op) in self.ops.iter().enumerate() {
write!(f, "{op}")?;
if i < n_ops - 1 {
write!(f, " | ")?;
}
}
Ok(())
}
}

impl Query {
// Return the first operation in the query, which is always a form of `get`.
fn first_op(&self) -> &TableOp {
self.ops.first().expect("Should have parsed at least 1 operation")
}

/// Iterate over the table operations.
pub(crate) fn table_ops(
&self,
) -> impl ExactSizeIterator<Item = &'_ TableOp> + '_ {
self.ops.iter()
}

/// Return the name of the first referenced timeseries.
///
/// This is from the first `get`, which might be from a subquery.
pub(crate) fn timeseries_name(&self) -> &TimeseriesName {
match self.first_op() {
TableOp::Basic(BasicTableOp::Get(n)) => n,
Expand All @@ -42,6 +68,33 @@ impl Query {
}
}

/// Return _all_ timeseries names referred to by get table operations.
pub(crate) fn all_timeseries_names(&self) -> BTreeSet<&TimeseriesName> {
let mut set = BTreeSet::new();
self.all_timeseries_names_impl(&mut set);
set
}

// Add all timeseries names to the provided set, recursing into subqueries.
fn all_timeseries_names_impl<'a>(
&'a self,
set: &mut BTreeSet<&'a TimeseriesName>,
) {
for op in self.ops.iter() {
match op {
TableOp::Basic(BasicTableOp::Get(name)) => {
set.insert(name);
}
TableOp::Basic(_) => {}
TableOp::Grouped(GroupedTableOp { ops }) => {
for query in ops.iter() {
query.all_timeseries_names_impl(set);
}
}
}
}
}

// Check that this query (and any subqueries) start with a get table op, and
// that there are no following get operations. I.e., we have:
//
Expand Down
20 changes: 18 additions & 2 deletions oximeter/db/src/oxql/ast/table_ops/align.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use oxql_types::point::Values;
use oxql_types::Alignment;
use oxql_types::Table;
use oxql_types::Timeseries;
use std::fmt;
use std::time::Duration;

// The maximum factor by which an alignment operation may upsample data.
Expand Down Expand Up @@ -68,7 +69,7 @@ fn verify_max_upsampling_ratio(
///
/// Alignment is used to produce data at the defined timestamps, so that samples
/// from multiple timeseries may be combined or correlated in meaningful ways.
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub struct Align {
/// The alignment method, used to describe how data over the input period
/// is used to generate an output sample.
Expand All @@ -87,6 +88,12 @@ pub struct Align {
pub period: Duration,
}

impl std::fmt::Display for Align {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}({:?})", self.method, self.period)
}
}

impl Align {
// Apply the alignment function to the set of tables.
pub(crate) fn apply(
Expand All @@ -108,7 +115,7 @@ impl Align {
}

/// An alignment method.
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum AlignmentMethod {
/// Alignment is done by interpolating the output data at the specified
/// period.
Expand All @@ -118,6 +125,15 @@ pub enum AlignmentMethod {
MeanWithin,
}

impl fmt::Display for AlignmentMethod {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
AlignmentMethod::Interpolate => write!(f, "interpolate"),
AlignmentMethod::MeanWithin => write!(f, "mean_within"),
}
}
}

// Align the timeseries in a table by computing the average within each output
// period.
fn align_mean_within(
Expand Down
Loading

0 comments on commit 9093ac6

Please sign in to comment.