diff --git a/src/client.rs b/src/client.rs index f7fffab5..419448fb 100644 --- a/src/client.rs +++ b/src/client.rs @@ -84,9 +84,6 @@ pub struct Client { /// Used to notify clients about an impending shutdown shutdown: Receiver<()>, - - // Allow only admin connections. - admin_only: bool, } /// Client entrypoint. @@ -224,17 +221,7 @@ pub async fn client_entrypoint( let (read, write) = split(stream); // Continue with cancel query request. - match Client::cancel( - read, - write, - addr, - bytes, - client_server_map, - shutdown, - admin_only, - ) - .await - { + match Client::cancel(read, write, addr, bytes, client_server_map, shutdown).await { Ok(mut client) => { info!("Client {:?} issued a cancel query request", addr); @@ -384,6 +371,20 @@ where .count() == 1; + // Kick any client that's not admin while we're in admin-only mode. + if !admin && admin_only { + debug!( + "Rejecting non-admin connection to {} when in admin only mode", + pool_name + ); + error_response_terminal( + &mut write, + &format!("terminating connection due to administrator command"), + ) + .await?; + return Err(Error::ShuttingDown); + } + // Generate random backend ID and secret key let process_id: i32 = rand::random(); let secret_key: i32 = rand::random(); @@ -494,7 +495,6 @@ where username: username.clone(), shutdown, connected_to_server: false, - admin_only, }); } @@ -506,7 +506,6 @@ where mut bytes: BytesMut, // The rest of the startup message. client_server_map: ClientServerMap, shutdown: Receiver<()>, - admin_only: bool, ) -> Result, Error> { let process_id = bytes.get_i32(); let secret_key = bytes.get_i32(); @@ -529,7 +528,6 @@ where username: String::from("undefined"), shutdown, connected_to_server: false, - admin_only, }); } @@ -565,16 +563,6 @@ where return Ok(Server::cancel(&address, port, process_id, secret_key).await?); } - // Kick any client that's not admin while we're in admin-only mode. - if !self.admin && self.admin_only { - error_response_terminal( - &mut self.write, - &format!("terminating connection due to administrator command"), - ) - .await?; - return Err(Error::ShuttingDown); - } - // The query router determines where the query is going to go, // e.g. primary, replica, which shard. let mut query_router = QueryRouter::new(); diff --git a/tests/python/tests.py b/tests/python/tests.py index a674cee6..092fc8cc 100644 --- a/tests/python/tests.py +++ b/tests/python/tests.py @@ -14,6 +14,7 @@ def pgcat_start(): pg_cat_send_signal(signal.SIGTERM) os.system("./target/debug/pgcat .circleci/pgcat.toml &") + time.sleep(2) def pg_cat_send_signal(signal: signal.Signals): @@ -27,11 +28,23 @@ def pg_cat_send_signal(signal: signal.Signals): raise Exception("pgcat not closed after SIGTERM") -def connect_normal_db( - autocommit: bool = False, +def connect_db( + autocommit: bool = True, + admin: bool = False, ) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]: + + if admin: + user = "admin_user" + password = "admin_pass" + db = "pgcat" + else: + user = "sharding_user" + password = "sharding_user" + db = "sharded_db" + conn = psycopg2.connect( - f"postgres://sharding_user:sharding_user@{PGCAT_HOST}:{PGCAT_PORT}/sharded_db?application_name=testing_pgcat" + f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat", + connect_timeout=2, ) conn.autocommit = autocommit cur = conn.cursor() @@ -45,7 +58,7 @@ def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions. def test_normal_db_access(): - conn, cur = connect_normal_db() + conn, cur = connect_db(autocommit=False) cur.execute("SELECT 1") res = cur.fetchall() print(res) @@ -53,11 +66,7 @@ def test_normal_db_access(): def test_admin_db_access(): - conn = psycopg2.connect( - f"postgres://admin_user:admin_pass@{PGCAT_HOST}:{PGCAT_PORT}/pgcat" - ) - conn.autocommit = True # BEGIN/COMMIT is not supported by admin db - cur = conn.cursor() + conn, cur = connect_db(admin=True) cur.execute("SHOW POOLS") res = cur.fetchall() @@ -67,15 +76,14 @@ def test_admin_db_access(): def test_shutdown_logic(): - ##### NO ACTIVE QUERIES SIGINT HANDLING ##### + # - - - - - - - - - - - - - - - - - - + # NO ACTIVE QUERIES SIGINT HANDLING + # Start pgcat pgcat_start() - # Wait for server to fully start up - time.sleep(2) - # Create client connection and send query (not in transaction) - conn, cur = connect_normal_db(True) + conn, cur = connect_db() cur.execute("BEGIN;") cur.execute("SELECT 1;") @@ -97,17 +105,14 @@ def test_shutdown_logic(): cleanup_conn(conn, cur) pg_cat_send_signal(signal.SIGTERM) - ##### END ##### + # - - - - - - - - - - - - - - - - - - + # HANDLE TRANSACTION WITH SIGINT - ##### HANDLE TRANSACTION WITH SIGINT ##### # Start pgcat pgcat_start() - # Wait for server to fully start up - time.sleep(2) - # Create client connection and begin transaction - conn, cur = connect_normal_db(True) + conn, cur = connect_db() cur.execute("BEGIN;") cur.execute("SELECT 1;") @@ -126,17 +131,97 @@ def test_shutdown_logic(): cleanup_conn(conn, cur) pg_cat_send_signal(signal.SIGTERM) - ##### END ##### + # - - - - - - - - - - - - - - - - - - + # NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN + # Start pgcat + pgcat_start() + + # Create client connection and begin transaction + transaction_conn, transaction_cur = connect_db() + + transaction_cur.execute("BEGIN;") + transaction_cur.execute("SELECT 1;") + + # Send sigint to pgcat while still in transaction + pg_cat_send_signal(signal.SIGINT) + time.sleep(1) - ##### HANDLE SHUTDOWN TIMEOUT WITH SIGINT ##### + start = time.perf_counter() + try: + conn, cur = connect_db() + cur.execute("SELECT 1;") + cleanup_conn(conn, cur) + except psycopg2.OperationalError as e: + time_taken = time.perf_counter() - start + if time_taken > 0.1: + raise Exception( + "Failed to reject connection within 0.1 seconds, got", time_taken, "seconds") + pass + else: + raise Exception("Able connect to database during shutdown") + + cleanup_conn(transaction_conn, transaction_cur) + pg_cat_send_signal(signal.SIGTERM) + + # - - - - - - - - - - - - - - - - - - + # ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN # Start pgcat pgcat_start() - # Wait for server to fully start up - time.sleep(3) + # Create client connection and begin transaction + transaction_conn, transaction_cur = connect_db() + + transaction_cur.execute("BEGIN;") + transaction_cur.execute("SELECT 1;") + + # Send sigint to pgcat while still in transaction + pg_cat_send_signal(signal.SIGINT) + time.sleep(1) + + try: + conn, cur = connect_db(admin=True) + cur.execute("SHOW DATABASES;") + cleanup_conn(conn, cur) + except psycopg2.OperationalError as e: + raise Exception(e) + + cleanup_conn(transaction_conn, transaction_cur) + pg_cat_send_signal(signal.SIGTERM) + + # - - - - - - - - - - - - - - - - - - + # ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN + # Start pgcat + pgcat_start() + + # Create client connection and begin transaction + transaction_conn, transaction_cur = connect_db() + transaction_cur.execute("BEGIN;") + transaction_cur.execute("SELECT 1;") + + admin_conn, admin_cur = connect_db(admin=True) + admin_cur.execute("SHOW DATABASES;") + + # Send sigint to pgcat while still in transaction + pg_cat_send_signal(signal.SIGINT) + time.sleep(1) + + try: + admin_cur.execute("SHOW DATABASES;") + except psycopg2.OperationalError as e: + raise Exception("Could not execute admin command:", e) + + cleanup_conn(transaction_conn, transaction_cur) + cleanup_conn(admin_conn, admin_cur) + pg_cat_send_signal(signal.SIGTERM) + + # - - - - - - - - - - - - - - - - - - + # HANDLE SHUTDOWN TIMEOUT WITH SIGINT + + # Start pgcat + pgcat_start() # Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached - conn, cur = connect_normal_db(True) + conn, cur = connect_db() cur.execute("BEGIN;") cur.execute("SELECT 1;") @@ -159,7 +244,7 @@ def test_shutdown_logic(): cleanup_conn(conn, cur) pg_cat_send_signal(signal.SIGTERM) - ##### END ##### + # - - - - - - - - - - - - - - - - - - test_normal_db_access()