Skip to content

Support IPROTO_PUSH messages #156

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
### Added

- SSL support (#155)
- IPROTO_PUSH messages support (#67)

### Changed

Expand Down
8 changes: 8 additions & 0 deletions config.lua
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ local function simple_incr(a)
end
rawset(_G, 'simple_incr', simple_incr)

local function push_func(cnt)
for i = 1, cnt do
box.session.push(i)
end
return cnt
end
rawset(_G, 'push_func', push_func)

box.space.test:truncate()

--box.schema.user.revoke('guest', 'read,write,execute', 'universe')
Expand Down
121 changes: 93 additions & 28 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,9 @@ type Greeting struct {

// Opts is a way to configure Connection
type Opts struct {
// Timeout for any particular request. If Timeout is zero request, any
// request can be blocked infinitely.
// Timeout for response to a particular request. The timeout is reset when
// push messages are received. If Timeout is zero, any request can be
// blocked infinitely.
// Also used to setup net.TCPConn.Set(Read|Write)Deadline.
Timeout time.Duration
// Timeout between reconnect attempts. If Reconnect is zero, no
Expand Down Expand Up @@ -568,8 +569,8 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
requests[pos].first = nil
requests[pos].last = &requests[pos].first
for fut != nil {
fut.err = neterr
fut.markReady(conn)
fut.SetError(neterr)
conn.markDone(fut)
fut, fut.next = fut.next, nil
}
}
Expand Down Expand Up @@ -685,40 +686,61 @@ func (conn *Connection) reader(r *bufio.Reader, c net.Conn) {
conn.reconnect(err, c)
return
}
if fut := conn.fetchFuture(resp.RequestId); fut != nil {
fut.resp = resp
fut.markReady(conn)

var fut *Future = nil
if resp.Code == PushCode {
if fut = conn.peekFuture(resp.RequestId); fut != nil {
fut.AppendPush(resp)
}
} else {
if fut = conn.fetchFuture(resp.RequestId); fut != nil {
fut.SetResponse(resp)
conn.markDone(fut)
}
}
if fut == nil {
conn.opts.Logger.Report(LogUnexpectedResultId, conn, resp)
}
}
}

func (conn *Connection) newFuture(requestCode int32) (fut *Future) {
fut = &Future{}
fut = NewFuture()
if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop {
select {
case conn.rlimit <- struct{}{}:
default:
fut.err = ClientError{ErrRateLimited, "Request is rate limited on client"}
fut.err = ClientError{
ErrRateLimited,
"Request is rate limited on client",
}
fut.ready = nil
fut.done = nil
return
}
}
fut.ready = make(chan struct{})
fut.requestId = conn.nextRequestId()
fut.requestCode = requestCode
shardn := fut.requestId & (conn.opts.Concurrency - 1)
shard := &conn.shard[shardn]
shard.rmut.Lock()
switch conn.state {
case connClosed:
fut.err = ClientError{ErrConnectionClosed, "using closed connection"}
fut.err = ClientError{
ErrConnectionClosed,
"using closed connection",
}
fut.ready = nil
fut.done = nil
shard.rmut.Unlock()
return
case connDisconnected:
fut.err = ClientError{ErrConnectionNotReady, "client connection is not ready"}
fut.err = ClientError{
ErrConnectionNotReady,
"client connection is not ready",
}
fut.ready = nil
fut.done = nil
shard.rmut.Unlock()
return
}
Expand All @@ -737,22 +759,38 @@ func (conn *Connection) newFuture(requestCode int32) (fut *Future) {
runtime.Gosched()
select {
case conn.rlimit <- struct{}{}:
case <-fut.ready:
case <-fut.done:
if fut.err == nil {
panic("fut.ready is closed, but err is nil")
panic("fut.done is closed, but err is nil")
}
}
}
}
return
}

func (conn *Connection) sendFuture(fut *Future, body func(*msgpack.Encoder) error) *Future {
if fut.ready == nil {
return fut
}
conn.putFuture(fut, body)
return fut
}

func (conn *Connection) failFuture(fut *Future, err error) *Future {
if f := conn.fetchFuture(fut.requestId); f == fut {
fut.SetError(err)
conn.markDone(fut)
}
return fut
}

func (conn *Connection) putFuture(fut *Future, body func(*msgpack.Encoder) error) {
shardn := fut.requestId & (conn.opts.Concurrency - 1)
shard := &conn.shard[shardn]
shard.bufmut.Lock()
select {
case <-fut.ready:
case <-fut.done:
shard.bufmut.Unlock()
return
default:
Expand All @@ -767,8 +805,8 @@ func (conn *Connection) putFuture(fut *Future, body func(*msgpack.Encoder) error
shard.buf.Trunc(blen)
shard.bufmut.Unlock()
if f := conn.fetchFuture(fut.requestId); f == fut {
fut.markReady(conn)
fut.err = err
fut.SetError(err)
conn.markDone(fut)
} else if f != nil {
/* in theory, it is possible. In practice, you have
* to have race condition that lasts hours */
Expand All @@ -782,7 +820,7 @@ func (conn *Connection) putFuture(fut *Future, body func(*msgpack.Encoder) error
// packing error is more important than connection
// error, because it is indication of programmer's
// mistake.
fut.err = err
fut.SetError(err)
}
}
return
Expand All @@ -793,15 +831,40 @@ func (conn *Connection) putFuture(fut *Future, body func(*msgpack.Encoder) error
}
}

func (conn *Connection) markDone(fut *Future) {
if conn.rlimit != nil {
<-conn.rlimit
}
}

func (conn *Connection) peekFuture(reqid uint32) (fut *Future) {
shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
pos := (reqid / conn.opts.Concurrency) & (requestsMap - 1)
shard.rmut.Lock()
defer shard.rmut.Unlock()

if conn.opts.Timeout > 0 {
fut = conn.getFutureImp(reqid, true)
pair := &shard.requests[pos]
*pair.last = fut
pair.last = &fut.next
fut.timeout = time.Since(epoch) + conn.opts.Timeout
} else {
fut = conn.getFutureImp(reqid, false)
}

return fut
}

func (conn *Connection) fetchFuture(reqid uint32) (fut *Future) {
shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
shard.rmut.Lock()
fut = conn.fetchFutureImp(reqid)
fut = conn.getFutureImp(reqid, true)
shard.rmut.Unlock()
return fut
}

func (conn *Connection) fetchFutureImp(reqid uint32) *Future {
func (conn *Connection) getFutureImp(reqid uint32, fetch bool) *Future {
shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
pos := (reqid / conn.opts.Concurrency) & (requestsMap - 1)
pair := &shard.requests[pos]
Expand All @@ -812,11 +875,13 @@ func (conn *Connection) fetchFutureImp(reqid uint32) *Future {
return nil
}
if fut.requestId == reqid {
*root = fut.next
if fut.next == nil {
pair.last = root
} else {
fut.next = nil
if fetch {
*root = fut.next
if fut.next == nil {
pair.last = root
} else {
fut.next = nil
}
}
return fut
}
Expand Down Expand Up @@ -851,11 +916,11 @@ func (conn *Connection) timeouts() {
} else {
fut.next = nil
}
fut.err = ClientError{
fut.SetError(ClientError{
Code: ErrTimeouted,
Msg: fmt.Sprintf("client timeout for request %d", fut.requestId),
}
fut.markReady(conn)
})
conn.markDone(fut)
shard.bufmut.Unlock()
}
if pair.first != nil && pair.first.timeout < minNext {
Expand Down
1 change: 1 addition & 0 deletions const.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ const (
RLimitWait = 2

OkCode = uint32(0)
PushCode = uint32(0x80)
ErrorCodeBit = 0x8000
PacketLengthBytes = 5
ErSpaceExistsCode = 0xa
Expand Down
33 changes: 33 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,39 @@ func ExampleConnection_SelectAsync() {
// Future 2 Data [[18 val 18 bla]]
}

func ExampleFuture_GetIterator() {
conn := example_connect()
defer conn.Close()

const timeout = 3 * time.Second
// Or any other Connection.*Async() call.
fut := conn.Call17Async("push_func", []interface{}{4})

var it tarantool.ResponseIterator
for it = fut.GetIterator().WithTimeout(timeout); it.Next(); {
resp := it.Value()
if resp.Code == tarantool.PushCode {
// It is a push message.
fmt.Printf("push message: %d\n", resp.Data[0].(uint64))
} else if resp.Code == tarantool.OkCode {
// It is a regular response.
fmt.Printf("response: %d", resp.Data[0].(uint64))
} else {
fmt.Printf("an unexpected response code %d", resp.Code)
}
}
if err := it.Err(); err != nil {
fmt.Printf("error in call of push_func is %v", err)
return
}
// Output:
// push message: 1
// push message: 2
// push message: 3
// push message: 4
// response: 4
}

func ExampleConnection_Ping() {
conn := example_connect()
defer conn.Close()
Expand Down
Loading