Skip to content

Commit c133955

Browse files
committed
Add ctx in state
Signed-off-by: pigletfly <[email protected]>
1 parent 9bb86a0 commit c133955

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+717
-661
lines changed

state/aerospike/aerospike.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ limitations under the License.
1414
package aerospike
1515

1616
import (
17+
"context"
1718
"encoding/json"
1819
"errors"
1920
"fmt"
@@ -110,7 +111,7 @@ func (aspike *Aerospike) Features() []state.Feature {
110111
}
111112

112113
// Set stores value for a key to Aerospike. It honors ETag (for concurrency) and consistency settings.
113-
func (aspike *Aerospike) Set(req *state.SetRequest) error {
114+
func (aspike *Aerospike) Set(ctx context.Context, req *state.SetRequest) error {
114115
err := state.CheckRequestOptions(req.Options)
115116
if err != nil {
116117
return err
@@ -162,7 +163,7 @@ func (aspike *Aerospike) Set(req *state.SetRequest) error {
162163
}
163164

164165
// Get retrieves state from Aerospike with a key.
165-
func (aspike *Aerospike) Get(req *state.GetRequest) (*state.GetResponse, error) {
166+
func (aspike *Aerospike) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
166167
asKey, err := as.NewKey(aspike.namespace, aspike.set, req.Key)
167168
if err != nil {
168169
return nil, err
@@ -196,7 +197,7 @@ func (aspike *Aerospike) Get(req *state.GetRequest) (*state.GetResponse, error)
196197
}
197198

198199
// Delete performs a delete operation.
199-
func (aspike *Aerospike) Delete(req *state.DeleteRequest) error {
200+
func (aspike *Aerospike) Delete(ctx context.Context, req *state.DeleteRequest) error {
200201
err := state.CheckRequestOptions(req.Options)
201202
if err != nil {
202203
return err
@@ -238,7 +239,7 @@ func (aspike *Aerospike) Delete(req *state.DeleteRequest) error {
238239
return nil
239240
}
240241

241-
func (aspike *Aerospike) Ping() error {
242+
func (aspike *Aerospike) Ping(ctx context.Context) error {
242243
return nil
243244
}
244245

state/alicloud/tablestore/tablestore.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ limitations under the License.
1414
package tablestore
1515

1616
import (
17+
"context"
1718
"encoding/json"
1819

1920
"github.com/agrea/ptr"
@@ -68,7 +69,7 @@ func (s *AliCloudTableStore) Features() []state.Feature {
6869
return s.features
6970
}
7071

71-
func (s *AliCloudTableStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
72+
func (s *AliCloudTableStore) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
7273
criteria := &tablestore.SingleRowQueryCriteria{
7374
PrimaryKey: s.primaryKey(req.Key),
7475
TableName: s.metadata.TableName,
@@ -103,7 +104,7 @@ func (s *AliCloudTableStore) getResp(columns []*tablestore.AttributeColumn) *sta
103104
return getResp
104105
}
105106

106-
func (s *AliCloudTableStore) BulkGet(reqs []state.GetRequest) (bool, []state.BulkGetResponse, error) {
107+
func (s *AliCloudTableStore) BulkGet(ctx context.Context, reqs []state.GetRequest) (bool, []state.BulkGetResponse, error) {
107108
// "len == 0": empty request, directly return empty response
108109
if len(reqs) == 0 {
109110
return true, []state.BulkGetResponse{}, nil
@@ -139,7 +140,7 @@ func (s *AliCloudTableStore) BulkGet(reqs []state.GetRequest) (bool, []state.Bul
139140
return true, responseList, nil
140141
}
141142

142-
func (s *AliCloudTableStore) Set(req *state.SetRequest) error {
143+
func (s *AliCloudTableStore) Set(ctx context.Context, req *state.SetRequest) error {
143144
change := s.updateRowChange(req)
144145

145146
request := &tablestore.UpdateRowRequest{
@@ -183,7 +184,7 @@ func unmarshal(val interface{}) []byte {
183184
return []byte(output)
184185
}
185186

186-
func (s *AliCloudTableStore) Delete(req *state.DeleteRequest) error {
187+
func (s *AliCloudTableStore) Delete(ctx context.Context, req *state.DeleteRequest) error {
187188
change := s.deleteRowChange(req)
188189

189190
deleteRowReq := &tablestore.DeleteRowRequest{
@@ -205,11 +206,11 @@ func (s *AliCloudTableStore) deleteRowChange(req *state.DeleteRequest) *tablesto
205206
return change
206207
}
207208

208-
func (s *AliCloudTableStore) BulkSet(reqs []state.SetRequest) error {
209+
func (s *AliCloudTableStore) BulkSet(ctx context.Context, reqs []state.SetRequest) error {
209210
return s.batchWrite(reqs, nil)
210211
}
211212

212-
func (s *AliCloudTableStore) BulkDelete(reqs []state.DeleteRequest) error {
213+
func (s *AliCloudTableStore) BulkDelete(ctx context.Context, reqs []state.DeleteRequest) error {
213214
return s.batchWrite(nil, reqs)
214215
}
215216

@@ -234,7 +235,7 @@ func (s *AliCloudTableStore) batchWrite(setReqs []state.SetRequest, deleteReqs [
234235
return nil
235236
}
236237

237-
func (s *AliCloudTableStore) Ping() error {
238+
func (s *AliCloudTableStore) Ping(ctx context.Context) error {
238239
return nil
239240
}
240241

state/alicloud/tablestore/tablestore_test.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ limitations under the License.
1414
package tablestore
1515

1616
import (
17+
"context"
1718
"testing"
1819

1920
"github.com/agrea/ptr"
@@ -63,15 +64,15 @@ func TestReadAndWrite(t *testing.T) {
6364
Value: "value of key",
6465
ETag: ptr.String("the etag"),
6566
}
66-
err := store.Set(setReq)
67+
err := store.Set(context.TODO(), setReq)
6768
assert.Nil(t, err)
6869
})
6970

7071
t.Run("test get 1", func(t *testing.T) {
7172
getReq := &state.GetRequest{
7273
Key: "theFirstKey",
7374
}
74-
resp, err := store.Get(getReq)
75+
resp, err := store.Get(context.TODO(), getReq)
7576
assert.Nil(t, err)
7677
assert.NotNil(t, resp)
7778
assert.Equal(t, "value of key", string(resp.Data))
@@ -83,22 +84,22 @@ func TestReadAndWrite(t *testing.T) {
8384
Value: "1234",
8485
ETag: ptr.String("the etag"),
8586
}
86-
err := store.Set(setReq)
87+
err := store.Set(context.TODO(), setReq)
8788
assert.Nil(t, err)
8889
})
8990

9091
t.Run("test get 2", func(t *testing.T) {
9192
getReq := &state.GetRequest{
9293
Key: "theSecondKey",
9394
}
94-
resp, err := store.Get(getReq)
95+
resp, err := store.Get(context.TODO(), getReq)
9596
assert.Nil(t, err)
9697
assert.NotNil(t, resp)
9798
assert.Equal(t, "1234", string(resp.Data))
9899
})
99100

100101
t.Run("test BulkSet", func(t *testing.T) {
101-
err := store.BulkSet([]state.SetRequest{{
102+
err := store.BulkSet(context.TODO(), []state.SetRequest{{
102103
Key: "theFirstKey",
103104
Value: "666",
104105
}, {
@@ -110,7 +111,7 @@ func TestReadAndWrite(t *testing.T) {
110111
})
111112

112113
t.Run("test BulkGet", func(t *testing.T) {
113-
_, resp, err := store.BulkGet([]state.GetRequest{{
114+
_, resp, err := store.BulkGet(context.TODO(), []state.GetRequest{{
114115
Key: "theFirstKey",
115116
}, {
116117
Key: "theSecondKey",
@@ -126,12 +127,12 @@ func TestReadAndWrite(t *testing.T) {
126127
req := &state.DeleteRequest{
127128
Key: "theFirstKey",
128129
}
129-
err := store.Delete(req)
130+
err := store.Delete(context.TODO(), req)
130131
assert.Nil(t, err)
131132
})
132133

133134
t.Run("test BulkGet2", func(t *testing.T) {
134-
_, resp, err := store.BulkGet([]state.GetRequest{{
135+
_, resp, err := store.BulkGet(context.TODO(), []state.GetRequest{{
135136
Key: "theFirstKey",
136137
}, {
137138
Key: "theSecondKey",

state/aws/dynamodb/dynamodb.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ limitations under the License.
1414
package dynamodb
1515

1616
import (
17+
"context"
1718
"encoding/json"
1819
"fmt"
1920
"strconv"
@@ -70,7 +71,7 @@ func (d *StateStore) Init(metadata state.Metadata) error {
7071
return nil
7172
}
7273

73-
func (d *StateStore) Ping() error {
74+
func (d *StateStore) Ping(ctx context.Context) error {
7475
return nil
7576
}
7677

@@ -80,7 +81,7 @@ func (d *StateStore) Features() []state.Feature {
8081
}
8182

8283
// Get retrieves a dynamoDB item.
83-
func (d *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
84+
func (d *StateStore) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
8485
input := &dynamodb.GetItemInput{
8586
ConsistentRead: aws.Bool(req.Options.Consistency == state.Strong),
8687
TableName: aws.String(d.table),
@@ -124,13 +125,13 @@ func (d *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
124125
}
125126

126127
// BulkGet performs a bulk get operations.
127-
func (d *StateStore) BulkGet(req []state.GetRequest) (bool, []state.BulkGetResponse, error) {
128+
func (d *StateStore) BulkGet(ctx context.Context, req []state.GetRequest) (bool, []state.BulkGetResponse, error) {
128129
// TODO: replace with dynamodb.BatchGetItem for performance
129130
return false, nil, nil
130131
}
131132

132133
// Set saves a dynamoDB item.
133-
func (d *StateStore) Set(req *state.SetRequest) error {
134+
func (d *StateStore) Set(ctx context.Context, req *state.SetRequest) error {
134135
value, err := d.marshalToString(req.Value)
135136
if err != nil {
136137
return fmt.Errorf("dynamodb error: failed to set key %s: %s", req.Key, err)
@@ -176,7 +177,7 @@ func (d *StateStore) Set(req *state.SetRequest) error {
176177
}
177178

178179
// BulkSet performs a bulk set operation.
179-
func (d *StateStore) BulkSet(req []state.SetRequest) error {
180+
func (d *StateStore) BulkSet(ctx context.Context, req []state.SetRequest) error {
180181
writeRequests := []*dynamodb.WriteRequest{}
181182

182183
for _, r := range req {
@@ -234,7 +235,7 @@ func (d *StateStore) BulkSet(req []state.SetRequest) error {
234235
}
235236

236237
// Delete performs a delete operation.
237-
func (d *StateStore) Delete(req *state.DeleteRequest) error {
238+
func (d *StateStore) Delete(ctx context.Context, req *state.DeleteRequest) error {
238239
input := &dynamodb.DeleteItemInput{
239240
Key: map[string]*dynamodb.AttributeValue{
240241
"key": {
@@ -249,7 +250,7 @@ func (d *StateStore) Delete(req *state.DeleteRequest) error {
249250
}
250251

251252
// BulkDelete performs a bulk delete operation.
252-
func (d *StateStore) BulkDelete(req []state.DeleteRequest) error {
253+
func (d *StateStore) BulkDelete(ctx context.Context, req []state.DeleteRequest) error {
253254
writeRequests := []*dynamodb.WriteRequest{}
254255

255256
for _, r := range req {

0 commit comments

Comments
 (0)