Skip to content

Commit e844c03

Browse files
committed
code health: extract the Future type to future.go
This is not a critical change. It just splits request.go into request.go and future.go.
1 parent 1c80774 commit e844c03

File tree

2 files changed

+140
-134
lines changed

2 files changed

+140
-134
lines changed

future.go

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package tarantool
2+
3+
import (
4+
"time"
5+
6+
"gopkg.in/vmihailenco/msgpack.v2"
7+
)
8+
9+
// Future is a handle for asynchronous request.
10+
type Future struct {
11+
requestId uint32
12+
requestCode int32
13+
timeout time.Duration
14+
resp *Response
15+
err error
16+
ready chan struct{}
17+
next *Future
18+
}
19+
20+
// NewErrorFuture returns new set empty Future with filled error field.
21+
func NewErrorFuture(err error) *Future {
22+
return &Future{err: err}
23+
}
24+
25+
// Get waits for Future to be filled and returns Response and error.
26+
//
27+
// Response will contain deserialized result in Data field.
28+
// It will be []interface{}, so if you want more performace, use GetTyped method.
29+
//
30+
// Note: Response could be equal to nil if ClientError is returned in error.
31+
//
32+
// "error" could be Error, if it is error returned by Tarantool,
33+
// or ClientError, if something bad happens in a client process.
34+
func (fut *Future) Get() (*Response, error) {
35+
fut.wait()
36+
if fut.err != nil {
37+
return fut.resp, fut.err
38+
}
39+
fut.err = fut.resp.decodeBody()
40+
return fut.resp, fut.err
41+
}
42+
43+
// GetTyped waits for Future and calls msgpack.Decoder.Decode(result) if no error happens.
44+
// It is could be much faster than Get() function.
45+
//
46+
// Note: Tarantool usually returns array of tuples (except for Eval and Call17 actions).
47+
func (fut *Future) GetTyped(result interface{}) error {
48+
fut.wait()
49+
if fut.err != nil {
50+
return fut.err
51+
}
52+
fut.err = fut.resp.decodeBodyTyped(result)
53+
return fut.err
54+
}
55+
56+
var closedChan = make(chan struct{})
57+
58+
func init() {
59+
close(closedChan)
60+
}
61+
62+
// WaitChan returns channel which becomes closed when response arrived or error occured.
63+
func (fut *Future) WaitChan() <-chan struct{} {
64+
if fut.ready == nil {
65+
return closedChan
66+
}
67+
return fut.ready
68+
}
69+
70+
// Err returns error set on Future.
71+
// It waits for future to be set.
72+
// Note: it doesn't decode body, therefore decoding error are not set here.
73+
func (fut *Future) Err() error {
74+
fut.wait()
75+
return fut.err
76+
}
77+
78+
func (fut *Future) pack(h *smallWBuf, enc *msgpack.Encoder, body func(*msgpack.Encoder) error) (err error) {
79+
rid := fut.requestId
80+
hl := h.Len()
81+
h.Write([]byte{
82+
0xce, 0, 0, 0, 0, // Length.
83+
0x82, // 2 element map.
84+
KeyCode, byte(fut.requestCode), // Request code.
85+
KeySync, 0xce,
86+
byte(rid >> 24), byte(rid >> 16),
87+
byte(rid >> 8), byte(rid),
88+
})
89+
90+
if err = body(enc); err != nil {
91+
return
92+
}
93+
94+
l := uint32(h.Len() - 5 - hl)
95+
h.b[hl+1] = byte(l >> 24)
96+
h.b[hl+2] = byte(l >> 16)
97+
h.b[hl+3] = byte(l >> 8)
98+
h.b[hl+4] = byte(l)
99+
100+
return
101+
}
102+
103+
func (fut *Future) send(conn *Connection, body func(*msgpack.Encoder) error) *Future {
104+
if fut.ready == nil {
105+
return fut
106+
}
107+
conn.putFuture(fut, body)
108+
return fut
109+
}
110+
111+
func (fut *Future) markReady(conn *Connection) {
112+
close(fut.ready)
113+
if conn.rlimit != nil {
114+
<-conn.rlimit
115+
}
116+
}
117+
118+
func (fut *Future) fail(conn *Connection, err error) *Future {
119+
if f := conn.fetchFuture(fut.requestId); f == fut {
120+
f.err = err
121+
fut.markReady(conn)
122+
}
123+
return fut
124+
}
125+
126+
func (fut *Future) wait() {
127+
if fut.ready == nil {
128+
return
129+
}
130+
<-fut.ready
131+
}

request.go

Lines changed: 9 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -5,29 +5,17 @@ import (
55
"reflect"
66
"strings"
77
"sync"
8-
"time"
98

109
"gopkg.in/vmihailenco/msgpack.v2"
1110
)
1211

13-
// Future is a handle for asynchronous request.
14-
type Future struct {
15-
requestId uint32
16-
requestCode int32
17-
timeout time.Duration
18-
resp *Response
19-
err error
20-
ready chan struct{}
21-
next *Future
22-
}
23-
2412
// Ping sends empty request to Tarantool to check connection.
2513
func (conn *Connection) Ping() (resp *Response, err error) {
2614
future := conn.newFuture(PingRequest)
2715
return future.send(conn, func(enc *msgpack.Encoder) error { enc.EncodeMapLen(0); return nil }).Get()
2816
}
2917

30-
func (req *Future) fillSearch(enc *msgpack.Encoder, spaceNo, indexNo uint32, key interface{}) error {
18+
func fillSearch(enc *msgpack.Encoder, spaceNo, indexNo uint32, key interface{}) error {
3119
enc.EncodeUint64(KeySpaceNo)
3220
enc.EncodeUint64(uint64(spaceNo))
3321
enc.EncodeUint64(KeyIndexNo)
@@ -36,7 +24,7 @@ func (req *Future) fillSearch(enc *msgpack.Encoder, spaceNo, indexNo uint32, key
3624
return enc.Encode(key)
3725
}
3826

39-
func (req *Future) fillIterator(enc *msgpack.Encoder, offset, limit, iterator uint32) {
27+
func fillIterator(enc *msgpack.Encoder, offset, limit, iterator uint32) {
4028
enc.EncodeUint64(KeyIterator)
4129
enc.EncodeUint64(uint64(iterator))
4230
enc.EncodeUint64(KeyOffset)
@@ -45,7 +33,7 @@ func (req *Future) fillIterator(enc *msgpack.Encoder, offset, limit, iterator ui
4533
enc.EncodeUint64(uint64(limit))
4634
}
4735

48-
func (req *Future) fillInsert(enc *msgpack.Encoder, spaceNo uint32, tuple interface{}) error {
36+
func fillInsert(enc *msgpack.Encoder, spaceNo uint32, tuple interface{}) error {
4937
enc.EncodeUint64(KeySpaceNo)
5038
enc.EncodeUint64(uint64(spaceNo))
5139
enc.EncodeUint64(KeyTuple)
@@ -242,8 +230,8 @@ func (conn *Connection) SelectAsync(space, index interface{}, offset, limit, ite
242230
}
243231
return future.send(conn, func(enc *msgpack.Encoder) error {
244232
enc.EncodeMapLen(6)
245-
future.fillIterator(enc, offset, limit, iterator)
246-
return future.fillSearch(enc, spaceNo, indexNo, key)
233+
fillIterator(enc, offset, limit, iterator)
234+
return fillSearch(enc, spaceNo, indexNo, key)
247235
})
248236
}
249237

@@ -257,7 +245,7 @@ func (conn *Connection) InsertAsync(space interface{}, tuple interface{}) *Futur
257245
}
258246
return future.send(conn, func(enc *msgpack.Encoder) error {
259247
enc.EncodeMapLen(2)
260-
return future.fillInsert(enc, spaceNo, tuple)
248+
return fillInsert(enc, spaceNo, tuple)
261249
})
262250
}
263251

@@ -271,7 +259,7 @@ func (conn *Connection) ReplaceAsync(space interface{}, tuple interface{}) *Futu
271259
}
272260
return future.send(conn, func(enc *msgpack.Encoder) error {
273261
enc.EncodeMapLen(2)
274-
return future.fillInsert(enc, spaceNo, tuple)
262+
return fillInsert(enc, spaceNo, tuple)
275263
})
276264
}
277265

@@ -285,7 +273,7 @@ func (conn *Connection) DeleteAsync(space, index interface{}, key interface{}) *
285273
}
286274
return future.send(conn, func(enc *msgpack.Encoder) error {
287275
enc.EncodeMapLen(3)
288-
return future.fillSearch(enc, spaceNo, indexNo, key)
276+
return fillSearch(enc, spaceNo, indexNo, key)
289277
})
290278
}
291279

@@ -299,7 +287,7 @@ func (conn *Connection) UpdateAsync(space, index interface{}, key, ops interface
299287
}
300288
return future.send(conn, func(enc *msgpack.Encoder) error {
301289
enc.EncodeMapLen(4)
302-
if err := future.fillSearch(enc, spaceNo, indexNo, key); err != nil {
290+
if err := fillSearch(enc, spaceNo, indexNo, key); err != nil {
303291
return err
304292
}
305293
enc.EncodeUint64(KeyTuple)
@@ -520,116 +508,3 @@ func encodeSQLBind(enc *msgpack.Encoder, from interface{}) error {
520508
}
521509
return nil
522510
}
523-
524-
func (fut *Future) pack(h *smallWBuf, enc *msgpack.Encoder, body func(*msgpack.Encoder) error) (err error) {
525-
rid := fut.requestId
526-
hl := h.Len()
527-
h.Write([]byte{
528-
0xce, 0, 0, 0, 0, // length
529-
0x82, // 2 element map
530-
KeyCode, byte(fut.requestCode), // request code
531-
KeySync, 0xce,
532-
byte(rid >> 24), byte(rid >> 16),
533-
byte(rid >> 8), byte(rid),
534-
})
535-
536-
if err = body(enc); err != nil {
537-
return
538-
}
539-
540-
l := uint32(h.Len() - 5 - hl)
541-
h.b[hl+1] = byte(l >> 24)
542-
h.b[hl+2] = byte(l >> 16)
543-
h.b[hl+3] = byte(l >> 8)
544-
h.b[hl+4] = byte(l)
545-
546-
return
547-
}
548-
549-
func (fut *Future) send(conn *Connection, body func(*msgpack.Encoder) error) *Future {
550-
if fut.ready == nil {
551-
return fut
552-
}
553-
conn.putFuture(fut, body)
554-
return fut
555-
}
556-
557-
func (fut *Future) markReady(conn *Connection) {
558-
close(fut.ready)
559-
if conn.rlimit != nil {
560-
<-conn.rlimit
561-
}
562-
}
563-
564-
func (fut *Future) fail(conn *Connection, err error) *Future {
565-
if f := conn.fetchFuture(fut.requestId); f == fut {
566-
f.err = err
567-
fut.markReady(conn)
568-
}
569-
return fut
570-
}
571-
572-
func (fut *Future) wait() {
573-
if fut.ready == nil {
574-
return
575-
}
576-
<-fut.ready
577-
}
578-
579-
// Get waits for Future to be filled and returns Response and error.
580-
//
581-
// Response will contain deserialized result in Data field.
582-
// It will be []interface{}, so if you want more performace, use GetTyped method.
583-
//
584-
// Note: Response could be equal to nil if ClientError is returned in error.
585-
//
586-
// "error" could be Error, if it is error returned by Tarantool,
587-
// or ClientError, if something bad happens in a client process.
588-
func (fut *Future) Get() (*Response, error) {
589-
fut.wait()
590-
if fut.err != nil {
591-
return fut.resp, fut.err
592-
}
593-
fut.err = fut.resp.decodeBody()
594-
return fut.resp, fut.err
595-
}
596-
597-
// GetTyped waits for Future and calls msgpack.Decoder.Decode(result) if no error happens.
598-
// It is could be much faster than Get() function.
599-
//
600-
// Note: Tarantool usually returns array of tuples (except for Eval and Call17 actions)
601-
func (fut *Future) GetTyped(result interface{}) error {
602-
fut.wait()
603-
if fut.err != nil {
604-
return fut.err
605-
}
606-
fut.err = fut.resp.decodeBodyTyped(result)
607-
return fut.err
608-
}
609-
610-
var closedChan = make(chan struct{})
611-
612-
func init() {
613-
close(closedChan)
614-
}
615-
616-
// WaitChan returns channel which becomes closed when response arrived or error occured.
617-
func (fut *Future) WaitChan() <-chan struct{} {
618-
if fut.ready == nil {
619-
return closedChan
620-
}
621-
return fut.ready
622-
}
623-
624-
// Err returns error set on Future.
625-
// It waits for future to be set.
626-
// Note: it doesn't decode body, therefore decoding error are not set here.
627-
func (fut *Future) Err() error {
628-
fut.wait()
629-
return fut.err
630-
}
631-
632-
// NewErrorFuture returns new set empty Future with filled error field.
633-
func NewErrorFuture(err error) *Future {
634-
return &Future{err: err}
635-
}

0 commit comments

Comments
 (0)