@@ -70,9 +70,10 @@ use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWi
70
70
71
71
use crate :: error:: Result ;
72
72
use crate :: spec:: {
73
- COMMIT_MAX_RETRY_WAIT_MS , COMMIT_MAX_RETRY_WAIT_MS_DEFAULT , COMMIT_MIN_RETRY_WAIT_MS ,
74
- COMMIT_MIN_RETRY_WAIT_MS_DEFAULT , COMMIT_NUM_RETRIES , COMMIT_NUM_RETRIES_DEFAULT ,
75
- COMMIT_TOTAL_RETRY_TIME_MS , COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT ,
73
+ PROPERTY_COMMIT_MAX_RETRY_WAIT_MS , PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT ,
74
+ PROPERTY_COMMIT_MIN_RETRY_WAIT_MS , PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT ,
75
+ PROPERTY_COMMIT_NUM_RETRIES , PROPERTY_COMMIT_NUM_RETRIES_DEFAULT ,
76
+ PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS , PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT ,
76
77
} ;
77
78
use crate :: table:: Table ;
78
79
use crate :: transaction:: action:: BoxedTransactionAction ;
@@ -82,7 +83,7 @@ use crate::transaction::update_location::UpdateLocationAction;
82
83
use crate :: transaction:: update_properties:: UpdatePropertiesAction ;
83
84
use crate :: transaction:: update_statistics:: UpdateStatisticsAction ;
84
85
use crate :: transaction:: upgrade_format_version:: UpgradeFormatVersionAction ;
85
- use crate :: { Catalog , TableCommit , TableRequirement , TableUpdate } ;
86
+ use crate :: { Catalog , Error , ErrorKind , TableCommit , TableRequirement , TableUpdate } ;
86
87
87
88
/// Table transaction.
88
89
#[ derive( Clone ) ]
@@ -169,7 +170,7 @@ impl Transaction {
169
170
return Ok ( self . table ) ;
170
171
}
171
172
172
- let backoff = Self :: build_backoff ( self . table . metadata ( ) . properties ( ) ) ;
173
+ let backoff = Self :: build_backoff ( self . table . metadata ( ) . properties ( ) ) ? ;
173
174
let tx = self ;
174
175
175
176
( |mut tx : Transaction | async {
@@ -184,38 +185,55 @@ impl Transaction {
184
185
. 1
185
186
}
186
187
187
- fn build_backoff ( props : & HashMap < String , String > ) -> ExponentialBackoff {
188
- ExponentialBuilder :: new ( )
189
- . with_min_delay ( Duration :: from_millis (
190
- props
191
- . get ( COMMIT_MIN_RETRY_WAIT_MS )
192
- . map ( |s| s. parse ( ) )
193
- . unwrap_or_else ( || Ok ( COMMIT_MIN_RETRY_WAIT_MS_DEFAULT ) )
194
- . expect ( "Invalid value for commit.retry.min-wait-ms" ) ,
195
- ) )
196
- . with_max_delay ( Duration :: from_millis (
197
- props
198
- . get ( COMMIT_MAX_RETRY_WAIT_MS )
199
- . map ( |s| s. parse ( ) )
200
- . unwrap_or_else ( || Ok ( COMMIT_MAX_RETRY_WAIT_MS_DEFAULT ) )
201
- . expect ( "Invalid value for commit.retry.max-wait-ms" ) ,
202
- ) )
203
- . with_total_delay ( Some ( Duration :: from_millis (
204
- props
205
- . get ( COMMIT_TOTAL_RETRY_TIME_MS )
206
- . map ( |s| s. parse ( ) )
207
- . unwrap_or_else ( || Ok ( COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT ) )
208
- . expect ( "Invalid value for commit.retry.total-timeout-ms" ) ,
209
- ) ) )
210
- . with_max_times (
211
- props
212
- . get ( COMMIT_NUM_RETRIES )
213
- . map ( |s| s. parse ( ) )
214
- . unwrap_or_else ( || Ok ( COMMIT_NUM_RETRIES_DEFAULT ) )
215
- . expect ( "Invalid value for commit.retry.num-retries" ) ,
216
- )
188
+ fn build_backoff ( props : & HashMap < String , String > ) -> Result < ExponentialBackoff > {
189
+ let min_delay = match props. get ( PROPERTY_COMMIT_MIN_RETRY_WAIT_MS ) {
190
+ Some ( value_str) => value_str. parse :: < u64 > ( ) . map_err ( |e| {
191
+ Error :: new (
192
+ ErrorKind :: DataInvalid ,
193
+ "Invalid value for commit.retry.min-wait-ms" ,
194
+ )
195
+ . with_source ( e)
196
+ } ) ?,
197
+ None => PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT ,
198
+ } ;
199
+ let max_delay = match props. get ( PROPERTY_COMMIT_MAX_RETRY_WAIT_MS ) {
200
+ Some ( value_str) => value_str. parse :: < u64 > ( ) . map_err ( |e| {
201
+ Error :: new (
202
+ ErrorKind :: DataInvalid ,
203
+ "Invalid value for commit.retry.max-wait-ms" ,
204
+ )
205
+ . with_source ( e)
206
+ } ) ?,
207
+ None => PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT ,
208
+ } ;
209
+ let total_delay = match props. get ( PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS ) {
210
+ Some ( value_str) => value_str. parse :: < u64 > ( ) . map_err ( |e| {
211
+ Error :: new (
212
+ ErrorKind :: DataInvalid ,
213
+ "Invalid value for commit.retry.total-timeout-ms" ,
214
+ )
215
+ . with_source ( e)
216
+ } ) ?,
217
+ None => PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT ,
218
+ } ;
219
+ let max_times = match props. get ( PROPERTY_COMMIT_NUM_RETRIES ) {
220
+ Some ( value_str) => value_str. parse :: < usize > ( ) . map_err ( |e| {
221
+ Error :: new (
222
+ ErrorKind :: DataInvalid ,
223
+ "Invalid value for commit.retry.num-retries" ,
224
+ )
225
+ . with_source ( e)
226
+ } ) ?,
227
+ None => PROPERTY_COMMIT_NUM_RETRIES_DEFAULT ,
228
+ } ;
229
+
230
+ Ok ( ExponentialBuilder :: new ( )
231
+ . with_min_delay ( Duration :: from_millis ( min_delay) )
232
+ . with_max_delay ( Duration :: from_millis ( max_delay) )
233
+ . with_total_delay ( Some ( Duration :: from_millis ( total_delay) ) )
234
+ . with_max_times ( max_times)
217
235
. with_factor ( 2.0 )
218
- . build ( )
236
+ . build ( ) )
219
237
}
220
238
221
239
async fn do_commit ( & mut self , catalog : & dyn Catalog ) -> Result < Table > {
@@ -259,6 +277,7 @@ mod tests {
259
277
use std:: fs:: File ;
260
278
use std:: io:: BufReader ;
261
279
use std:: sync:: Arc ;
280
+ use std:: sync:: atomic:: { AtomicU32 , Ordering } ;
262
281
263
282
use crate :: catalog:: MockCatalog ;
264
283
use crate :: io:: FileIOBuilder ;
@@ -375,25 +394,22 @@ mod tests {
375
394
. expect_load_table ( )
376
395
. returning_st ( |_| Box :: pin ( async move { Ok ( make_v2_table ( ) ) } ) ) ;
377
396
397
+ let attempts = AtomicU32 :: new ( 0 ) ;
378
398
mock_catalog
379
399
. expect_update_table ( )
380
400
. times ( expected_calls)
381
401
. returning_st ( move |_| {
382
- if let Some ( attempts) = success_after_attempts {
383
- static mut ATTEMPTS : u32 = 0 ;
384
- unsafe {
385
- ATTEMPTS += 1 ;
386
- if ATTEMPTS <= attempts {
387
- Box :: pin ( async move {
388
- Err ( Error :: new (
389
- ErrorKind :: CatalogCommitConflicts ,
390
- "Commit conflict" ,
391
- )
392
- . with_retryable ( true ) )
393
- } )
394
- } else {
395
- Box :: pin ( async move { Ok ( make_v2_table ( ) ) } )
396
- }
402
+ if let Some ( success_after_attempts) = success_after_attempts {
403
+ attempts. fetch_add ( 1 , Ordering :: SeqCst ) ;
404
+ if attempts. load ( Ordering :: SeqCst ) <= success_after_attempts {
405
+ Box :: pin ( async move {
406
+ Err (
407
+ Error :: new ( ErrorKind :: CatalogCommitConflicts , "Commit conflict" )
408
+ . with_retryable ( true ) ,
409
+ )
410
+ } )
411
+ } else {
412
+ Box :: pin ( async move { Ok ( make_v2_table ( ) ) } )
397
413
}
398
414
} else {
399
415
// Always fail with retryable error
0 commit comments