Skip to content

Commit 1120a47

Browse files
iskandarov-egorandrewkavegor.iskandarov
authored
Add utube handling (without unrelated changes) (#85)
* Add utube handling * undo changes from previous commit that are unrelated to the utube handling PR Co-authored-by: andrew <[email protected]> Co-authored-by: egor.iskandarov <[email protected]>
1 parent f4ece35 commit 1120a47

File tree

2 files changed

+76
-0
lines changed

2 files changed

+76
-0
lines changed

queue/queue.go

+5
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ type Opts struct {
103103
Ttl time.Duration // task time to live
104104
Ttr time.Duration // task time to execute
105105
Delay time.Duration // delayed execution
106+
Utube string
106107
}
107108

108109
func (opts Opts) toMap() map[string]interface{} {
@@ -124,6 +125,10 @@ func (opts Opts) toMap() map[string]interface{} {
124125
ret["pri"] = opts.Pri
125126
}
126127

128+
if opts.Utube != "" {
129+
ret["utube"] = opts.Utube
130+
}
131+
127132
return ret
128133
}
129134

queue/queue_test.go

+71
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package queue_test
22

33
import (
44
"fmt"
5+
"math"
56
"testing"
67
"time"
78

@@ -747,3 +748,73 @@ func TestTtlQueue_Put(t *testing.T) {
747748
}
748749
}
749750
}
751+
752+
func TestUtube_Put(t *testing.T) {
753+
conn, err := Connect(server, opts)
754+
if err != nil {
755+
t.Errorf("Failed to connect: %s", err.Error())
756+
return
757+
}
758+
if conn == nil {
759+
t.Errorf("conn is nil after Connect")
760+
return
761+
}
762+
defer conn.Close()
763+
764+
name := "test_utube"
765+
cfg := queue.Cfg{
766+
Temporary: true,
767+
Kind: queue.UTUBE,
768+
IfNotExists: true,
769+
}
770+
q := queue.New(conn, name)
771+
if err = q.Create(cfg); err != nil {
772+
t.Errorf("Failed to create queue: %s", err.Error())
773+
return
774+
}
775+
defer func() {
776+
//Drop
777+
err := q.Drop()
778+
if err != nil {
779+
t.Errorf("Failed drop queue: %s", err.Error())
780+
}
781+
}()
782+
783+
data1 := &customData{"test-data-0"}
784+
_, err = q.PutWithOpts(data1, queue.Opts{Utube: "test-utube-consumer-key"})
785+
if err != nil {
786+
t.Fatalf("Failed put task to queue: %s", err.Error())
787+
}
788+
data2 := &customData{"test-data-1"}
789+
_, err = q.PutWithOpts(data2, queue.Opts{Utube: "test-utube-consumer-key"})
790+
if err != nil {
791+
t.Fatalf("Failed put task to queue: %s", err.Error())
792+
}
793+
794+
go func() {
795+
t1, err := q.TakeTimeout(2 * time.Second)
796+
if err != nil {
797+
t.Fatalf("Failed to take task from utube: %s", err.Error())
798+
}
799+
800+
time.Sleep(2 * time.Second)
801+
if err := t1.Ack(); err != nil {
802+
t.Fatalf("Failed to ack task: %s", err.Error())
803+
}
804+
}()
805+
806+
time.Sleep(100 * time.Millisecond)
807+
// the queue should be blocked for ~2 seconds
808+
start := time.Now()
809+
t2, err := q.TakeTimeout(2 * time.Second)
810+
if err != nil {
811+
t.Fatalf("Failed to take task from utube: %s", err.Error())
812+
}
813+
if err := t2.Ack(); err != nil {
814+
t.Fatalf("Failed to ack task: %s", err.Error())
815+
}
816+
end := time.Now()
817+
if math.Abs(float64(end.Sub(start)-2*time.Second)) > float64(200*time.Millisecond) {
818+
t.Fatalf("Blocking time is less than expected: actual = %.2fs, expected = 1s", end.Sub(start).Seconds())
819+
}
820+
}

0 commit comments

Comments
 (0)