1
1
using CloudBase: CloudCredentials, AWSCredentials, AbstractStore, AWS
2
2
using JSON3, HTTP, Sockets, Base64
3
3
using RustyObjectStore: SnowflakeConfig, ClientOptions
4
+ using Base: UUID
4
5
5
6
export SFGatewayMock, start, with
6
7
7
8
struct SFConfig
8
9
account:: String
10
+ database:: String
11
+ default_schema:: String
12
+ end
13
+
14
+ struct Stage
9
15
database:: String
10
16
schema:: String
11
- stage :: String
17
+ name :: String
12
18
end
13
19
14
20
mutable struct SFGatewayMock
15
21
credentials:: CloudCredentials
16
22
store:: AbstractStore
17
23
opts:: ClientOptions
18
24
config:: SFConfig
25
+ allowed_stages:: Vector{Stage}
19
26
encrypted:: Bool
20
27
keys_lock:: ReentrantLock
21
28
next_key_id:: Int
22
29
keys:: Dict{String, String}
23
30
end
24
31
25
- function SFGatewayMock (credentials:: CloudCredentials , store:: AbstractStore , encrypted:: Bool ; opts= ClientOptions ())
26
- stage_name = " teststage" * string (rand (UInt64), base= 16 )
32
+
33
+ function to_stage (stage:: AbstractString , config:: SFConfig )
34
+ parts = split (stage, " ." )
35
+ if length (parts) == 1
36
+ return Stage (uppercase (config. database), uppercase (config. default_schema), uppercase (parts[1 ]))
37
+ elseif length (parts) == 2
38
+ return Stage (uppercase (config. database), uppercase (parts[1 ]), uppercase (parts[2 ]))
39
+ elseif length (parts) == 3
40
+ return Stage (uppercase (parts[1 ]), uppercase (parts[2 ]), uppercase (parts[3 ]))
41
+ else
42
+ error (" Invalid stage spec" )
43
+ end
44
+ end
45
+
46
+ fqsn (s:: Stage ) = " $(s. database) .$(s. schema) .$(s. name) "
47
+ stage_uuid (s:: Stage ) = UUID ((hash (fqsn (s)), hash (fqsn (s))))
48
+ stage_path (s:: Stage ) = " stages/$(stage_uuid (s)) /"
49
+
50
+ function SFGatewayMock (
51
+ credentials:: CloudCredentials ,
52
+ store:: AbstractStore ,
53
+ encrypted:: Bool ;
54
+ opts= ClientOptions (),
55
+ default_schema:: String = " testschema" ,
56
+ allowed_stages:: Vector{String} = [" teststage" * string (rand (UInt64), base= 16 )]
57
+ )
27
58
config = SFConfig (
28
59
" testaccount" ,
29
60
" testdatabase" ,
30
- " testschema" ,
31
- stage_name
61
+ default_schema
32
62
)
63
+ allowed_stages_parsed = map (s -> to_stage (s, config), allowed_stages)
33
64
SFGatewayMock (
34
65
credentials,
35
66
store,
36
67
opts,
37
68
config,
69
+ allowed_stages_parsed,
38
70
encrypted,
39
71
ReentrantLock (),
40
72
1 ,
@@ -160,9 +192,13 @@ function start(gw::SFGatewayMock)
160
192
return error_json_response (" Missing stage name or file path" )
161
193
end
162
194
163
- stage_name = m. captures[1 ]
195
+ stage = try
196
+ to_stage (m. captures[1 ], gw. config)
197
+ catch e
198
+ return error_json_response (" $(e) " )
199
+ end
164
200
165
- if stage_name != gw. config . stage
201
+ if ! (stage in gw. allowed_stages)
166
202
return error_json_response (" Stage not found" )
167
203
end
168
204
@@ -179,9 +215,8 @@ function start(gw::SFGatewayMock)
179
215
end
180
216
181
217
182
- stage_path = " stages/a6688b33-acb4-44ed-bd46-30ff12238c2a/"
183
218
stage_info = if isa (gw. credentials, AWSCredentials) && isa (gw. store, AWS. Bucket)
184
- construct_stage_info (gw. credentials, gw. store, stage_path, gw. encrypted)
219
+ construct_stage_info (gw. credentials, gw. store, stage_path (stage) , gw. encrypted)
185
220
else
186
221
error (" unimplemented" )
187
222
end
@@ -203,16 +238,20 @@ function start(gw::SFGatewayMock)
203
238
return error_json_response (" Missing stage name or file path" )
204
239
end
205
240
206
- stage_name = m. captures[1 ]
207
- path = m. captures[2 ]
241
+ stage = try
242
+ to_stage (m. captures[1 ], gw. config)
243
+ catch e
244
+ return error_json_response (" $(e) " )
245
+ end
208
246
209
- if stage_name != gw. config . stage
247
+ if ! (stage in gw. allowed_stages)
210
248
return error_json_response (" Stage not found" )
211
249
end
212
250
213
- stage_path = " stages/a6688b33-acb4-44ed-bd46-30ff12238c2a/"
251
+ path = m. captures[2 ]
252
+
214
253
stage_info = if isa (gw. credentials, AWSCredentials) && isa (gw. store, AWS. Bucket)
215
- construct_stage_info (gw. credentials, gw. store, stage_path, gw. encrypted)
254
+ construct_stage_info (gw. credentials, gw. store, stage_path (stage) , gw. encrypted)
216
255
else
217
256
error (" unimplemented" )
218
257
end
@@ -257,10 +296,10 @@ function start(gw::SFGatewayMock)
257
296
master_path, fileio = mktemp ()
258
297
write (fileio, " dummy-file-master-token" )
259
298
sfconfig = SnowflakeConfig (
260
- stage= gw. config . stage ,
299
+ stage= fqsn ( gw. allowed_stages[ 1 ]) ,
261
300
account= gw. config. account,
262
301
database= gw. config. database,
263
- schema= gw. config . schema,
302
+ schema= gw. allowed_stages[ 1 ] . schema,
264
303
endpoint= " http://127.0.0.1:$(port) " ,
265
304
master_token_path= master_path,
266
305
opts= gw. opts
0 commit comments