Skip to content

Re-enable query parser and parse multiple statements #191

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ serde_derive = "1"
regex = "1"
num_cpus = "1"
once_cell = "1"
sqlparser = "0.23.0"
sqlparser = "0.26.0"
log = "0.4"
arc-swap = "1"
env_logger = "0.9"
Expand Down
112 changes: 74 additions & 38 deletions src/query_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ pub struct QueryRouter {
active_role: Option<Role>,

/// Should we try to parse queries to route them to replicas or primary automatically
query_parser_enabled: bool,
query_parser_enabled: Option<bool>,

/// Include the primary into the replica pool for reads.
primary_reads_enabled: bool,
primary_reads_enabled: Option<bool>,

/// Pool configuration.
pool_settings: PoolSettings,
Expand Down Expand Up @@ -95,8 +95,8 @@ impl QueryRouter {
QueryRouter {
active_shard: None,
active_role: None,
query_parser_enabled: false,
primary_reads_enabled: false,
query_parser_enabled: None,
primary_reads_enabled: None,
pool_settings: PoolSettings::default(),
}
}
Expand Down Expand Up @@ -172,15 +172,15 @@ impl QueryRouter {
Some(Role::Primary) => Role::Primary.to_string(),
Some(Role::Replica) => Role::Replica.to_string(),
None => {
if self.query_parser_enabled {
if self.query_parser_enabled() {
String::from("auto")
} else {
String::from("any")
}
}
},

Command::ShowPrimaryReads => match self.primary_reads_enabled {
Command::ShowPrimaryReads => match self.primary_reads_enabled() {
true => String::from("on"),
false => String::from("off"),
},
Expand All @@ -207,28 +207,28 @@ impl QueryRouter {
Command::SetServerRole => {
self.active_role = match value.to_ascii_lowercase().as_ref() {
"primary" => {
self.query_parser_enabled = false;
self.query_parser_enabled = Some(false);
Some(Role::Primary)
}

"replica" => {
self.query_parser_enabled = false;
self.query_parser_enabled = Some(false);
Some(Role::Replica)
}

"any" => {
self.query_parser_enabled = false;
self.query_parser_enabled = Some(false);
None
}

"auto" => {
self.query_parser_enabled = true;
self.query_parser_enabled = Some(true);
None
}

"default" => {
self.active_role = self.pool_settings.default_role;
self.query_parser_enabled = self.query_parser_enabled;
self.query_parser_enabled = None;
self.active_role
}

Expand All @@ -239,13 +239,13 @@ impl QueryRouter {
Command::SetPrimaryReads => {
if value == "on" {
debug!("Setting primary reads to on");
self.primary_reads_enabled = true;
self.primary_reads_enabled = Some(true);
} else if value == "off" {
debug!("Setting primary reads to off");
self.primary_reads_enabled = false;
self.primary_reads_enabled = Some(false);
} else if value == "default" {
debug!("Setting primary reads to default");
self.primary_reads_enabled = self.pool_settings.primary_reads_enabled;
self.primary_reads_enabled = None;
}
}

Expand Down Expand Up @@ -300,34 +300,44 @@ impl QueryRouter {
let ast = match Parser::parse_sql(&PostgreSqlDialect {}, &query) {
Ok(ast) => ast,
Err(err) => {
debug!("{}", err.to_string());
// SELECT ... FOR UPDATE won't get parsed correctly.
error!("{}: {}", err, query);
self.active_role = Some(Role::Primary);
return false;
}
};

debug!("AST: {:?}", ast);

if ast.len() == 0 {
// That's weird, no idea, let's go to primary
self.active_role = Some(Role::Primary);
return false;
}

match ast[0] {
// All transactions go to the primary, probably a write.
StartTransaction { .. } => {
self.active_role = Some(Role::Primary);
}
for q in &ast {
match q {
// All transactions go to the primary, probably a write.
StartTransaction { .. } => {
self.active_role = Some(Role::Primary);
break;
}

// Likely a read-only query
Query { .. } => {
self.active_role = match self.primary_reads_enabled {
false => Some(Role::Replica), // If primary should not be receiving reads, use a replica.
true => None, // Any server role is fine in this case.
// Likely a read-only query
Query { .. } => {
self.active_role = match self.primary_reads_enabled() {
false => Some(Role::Replica), // If primary should not be receiving reads, use a replica.
true => None, // Any server role is fine in this case.
}
}
}

// Likely a write
_ => {
self.active_role = Some(Role::Primary);
}
};
// Likely a write
_ => {
self.active_role = Some(Role::Primary);
break;
}
};
}

true
}
Expand All @@ -350,9 +360,18 @@ impl QueryRouter {
}

/// Should we attempt to parse queries?
#[allow(dead_code)]
pub fn query_parser_enabled(&self) -> bool {
self.query_parser_enabled
match self.query_parser_enabled {
None => self.pool_settings.query_parser_enabled,
Some(value) => value,
}
}

pub fn primary_reads_enabled(&self) -> bool {
match self.primary_reads_enabled {
None => self.pool_settings.primary_reads_enabled,
Some(value) => value,
}
}
}

Expand Down Expand Up @@ -616,7 +635,7 @@ mod test {
assert!(qr.query_parser_enabled());
let query = simple_query("SET SERVER ROLE TO 'default'");
assert!(qr.try_execute_command(query) != None);
assert!(qr.query_parser_enabled());
assert!(!qr.query_parser_enabled());
}

#[test]
Expand All @@ -635,16 +654,16 @@ mod test {
let mut qr = QueryRouter::new();
assert_eq!(qr.active_role, None);
assert_eq!(qr.active_shard, None);
assert_eq!(qr.query_parser_enabled, false);
assert_eq!(qr.primary_reads_enabled, false);
assert_eq!(qr.query_parser_enabled, None);
assert_eq!(qr.primary_reads_enabled, None);

// Internal state must not be changed due to this, only defaults
qr.update_pool_settings(pool_settings.clone());

assert_eq!(qr.active_role, None);
assert_eq!(qr.active_shard, None);
assert_eq!(qr.query_parser_enabled, false);
assert_eq!(qr.primary_reads_enabled, false);
assert_eq!(qr.query_parser_enabled(), true);
assert_eq!(qr.primary_reads_enabled(), false);

let q1 = simple_query("SET SERVER ROLE TO 'primary'");
assert!(qr.try_execute_command(q1) != None);
Expand All @@ -654,4 +673,21 @@ mod test {
assert!(qr.try_execute_command(q2) != None);
assert_eq!(qr.active_role.unwrap(), pool_settings.clone().default_role);
}

#[test]
fn test_parse_multiple_queries() {
QueryRouter::setup();

let mut qr = QueryRouter::new();
assert!(qr.infer_role(simple_query("BEGIN; SELECT 1; COMMIT;")));
assert_eq!(qr.role(), Role::Primary);

assert!(qr.infer_role(simple_query("SELECT 1; SELECT 2;")));
assert_eq!(qr.role(), Role::Replica);

assert!(qr.infer_role(simple_query(
"SELECT 123; INSERT INTO t VALUES (5); SELECT 1;"
)));
assert_eq!(qr.role(), Role::Primary);
}
}