Skip to content

Commit 213f008

Browse files
committed
Lock on the current Fiber rather than current Thread
Applications using fiber are able to do concurrent queries from the same thread.
1 parent ba4d465 commit 213f008

File tree

5 files changed

+30
-26
lines changed

5 files changed

+30
-26
lines changed

ext/mysql2/client.c

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ static void rb_mysql_client_mark(void * wrapper) {
193193
mysql_client_wrapper * w = wrapper;
194194
if (w) {
195195
rb_gc_mark(w->encoding);
196-
rb_gc_mark(w->active_thread);
196+
rb_gc_mark(w->active_fiber);
197197
}
198198
}
199199

@@ -297,7 +297,7 @@ static void *nogvl_close(void *ptr) {
297297
mysql_close(wrapper->client);
298298
wrapper->closed = 1;
299299
wrapper->reconnect_enabled = 0;
300-
wrapper->active_thread = Qnil;
300+
wrapper->active_fiber = Qnil;
301301
}
302302

303303
return NULL;
@@ -342,7 +342,7 @@ static VALUE allocate(VALUE klass) {
342342
mysql_client_wrapper * wrapper;
343343
obj = Data_Make_Struct(klass, mysql_client_wrapper, rb_mysql_client_mark, rb_mysql_client_free, wrapper);
344344
wrapper->encoding = Qnil;
345-
wrapper->active_thread = Qnil;
345+
wrapper->active_fiber = Qnil;
346346
wrapper->automatic_close = 1;
347347
wrapper->server_version = 0;
348348
wrapper->reconnect_enabled = 0;
@@ -543,7 +543,7 @@ static VALUE do_send_query(VALUE args) {
543543
mysql_client_wrapper *wrapper = query_args->wrapper;
544544
if ((VALUE)rb_thread_call_without_gvl(nogvl_send_query, query_args, RUBY_UBF_IO, 0) == Qfalse) {
545545
/* an error occurred, we're not active anymore */
546-
wrapper->active_thread = Qnil;
546+
wrapper->active_fiber = Qnil;
547547
rb_raise_mysql2_error(wrapper);
548548
}
549549
return Qnil;
@@ -573,7 +573,7 @@ static void *nogvl_do_result(void *ptr, char use_result) {
573573

574574
/* once our result is stored off, this connection is
575575
ready for another command to be issued */
576-
wrapper->active_thread = Qnil;
576+
wrapper->active_fiber = Qnil;
577577

578578
return result;
579579
}
@@ -599,13 +599,13 @@ static VALUE rb_mysql_client_async_result(VALUE self) {
599599
GET_CLIENT(self);
600600

601601
/* if we're not waiting on a result, do nothing */
602-
if (NIL_P(wrapper->active_thread))
602+
if (NIL_P(wrapper->active_fiber))
603603
return Qnil;
604604

605605
REQUIRE_CONNECTED(wrapper);
606606
if ((VALUE)rb_thread_call_without_gvl(nogvl_read_query_result, wrapper->client, RUBY_UBF_IO, 0) == Qfalse) {
607607
/* an error occurred, mark this connection inactive */
608-
wrapper->active_thread = Qnil;
608+
wrapper->active_fiber = Qnil;
609609
rb_raise_mysql2_error(wrapper);
610610
}
611611

@@ -618,7 +618,7 @@ static VALUE rb_mysql_client_async_result(VALUE self) {
618618

619619
if (result == NULL) {
620620
if (mysql_errno(wrapper->client) != 0) {
621-
wrapper->active_thread = Qnil;
621+
wrapper->active_fiber = Qnil;
622622
rb_raise_mysql2_error(wrapper);
623623
}
624624
/* no data and no error, so query was not a SELECT */
@@ -645,7 +645,7 @@ struct async_query_args {
645645
static VALUE disconnect_and_raise(VALUE self, VALUE error) {
646646
GET_CLIENT(self);
647647

648-
wrapper->active_thread = Qnil;
648+
wrapper->active_fiber = Qnil;
649649

650650
/* Invalidate the MySQL socket to prevent further communication.
651651
* The GC will come along later and call mysql_close to free it.
@@ -710,7 +710,7 @@ static VALUE disconnect_and_mark_inactive(VALUE self) {
710710
GET_CLIENT(self);
711711

712712
/* Check if execution terminated while result was still being read. */
713-
if (!NIL_P(wrapper->active_thread)) {
713+
if (!NIL_P(wrapper->active_fiber)) {
714714
if (CONNECTED(wrapper)) {
715715
/* Invalidate the MySQL socket to prevent further communication. */
716716
#ifndef _WIN32
@@ -725,24 +725,24 @@ static VALUE disconnect_and_mark_inactive(VALUE self) {
725725
}
726726
/* Skip mysql client check performed before command execution. */
727727
wrapper->client->status = MYSQL_STATUS_READY;
728-
wrapper->active_thread = Qnil;
728+
wrapper->active_fiber = Qnil;
729729
}
730730

731731
return Qnil;
732732
}
733733

734-
void rb_mysql_client_set_active_thread(VALUE self) {
735-
VALUE thread_current = rb_thread_current();
734+
static void rb_mysql_client_set_active_fiber(VALUE self) {
735+
VALUE fiber_current = rb_fiber_current();
736736
GET_CLIENT(self);
737737

738738
// see if this connection is still waiting on a result from a previous query
739-
if (NIL_P(wrapper->active_thread)) {
739+
if (NIL_P(wrapper->active_fiber)) {
740740
// mark this connection active
741-
wrapper->active_thread = thread_current;
742-
} else if (wrapper->active_thread == thread_current) {
741+
wrapper->active_fiber = fiber_current;
742+
} else if (wrapper->active_fiber == fiber_current) {
743743
rb_raise(cMysql2Error, "This connection is still waiting for a result, try again once you have the result");
744744
} else {
745-
VALUE inspect = rb_inspect(wrapper->active_thread);
745+
VALUE inspect = rb_inspect(wrapper->active_fiber);
746746
const char *thr = StringValueCStr(inspect);
747747

748748
rb_raise(cMysql2Error, "This connection is in use by: %s", thr);
@@ -806,7 +806,7 @@ static VALUE rb_mysql_query(VALUE self, VALUE sql, VALUE current) {
806806
args.sql_len = RSTRING_LEN(args.sql);
807807
args.wrapper = wrapper;
808808

809-
rb_mysql_client_set_active_thread(self);
809+
rb_mysql_client_set_active_fiber(self);
810810

811811
#ifndef _WIN32
812812
rb_rescue2(do_send_query, (VALUE)&args, disconnect_and_raise, self, rb_eException, (VALUE)0);

ext/mysql2/client.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
typedef struct {
55
VALUE encoding;
6-
VALUE active_thread; /* rb_thread_current() or Qnil */
6+
VALUE active_fiber; /* rb_fiber_current() or Qnil */
77
long server_version;
88
int reconnect_enabled;
99
unsigned int connect_timeout;
@@ -15,7 +15,6 @@ typedef struct {
1515
MYSQL *client;
1616
} mysql_client_wrapper;
1717

18-
void rb_mysql_client_set_active_thread(VALUE self);
1918
void rb_mysql_set_server_query_flags(MYSQL *client, VALUE result);
2019

2120
#define GET_CLIENT(self) \

ext/mysql2/statement.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ static VALUE rb_mysql_stmt_execute(int argc, VALUE *argv, VALUE self) {
448448
if (metadata == NULL) {
449449
if (mysql_stmt_errno(stmt) != 0) {
450450
// either CR_OUT_OF_MEMORY or CR_UNKNOWN_ERROR. both fatal.
451-
wrapper->active_thread = Qnil;
451+
wrapper->active_fiber = Qnil;
452452
rb_raise_mysql2_stmt_error(stmt_wrapper);
453453
}
454454
// no data and no error, so query was not a SELECT
@@ -461,7 +461,7 @@ static VALUE rb_mysql_stmt_execute(int argc, VALUE *argv, VALUE self) {
461461
mysql_free_result(metadata);
462462
rb_raise_mysql2_stmt_error(stmt_wrapper);
463463
}
464-
wrapper->active_thread = Qnil;
464+
wrapper->active_fiber = Qnil;
465465
}
466466

467467
resultObj = rb_mysql_result_to_obj(stmt_wrapper->client, wrapper->encoding, current, metadata, self);
@@ -502,7 +502,7 @@ static VALUE rb_mysql_stmt_fields(VALUE self) {
502502
if (metadata == NULL) {
503503
if (mysql_stmt_errno(stmt) != 0) {
504504
// either CR_OUT_OF_MEMORY or CR_UNKNOWN_ERROR. both fatal.
505-
wrapper->active_thread = Qnil;
505+
wrapper->active_fiber = Qnil;
506506
rb_raise_mysql2_stmt_error(stmt_wrapper);
507507
}
508508
// no data and no error, so query was not a SELECT

spec/mysql2/client_spec.rb

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -624,10 +624,13 @@ def run_gc
624624
end
625625

626626
it "should describe the thread holding the active query" do
627-
thr = Thread.new { @client.query("SELECT 1", async: true) }
627+
thr = Thread.new do
628+
@client.query("SELECT 1", async: true)
629+
Fiber.current
630+
end
628631

629-
thr.join
630-
expect { @client.query('SELECT 1') }.to raise_error(Mysql2::Error, Regexp.new(Regexp.escape(thr.inspect)))
632+
fiber = thr.value
633+
expect { @client.query('SELECT 1') }.to raise_error(Mysql2::Error, Regexp.new(Regexp.escape(fiber.inspect)))
631634
end
632635

633636
it "should timeout if we wait longer than :read_timeout" do

spec/spec_helper.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
require 'mysql2'
33
require 'timeout'
44
require 'yaml'
5+
require 'fiber'
6+
57
DatabaseCredentials = YAML.load_file('spec/configuration.yml')
68

79
if GC.respond_to?(:verify_compaction_references)

0 commit comments

Comments
 (0)