From 9393375fec399dfc16a88bf7bbcc350526ab287a Mon Sep 17 00:00:00 2001 From: Bart Louwers Date: Mon, 3 Feb 2025 16:41:11 +0100 Subject: [PATCH 1/4] sqlite: handle exceptions in filter callback database.applyChangeset() --- doc/api/sqlite.md | 4 +- src/node_sqlite.cc | 20 ++++-- test/parallel/test-sqlite-session.js | 94 +++++++++++++++++++++++----- 3 files changed, 93 insertions(+), 25 deletions(-) diff --git a/doc/api/sqlite.md b/doc/api/sqlite.md index cfd57f48922dfd..341bc172b4ebf8 100644 --- a/doc/api/sqlite.md +++ b/doc/api/sqlite.md @@ -241,8 +241,8 @@ added: * `changeset` {Uint8Array} A binary changeset or patchset. * `options` {Object} The configuration options for how the changes will be applied. - * `filter` {Function} Skip changes that, when targeted table name is supplied to this function, return a truthy value. - By default, all changes are attempted. + * `filter` {Function} A table name is provided as an argument to this callback. Returning a truthy value means changes + for the table with that table name should be attempted. By default, changes for all tables are attempted. * `onConflict` {Function} A function that determines how to handle conflicts. The function receives one argument, which can be one of the following values: diff --git a/src/node_sqlite.cc b/src/node_sqlite.cc index 2c830961e72817..196156cbef9bc2 100644 --- a/src/node_sqlite.cc +++ b/src/node_sqlite.cc @@ -825,13 +825,21 @@ void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo& args) { Local filterFunc = filterValue.As(); filterCallback = [env, filterFunc](std::string item) -> bool { + TryCatch try_catch(env->isolate()); Local argv[] = {String::NewFromUtf8(env->isolate(), - item.c_str(), - NewStringType::kNormal) - .ToLocalChecked()}; - Local result = - filterFunc->Call(env->context(), Null(env->isolate()), 1, argv) - .ToLocalChecked(); + item.c_str(), + NewStringType::kNormal) + .ToLocalChecked()}; + MaybeLocal maybe_result = + filterFunc->Call(env->context(), Null(env->isolate()), 1, argv); + if (try_catch.HasCaught()) { + try_catch.ReThrow(); + return false; + } + if (maybe_result.IsEmpty()) { + return false; + } + Local result = maybe_result.ToLocalChecked(); return result->BooleanValue(env->isolate()); }; } diff --git a/test/parallel/test-sqlite-session.js b/test/parallel/test-sqlite-session.js index 5cba37e337e835..22d00188bd1fc8 100644 --- a/test/parallel/test-sqlite-session.js +++ b/test/parallel/test-sqlite-session.js @@ -361,29 +361,89 @@ suite('conflict resolution', () => { }); }); -test('database.createSession() - filter changes', (t) => { - const database1 = new DatabaseSync(':memory:'); - const database2 = new DatabaseSync(':memory:'); - const createTableSql = 'CREATE TABLE data1(key INTEGER PRIMARY KEY); CREATE TABLE data2(key INTEGER PRIMARY KEY);'; - database1.exec(createTableSql); - database2.exec(createTableSql); +suite('filter tables', () => { + function testFilter(t, options) { + const database1 = new DatabaseSync(':memory:'); + const database2 = new DatabaseSync(':memory:'); + const createTableSql = 'CREATE TABLE data1(key INTEGER PRIMARY KEY); CREATE TABLE data2(key INTEGER PRIMARY KEY);'; + + database1.exec(createTableSql); + database2.exec(createTableSql); + + const session = database1.createSession(); + database1.exec('INSERT INTO data1 (key) VALUES (1), (2), (3)'); + database1.exec('INSERT INTO data2 (key) VALUES (1), (2), (3), (4), (5)'); - const session = database1.createSession(); + const applyChangeset = () => database2.applyChangeset(session.changeset(), { + ...(options.filter ? { filter: options.filter } : {}) + }); + if (options.error) { + t.assert.throws(applyChangeset, options.error); + } else { + applyChangeset(); + } - database1.exec('INSERT INTO data1 (key) VALUES (1), (2), (3)'); - database1.exec('INSERT INTO data2 (key) VALUES (1), (2), (3), (4), (5)'); + t.assert.strictEqual(database2.prepare('SELECT * FROM data1').all().length, options.data1); + t.assert.strictEqual(database2.prepare('SELECT * FROM data2').all().length, options.data2); + } - database2.applyChangeset(session.changeset(), { - filter: (tableName) => tableName === 'data2' + test('database.createSession() - filter one table', (t) => { + testFilter(t, { + filter: (tableName) => tableName === 'data2', + // Only changes from data2 should be included + data1: 0, + data2: 5 + }); }); - const data1Rows = database2.prepare('SELECT * FROM data1').all(); - const data2Rows = database2.prepare('SELECT * FROM data2').all(); + test('database.createSession() - throw in filter callback', (t) => { + const error = Error('hello world'); + testFilter(t, { + filter: () => { throw error; }, + error: error, + // When an exception is thrown in the filter function, no changes should be applied + data1: 0, + data2: 0 + }); + }); - // Expect no rows since all changes were filtered out - t.assert.strictEqual(data1Rows.length, 0); - // Expect 5 rows since these changes were not filtered out - t.assert.strictEqual(data2Rows.length, 5); + test('database.createSession() - do not return anything in filter callback', (t) => { + testFilter(t, { + filter: () => {}, + // Undefined is falsy, so it is interpreted as "do not include changes from this table" + data1: 0, + data2: 0 + }); + }); + + test('database.createSession() - return true for all tables', (t) => { + const tables = new Set(); + testFilter(t, { + filter: (tableName) => { tables.add(tableName); return true; }, + // Changes from all tables should be included + data1: 3, + data2: 5 + }); + t.assert.deepEqual(tables, new Set(['data1', 'data2'])); + }); + + test('database.createSession() - return truthy value for all tables', (t) => { + testFilter(t, { + filter: () => 'yes', + // Truthy, so changes from all tables should be included + data1: 3, + data2: 5 + }); + }); + + test('database.createSession() - no filter callback', (t) => { + testFilter(t, { + filter: undefined, + // all changes should be applied + data1: 3, + data2: 5 + }); + }); }); test('database.createSession() - specify other database', (t) => { From 4adb1bfa79e154bb523e9367fb4f677d4355b787 Mon Sep 17 00:00:00 2001 From: Bart Louwers Date: Mon, 3 Feb 2025 16:49:45 +0100 Subject: [PATCH 2/4] sqlite: add test conditional exception filter callback --- test/parallel/test-sqlite-session.js | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/test/parallel/test-sqlite-session.js b/test/parallel/test-sqlite-session.js index 22d00188bd1fc8..25393c4e9b6664 100644 --- a/test/parallel/test-sqlite-session.js +++ b/test/parallel/test-sqlite-session.js @@ -380,7 +380,11 @@ suite('filter tables', () => { if (options.error) { t.assert.throws(applyChangeset, options.error); } else { - applyChangeset(); + try { + applyChangeset(); + } catch (error) { + if (!options.expectError) throw error; + } } t.assert.strictEqual(database2.prepare('SELECT * FROM data1').all().length, options.data1); @@ -407,6 +411,18 @@ suite('filter tables', () => { }); }); + test('database.createSession() - throw sometimes in filter callback', (t) => { + testFilter(t, { + filter: (tableName) => { if (tableName === 'data2') throw new Error(); else { return true; } }, + // Only changes to data1 should be applied + // note that the changeset was not aborted + data1: 3, + data2: 0, + expectError: true + }); + }); + + test('database.createSession() - do not return anything in filter callback', (t) => { testFilter(t, { filter: () => {}, @@ -439,7 +455,7 @@ suite('filter tables', () => { test('database.createSession() - no filter callback', (t) => { testFilter(t, { filter: undefined, - // all changes should be applied + // All changes should be applied data1: 3, data2: 5 }); From 2deacb3dcc548ca456d16bc9122e9d5646437e79 Mon Sep 17 00:00:00 2001 From: Bart Louwers Date: Wed, 12 Feb 2025 11:31:54 +0100 Subject: [PATCH 3/4] sqlite: add failing tests --- test/parallel/test-sqlite-session.js | 89 ++++++++++++++++++++++++---- test/parallel/test-sqlite.js | 10 +--- test/sqlite/next-db.js | 12 ++++ test/sqlite/worker.js | 20 +++++++ 4 files changed, 112 insertions(+), 19 deletions(-) create mode 100644 test/sqlite/next-db.js create mode 100644 test/sqlite/worker.js diff --git a/test/parallel/test-sqlite-session.js b/test/parallel/test-sqlite-session.js index 25393c4e9b6664..bb9aeb87c4ed6e 100644 --- a/test/parallel/test-sqlite-session.js +++ b/test/parallel/test-sqlite-session.js @@ -6,6 +6,9 @@ const { constants, } = require('node:sqlite'); const { test, suite } = require('node:test'); +const { nextDb } = require("../sqlite/next-db.js"); +const { Worker } = require('worker_threads'); +const { once } = require('events'); /** * Convenience wrapper around assert.deepStrictEqual that sets a null @@ -377,14 +380,10 @@ suite('filter tables', () => { const applyChangeset = () => database2.applyChangeset(session.changeset(), { ...(options.filter ? { filter: options.filter } : {}) }); - if (options.error) { - t.assert.throws(applyChangeset, options.error); + if (options.apply) { + options.apply(applyChangeset); } else { - try { - applyChangeset(); - } catch (error) { - if (!options.expectError) throw error; - } + applyChangeset(); } t.assert.strictEqual(database2.prepare('SELECT * FROM data1').all().length, options.data1); @@ -414,14 +413,24 @@ suite('filter tables', () => { test('database.createSession() - throw sometimes in filter callback', (t) => { testFilter(t, { filter: (tableName) => { if (tableName === 'data2') throw new Error(); else { return true; } }, - // Only changes to data1 should be applied - // note that the changeset was not aborted - data1: 3, + data1: 0, data2: 0, expectError: true }); }); + test('database.createSession() - throw sometimes in filter callback', (t) => { + testFilter(t, { + filter: (tableName) => { + if (tableName === "data1") + throw new Error(tableName); + return true; + }, + data1: 0, + data2: 0, + expectError: true + }); + }); test('database.createSession() - do not return anything in filter callback', (t) => { testFilter(t, { @@ -614,3 +623,63 @@ test('session.close() - closing twice', (t) => { message: 'session is not open' }); }); + +test('concurrent applyChangeset with workers', async (t) => { + function modeToString(mode) { + if (mode === constants.SQLITE_CHANGESET_ABORT) return 'SQLITE_CHANGESET_ABORT'; + if (mode === constants.SQLITE_CHANGESET_OMIT) return 'SQLITE_CHANGESET_OMIT'; + } + + const dbPath = nextDb(); + const db1 = new DatabaseSync(dbPath); + const db2 = new DatabaseSync(':memory:'); + const createTable = ` + CREATE TABLE data( + key INTEGER PRIMARY KEY, + value TEXT + ) STRICT`; + db1.exec(createTable); + db2.exec(createTable); + db1.prepare('INSERT INTO data (key, value) VALUES (?, ?)').run(1, 'hello'); + const session = db2.createSession(); + db2.prepare('INSERT INTO data (key, value) VALUES (?, ?)').run(1, 'world'); + const changeset = session.changeset(); // changeset with conflict (for db1) + + const iterations = 100; // Increase chances of race condition + for (let i = 0; i < iterations; i++) { + const workers = []; + const expectedResults = new Map([[constants.SQLITE_CHANGESET_ABORT, false], [constants.SQLITE_CHANGESET_OMIT, true]]); + + // Launch two workers (abort and omit modes) + for (const mode of [constants.SQLITE_CHANGESET_ABORT, constants.SQLITE_CHANGESET_OMIT]) { + const worker = new Worker(`${__dirname}/../sqlite/worker.js`, { + workerData: { + dbPath, + changeset, + mode + }, + }); + workers.push(worker); + } + + const results = await Promise.all(workers.map(async (worker) => { + const [message] = await once(worker, 'message'); + return message; + })); + + // Verify each result + for (const res of results) { + if (res.error) { + t.assert.fail(`Worker error: ${res.error}`); + } + const expected = expectedResults.get(res.mode); + t.assert.strictEqual( + res.result, + expected, + `Iteration ${i}: Worker (${modeToString(res.mode)}) expected ${expected} but got ${res.result}` + ); + } + + workers.forEach(worker => worker.terminate()); // Cleanup + } +}); diff --git a/test/parallel/test-sqlite.js b/test/parallel/test-sqlite.js index c9a45fa22aecc2..699916bf653848 100644 --- a/test/parallel/test-sqlite.js +++ b/test/parallel/test-sqlite.js @@ -1,16 +1,8 @@ 'use strict'; const { spawnPromisified } = require('../common'); -const tmpdir = require('../common/tmpdir'); -const { join } = require('node:path'); const { DatabaseSync, constants } = require('node:sqlite'); const { suite, test } = require('node:test'); -let cnt = 0; - -tmpdir.refresh(); - -function nextDb() { - return join(tmpdir.path, `database-${cnt++}.db`); -} +const { nextDb } = require("../sqlite/next-db.js"); suite('accessing the node:sqlite module', () => { test('cannot be accessed without the node: scheme', (t) => { diff --git a/test/sqlite/next-db.js b/test/sqlite/next-db.js new file mode 100644 index 00000000000000..9b620b0f094acf --- /dev/null +++ b/test/sqlite/next-db.js @@ -0,0 +1,12 @@ +const tmpdir = require('../common/tmpdir'); +const { join } = require('node:path'); + +let cnt = 0; + +tmpdir.refresh(); + +function nextDb() { + return join(tmpdir.path, `database-${cnt++}.db`); +} + +module.exports = { nextDb }; diff --git a/test/sqlite/worker.js b/test/sqlite/worker.js new file mode 100644 index 00000000000000..b8de0f6565cf22 --- /dev/null +++ b/test/sqlite/worker.js @@ -0,0 +1,20 @@ +// this worker is used for one of the tests in test-sqlite-session.js + +const { parentPort, workerData } = require('worker_threads'); +const { DatabaseSync, constants } = require('node:sqlite'); +const { changeset, mode, dbPath } = workerData; + +const db = new DatabaseSync(dbPath); + +const options = {} +if (mode !== constants.SQLITE_CHANGESET_ABORT && mode !== constants.SQLITE_CHANGESET_OMIT) { + throw new Error("Unexpected value for mode"); +} +options.onConflict = () => mode; + +try { + const result = db.applyChangeset(changeset, options); + parentPort.postMessage({ mode, result, error: null }); +} catch (e) { + parentPort.postMessage({ mode, result: null, error: e.message }); +} From 7d3e6b8ff58372606775b137ff95237e868a6fe1 Mon Sep 17 00:00:00 2001 From: Bart Louwers Date: Wed, 12 Feb 2025 12:17:09 +0100 Subject: [PATCH 4/4] sqlite: fix crash --- src/node_sqlite.cc | 37 ++++++++++++++-------------- test/parallel/test-sqlite-session.js | 14 ++++++++--- test/sqlite/worker.js | 6 +++-- 3 files changed, 34 insertions(+), 23 deletions(-) diff --git a/src/node_sqlite.cc b/src/node_sqlite.cc index 196156cbef9bc2..f39c5b0a35ff1f 100644 --- a/src/node_sqlite.cc +++ b/src/node_sqlite.cc @@ -743,26 +743,28 @@ void DatabaseSync::CreateSession(const FunctionCallbackInfo& args) { args.GetReturnValue().Set(session->object()); } +struct ConflictCallbackContext { + std::function filterCallback; + std::function conflictCallback; +}; + // the reason for using static functions here is that SQLite needs a // function pointer -static std::function conflictCallback; static int xConflict(void* pCtx, int eConflict, sqlite3_changeset_iter* pIter) { - if (!conflictCallback) return SQLITE_CHANGESET_ABORT; - return conflictCallback(eConflict); + auto ctx = static_cast(pCtx); + if (!ctx->conflictCallback) return SQLITE_CHANGESET_ABORT; + return ctx->conflictCallback(eConflict); } -static std::function filterCallback; - static int xFilter(void* pCtx, const char* zTab) { - if (!filterCallback) return 1; - - return filterCallback(zTab) ? 1 : 0; + auto ctx = static_cast(pCtx); + if (!ctx->filterCallback) return 1; + return ctx->filterCallback(zTab) ? 1 : 0; } void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo& args) { - conflictCallback = nullptr; - filterCallback = nullptr; + ConflictCallbackContext context; DatabaseSync* db; ASSIGN_OR_RETURN_UNWRAP(&db, args.This()); @@ -794,7 +796,7 @@ void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo& args) { return; } Local conflictFunc = conflictValue.As(); - conflictCallback = [env, conflictFunc](int conflictType) -> int { + context.conflictCallback = [env, conflictFunc](int conflictType) -> int { Local argv[] = {Integer::New(env->isolate(), conflictType)}; TryCatch try_catch(env->isolate()); Local result = @@ -824,22 +826,21 @@ void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo& args) { Local filterFunc = filterValue.As(); - filterCallback = [env, filterFunc](std::string item) -> bool { - TryCatch try_catch(env->isolate()); + context.filterCallback = [env, filterFunc](std::string item) -> bool { Local argv[] = {String::NewFromUtf8(env->isolate(), item.c_str(), NewStringType::kNormal) .ToLocalChecked()}; MaybeLocal maybe_result = filterFunc->Call(env->context(), Null(env->isolate()), 1, argv); - if (try_catch.HasCaught()) { - try_catch.ReThrow(); + + if (maybe_result.IsEmpty()) { return false; } - if (maybe_result.IsEmpty()) { + Local result; + if (!maybe_result.ToLocal(&result)) { return false; } - Local result = maybe_result.ToLocalChecked(); return result->BooleanValue(env->isolate()); }; } @@ -852,7 +853,7 @@ void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo& args) { const_cast(static_cast(buf.data())), xFilter, xConflict, - nullptr); + static_cast(&context)); if (r == SQLITE_OK) { args.GetReturnValue().Set(true); return; diff --git a/test/parallel/test-sqlite-session.js b/test/parallel/test-sqlite-session.js index bb9aeb87c4ed6e..a75bd665022e3a 100644 --- a/test/parallel/test-sqlite-session.js +++ b/test/parallel/test-sqlite-session.js @@ -625,6 +625,10 @@ test('session.close() - closing twice', (t) => { }); test('concurrent applyChangeset with workers', async (t) => { + // before adding this test, the callbacks were stored in static variables + // this could result in a crash + // this test is a regression test for that scenario + function modeToString(mode) { if (mode === constants.SQLITE_CHANGESET_ABORT) return 'SQLITE_CHANGESET_ABORT'; if (mode === constants.SQLITE_CHANGESET_OMIT) return 'SQLITE_CHANGESET_OMIT'; @@ -641,11 +645,12 @@ test('concurrent applyChangeset with workers', async (t) => { db1.exec(createTable); db2.exec(createTable); db1.prepare('INSERT INTO data (key, value) VALUES (?, ?)').run(1, 'hello'); + db1.close(); const session = db2.createSession(); db2.prepare('INSERT INTO data (key, value) VALUES (?, ?)').run(1, 'world'); const changeset = session.changeset(); // changeset with conflict (for db1) - const iterations = 100; // Increase chances of race condition + const iterations = 10; for (let i = 0; i < iterations; i++) { const workers = []; const expectedResults = new Map([[constants.SQLITE_CHANGESET_ABORT, false], [constants.SQLITE_CHANGESET_OMIT, true]]); @@ -669,8 +674,11 @@ test('concurrent applyChangeset with workers', async (t) => { // Verify each result for (const res of results) { - if (res.error) { - t.assert.fail(`Worker error: ${res.error}`); + if (res.errorMessage) { + if (res.errcode === 5) { // SQLITE_BUSY + break; // ignore + } + t.assert.fail(`Worker error: ${res.error.message}`); } const expected = expectedResults.get(res.mode); t.assert.strictEqual( diff --git a/test/sqlite/worker.js b/test/sqlite/worker.js index b8de0f6565cf22..0b490bbf7e0611 100644 --- a/test/sqlite/worker.js +++ b/test/sqlite/worker.js @@ -15,6 +15,8 @@ options.onConflict = () => mode; try { const result = db.applyChangeset(changeset, options); parentPort.postMessage({ mode, result, error: null }); -} catch (e) { - parentPort.postMessage({ mode, result: null, error: e.message }); +} catch (error) { + parentPort.postMessage({ mode, result: null, errorMessage: error.message, errcode: error.errcode }); +} finally { + db.close(); // just to make sure it is closed ASAP }