Skip to content

Commit 3eeb7b4

Browse files
louwerstargos
authored andcommitted
sqlite: fix crash session extension callbacks with workers
PR-URL: #59848 Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: Zeyu "Alex" Yang <[email protected]> Reviewed-By: Edy Silva <[email protected]>
1 parent 0fe5337 commit 3eeb7b4

File tree

5 files changed

+135
-26
lines changed

5 files changed

+135
-26
lines changed

src/node_sqlite.cc

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1668,26 +1668,28 @@ void Backup(const FunctionCallbackInfo<Value>& args) {
16681668
job->ScheduleBackup();
16691669
}
16701670

1671+
struct ConflictCallbackContext {
1672+
std::function<bool(std::string_view)> filterCallback;
1673+
std::function<int(int)> conflictCallback;
1674+
};
1675+
16711676
// the reason for using static functions here is that SQLite needs a
16721677
// function pointer
1673-
static std::function<int(int)> conflictCallback;
16741678

16751679
static int xConflict(void* pCtx, int eConflict, sqlite3_changeset_iter* pIter) {
1676-
if (!conflictCallback) return SQLITE_CHANGESET_ABORT;
1677-
return conflictCallback(eConflict);
1680+
auto ctx = static_cast<ConflictCallbackContext*>(pCtx);
1681+
if (!ctx->conflictCallback) return SQLITE_CHANGESET_ABORT;
1682+
return ctx->conflictCallback(eConflict);
16781683
}
16791684

1680-
static std::function<bool(std::string)> filterCallback;
1681-
16821685
static int xFilter(void* pCtx, const char* zTab) {
1683-
if (!filterCallback) return 1;
1684-
1685-
return filterCallback(zTab) ? 1 : 0;
1686+
auto ctx = static_cast<ConflictCallbackContext*>(pCtx);
1687+
if (!ctx->filterCallback) return 1;
1688+
return ctx->filterCallback(zTab) ? 1 : 0;
16861689
}
16871690

16881691
void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo<Value>& args) {
1689-
conflictCallback = nullptr;
1690-
filterCallback = nullptr;
1692+
ConflictCallbackContext context;
16911693

16921694
DatabaseSync* db;
16931695
ASSIGN_OR_RETURN_UNWRAP(&db, args.This());
@@ -1723,7 +1725,7 @@ void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo<Value>& args) {
17231725
return;
17241726
}
17251727
Local<Function> conflictFunc = conflictValue.As<Function>();
1726-
conflictCallback = [env, conflictFunc](int conflictType) -> int {
1728+
context.conflictCallback = [env, conflictFunc](int conflictType) -> int {
17271729
Local<Value> argv[] = {Integer::New(env->isolate(), conflictType)};
17281730
TryCatch try_catch(env->isolate());
17291731
Local<Value> result =
@@ -1761,15 +1763,18 @@ void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo<Value>& args) {
17611763

17621764
Local<Function> filterFunc = filterValue.As<Function>();
17631765

1764-
filterCallback = [env, filterFunc](std::string item) -> bool {
1766+
context.filterCallback = [env,
1767+
filterFunc](std::string_view item) -> bool {
17651768
// TODO(@jasnell): The use of ToLocalChecked here means that if
17661769
// the filter function throws an error the process will crash.
17671770
// The filterCallback should be updated to avoid the check and
17681771
// propagate the error correctly.
1769-
Local<Value> argv[] = {String::NewFromUtf8(env->isolate(),
1770-
item.c_str(),
1771-
NewStringType::kNormal)
1772-
.ToLocalChecked()};
1772+
Local<Value> argv[] = {
1773+
String::NewFromUtf8(env->isolate(),
1774+
item.data(),
1775+
NewStringType::kNormal,
1776+
static_cast<int>(item.size()))
1777+
.ToLocalChecked()};
17731778
Local<Value> result =
17741779
filterFunc->Call(env->context(), Null(env->isolate()), 1, argv)
17751780
.ToLocalChecked();
@@ -1785,7 +1790,7 @@ void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo<Value>& args) {
17851790
const_cast<void*>(static_cast<const void*>(buf.data())),
17861791
xFilter,
17871792
xConflict,
1788-
nullptr);
1793+
static_cast<void*>(&context));
17891794
if (r == SQLITE_OK) {
17901795
args.GetReturnValue().Set(true);
17911796
return;

test/parallel/test-sqlite-session.js

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ const {
77
constants,
88
} = require('node:sqlite');
99
const { test, suite } = require('node:test');
10+
const { nextDb } = require('../sqlite/next-db.js');
11+
const { Worker } = require('worker_threads');
12+
const { once } = require('events');
1013

1114
/**
1215
* Convenience wrapper around assert.deepStrictEqual that sets a null
@@ -555,3 +558,74 @@ test('session supports ERM', (t) => {
555558
message: /session is not open/,
556559
});
557560
});
561+
562+
test('concurrent applyChangeset with workers', async (t) => {
563+
// Before adding this test, the callbacks were stored in static variables
564+
// this could result in a crash
565+
// this test is a regression test for that scenario
566+
567+
function modeToString(mode) {
568+
if (mode === constants.SQLITE_CHANGESET_ABORT) return 'SQLITE_CHANGESET_ABORT';
569+
if (mode === constants.SQLITE_CHANGESET_OMIT) return 'SQLITE_CHANGESET_OMIT';
570+
}
571+
572+
const dbPath = nextDb();
573+
const db1 = new DatabaseSync(dbPath);
574+
const db2 = new DatabaseSync(':memory:');
575+
const createTable = `
576+
CREATE TABLE data(
577+
key INTEGER PRIMARY KEY,
578+
value TEXT
579+
) STRICT`;
580+
db1.exec(createTable);
581+
db2.exec(createTable);
582+
db1.prepare('INSERT INTO data (key, value) VALUES (?, ?)').run(1, 'hello');
583+
db1.close();
584+
const session = db2.createSession();
585+
db2.prepare('INSERT INTO data (key, value) VALUES (?, ?)').run(1, 'world');
586+
const changeset = session.changeset(); // Changeset with conflict (for db1)
587+
588+
const iterations = 10;
589+
for (let i = 0; i < iterations; i++) {
590+
const workers = [];
591+
const expectedResults = new Map([
592+
[constants.SQLITE_CHANGESET_ABORT, false],
593+
[constants.SQLITE_CHANGESET_OMIT, true]]
594+
);
595+
596+
// Launch two workers (abort and omit modes)
597+
for (const mode of [constants.SQLITE_CHANGESET_ABORT, constants.SQLITE_CHANGESET_OMIT]) {
598+
const worker = new Worker(`${__dirname}/../sqlite/worker.js`, {
599+
workerData: {
600+
dbPath,
601+
changeset,
602+
mode
603+
},
604+
});
605+
workers.push(worker);
606+
}
607+
608+
const results = await Promise.all(workers.map(async (worker) => {
609+
const [message] = await once(worker, 'message');
610+
return message;
611+
}));
612+
613+
// Verify each result
614+
for (const res of results) {
615+
if (res.errorMessage) {
616+
if (res.errcode === 5) { // SQLITE_BUSY
617+
break; // ignore
618+
}
619+
t.assert.fail(`Worker error: ${res.error.message}`);
620+
}
621+
const expected = expectedResults.get(res.mode);
622+
t.assert.strictEqual(
623+
res.result,
624+
expected,
625+
`Iteration ${i}: Worker (${modeToString(res.mode)}) expected ${expected} but got ${res.result}`
626+
);
627+
}
628+
629+
workers.forEach((worker) => worker.terminate()); // Cleanup
630+
}
631+
});

test/parallel/test-sqlite.js

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,10 @@
11
'use strict';
22
const { spawnPromisified, skipIfSQLiteMissing } = require('../common');
33
skipIfSQLiteMissing();
4-
const tmpdir = require('../common/tmpdir');
5-
const { join } = require('node:path');
64
const { DatabaseSync, constants } = require('node:sqlite');
75
const { suite, test } = require('node:test');
86
const { pathToFileURL } = require('node:url');
9-
let cnt = 0;
10-
11-
tmpdir.refresh();
12-
13-
function nextDb() {
14-
return join(tmpdir.path, `database-${cnt++}.db`);
15-
}
7+
const { nextDb } = require('../sqlite/next-db.js');
168

179
suite('accessing the node:sqlite module', () => {
1810
test('cannot be accessed without the node: scheme', (t) => {

test/sqlite/next-db.js

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
'use strict';
2+
require('../common');
3+
const tmpdir = require('../common/tmpdir');
4+
const { join } = require('node:path');
5+
6+
let cnt = 0;
7+
8+
tmpdir.refresh();
9+
10+
function nextDb() {
11+
return join(tmpdir.path, `database-${cnt++}.db`);
12+
}
13+
14+
module.exports = { nextDb };

test/sqlite/worker.js

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// This worker is used for one of the tests in test-sqlite-session.js
2+
3+
'use strict';
4+
require('../common');
5+
const { parentPort, workerData } = require('worker_threads');
6+
const { DatabaseSync, constants } = require('node:sqlite');
7+
const { changeset, mode, dbPath } = workerData;
8+
9+
const db = new DatabaseSync(dbPath);
10+
11+
const options = {};
12+
if (mode !== constants.SQLITE_CHANGESET_ABORT && mode !== constants.SQLITE_CHANGESET_OMIT) {
13+
throw new Error('Unexpected value for mode');
14+
}
15+
options.onConflict = () => mode;
16+
17+
try {
18+
const result = db.applyChangeset(changeset, options);
19+
parentPort.postMessage({ mode, result, error: null });
20+
} catch (error) {
21+
parentPort.postMessage({ mode, result: null, errorMessage: error.message, errcode: error.errcode });
22+
} finally {
23+
db.close(); // Just to make sure it is closed ASAP
24+
}

0 commit comments

Comments
 (0)