From 05c0dafadd73d967f16d92b6be772c045a7bcbdf Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Wed, 25 May 2022 13:40:46 +0300 Subject: [PATCH 1/2] code health: extract the Future type to future.go This is not a critical change. It just splits request.go into request.go and future.go. --- future.go | 131 ++++++++++++++++++++++++++++++++++++++++++++++++ request.go | 143 ++++------------------------------------------------- 2 files changed, 140 insertions(+), 134 deletions(-) create mode 100644 future.go diff --git a/future.go b/future.go new file mode 100644 index 000000000..fbf141f82 --- /dev/null +++ b/future.go @@ -0,0 +1,131 @@ +package tarantool + +import ( + "time" + + "gopkg.in/vmihailenco/msgpack.v2" +) + +// Future is a handle for asynchronous request. +type Future struct { + requestId uint32 + requestCode int32 + timeout time.Duration + resp *Response + err error + ready chan struct{} + next *Future +} + +// NewErrorFuture returns new set empty Future with filled error field. +func NewErrorFuture(err error) *Future { + return &Future{err: err} +} + +// Get waits for Future to be filled and returns Response and error. +// +// Response will contain deserialized result in Data field. +// It will be []interface{}, so if you want more performace, use GetTyped method. +// +// Note: Response could be equal to nil if ClientError is returned in error. +// +// "error" could be Error, if it is error returned by Tarantool, +// or ClientError, if something bad happens in a client process. +func (fut *Future) Get() (*Response, error) { + fut.wait() + if fut.err != nil { + return fut.resp, fut.err + } + fut.err = fut.resp.decodeBody() + return fut.resp, fut.err +} + +// GetTyped waits for Future and calls msgpack.Decoder.Decode(result) if no error happens. +// It is could be much faster than Get() function. +// +// Note: Tarantool usually returns array of tuples (except for Eval and Call17 actions). +func (fut *Future) GetTyped(result interface{}) error { + fut.wait() + if fut.err != nil { + return fut.err + } + fut.err = fut.resp.decodeBodyTyped(result) + return fut.err +} + +var closedChan = make(chan struct{}) + +func init() { + close(closedChan) +} + +// WaitChan returns channel which becomes closed when response arrived or error occured. +func (fut *Future) WaitChan() <-chan struct{} { + if fut.ready == nil { + return closedChan + } + return fut.ready +} + +// Err returns error set on Future. +// It waits for future to be set. +// Note: it doesn't decode body, therefore decoding error are not set here. +func (fut *Future) Err() error { + fut.wait() + return fut.err +} + +func (fut *Future) pack(h *smallWBuf, enc *msgpack.Encoder, body func(*msgpack.Encoder) error) (err error) { + rid := fut.requestId + hl := h.Len() + h.Write([]byte{ + 0xce, 0, 0, 0, 0, // Length. + 0x82, // 2 element map. + KeyCode, byte(fut.requestCode), // Request code. + KeySync, 0xce, + byte(rid >> 24), byte(rid >> 16), + byte(rid >> 8), byte(rid), + }) + + if err = body(enc); err != nil { + return + } + + l := uint32(h.Len() - 5 - hl) + h.b[hl+1] = byte(l >> 24) + h.b[hl+2] = byte(l >> 16) + h.b[hl+3] = byte(l >> 8) + h.b[hl+4] = byte(l) + + return +} + +func (fut *Future) send(conn *Connection, body func(*msgpack.Encoder) error) *Future { + if fut.ready == nil { + return fut + } + conn.putFuture(fut, body) + return fut +} + +func (fut *Future) markReady(conn *Connection) { + close(fut.ready) + if conn.rlimit != nil { + <-conn.rlimit + } +} + +func (fut *Future) fail(conn *Connection, err error) *Future { + if f := conn.fetchFuture(fut.requestId); f == fut { + f.err = err + fut.markReady(conn) + } + return fut +} + +func (fut *Future) wait() { + if fut.ready == nil { + return + } + <-fut.ready +} diff --git a/request.go b/request.go index dd9486ae1..541fb2b4f 100644 --- a/request.go +++ b/request.go @@ -5,29 +5,17 @@ import ( "reflect" "strings" "sync" - "time" "gopkg.in/vmihailenco/msgpack.v2" ) -// Future is a handle for asynchronous request. -type Future struct { - requestId uint32 - requestCode int32 - timeout time.Duration - resp *Response - err error - ready chan struct{} - next *Future -} - // Ping sends empty request to Tarantool to check connection. func (conn *Connection) Ping() (resp *Response, err error) { future := conn.newFuture(PingRequest) return future.send(conn, func(enc *msgpack.Encoder) error { enc.EncodeMapLen(0); return nil }).Get() } -func (req *Future) fillSearch(enc *msgpack.Encoder, spaceNo, indexNo uint32, key interface{}) error { +func fillSearch(enc *msgpack.Encoder, spaceNo, indexNo uint32, key interface{}) error { enc.EncodeUint64(KeySpaceNo) enc.EncodeUint64(uint64(spaceNo)) enc.EncodeUint64(KeyIndexNo) @@ -36,7 +24,7 @@ func (req *Future) fillSearch(enc *msgpack.Encoder, spaceNo, indexNo uint32, key return enc.Encode(key) } -func (req *Future) fillIterator(enc *msgpack.Encoder, offset, limit, iterator uint32) { +func fillIterator(enc *msgpack.Encoder, offset, limit, iterator uint32) { enc.EncodeUint64(KeyIterator) enc.EncodeUint64(uint64(iterator)) enc.EncodeUint64(KeyOffset) @@ -45,7 +33,7 @@ func (req *Future) fillIterator(enc *msgpack.Encoder, offset, limit, iterator ui enc.EncodeUint64(uint64(limit)) } -func (req *Future) fillInsert(enc *msgpack.Encoder, spaceNo uint32, tuple interface{}) error { +func fillInsert(enc *msgpack.Encoder, spaceNo uint32, tuple interface{}) error { enc.EncodeUint64(KeySpaceNo) enc.EncodeUint64(uint64(spaceNo)) enc.EncodeUint64(KeyTuple) @@ -242,8 +230,8 @@ func (conn *Connection) SelectAsync(space, index interface{}, offset, limit, ite } return future.send(conn, func(enc *msgpack.Encoder) error { enc.EncodeMapLen(6) - future.fillIterator(enc, offset, limit, iterator) - return future.fillSearch(enc, spaceNo, indexNo, key) + fillIterator(enc, offset, limit, iterator) + return fillSearch(enc, spaceNo, indexNo, key) }) } @@ -257,7 +245,7 @@ func (conn *Connection) InsertAsync(space interface{}, tuple interface{}) *Futur } return future.send(conn, func(enc *msgpack.Encoder) error { enc.EncodeMapLen(2) - return future.fillInsert(enc, spaceNo, tuple) + return fillInsert(enc, spaceNo, tuple) }) } @@ -271,7 +259,7 @@ func (conn *Connection) ReplaceAsync(space interface{}, tuple interface{}) *Futu } return future.send(conn, func(enc *msgpack.Encoder) error { enc.EncodeMapLen(2) - return future.fillInsert(enc, spaceNo, tuple) + return fillInsert(enc, spaceNo, tuple) }) } @@ -285,7 +273,7 @@ func (conn *Connection) DeleteAsync(space, index interface{}, key interface{}) * } return future.send(conn, func(enc *msgpack.Encoder) error { enc.EncodeMapLen(3) - return future.fillSearch(enc, spaceNo, indexNo, key) + return fillSearch(enc, spaceNo, indexNo, key) }) } @@ -299,7 +287,7 @@ func (conn *Connection) UpdateAsync(space, index interface{}, key, ops interface } return future.send(conn, func(enc *msgpack.Encoder) error { enc.EncodeMapLen(4) - if err := future.fillSearch(enc, spaceNo, indexNo, key); err != nil { + if err := fillSearch(enc, spaceNo, indexNo, key); err != nil { return err } enc.EncodeUint64(KeyTuple) @@ -520,116 +508,3 @@ func encodeSQLBind(enc *msgpack.Encoder, from interface{}) error { } return nil } - -func (fut *Future) pack(h *smallWBuf, enc *msgpack.Encoder, body func(*msgpack.Encoder) error) (err error) { - rid := fut.requestId - hl := h.Len() - h.Write([]byte{ - 0xce, 0, 0, 0, 0, // length - 0x82, // 2 element map - KeyCode, byte(fut.requestCode), // request code - KeySync, 0xce, - byte(rid >> 24), byte(rid >> 16), - byte(rid >> 8), byte(rid), - }) - - if err = body(enc); err != nil { - return - } - - l := uint32(h.Len() - 5 - hl) - h.b[hl+1] = byte(l >> 24) - h.b[hl+2] = byte(l >> 16) - h.b[hl+3] = byte(l >> 8) - h.b[hl+4] = byte(l) - - return -} - -func (fut *Future) send(conn *Connection, body func(*msgpack.Encoder) error) *Future { - if fut.ready == nil { - return fut - } - conn.putFuture(fut, body) - return fut -} - -func (fut *Future) markReady(conn *Connection) { - close(fut.ready) - if conn.rlimit != nil { - <-conn.rlimit - } -} - -func (fut *Future) fail(conn *Connection, err error) *Future { - if f := conn.fetchFuture(fut.requestId); f == fut { - f.err = err - fut.markReady(conn) - } - return fut -} - -func (fut *Future) wait() { - if fut.ready == nil { - return - } - <-fut.ready -} - -// Get waits for Future to be filled and returns Response and error. -// -// Response will contain deserialized result in Data field. -// It will be []interface{}, so if you want more performace, use GetTyped method. -// -// Note: Response could be equal to nil if ClientError is returned in error. -// -// "error" could be Error, if it is error returned by Tarantool, -// or ClientError, if something bad happens in a client process. -func (fut *Future) Get() (*Response, error) { - fut.wait() - if fut.err != nil { - return fut.resp, fut.err - } - fut.err = fut.resp.decodeBody() - return fut.resp, fut.err -} - -// GetTyped waits for Future and calls msgpack.Decoder.Decode(result) if no error happens. -// It is could be much faster than Get() function. -// -// Note: Tarantool usually returns array of tuples (except for Eval and Call17 actions) -func (fut *Future) GetTyped(result interface{}) error { - fut.wait() - if fut.err != nil { - return fut.err - } - fut.err = fut.resp.decodeBodyTyped(result) - return fut.err -} - -var closedChan = make(chan struct{}) - -func init() { - close(closedChan) -} - -// WaitChan returns channel which becomes closed when response arrived or error occured. -func (fut *Future) WaitChan() <-chan struct{} { - if fut.ready == nil { - return closedChan - } - return fut.ready -} - -// Err returns error set on Future. -// It waits for future to be set. -// Note: it doesn't decode body, therefore decoding error are not set here. -func (fut *Future) Err() error { - fut.wait() - return fut.err -} - -// NewErrorFuture returns new set empty Future with filled error field. -func NewErrorFuture(err error) *Future { - return &Future{err: err} -} From f1b94201995eee8fbf086b0f0668f7adbc910007 Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Wed, 6 Apr 2022 15:40:49 +0300 Subject: [PATCH 2/2] api: support IPROTO_PUSH messages This patch adds support for receiving messages sent using box.session.push() via an iterator in the manner of asynchronous case of a Lua implementation[1]. Now the calls Future.Get() and Future.GetTyped() ignore push messages, and do not report an error. 1. https://www.tarantool.io/ru/doc/latest/reference/reference_lua/box_session/push/ Closes #67 --- CHANGELOG.md | 1 + config.lua | 8 ++ connection.go | 121 ++++++++++++++++----- const.go | 1 + example_test.go | 33 ++++++ future.go | 269 +++++++++++++++++++++++++++++++++++----------- future_test.go | 235 ++++++++++++++++++++++++++++++++++++++++ request.go | 34 +++--- response.go | 4 +- response_it.go | 26 +++++ tarantool_test.go | 236 +++++++++++++++++++++------------------- 11 files changed, 750 insertions(+), 218 deletions(-) create mode 100644 future_test.go create mode 100644 response_it.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a1f304a9..7f4e2f887 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. ### Added - SSL support (#155) +- IPROTO_PUSH messages support (#67) ### Changed diff --git a/config.lua b/config.lua index d024b3a28..abea45742 100644 --- a/config.lua +++ b/config.lua @@ -110,6 +110,14 @@ local function simple_incr(a) end rawset(_G, 'simple_incr', simple_incr) +local function push_func(cnt) + for i = 1, cnt do + box.session.push(i) + end + return cnt +end +rawset(_G, 'push_func', push_func) + box.space.test:truncate() --box.schema.user.revoke('guest', 'read,write,execute', 'universe') diff --git a/connection.go b/connection.go index 71c22890e..5f5425531 100644 --- a/connection.go +++ b/connection.go @@ -163,8 +163,9 @@ type Greeting struct { // Opts is a way to configure Connection type Opts struct { - // Timeout for any particular request. If Timeout is zero request, any - // request can be blocked infinitely. + // Timeout for response to a particular request. The timeout is reset when + // push messages are received. If Timeout is zero, any request can be + // blocked infinitely. // Also used to setup net.TCPConn.Set(Read|Write)Deadline. Timeout time.Duration // Timeout between reconnect attempts. If Reconnect is zero, no @@ -568,8 +569,8 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error) requests[pos].first = nil requests[pos].last = &requests[pos].first for fut != nil { - fut.err = neterr - fut.markReady(conn) + fut.SetError(neterr) + conn.markDone(fut) fut, fut.next = fut.next, nil } } @@ -685,26 +686,39 @@ func (conn *Connection) reader(r *bufio.Reader, c net.Conn) { conn.reconnect(err, c) return } - if fut := conn.fetchFuture(resp.RequestId); fut != nil { - fut.resp = resp - fut.markReady(conn) + + var fut *Future = nil + if resp.Code == PushCode { + if fut = conn.peekFuture(resp.RequestId); fut != nil { + fut.AppendPush(resp) + } } else { + if fut = conn.fetchFuture(resp.RequestId); fut != nil { + fut.SetResponse(resp) + conn.markDone(fut) + } + } + if fut == nil { conn.opts.Logger.Report(LogUnexpectedResultId, conn, resp) } } } func (conn *Connection) newFuture(requestCode int32) (fut *Future) { - fut = &Future{} + fut = NewFuture() if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop { select { case conn.rlimit <- struct{}{}: default: - fut.err = ClientError{ErrRateLimited, "Request is rate limited on client"} + fut.err = ClientError{ + ErrRateLimited, + "Request is rate limited on client", + } + fut.ready = nil + fut.done = nil return } } - fut.ready = make(chan struct{}) fut.requestId = conn.nextRequestId() fut.requestCode = requestCode shardn := fut.requestId & (conn.opts.Concurrency - 1) @@ -712,13 +726,21 @@ func (conn *Connection) newFuture(requestCode int32) (fut *Future) { shard.rmut.Lock() switch conn.state { case connClosed: - fut.err = ClientError{ErrConnectionClosed, "using closed connection"} + fut.err = ClientError{ + ErrConnectionClosed, + "using closed connection", + } fut.ready = nil + fut.done = nil shard.rmut.Unlock() return case connDisconnected: - fut.err = ClientError{ErrConnectionNotReady, "client connection is not ready"} + fut.err = ClientError{ + ErrConnectionNotReady, + "client connection is not ready", + } fut.ready = nil + fut.done = nil shard.rmut.Unlock() return } @@ -737,9 +759,9 @@ func (conn *Connection) newFuture(requestCode int32) (fut *Future) { runtime.Gosched() select { case conn.rlimit <- struct{}{}: - case <-fut.ready: + case <-fut.done: if fut.err == nil { - panic("fut.ready is closed, but err is nil") + panic("fut.done is closed, but err is nil") } } } @@ -747,12 +769,28 @@ func (conn *Connection) newFuture(requestCode int32) (fut *Future) { return } +func (conn *Connection) sendFuture(fut *Future, body func(*msgpack.Encoder) error) *Future { + if fut.ready == nil { + return fut + } + conn.putFuture(fut, body) + return fut +} + +func (conn *Connection) failFuture(fut *Future, err error) *Future { + if f := conn.fetchFuture(fut.requestId); f == fut { + fut.SetError(err) + conn.markDone(fut) + } + return fut +} + func (conn *Connection) putFuture(fut *Future, body func(*msgpack.Encoder) error) { shardn := fut.requestId & (conn.opts.Concurrency - 1) shard := &conn.shard[shardn] shard.bufmut.Lock() select { - case <-fut.ready: + case <-fut.done: shard.bufmut.Unlock() return default: @@ -767,8 +805,8 @@ func (conn *Connection) putFuture(fut *Future, body func(*msgpack.Encoder) error shard.buf.Trunc(blen) shard.bufmut.Unlock() if f := conn.fetchFuture(fut.requestId); f == fut { - fut.markReady(conn) - fut.err = err + fut.SetError(err) + conn.markDone(fut) } else if f != nil { /* in theory, it is possible. In practice, you have * to have race condition that lasts hours */ @@ -782,7 +820,7 @@ func (conn *Connection) putFuture(fut *Future, body func(*msgpack.Encoder) error // packing error is more important than connection // error, because it is indication of programmer's // mistake. - fut.err = err + fut.SetError(err) } } return @@ -793,15 +831,40 @@ func (conn *Connection) putFuture(fut *Future, body func(*msgpack.Encoder) error } } +func (conn *Connection) markDone(fut *Future) { + if conn.rlimit != nil { + <-conn.rlimit + } +} + +func (conn *Connection) peekFuture(reqid uint32) (fut *Future) { + shard := &conn.shard[reqid&(conn.opts.Concurrency-1)] + pos := (reqid / conn.opts.Concurrency) & (requestsMap - 1) + shard.rmut.Lock() + defer shard.rmut.Unlock() + + if conn.opts.Timeout > 0 { + fut = conn.getFutureImp(reqid, true) + pair := &shard.requests[pos] + *pair.last = fut + pair.last = &fut.next + fut.timeout = time.Since(epoch) + conn.opts.Timeout + } else { + fut = conn.getFutureImp(reqid, false) + } + + return fut +} + func (conn *Connection) fetchFuture(reqid uint32) (fut *Future) { shard := &conn.shard[reqid&(conn.opts.Concurrency-1)] shard.rmut.Lock() - fut = conn.fetchFutureImp(reqid) + fut = conn.getFutureImp(reqid, true) shard.rmut.Unlock() return fut } -func (conn *Connection) fetchFutureImp(reqid uint32) *Future { +func (conn *Connection) getFutureImp(reqid uint32, fetch bool) *Future { shard := &conn.shard[reqid&(conn.opts.Concurrency-1)] pos := (reqid / conn.opts.Concurrency) & (requestsMap - 1) pair := &shard.requests[pos] @@ -812,11 +875,13 @@ func (conn *Connection) fetchFutureImp(reqid uint32) *Future { return nil } if fut.requestId == reqid { - *root = fut.next - if fut.next == nil { - pair.last = root - } else { - fut.next = nil + if fetch { + *root = fut.next + if fut.next == nil { + pair.last = root + } else { + fut.next = nil + } } return fut } @@ -851,11 +916,11 @@ func (conn *Connection) timeouts() { } else { fut.next = nil } - fut.err = ClientError{ + fut.SetError(ClientError{ Code: ErrTimeouted, Msg: fmt.Sprintf("client timeout for request %d", fut.requestId), - } - fut.markReady(conn) + }) + conn.markDone(fut) shard.bufmut.Unlock() } if pair.first != nil && pair.first.timeout < minNext { diff --git a/const.go b/const.go index 5152f8e43..f542c8171 100644 --- a/const.go +++ b/const.go @@ -61,6 +61,7 @@ const ( RLimitWait = 2 OkCode = uint32(0) + PushCode = uint32(0x80) ErrorCodeBit = 0x8000 PacketLengthBytes = 5 ErSpaceExistsCode = 0xa diff --git a/example_test.go b/example_test.go index b14dc4e81..b8f8ee110 100644 --- a/example_test.go +++ b/example_test.go @@ -126,6 +126,39 @@ func ExampleConnection_SelectAsync() { // Future 2 Data [[18 val 18 bla]] } +func ExampleFuture_GetIterator() { + conn := example_connect() + defer conn.Close() + + const timeout = 3 * time.Second + // Or any other Connection.*Async() call. + fut := conn.Call17Async("push_func", []interface{}{4}) + + var it tarantool.ResponseIterator + for it = fut.GetIterator().WithTimeout(timeout); it.Next(); { + resp := it.Value() + if resp.Code == tarantool.PushCode { + // It is a push message. + fmt.Printf("push message: %d\n", resp.Data[0].(uint64)) + } else if resp.Code == tarantool.OkCode { + // It is a regular response. + fmt.Printf("response: %d", resp.Data[0].(uint64)) + } else { + fmt.Printf("an unexpected response code %d", resp.Code) + } + } + if err := it.Err(); err != nil { + fmt.Printf("error in call of push_func is %v", err) + return + } + // Output: + // push message: 1 + // push message: 2 + // push message: 3 + // push message: 4 + // response: 4 +} + func ExampleConnection_Ping() { conn := example_connect() defer conn.Close() diff --git a/future.go b/future.go index fbf141f82..c077e8271 100644 --- a/future.go +++ b/future.go @@ -1,6 +1,7 @@ package tarantool import ( + "sync" "time" "gopkg.in/vmihailenco/msgpack.v2" @@ -11,15 +12,193 @@ type Future struct { requestId uint32 requestCode int32 timeout time.Duration + mutex sync.Mutex + pushes []*Response resp *Response err error ready chan struct{} + done chan struct{} next *Future } +func (fut *Future) pack(h *smallWBuf, enc *msgpack.Encoder, body func(*msgpack.Encoder) error) (err error) { + rid := fut.requestId + hl := h.Len() + h.Write([]byte{ + 0xce, 0, 0, 0, 0, // Length. + 0x82, // 2 element map. + KeyCode, byte(fut.requestCode), // Request code. + KeySync, 0xce, + byte(rid >> 24), byte(rid >> 16), + byte(rid >> 8), byte(rid), + }) + + if err = body(enc); err != nil { + return + } + + l := uint32(h.Len() - 5 - hl) + h.b[hl+1] = byte(l >> 24) + h.b[hl+2] = byte(l >> 16) + h.b[hl+3] = byte(l >> 8) + h.b[hl+4] = byte(l) + + return +} + +func (fut *Future) wait() { + if fut.done == nil { + return + } + <-fut.done +} + +func (fut *Future) isDone() bool { + if fut.done == nil { + return true + } + select { + case <-fut.done: + return true + default: + return false + } +} + +type asyncResponseIterator struct { + fut *Future + timeout time.Duration + resp *Response + err error + curPos int + done bool +} + +func (it *asyncResponseIterator) Next() bool { + if it.done || it.err != nil { + it.resp = nil + return false + } + + var last = false + var exit = false + for !exit { + // We try to read at least once. + it.fut.mutex.Lock() + it.resp = it.nextResponse() + it.err = it.fut.err + last = it.resp == it.fut.resp + it.fut.mutex.Unlock() + + if it.timeout == 0 || it.resp != nil || it.err != nil { + break + } + + select { + case <-it.fut.ready: + case <-time.After(it.timeout): + exit = true + } + } + + if it.resp == nil { + return false + } + + if it.err = it.resp.decodeBody(); it.err != nil { + it.resp = nil + return false + } + + if last { + it.done = true + } else { + it.curPos += 1 + } + + return true +} + +func (it *asyncResponseIterator) Value() *Response { + return it.resp +} + +func (it *asyncResponseIterator) Err() error { + return it.err +} + +func (it *asyncResponseIterator) WithTimeout(timeout time.Duration) TimeoutResponseIterator { + it.timeout = timeout + return it +} + +func (it *asyncResponseIterator) nextResponse() (resp *Response) { + fut := it.fut + pushesLen := len(fut.pushes) + + if it.curPos < pushesLen { + resp = fut.pushes[it.curPos] + } else if it.curPos == pushesLen { + resp = fut.resp + } + + return resp +} + +// NewFuture creates a new empty Future. +func NewFuture() (fut *Future) { + fut = &Future{} + fut.ready = make(chan struct{}, 1000000000) + fut.done = make(chan struct{}) + fut.pushes = make([]*Response, 0) + return fut +} + // NewErrorFuture returns new set empty Future with filled error field. func NewErrorFuture(err error) *Future { - return &Future{err: err} + fut := NewFuture() + fut.SetError(err) + return fut +} + +// AppendPush appends the push response to the future. +// Note: it works only before SetResponse() or SetError() +func (fut *Future) AppendPush(resp *Response) { + if fut.isDone() { + return + } + resp.Code = PushCode + fut.mutex.Lock() + fut.pushes = append(fut.pushes, resp) + fut.mutex.Unlock() + + fut.ready <- struct{}{} +} + +// SetResponse sets a response for the future and finishes the future. +func (fut *Future) SetResponse(resp *Response) { + if fut.isDone() { + return + } + fut.mutex.Lock() + fut.resp = resp + fut.mutex.Unlock() + + close(fut.ready) + close(fut.done) +} + +// SetError sets an error for the future and finishes the future. +func (fut *Future) SetError(err error) { + if fut.isDone() { + return + } + fut.mutex.Lock() + fut.err = err + fut.mutex.Unlock() + + close(fut.ready) + close(fut.done) } // Get waits for Future to be filled and returns Response and error. @@ -36,8 +215,11 @@ func (fut *Future) Get() (*Response, error) { if fut.err != nil { return fut.resp, fut.err } - fut.err = fut.resp.decodeBody() - return fut.resp, fut.err + err := fut.resp.decodeBody() + if err != nil { + fut.err = err + } + return fut.resp, err } // GetTyped waits for Future and calls msgpack.Decoder.Decode(result) if no error happens. @@ -49,8 +231,26 @@ func (fut *Future) GetTyped(result interface{}) error { if fut.err != nil { return fut.err } - fut.err = fut.resp.decodeBodyTyped(result) - return fut.err + err := fut.resp.decodeBodyTyped(result) + if err != nil { + fut.err = err + } + return err +} + +// GetIterator returns an iterator for iterating through push messages +// and a response. Push messages and the response will contain deserialized +// result in Data field as for the Get() function. +// +// See also +// +// * box.session.push() https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_session/push/ +// +func (fut *Future) GetIterator() (it TimeoutResponseIterator) { + futit := &asyncResponseIterator{ + fut: fut, + } + return futit } var closedChan = make(chan struct{}) @@ -61,10 +261,10 @@ func init() { // WaitChan returns channel which becomes closed when response arrived or error occured. func (fut *Future) WaitChan() <-chan struct{} { - if fut.ready == nil { + if fut.done == nil { return closedChan } - return fut.ready + return fut.done } // Err returns error set on Future. @@ -74,58 +274,3 @@ func (fut *Future) Err() error { fut.wait() return fut.err } - -func (fut *Future) pack(h *smallWBuf, enc *msgpack.Encoder, body func(*msgpack.Encoder) error) (err error) { - rid := fut.requestId - hl := h.Len() - h.Write([]byte{ - 0xce, 0, 0, 0, 0, // Length. - 0x82, // 2 element map. - KeyCode, byte(fut.requestCode), // Request code. - KeySync, 0xce, - byte(rid >> 24), byte(rid >> 16), - byte(rid >> 8), byte(rid), - }) - - if err = body(enc); err != nil { - return - } - - l := uint32(h.Len() - 5 - hl) - h.b[hl+1] = byte(l >> 24) - h.b[hl+2] = byte(l >> 16) - h.b[hl+3] = byte(l >> 8) - h.b[hl+4] = byte(l) - - return -} - -func (fut *Future) send(conn *Connection, body func(*msgpack.Encoder) error) *Future { - if fut.ready == nil { - return fut - } - conn.putFuture(fut, body) - return fut -} - -func (fut *Future) markReady(conn *Connection) { - close(fut.ready) - if conn.rlimit != nil { - <-conn.rlimit - } -} - -func (fut *Future) fail(conn *Connection, err error) *Future { - if f := conn.fetchFuture(fut.requestId); f == fut { - f.err = err - fut.markReady(conn) - } - return fut -} - -func (fut *Future) wait() { - if fut.ready == nil { - return - } - <-fut.ready -} diff --git a/future_test.go b/future_test.go new file mode 100644 index 000000000..d6d800530 --- /dev/null +++ b/future_test.go @@ -0,0 +1,235 @@ +package tarantool_test + +import ( + "errors" + "sync" + "testing" + "time" + + . "github.com/tarantool/go-tarantool" +) + +func assertResponseIteratorValue(t testing.TB, it ResponseIterator, + code uint32, resp *Response) { + t.Helper() + + if it.Err() != nil { + t.Errorf("An unexpected iteration error: %q", it.Err().Error()) + } + + if it.Value() == nil { + t.Errorf("An unexpected nil value") + } else if it.Value().Code != code { + t.Errorf("An unexpected response code %d, expected %d", it.Value().Code, code) + } + + if it.Value() != resp { + t.Errorf("An unexpected response %v, expected %v", it.Value(), resp) + } +} + +func assertResponseIteratorFinished(t testing.TB, it ResponseIterator) { + t.Helper() + + if it.Err() != nil { + t.Errorf("An unexpected iteration error: %q", it.Err().Error()) + } + if it.Value() != nil { + t.Errorf("An unexpected value %v", it.Value()) + } +} + +func TestFutureGetIteratorNoItems(t *testing.T) { + fut := NewFuture() + + it := fut.GetIterator() + if it.Next() { + t.Errorf("An unexpected next value.") + } else { + assertResponseIteratorFinished(t, it) + } +} + +func TestFutureGetIteratorNoResponse(t *testing.T) { + push := &Response{} + fut := NewFuture() + fut.AppendPush(push) + + if it := fut.GetIterator(); it.Next() { + assertResponseIteratorValue(t, it, PushCode, push) + if it.Next() == true { + t.Errorf("An unexpected next value.") + } + assertResponseIteratorFinished(t, it) + } else { + t.Errorf("A push message expected.") + } +} + +func TestFutureGetIteratorNoResponseTimeout(t *testing.T) { + push := &Response{} + fut := NewFuture() + fut.AppendPush(push) + + if it := fut.GetIterator().WithTimeout(1 * time.Nanosecond); it.Next() { + assertResponseIteratorValue(t, it, PushCode, push) + if it.Next() == true { + t.Errorf("An unexpected next value.") + } + assertResponseIteratorFinished(t, it) + } else { + t.Errorf("A push message expected.") + } +} + +func TestFutureGetIteratorResponseOnTimeout(t *testing.T) { + push := &Response{} + resp := &Response{} + fut := NewFuture() + fut.AppendPush(push) + + var done sync.WaitGroup + var wait sync.WaitGroup + wait.Add(1) + done.Add(1) + + go func() { + defer done.Done() + + var it ResponseIterator + var cnt = 0 + for it = fut.GetIterator().WithTimeout(5 * time.Second); it.Next(); { + code := PushCode + r := push + if cnt == 1 { + code = OkCode + r = resp + } + assertResponseIteratorValue(t, it, code, r) + cnt += 1 + if cnt == 1 { + wait.Done() + } + } + assertResponseIteratorFinished(t, it) + + if cnt != 2 { + t.Errorf("An unexpected count of responses %d != %d", cnt, 2) + } + }() + + wait.Wait() + fut.SetResponse(resp) + done.Wait() +} + +func TestFutureGetIteratorFirstResponse(t *testing.T) { + resp1 := &Response{} + resp2 := &Response{} + fut := NewFuture() + fut.SetResponse(resp1) + fut.SetResponse(resp2) + + if it := fut.GetIterator(); it.Next() { + assertResponseIteratorValue(t, it, OkCode, resp1) + if it.Next() == true { + t.Errorf("An unexpected next value.") + } + assertResponseIteratorFinished(t, it) + } else { + t.Errorf("A response expected.") + } +} + +func TestFutureGetIteratorFirstError(t *testing.T) { + const errMsg1 = "error1" + const errMsg2 = "error2" + + fut := NewFuture() + fut.SetError(errors.New(errMsg1)) + fut.SetError(errors.New(errMsg2)) + + it := fut.GetIterator() + if it.Next() { + t.Errorf("An unexpected value.") + } else if it.Err() == nil { + t.Errorf("An error expected.") + } else if it.Err().Error() != errMsg1 { + t.Errorf("An unexpected error %q, expected %q", it.Err().Error(), errMsg1) + } +} + +func TestFutureGetIteratorResponse(t *testing.T) { + responses := []*Response{ + {}, + {}, + {Code: OkCode}, + } + fut := NewFuture() + for i, resp := range responses { + if i == len(responses)-1 { + fut.SetResponse(resp) + } else { + fut.AppendPush(resp) + } + } + + var its = []ResponseIterator{ + fut.GetIterator(), + fut.GetIterator().WithTimeout(5 * time.Second), + } + for _, it := range its { + var cnt = 0 + for it.Next() { + code := PushCode + if cnt == len(responses)-1 { + code = OkCode + } + assertResponseIteratorValue(t, it, code, responses[cnt]) + cnt += 1 + } + assertResponseIteratorFinished(t, it) + + if cnt != len(responses) { + t.Errorf("An unexpected count of responses %d != %d", cnt, len(responses)) + } + } +} + +func TestFutureGetIteratorError(t *testing.T) { + const errMsg = "error message" + responses := []*Response{ + {}, + {}, + } + err := errors.New(errMsg) + fut := NewFuture() + for _, resp := range responses { + fut.AppendPush(resp) + } + fut.SetError(err) + + var its = []ResponseIterator{ + fut.GetIterator(), + fut.GetIterator().WithTimeout(5 * time.Second), + } + for _, it := range its { + var cnt = 0 + for it.Next() { + code := PushCode + assertResponseIteratorValue(t, it, code, responses[cnt]) + cnt += 1 + } + if err = it.Err(); err != nil { + if err.Error() != errMsg { + t.Errorf("An unexpected error %q, expected %q", err.Error(), errMsg) + } + } else { + t.Errorf("An error expected.") + } + + if cnt != len(responses) { + t.Errorf("An unexpected count of responses %d != %d", cnt, len(responses)) + } + } +} diff --git a/request.go b/request.go index 541fb2b4f..e672f0b17 100644 --- a/request.go +++ b/request.go @@ -12,7 +12,7 @@ import ( // Ping sends empty request to Tarantool to check connection. func (conn *Connection) Ping() (resp *Response, err error) { future := conn.newFuture(PingRequest) - return future.send(conn, func(enc *msgpack.Encoder) error { enc.EncodeMapLen(0); return nil }).Get() + return conn.sendFuture(future, func(enc *msgpack.Encoder) error { enc.EncodeMapLen(0); return nil }).Get() } func fillSearch(enc *msgpack.Encoder, spaceNo, indexNo uint32, key interface{}) error { @@ -226,9 +226,9 @@ func (conn *Connection) SelectAsync(space, index interface{}, offset, limit, ite future := conn.newFuture(SelectRequest) spaceNo, indexNo, err := conn.Schema.resolveSpaceIndex(space, index) if err != nil { - return future.fail(conn, err) + return conn.failFuture(future, err) } - return future.send(conn, func(enc *msgpack.Encoder) error { + return conn.sendFuture(future, func(enc *msgpack.Encoder) error { enc.EncodeMapLen(6) fillIterator(enc, offset, limit, iterator) return fillSearch(enc, spaceNo, indexNo, key) @@ -241,9 +241,9 @@ func (conn *Connection) InsertAsync(space interface{}, tuple interface{}) *Futur future := conn.newFuture(InsertRequest) spaceNo, _, err := conn.Schema.resolveSpaceIndex(space, nil) if err != nil { - return future.fail(conn, err) + return conn.failFuture(future, err) } - return future.send(conn, func(enc *msgpack.Encoder) error { + return conn.sendFuture(future, func(enc *msgpack.Encoder) error { enc.EncodeMapLen(2) return fillInsert(enc, spaceNo, tuple) }) @@ -255,9 +255,9 @@ func (conn *Connection) ReplaceAsync(space interface{}, tuple interface{}) *Futu future := conn.newFuture(ReplaceRequest) spaceNo, _, err := conn.Schema.resolveSpaceIndex(space, nil) if err != nil { - return future.fail(conn, err) + return conn.failFuture(future, err) } - return future.send(conn, func(enc *msgpack.Encoder) error { + return conn.sendFuture(future, func(enc *msgpack.Encoder) error { enc.EncodeMapLen(2) return fillInsert(enc, spaceNo, tuple) }) @@ -269,9 +269,9 @@ func (conn *Connection) DeleteAsync(space, index interface{}, key interface{}) * future := conn.newFuture(DeleteRequest) spaceNo, indexNo, err := conn.Schema.resolveSpaceIndex(space, index) if err != nil { - return future.fail(conn, err) + return conn.failFuture(future, err) } - return future.send(conn, func(enc *msgpack.Encoder) error { + return conn.sendFuture(future, func(enc *msgpack.Encoder) error { enc.EncodeMapLen(3) return fillSearch(enc, spaceNo, indexNo, key) }) @@ -283,9 +283,9 @@ func (conn *Connection) UpdateAsync(space, index interface{}, key, ops interface future := conn.newFuture(UpdateRequest) spaceNo, indexNo, err := conn.Schema.resolveSpaceIndex(space, index) if err != nil { - return future.fail(conn, err) + return conn.failFuture(future, err) } - return future.send(conn, func(enc *msgpack.Encoder) error { + return conn.sendFuture(future, func(enc *msgpack.Encoder) error { enc.EncodeMapLen(4) if err := fillSearch(enc, spaceNo, indexNo, key); err != nil { return err @@ -301,9 +301,9 @@ func (conn *Connection) UpsertAsync(space interface{}, tuple interface{}, ops in future := conn.newFuture(UpsertRequest) spaceNo, _, err := conn.Schema.resolveSpaceIndex(space, nil) if err != nil { - return future.fail(conn, err) + return conn.failFuture(future, err) } - return future.send(conn, func(enc *msgpack.Encoder) error { + return conn.sendFuture(future, func(enc *msgpack.Encoder) error { enc.EncodeMapLen(3) enc.EncodeUint64(KeySpaceNo) enc.EncodeUint64(uint64(spaceNo)) @@ -320,7 +320,7 @@ func (conn *Connection) UpsertAsync(space interface{}, tuple interface{}, ops in // It uses request code for Tarantool 1.6, so future's result is always array of arrays func (conn *Connection) CallAsync(functionName string, args interface{}) *Future { future := conn.newFuture(CallRequest) - return future.send(conn, func(enc *msgpack.Encoder) error { + return conn.sendFuture(future, func(enc *msgpack.Encoder) error { enc.EncodeMapLen(2) enc.EncodeUint64(KeyFunctionName) enc.EncodeString(functionName) @@ -334,7 +334,7 @@ func (conn *Connection) CallAsync(functionName string, args interface{}) *Future // (though, keep in mind, result is always array) func (conn *Connection) Call17Async(functionName string, args interface{}) *Future { future := conn.newFuture(Call17Request) - return future.send(conn, func(enc *msgpack.Encoder) error { + return conn.sendFuture(future, func(enc *msgpack.Encoder) error { enc.EncodeMapLen(2) enc.EncodeUint64(KeyFunctionName) enc.EncodeString(functionName) @@ -346,7 +346,7 @@ func (conn *Connection) Call17Async(functionName string, args interface{}) *Futu // EvalAsync sends a Lua expression for evaluation and returns Future. func (conn *Connection) EvalAsync(expr string, args interface{}) *Future { future := conn.newFuture(EvalRequest) - return future.send(conn, func(enc *msgpack.Encoder) error { + return conn.sendFuture(future, func(enc *msgpack.Encoder) error { enc.EncodeMapLen(2) enc.EncodeUint64(KeyExpression) enc.EncodeString(expr) @@ -359,7 +359,7 @@ func (conn *Connection) EvalAsync(expr string, args interface{}) *Future { // Since 1.6.0 func (conn *Connection) ExecuteAsync(expr string, args interface{}) *Future { future := conn.newFuture(ExecuteRequest) - return future.send(conn, func(enc *msgpack.Encoder) error { + return conn.sendFuture(future, func(enc *msgpack.Encoder) error { enc.EncodeMapLen(2) enc.EncodeUint64(KeySQLText) enc.EncodeString(expr) diff --git a/response.go b/response.go index 9fcca64da..b7a15706f 100644 --- a/response.go +++ b/response.go @@ -184,7 +184,7 @@ func (resp *Response) decodeBody() (err error) { } } } - if resp.Code != OkCode { + if resp.Code != OkCode && resp.Code != PushCode { resp.Code &^= ErrorCodeBit err = Error{resp.Code, resp.Error} } @@ -227,7 +227,7 @@ func (resp *Response) decodeBodyTyped(res interface{}) (err error) { } } } - if resp.Code != OkCode { + if resp.Code != OkCode && resp.Code != PushCode { resp.Code &^= ErrorCodeBit err = Error{resp.Code, resp.Error} } diff --git a/response_it.go b/response_it.go new file mode 100644 index 000000000..3ae296449 --- /dev/null +++ b/response_it.go @@ -0,0 +1,26 @@ +package tarantool + +import ( + "time" +) + +// ResponseIterator is an interface for iteration over a set of responses. +type ResponseIterator interface { + // Next tries to switch to a next Response and returns true if it exists. + Next() bool + // Value returns a current Response if it exists, nil otherwise. + Value() *Response + // Err returns error if it happens. + Err() error +} + +// TimeoutResponseIterator is an interface that extends ResponseIterator +// and adds the ability to change a timeout for the Next() call. +type TimeoutResponseIterator interface { + ResponseIterator + // WithTimeout allows to set up a timeout for the Next() call. + // Note: in the current implementation, there is a timeout for each + // response (the timeout for the request is reset by each push message): + // Connection's Opts.Timeout. You need to increase the value if necessary. + WithTimeout(timeout time.Duration) TimeoutResponseIterator +} diff --git a/tarantool_test.go b/tarantool_test.go index acd8577c9..438f3a18b 100644 --- a/tarantool_test.go +++ b/tarantool_test.go @@ -22,6 +22,19 @@ type Member struct { Val uint } +func connect(t testing.TB, server string, opts Opts) (conn *Connection) { + t.Helper() + + conn, err := Connect(server, opts) + if err != nil { + t.Fatalf("Failed to connect: %s", err.Error()) + } + if conn == nil { + t.Fatalf("conn is nil after Connect") + } + return conn +} + func (m *Member) EncodeMsgpack(e *msgpack.Encoder) error { if err := e.EncodeSliceLen(2); err != nil { return err @@ -71,11 +84,7 @@ const N = 500 func BenchmarkClientSerial(b *testing.B) { var err error - conn, err := Connect(server, opts) - if err != nil { - b.Errorf("No connection available") - return - } + conn := connect(b, server, opts) defer conn.Close() _, err = conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) @@ -95,11 +104,7 @@ func BenchmarkClientSerial(b *testing.B) { func BenchmarkClientSerialTyped(b *testing.B) { var err error - conn, err := Connect(server, opts) - if err != nil { - b.Errorf("No connection available") - return - } + conn := connect(b, server, opts) defer conn.Close() _, err = conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) @@ -120,11 +125,7 @@ func BenchmarkClientSerialTyped(b *testing.B) { func BenchmarkClientFuture(b *testing.B) { var err error - conn, err := Connect(server, opts) - if err != nil { - b.Error(err) - return - } + conn := connect(b, server, opts) defer conn.Close() _, err = conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) @@ -151,11 +152,7 @@ func BenchmarkClientFuture(b *testing.B) { func BenchmarkClientFutureTyped(b *testing.B) { var err error - conn, err := Connect(server, opts) - if err != nil { - b.Errorf("No connection available") - return - } + conn := connect(b, server, opts) defer conn.Close() _, err = conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) @@ -185,11 +182,7 @@ func BenchmarkClientFutureTyped(b *testing.B) { func BenchmarkClientFutureParallel(b *testing.B) { var err error - conn, err := Connect(server, opts) - if err != nil { - b.Errorf("No connection available") - return - } + conn := connect(b, server, opts) defer conn.Close() _, err = conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) @@ -222,11 +215,7 @@ func BenchmarkClientFutureParallel(b *testing.B) { func BenchmarkClientFutureParallelTyped(b *testing.B) { var err error - conn, err := Connect(server, opts) - if err != nil { - b.Errorf("No connection available") - return - } + conn := connect(b, server, opts) defer conn.Close() _, err = conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) @@ -262,14 +251,10 @@ func BenchmarkClientFutureParallelTyped(b *testing.B) { } func BenchmarkClientParallel(b *testing.B) { - conn, err := Connect(server, opts) - if err != nil { - b.Errorf("No connection available") - return - } + conn := connect(b, server, opts) defer conn.Close() - _, err = conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) + _, err := conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) if err != nil { b.Fatal("No connection available") } @@ -287,14 +272,10 @@ func BenchmarkClientParallel(b *testing.B) { } func BenchmarkClientParallelMassive(b *testing.B) { - conn, err := Connect(server, opts) - if err != nil { - b.Errorf("No connection available") - return - } + conn := connect(b, server, opts) defer conn.Close() - _, err = conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) + _, err := conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) if err != nil { b.Fatal("No connection available") } @@ -326,14 +307,10 @@ func BenchmarkClientParallelMassive(b *testing.B) { } func BenchmarkClientParallelMassiveUntyped(b *testing.B) { - conn, err := Connect(server, opts) - if err != nil { - b.Errorf("No connection available") - return - } + conn := connect(b, server, opts) defer conn.Close() - _, err = conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) + _, err := conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) if err != nil { b.Errorf("No connection available") } @@ -488,15 +465,8 @@ func BenchmarkSQLSerial(b *testing.B) { func TestClient(t *testing.T) { var resp *Response var err error - var conn *Connection - conn, err = Connect(server, opts) - if err != nil { - t.Fatalf("Failed to connect: %s", err.Error()) - } - if conn == nil { - t.Fatalf("conn is nil after Connect") - } + conn := connect(t, server, opts) defer conn.Close() // Ping @@ -798,6 +768,100 @@ func TestClient(t *testing.T) { } } +func TestClientSessionPush(t *testing.T) { + conn := connect(t, server, opts) + defer conn.Close() + + var it ResponseIterator + const pushMax = 3 + // It will be iterated immediately. + fut0 := conn.Call17Async("push_func", []interface{}{pushMax}) + respCnt := 0 + for it = fut0.GetIterator(); it.Next(); { + err := it.Err() + resp := it.Value() + if err != nil { + t.Errorf("Unexpected error after it.Next() == true: %q", err.Error()) + break + } + if resp == nil { + t.Errorf("Response is empty after it.Next() == true") + break + } + respCnt += 1 + } + if err := it.Err(); err != nil { + t.Errorf("An unexpected iteration error: %s", err.Error()) + } + if respCnt > pushMax+1 { + t.Errorf("Unexpected respCnt = %d, expected 0 <= respCnt <= %d", respCnt, pushMax+1) + } + _, _ = fut0.Get() + + // It will wait a response before iteration. + fut1 := conn.Call17Async("push_func", []interface{}{pushMax}) + // Future.Get ignores push messages. + resp, err := fut1.Get() + if err != nil { + t.Errorf("Failed to Call: %s", err.Error()) + } else if resp == nil { + t.Errorf("Response is nil after CallAsync") + } else if len(resp.Data) < 1 { + t.Errorf("Response.Data is empty after CallAsync") + } else if resp.Data[0].(uint64) != pushMax { + t.Errorf("result is not {{1}} : %v", resp.Data) + } + + // It will will be iterated with a timeout. + fut2 := conn.Call17Async("push_func", []interface{}{pushMax}) + + var its = []ResponseIterator{ + fut1.GetIterator(), + fut2.GetIterator().WithTimeout(5 * time.Second), + } + + for i := 0; i < len(its); i++ { + pushCnt := uint64(0) + respCnt := uint64(0) + + it = its[i] + for it.Next() { + resp = it.Value() + if resp == nil { + t.Errorf("Response is empty after it.Next() == true") + break + } + if len(resp.Data) < 1 { + t.Errorf("Response.Data is empty after CallAsync") + break + } + if resp.Code == PushCode { + pushCnt += 1 + if resp.Data[0].(uint64) != pushCnt { + t.Errorf("Unexpected push data = %v", resp.Data) + } + } else { + respCnt += 1 + if resp.Data[0].(uint64) != pushMax { + t.Errorf("result is not {{1}} : %v", resp.Data) + } + } + } + + if err = it.Err(); err != nil { + t.Errorf("An unexpected iteration error: %s", err.Error()) + } + + if pushCnt != pushMax { + t.Errorf("Expect %d pushes but got %d", pushMax, pushCnt) + } + + if respCnt != 1 { + t.Errorf("Expect %d responses but got %d", 1, respCnt) + } + } +} + const ( createTableQuery = "CREATE TABLE SQL_SPACE (id INTEGER PRIMARY KEY AUTOINCREMENT, name STRING COLLATE \"unicode\" DEFAULT NULL);" insertQuery = "INSERT INTO SQL_SPACE VALUES (?, ?);" @@ -974,10 +1038,7 @@ func TestSQL(t *testing.T) { }, } - var conn *Connection - conn, err = Connect(server, opts) - assert.Nil(t, err, "Failed to Connect") - assert.NotNil(t, conn, "conn is nil after Connect") + conn := connect(t, server, opts) defer conn.Close() for i, test := range testCases { @@ -1011,15 +1072,7 @@ func TestSQLTyped(t *testing.T) { t.Skip() } - var conn *Connection - - conn, err = Connect(server, opts) - if err != nil { - t.Fatalf("Failed to connect: %s", err.Error()) - } - if conn == nil { - t.Fatal("conn is nil after Connect") - } + conn := connect(t, server, opts) defer conn.Close() mem := []Member{} @@ -1054,15 +1107,8 @@ func TestSQLBindings(t *testing.T) { } var resp *Response - var conn *Connection - conn, err = Connect(server, opts) - if err != nil { - t.Fatalf("Failed to connect: %s", err.Error()) - } - if conn == nil { - t.Fatal("conn is nil after Connect") - } + conn := connect(t, server, opts) defer conn.Close() // test all types of supported bindings @@ -1170,15 +1216,8 @@ func TestStressSQL(t *testing.T) { } var resp *Response - var conn *Connection - conn, err = Connect(server, opts) - if err != nil { - t.Fatalf("Failed to connect: %s", err.Error()) - } - if conn == nil { - t.Fatalf("conn is nil after Connect") - } + conn := connect(t, server, opts) defer conn.Close() resp, err = conn.Execute(createTableQuery, []interface{}{}) @@ -1273,15 +1312,8 @@ func TestStressSQL(t *testing.T) { func TestSchema(t *testing.T) { var err error - var conn *Connection - conn, err = Connect(server, opts) - if err != nil { - t.Fatalf("Failed to connect: %s", err.Error()) - } - if conn == nil { - t.Fatalf("conn is nil after Connect") - } + conn := connect(t, server, opts) defer conn.Close() // Schema @@ -1455,15 +1487,8 @@ func TestSchema(t *testing.T) { func TestClientNamed(t *testing.T) { var resp *Response var err error - var conn *Connection - conn, err = Connect(server, opts) - if err != nil { - t.Fatalf("Failed to connect: %s", err.Error()) - } - if conn == nil { - t.Fatalf("conn is nil after Connect") - } + conn := connect(t, server, opts) defer conn.Close() // Insert @@ -1551,15 +1576,8 @@ func TestClientNamed(t *testing.T) { func TestComplexStructs(t *testing.T) { var err error - var conn *Connection - conn, err = Connect(server, opts) - if err != nil { - t.Fatalf("Failed to connect: %s", err.Error()) - } - if conn == nil { - t.Fatalf("conn is nil after Connect") - } + conn := connect(t, server, opts) defer conn.Close() tuple := Tuple2{Cid: 777, Orig: "orig", Members: []Member{{"lol", "", 1}, {"wut", "", 3}}}