@@ -13,20 +13,24 @@ import (
13
13
"sync"
14
14
"time"
15
15
16
- "cloud.google.com/go/pubsub"
16
+ "cloud.google.com/go/pubsub/v2"
17
+ "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
17
18
pscontext "github.com/cloudevents/sdk-go/protocol/pubsub/v2/context"
18
19
"github.com/cloudevents/sdk-go/v2/binding"
20
+ "google.golang.org/grpc/codes"
21
+ "google.golang.org/grpc/status"
22
+ "google.golang.org/protobuf/types/known/durationpb"
19
23
)
20
24
21
25
type topicInfo struct {
22
- topic * pubsub.Topic
26
+ topic * pubsub.Publisher
23
27
wasCreated bool
24
28
once sync.Once
25
29
err error
26
30
}
27
31
28
32
type subInfo struct {
29
- sub * pubsub.Subscription
33
+ sub * pubsub.Subscriber
30
34
wasCreated bool
31
35
once sync.Once
32
36
err error
@@ -89,8 +93,35 @@ const (
89
93
var DefaultReceiveSettings = pubsub.ReceiveSettings {
90
94
// Pubsub default receive settings will fill in other values.
91
95
// https://godoc.org/cloud.google.com/go/pubsub#Client.Subscription
96
+ }
92
97
93
- Synchronous : false ,
98
+ // pubsub/v2 dropped Exists methods and suggests optimistically using GetTopic instead
99
+ func topicExists (ctx context.Context , client * pubsub.Client , topicID string ) (bool , error ) {
100
+ // Check if the topic exists.
101
+ _ , err := client .TopicAdminClient .GetTopic (ctx , & pubsubpb.GetTopicRequest {
102
+ Topic : fmt .Sprintf ("projects/%s/topics/%s" , client .Project (), topicID ),
103
+ })
104
+ if err != nil {
105
+ if st , ok := status .FromError (err ); ok && st .Code () == codes .NotFound {
106
+ return false , nil
107
+ }
108
+ return false , fmt .Errorf ("unable to get topic %q, %v" , topicID , err )
109
+ }
110
+ return true , nil
111
+ }
112
+
113
+ func subscriptionExists (ctx context.Context , client * pubsub.Client , subID string ) (bool , error ) {
114
+ // Check if the subscription exists.
115
+ _ , err := client .SubscriptionAdminClient .GetSubscription (ctx , & pubsubpb.GetSubscriptionRequest {
116
+ Subscription : fmt .Sprintf ("projects/%s/subscriptions/%s" , client .Project (), subID ),
117
+ })
118
+ if err != nil {
119
+ if st , ok := status .FromError (err ); ok && st .Code () == codes .NotFound {
120
+ return false , nil
121
+ }
122
+ return false , fmt .Errorf ("unable to get subscription %q, %v" , subID , err )
123
+ }
124
+ return true , nil
94
125
}
95
126
96
127
func (c * Connection ) getOrCreateTopicInfo (ctx context.Context , getAlreadyOpenOnly bool ) (* topicInfo , error ) {
@@ -110,9 +141,7 @@ func (c *Connection) getOrCreateTopicInfo(ctx context.Context, getAlreadyOpenOnl
110
141
// Make sure the topic structure is initialized at most once.
111
142
ti .once .Do (func () {
112
143
var ok bool
113
- // Load the topic.
114
- topic := c .Client .Topic (c .TopicID )
115
- ok , ti .err = topic .Exists (ctx )
144
+ ok , ti .err = topicExists (ctx , c .Client , c .TopicID )
116
145
if ti .err != nil {
117
146
return
118
147
}
@@ -122,14 +151,17 @@ func (c *Connection) getOrCreateTopicInfo(ctx context.Context, getAlreadyOpenOnl
122
151
ti .err = fmt .Errorf ("protocol not allowed to create topic %q" , c .TopicID )
123
152
return
124
153
}
125
- topic , ti .err = c .Client .CreateTopic (ctx , c .TopicID )
154
+ _ , ti .err = c .Client .TopicAdminClient .CreateTopic (ctx , & pubsubpb.Topic {
155
+ Name : fmt .Sprintf ("projects/%s/topics/%s" , c .Client .Project (), c .TopicID ),
156
+ })
126
157
if ti .err != nil {
127
158
return
128
159
}
129
160
ti .wasCreated = true
130
161
}
131
- // Success.
132
- ti .topic = topic
162
+
163
+ // If the topic exists, we can use it
164
+ ti .topic = c .Client .Publisher (c .TopicID )
133
165
134
166
// EnableMessageOrdering is a runtime parameter only and not part of the topic
135
167
// Pub/Sub configuration. The Pub/Sub SDK requires this to be set to accept Pub/Sub
@@ -151,7 +183,7 @@ func (c *Connection) getOrCreateTopicInfo(ctx context.Context, getAlreadyOpenOnl
151
183
return ti , nil
152
184
}
153
185
154
- func (c * Connection ) getOrCreateTopic (ctx context.Context , getAlreadyOpenOnly bool ) (* pubsub.Topic , error ) {
186
+ func (c * Connection ) getOrCreateTopic (ctx context.Context , getAlreadyOpenOnly bool ) (* pubsub.Publisher , error ) {
155
187
ti , err := c .getOrCreateTopicInfo (ctx , getAlreadyOpenOnly )
156
188
if ti != nil {
157
189
return ti .topic , nil
@@ -170,12 +202,17 @@ func (c *Connection) DeleteTopic(ctx context.Context) error {
170
202
if ! ti .wasCreated {
171
203
return errors .New ("topic was not created by pubsub protocol" )
172
204
}
173
- if err := ti .topic .Delete (ctx ); err != nil {
174
- return err
175
- }
176
205
206
+ // Stop the publisher and send all messages before deleting the topic
177
207
ti .topic .Stop ()
178
208
209
+ err = c .Client .TopicAdminClient .DeleteTopic (ctx , & pubsubpb.DeleteTopicRequest {
210
+ Topic : fmt .Sprintf ("projects/%s/topics/%s" , c .Client .Project (), ti .topic .ID ()),
211
+ })
212
+ if err != nil {
213
+ return err
214
+ }
215
+
179
216
c .initLock .Lock ()
180
217
if ti == c .topicInfo {
181
218
c .topicInfo = nil
@@ -211,10 +248,8 @@ func (c *Connection) getOrCreateSubscriptionInfo(ctx context.Context, getAlready
211
248
212
249
// Make sure the subscription structure is initialized at most once.
213
250
si .once .Do (func () {
214
- // Load the subscription.
215
251
var ok bool
216
- sub := c .Client .Subscription (c .SubscriptionID )
217
- ok , si .err = sub .Exists (ctx )
252
+ ok , si .err = subscriptionExists (ctx , c .Client , c .SubscriptionID )
218
253
if si .err != nil {
219
254
return
220
255
}
@@ -226,7 +261,7 @@ func (c *Connection) getOrCreateSubscriptionInfo(ctx context.Context, getAlready
226
261
}
227
262
228
263
// Load the topic.
229
- var topic * pubsub.Topic
264
+ var topic * pubsub.Publisher
230
265
topic , si .err = c .getOrCreateTopic (ctx , false )
231
266
if si .err != nil {
232
267
return
@@ -235,26 +270,29 @@ func (c *Connection) getOrCreateSubscriptionInfo(ctx context.Context, getAlready
235
270
// Create a new subscription to the previously created topic
236
271
// with the given name.
237
272
// TODO: allow to use push config + allow setting the SubscriptionConfig.
238
- sub , si .err = c .Client .CreateSubscription (ctx , c .SubscriptionID , pubsub.SubscriptionConfig {
239
- Topic : topic ,
240
- AckDeadline : * c .AckDeadline ,
241
- RetentionDuration : * c .RetentionDuration ,
242
- EnableMessageOrdering : c .MessageOrdering ,
243
- Filter : c .Filter ,
273
+ _ , si .err = c .Client .SubscriptionAdminClient .CreateSubscription (ctx , & pubsubpb.Subscription {
274
+ Name : fmt .Sprintf ("projects/%s/subscriptions/%s" , c .Client .Project (), c .SubscriptionID ),
275
+ Topic : fmt .Sprintf ("projects/%s/topics/%s" , c .Client .Project (), topic .ID ()),
276
+ AckDeadlineSeconds : int32 (c .AckDeadline .Seconds ()),
277
+ MessageRetentionDuration : durationpb .New (* c .RetentionDuration ),
278
+ EnableMessageOrdering : c .MessageOrdering ,
279
+ Filter : c .Filter ,
244
280
})
245
281
if si .err != nil {
246
282
return
247
283
}
248
284
249
285
si .wasCreated = true
250
286
}
287
+
288
+ // Success.
289
+ si .sub = c .Client .Subscriber (c .SubscriptionID )
290
+
251
291
if c .ReceiveSettings == nil {
252
- sub .ReceiveSettings = DefaultReceiveSettings
292
+ si . sub .ReceiveSettings = DefaultReceiveSettings
253
293
} else {
254
- sub .ReceiveSettings = * c .ReceiveSettings
294
+ si . sub .ReceiveSettings = * c .ReceiveSettings
255
295
}
256
- // Success.
257
- si .sub = sub
258
296
})
259
297
if si .sub == nil {
260
298
// Initialization failed, remove this attempt so that future callers
@@ -269,7 +307,7 @@ func (c *Connection) getOrCreateSubscriptionInfo(ctx context.Context, getAlready
269
307
return si , nil
270
308
}
271
309
272
- func (c * Connection ) getOrCreateSubscription (ctx context.Context , getAlreadyOpenOnly bool ) (* pubsub.Subscription , error ) {
310
+ func (c * Connection ) getOrCreateSubscription (ctx context.Context , getAlreadyOpenOnly bool ) (* pubsub.Subscriber , error ) {
273
311
si , err := c .getOrCreateSubscriptionInfo (ctx , getAlreadyOpenOnly )
274
312
if si != nil {
275
313
return si .sub , nil
@@ -289,7 +327,9 @@ func (c *Connection) DeleteSubscription(ctx context.Context) error {
289
327
if ! si .wasCreated {
290
328
return errors .New ("subscription was not created by pubsub protocol" )
291
329
}
292
- if err := si .sub .Delete (ctx ); err != nil {
330
+ if err := c .Client .SubscriptionAdminClient .DeleteSubscription (ctx , & pubsubpb.DeleteSubscriptionRequest {
331
+ Subscription : fmt .Sprintf ("projects/%s/subscriptions/%s" , c .Client .Project (), si .sub .ID ()),
332
+ }); err != nil {
293
333
return err
294
334
}
295
335
0 commit comments