Skip to content

Commit 0524787

Browse files
authored
Automatic sharding: part one of many (#194)
Starting automatic sharding
1 parent fa26773 commit 0524787

File tree

5 files changed

+169
-17
lines changed

5 files changed

+169
-17
lines changed

pgcat.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ primary_reads_enabled = true
8383
#
8484
sharding_function = "pg_bigint_hash"
8585

86+
# Automatically parse this from queries and route queries to the right shard!
87+
automatic_sharding_key = "id"
88+
8689
# Credentials for users that may connect to this cluster
8790
[pools.sharded_db.users.0]
8891
username = "sharding_user"

src/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -672,7 +672,7 @@ where
672672
// Normal query, not a custom command.
673673
None => {
674674
if query_router.query_parser_enabled() {
675-
query_router.infer_role(message.clone());
675+
query_router.infer(message.clone());
676676
}
677677
}
678678

src/config.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,10 @@ pub struct Pool {
267267
pub connect_timeout: Option<u64>,
268268

269269
pub sharding_function: ShardingFunction,
270+
271+
#[serde(default = "Pool::default_automatic_sharding_key")]
272+
pub automatic_sharding_key: Option<String>,
273+
270274
pub shards: BTreeMap<String, Shard>,
271275
pub users: BTreeMap<String, User>,
272276
}
@@ -276,6 +280,10 @@ impl Pool {
276280
PoolMode::Transaction
277281
}
278282

283+
pub fn default_automatic_sharding_key() -> Option<String> {
284+
None
285+
}
286+
279287
pub fn validate(&self) -> Result<(), Error> {
280288
match self.default_role.as_ref() {
281289
"any" => (),
@@ -318,6 +326,7 @@ impl Default for Pool {
318326
query_parser_enabled: false,
319327
primary_reads_enabled: false,
320328
sharding_function: ShardingFunction::PgBigintHash,
329+
automatic_sharding_key: None,
321330
connect_timeout: None,
322331
}
323332
}

src/pool.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ pub struct PoolSettings {
7979

8080
// Sharding function.
8181
pub sharding_function: ShardingFunction,
82+
83+
// Sharding key
84+
pub automatic_sharding_key: Option<String>,
8285
}
8386

8487
impl Default for PoolSettings {
@@ -91,6 +94,7 @@ impl Default for PoolSettings {
9194
query_parser_enabled: false,
9295
primary_reads_enabled: true,
9396
sharding_function: ShardingFunction::PgBigintHash,
97+
automatic_sharding_key: None,
9498
}
9599
}
96100
}
@@ -254,6 +258,7 @@ impl ConnectionPool {
254258
query_parser_enabled: pool_config.query_parser_enabled.clone(),
255259
primary_reads_enabled: pool_config.primary_reads_enabled,
256260
sharding_function: pool_config.sharding_function,
261+
automatic_sharding_key: pool_config.automatic_sharding_key.clone(),
257262
},
258263
};
259264

src/query_router.rs

Lines changed: 151 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@ use log::{debug, error};
55
use once_cell::sync::OnceCell;
66
use regex::{Regex, RegexSet};
77
use sqlparser::ast::Statement::{Query, StartTransaction};
8+
use sqlparser::ast::{BinaryOperator, Expr, SetExpr, Value};
89
use sqlparser::dialect::PostgreSqlDialect;
910
use sqlparser::parser::Parser;
1011

1112
use crate::config::Role;
1213
use crate::pool::PoolSettings;
1314
use crate::sharding::Sharder;
1415

16+
use std::collections::BTreeSet;
17+
1518
/// Regexes used to parse custom commands.
1619
const CUSTOM_SQL_REGEXES: [&str; 7] = [
1720
r"(?i)^ *SET SHARDING KEY TO '?([0-9]+)'? *;? *$",
@@ -256,7 +259,7 @@ impl QueryRouter {
256259
}
257260

258261
/// Try to infer which server to connect to based on the contents of the query.
259-
pub fn infer_role(&mut self, mut buf: BytesMut) -> bool {
262+
pub fn infer(&mut self, mut buf: BytesMut) -> bool {
260263
debug!("Inferring role");
261264

262265
let code = buf.get_u8() as char;
@@ -324,7 +327,21 @@ impl QueryRouter {
324327
}
325328

326329
// Likely a read-only query
327-
Query { .. } => {
330+
Query(query) => {
331+
match &self.pool_settings.automatic_sharding_key {
332+
Some(_) => {
333+
// TODO: if we have multiple queries in the same message,
334+
// we can either split them and execute them individually
335+
// or discard shard selection. If they point to the same shard though,
336+
// we can let them through as-is.
337+
// This is basically building a database now :)
338+
self.active_shard = self.infer_shard(query);
339+
debug!("Automatically using shard: {:?}", self.active_shard);
340+
}
341+
342+
None => (),
343+
};
344+
328345
self.active_role = match self.primary_reads_enabled() {
329346
false => Some(Role::Replica), // If primary should not be receiving reads, use a replica.
330347
true => None, // Any server role is fine in this case.
@@ -342,6 +359,118 @@ impl QueryRouter {
342359
true
343360
}
344361

362+
/// A `selection` is the `WHERE` clause. This parses
363+
/// the clause and extracts the sharding key, if present.
364+
fn selection_parser(&self, expr: &Expr) -> Vec<i64> {
365+
let mut result = Vec::new();
366+
let mut found = false;
367+
368+
match expr {
369+
// This parses `sharding_key = 5`. But it's technically
370+
// legal to write `5 = sharding_key`. I don't judge the people
371+
// who do that, but I think ORMs will still use the first variant,
372+
// so we can leave the second as a TODO.
373+
Expr::BinaryOp { left, op, right } => {
374+
match &**left {
375+
Expr::BinaryOp { .. } => result.extend(self.selection_parser(&left)),
376+
Expr::Identifier(ident) => {
377+
found = ident.value
378+
== *self.pool_settings.automatic_sharding_key.as_ref().unwrap();
379+
}
380+
_ => (),
381+
};
382+
383+
match op {
384+
BinaryOperator::Eq => (),
385+
BinaryOperator::Or => (),
386+
BinaryOperator::And => (),
387+
_ => {
388+
// TODO: support other operators than equality.
389+
debug!("Unsupported operation: {:?}", op);
390+
return Vec::new();
391+
}
392+
};
393+
394+
match &**right {
395+
Expr::BinaryOp { .. } => result.extend(self.selection_parser(&right)),
396+
Expr::Value(Value::Number(value, ..)) => {
397+
if found {
398+
match value.parse::<i64>() {
399+
Ok(value) => result.push(value),
400+
Err(_) => {
401+
debug!("Sharding key was not an integer: {}", value);
402+
}
403+
};
404+
}
405+
}
406+
_ => (),
407+
};
408+
}
409+
410+
_ => (),
411+
};
412+
413+
debug!("Sharding keys found: {:?}", result);
414+
415+
result
416+
}
417+
418+
/// Try to figure out which shard the query should go to.
419+
fn infer_shard(&self, query: &sqlparser::ast::Query) -> Option<usize> {
420+
let mut shards = BTreeSet::new();
421+
422+
match &*query.body {
423+
SetExpr::Query(query) => {
424+
match self.infer_shard(&*query) {
425+
Some(shard) => {
426+
shards.insert(shard);
427+
}
428+
None => (),
429+
};
430+
}
431+
432+
SetExpr::Select(select) => {
433+
match &select.selection {
434+
Some(selection) => {
435+
let sharding_keys = self.selection_parser(&selection);
436+
437+
// TODO: Add support for prepared statements here.
438+
// This should just give us the position of the value in the `B` message.
439+
440+
let sharder = Sharder::new(
441+
self.pool_settings.shards,
442+
self.pool_settings.sharding_function,
443+
);
444+
445+
for value in sharding_keys {
446+
let shard = sharder.shard(value);
447+
shards.insert(shard);
448+
}
449+
}
450+
451+
None => (),
452+
};
453+
}
454+
_ => (),
455+
};
456+
457+
match shards.len() {
458+
// Didn't find a sharding key, you're on your own.
459+
0 => {
460+
debug!("No sharding keys found");
461+
None
462+
}
463+
464+
1 => Some(shards.into_iter().last().unwrap()),
465+
466+
// TODO: support querying multiple shards (some day...)
467+
_ => {
468+
debug!("More than one sharding key found");
469+
None
470+
}
471+
}
472+
}
473+
345474
/// Get the current desired server role we should be talking to.
346475
pub fn role(&self) -> Option<Role> {
347476
self.active_role
@@ -392,7 +521,7 @@ mod test {
392521
}
393522

394523
#[test]
395-
fn test_infer_role_replica() {
524+
fn test_infer_replica() {
396525
QueryRouter::setup();
397526
let mut qr = QueryRouter::new();
398527
assert!(qr.try_execute_command(simple_query("SET SERVER ROLE TO 'auto'")) != None);
@@ -410,13 +539,13 @@ mod test {
410539

411540
for query in queries {
412541
// It's a recognized query
413-
assert!(qr.infer_role(query));
542+
assert!(qr.infer(query));
414543
assert_eq!(qr.role(), Some(Role::Replica));
415544
}
416545
}
417546

418547
#[test]
419-
fn test_infer_role_primary() {
548+
fn test_infer_primary() {
420549
QueryRouter::setup();
421550
let mut qr = QueryRouter::new();
422551

@@ -429,24 +558,24 @@ mod test {
429558

430559
for query in queries {
431560
// It's a recognized query
432-
assert!(qr.infer_role(query));
561+
assert!(qr.infer(query));
433562
assert_eq!(qr.role(), Some(Role::Primary));
434563
}
435564
}
436565

437566
#[test]
438-
fn test_infer_role_primary_reads_enabled() {
567+
fn test_infer_primary_reads_enabled() {
439568
QueryRouter::setup();
440569
let mut qr = QueryRouter::new();
441570
let query = simple_query("SELECT * FROM items WHERE id = 5");
442571
assert!(qr.try_execute_command(simple_query("SET PRIMARY READS TO on")) != None);
443572

444-
assert!(qr.infer_role(query));
573+
assert!(qr.infer(query));
445574
assert_eq!(qr.role(), None);
446575
}
447576

448577
#[test]
449-
fn test_infer_role_parse_prepared() {
578+
fn test_infer_parse_prepared() {
450579
QueryRouter::setup();
451580
let mut qr = QueryRouter::new();
452581
qr.try_execute_command(simple_query("SET SERVER ROLE TO 'auto'"));
@@ -461,7 +590,7 @@ mod test {
461590
res.put(prepared_stmt);
462591
res.put_i16(0);
463592

464-
assert!(qr.infer_role(res));
593+
assert!(qr.infer(res));
465594
assert_eq!(qr.role(), Some(Role::Replica));
466595
}
467596

@@ -625,11 +754,11 @@ mod test {
625754
assert_eq!(qr.role(), None);
626755

627756
let query = simple_query("INSERT INTO test_table VALUES (1)");
628-
assert_eq!(qr.infer_role(query), true);
757+
assert_eq!(qr.infer(query), true);
629758
assert_eq!(qr.role(), Some(Role::Primary));
630759

631760
let query = simple_query("SELECT * FROM test_table");
632-
assert_eq!(qr.infer_role(query), true);
761+
assert_eq!(qr.infer(query), true);
633762
assert_eq!(qr.role(), Some(Role::Replica));
634763

635764
assert!(qr.query_parser_enabled());
@@ -644,12 +773,13 @@ mod test {
644773

645774
let pool_settings = PoolSettings {
646775
pool_mode: PoolMode::Transaction,
647-
shards: 0,
776+
shards: 2,
648777
user: crate::config::User::default(),
649778
default_role: Some(Role::Replica),
650779
query_parser_enabled: true,
651780
primary_reads_enabled: false,
652781
sharding_function: ShardingFunction::PgBigintHash,
782+
automatic_sharding_key: Some(String::from("id")),
653783
};
654784
let mut qr = QueryRouter::new();
655785
assert_eq!(qr.active_role, None);
@@ -672,20 +802,25 @@ mod test {
672802
let q2 = simple_query("SET SERVER ROLE TO 'default'");
673803
assert!(qr.try_execute_command(q2) != None);
674804
assert_eq!(qr.active_role.unwrap(), pool_settings.clone().default_role);
805+
806+
// Here we go :)
807+
let q3 = simple_query("SELECT * FROM test WHERE id = 5 AND values IN (1, 2, 3)");
808+
assert!(qr.infer(q3));
809+
assert_eq!(qr.shard(), 1);
675810
}
676811

677812
#[test]
678813
fn test_parse_multiple_queries() {
679814
QueryRouter::setup();
680815

681816
let mut qr = QueryRouter::new();
682-
assert!(qr.infer_role(simple_query("BEGIN; SELECT 1; COMMIT;")));
817+
assert!(qr.infer(simple_query("BEGIN; SELECT 1; COMMIT;")));
683818
assert_eq!(qr.role(), Role::Primary);
684819

685-
assert!(qr.infer_role(simple_query("SELECT 1; SELECT 2;")));
820+
assert!(qr.infer(simple_query("SELECT 1; SELECT 2;")));
686821
assert_eq!(qr.role(), Role::Replica);
687822

688-
assert!(qr.infer_role(simple_query(
823+
assert!(qr.infer(simple_query(
689824
"SELECT 123; INSERT INTO t VALUES (5); SELECT 1;"
690825
)));
691826
assert_eq!(qr.role(), Role::Primary);

0 commit comments

Comments
 (0)