Skip to content

Commit d81c1c9

Browse files
committed
api: proposal to add the context support
This patch adds the support of using context in API. The proposed API is based on using request objects. Added tests that cover almost all cases of using the context in a query. Added benchamrk tests are equivalent to other, that use the same query but without any context. Closes #48
1 parent c943276 commit d81c1c9

File tree

8 files changed

+373
-43
lines changed

8 files changed

+373
-43
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ CI and documentation.
3636
- Master discovery (#113)
3737
- SQL support (#62)
3838
- Add public API with a request object for Select/Update/Upstream (#126)
39+
- Context support for request objects
3940

4041
### Changed
4142

config.lua

+6
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,12 @@ local function simple_incr(a)
110110
end
111111
rawset(_G, 'simple_incr', simple_incr)
112112

113+
local function simple_sleep(a)
114+
require('fiber').sleep(15)
115+
return a + 1
116+
end
117+
rawset(_G, 'simple_sleep', simple_sleep)
118+
113119
box.space.test:truncate()
114120

115121
--box.schema.user.revoke('guest', 'read,write,execute', 'universe')

connection.go

+24-6
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package tarantool
55
import (
66
"bufio"
77
"bytes"
8+
"context"
89
"errors"
910
"fmt"
1011
"io"
@@ -916,9 +917,14 @@ func (conn *Connection) nextRequestId() (requestId uint32) {
916917
// Do verifies, sends the request and returns a response.
917918
//
918919
// An error is returned if the request was formed incorrectly, or failure to
919-
// communicate by the connection, or unable to decode the response.
920-
func (conn *Connection) Do(req Request) (*Response, error) {
921-
fut, err := conn.DoAsync(req)
920+
// communicate by the connection, or unable to decode the response, or nil context is passed
921+
// or context is canceled or done.
922+
func (conn *Connection) Do(ctx context.Context, req Request) (*Response, error) {
923+
if ctx == nil {
924+
return nil, errors.New("passed nil context")
925+
}
926+
927+
fut, err := conn.DoAsync(ctx, req)
922928
if err != nil {
923929
return nil, err
924930
}
@@ -929,8 +935,12 @@ func (conn *Connection) Do(req Request) (*Response, error) {
929935
//
930936
// An error is returned if the request was formed incorrectly, or failure to
931937
// communicate by the connection, or unable to decode the response.
932-
func (conn *Connection) DoTyped(req Request, result interface{}) error {
933-
fut, err := conn.DoAsync(req)
938+
func (conn *Connection) DoTyped(ctx context.Context, req Request, result interface{}) error {
939+
if ctx == nil {
940+
return errors.New("passed nil context")
941+
}
942+
943+
fut, err := conn.DoAsync(ctx, req)
934944
if err != nil {
935945
return err
936946
}
@@ -941,12 +951,20 @@ func (conn *Connection) DoTyped(req Request, result interface{}) error {
941951
//
942952
// An error is returned if the request was formed incorrectly, or failure to
943953
// create the future.
944-
func (conn *Connection) DoAsync(req Request) (*Future, error) {
954+
func (conn *Connection) DoAsync(ctx context.Context, req Request) (*Future, error) {
945955
bodyFunc, err := req.BodyFunc(conn.Schema)
946956
if err != nil {
947957
return nil, err
948958
}
949959
future := conn.newFuture(req.Code())
960+
if ctx != nil {
961+
select {
962+
case <-ctx.Done():
963+
return nil, fmt.Errorf("context is done")
964+
default:
965+
future.WithCtx(ctx)
966+
}
967+
}
950968
return future.send(conn, bodyFunc), nil
951969
}
952970

connection_pool/connection_pool.go

+7-6
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
package connection_pool
1212

1313
import (
14+
"context"
1415
"errors"
1516
"log"
1617
"sync/atomic"
@@ -483,33 +484,33 @@ func (connPool *ConnectionPool) EvalAsync(expr string, args interface{}, userMod
483484
}
484485

485486
// Do sends the request and returns a response.
486-
func (connPool *ConnectionPool) Do(req tarantool.Request, userMode Mode) (*tarantool.Response, error) {
487+
func (connPool *ConnectionPool) Do(ctx context.Context, req tarantool.Request, userMode Mode) (*tarantool.Response, error) {
487488
conn, err := connPool.getNextConnection(userMode)
488489
if err != nil {
489490
return nil, err
490491
}
491492

492-
return conn.Do(req)
493+
return conn.Do(ctx, req)
493494
}
494495

495496
// DoTyped sends the request and fills the typed result.
496-
func (connPool *ConnectionPool) DoTyped(req tarantool.Request, result interface{}, userMode Mode) error {
497+
func (connPool *ConnectionPool) DoTyped(ctx context.Context, req tarantool.Request, result interface{}, userMode Mode) error {
497498
conn, err := connPool.getNextConnection(userMode)
498499
if err != nil {
499500
return err
500501
}
501502

502-
return conn.DoTyped(req, result)
503+
return conn.DoTyped(ctx, req, result)
503504
}
504505

505506
// DoAsync sends the request and returns a future.
506-
func (connPool *ConnectionPool) DoAsync(req tarantool.Request, userMode Mode) (*tarantool.Future, error) {
507+
func (connPool *ConnectionPool) DoAsync(ctx context.Context, req tarantool.Request, userMode Mode) (*tarantool.Future, error) {
507508
conn, err := connPool.getNextConnection(userMode)
508509
if err != nil {
509510
return nil, err
510511
}
511512

512-
return conn.DoAsync(req)
513+
return conn.DoAsync(ctx, req)
513514
}
514515

515516
//

connector.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package tarantool
22

3-
import "time"
3+
import (
4+
"context"
5+
"time"
6+
)
47

58
type Connector interface {
69
ConnectedNow() bool
@@ -39,7 +42,7 @@ type Connector interface {
3942
Call17Async(functionName string, args interface{}) *Future
4043
EvalAsync(expr string, args interface{}) *Future
4144

42-
Do(req Request) (resp *Response, err error)
43-
DoTyped(req Request, result interface{}) (err error)
44-
DoAsync(req Request) (fut *Future, err error)
45+
Do(ctx context.Context, req Request) (resp *Response, err error)
46+
DoTyped(ctx context.Context, req Request, result interface{}) (err error)
47+
DoAsync(ctx context.Context, req Request) (fut *Future, err error)
4548
}

example_test.go

+14-7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package tarantool_test
22

33
import (
4+
"context"
45
"fmt"
56
"time"
67

@@ -129,11 +130,13 @@ func ExampleConnection_SelectAsync() {
129130
func ExampleSelectRequest() {
130131
conn := example_connect()
131132
defer conn.Close()
133+
var ctx, cancel = context.WithCancel(context.Background())
134+
defer cancel()
132135

133136
req := tarantool.NewSelectRequest(517).
134137
Limit(100).
135138
Key(tarantool.IntKey{1111})
136-
resp, err := conn.Do(req)
139+
resp, err := conn.Do(ctx, req)
137140
if err != nil {
138141
fmt.Printf("error in do select request is %v", err)
139142
return
@@ -144,7 +147,7 @@ func ExampleSelectRequest() {
144147
Index("primary").
145148
Limit(100).
146149
Key(tarantool.IntKey{1111})
147-
fut, err := conn.DoAsync(req)
150+
fut, err := conn.DoAsync(ctx, req)
148151
if err != nil {
149152
fmt.Printf("error in do async select request is %v", err)
150153
}
@@ -162,11 +165,13 @@ func ExampleSelectRequest() {
162165
func ExampleUpdateRequest() {
163166
conn := example_connect()
164167
defer conn.Close()
168+
var ctx, cancel = context.WithCancel(context.Background())
169+
defer cancel()
165170

166171
req := tarantool.NewUpdateRequest(517).
167172
Key(tarantool.IntKey{1111}).
168173
Operations(tarantool.NewOperations().Assign(1, "bye"))
169-
resp, err := conn.Do(req)
174+
resp, err := conn.Do(ctx, req)
170175
if err != nil {
171176
fmt.Printf("error in do update request is %v", err)
172177
return
@@ -177,7 +182,7 @@ func ExampleUpdateRequest() {
177182
Index("primary").
178183
Key(tarantool.IntKey{1111}).
179184
Operations(tarantool.NewOperations().Assign(1, "hello"))
180-
fut, err := conn.DoAsync(req)
185+
fut, err := conn.DoAsync(ctx, req)
181186
if err != nil {
182187
fmt.Printf("error in do async update request is %v", err)
183188
}
@@ -195,12 +200,14 @@ func ExampleUpdateRequest() {
195200
func ExampleUpsertRequest() {
196201
conn := example_connect()
197202
defer conn.Close()
203+
var ctx, cancel = context.WithCancel(context.Background())
204+
defer cancel()
198205

199206
var req tarantool.Request
200207
req = tarantool.NewUpsertRequest(517).
201208
Tuple([]interface{}{uint(1113), "first", "first"}).
202209
Operations(tarantool.NewOperations().Assign(1, "updated"))
203-
resp, err := conn.Do(req)
210+
resp, err := conn.Do(ctx, req)
204211
if err != nil {
205212
fmt.Printf("error in do select upsert is %v", err)
206213
return
@@ -210,7 +217,7 @@ func ExampleUpsertRequest() {
210217
req = tarantool.NewUpsertRequest("test").
211218
Tuple([]interface{}{uint(1113), "second", "second"}).
212219
Operations(tarantool.NewOperations().Assign(2, "updated"))
213-
fut, err := conn.DoAsync(req)
220+
fut, err := conn.DoAsync(ctx, req)
214221
if err != nil {
215222
fmt.Printf("error in do async upsert request is %v", err)
216223
}
@@ -224,7 +231,7 @@ func ExampleUpsertRequest() {
224231
req = tarantool.NewSelectRequest(517).
225232
Limit(100).
226233
Key(tarantool.IntKey{1113})
227-
resp, err = conn.Do(req)
234+
resp, err = conn.Do(ctx, req)
228235
if err != nil {
229236
fmt.Printf("error in do select request is %v", err)
230237
return

future.go

+28-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package tarantool
22

33
import (
4+
"context"
5+
"fmt"
46
"time"
57

68
"gopkg.in/vmihailenco/msgpack.v2"
@@ -15,6 +17,14 @@ type Future struct {
1517
err error
1618
ready chan struct{}
1719
next *Future
20+
ctx context.Context
21+
}
22+
23+
// WithCtx sets a passed context to a Future object and
24+
// returns the same object with that context.
25+
func (fut *Future) WithCtx(ctx context.Context) *Future {
26+
fut.ctx = ctx
27+
return fut
1828
}
1929

2030
// NewErrorFuture returns new set empty Future with filled error field.
@@ -127,5 +137,22 @@ func (fut *Future) wait() {
127137
if fut.ready == nil {
128138
return
129139
}
130-
<-fut.ready
140+
if fut.ctx == nil {
141+
<-fut.ready
142+
return
143+
}
144+
select {
145+
case <-fut.ready:
146+
default:
147+
select {
148+
case <-fut.ctx.Done():
149+
fut.err = fmt.Errorf("context is done")
150+
default:
151+
select {
152+
case <-fut.ready:
153+
case <-fut.ctx.Done():
154+
fut.err = fmt.Errorf("context is done")
155+
}
156+
}
157+
}
131158
}

0 commit comments

Comments
 (0)