@@ -202,19 +202,12 @@ vy_run_env_enable_coio(struct vy_run_env *env, int threads)
202
202
*/
203
203
static int
204
204
vy_page_info_create (struct vy_page_info * page_info , uint64_t offset ,
205
- const struct tuple * min_key , const struct key_def * cmp_def )
205
+ const char * min_key )
206
206
{
207
207
memset (page_info , 0 , sizeof (* page_info ));
208
208
page_info -> offset = offset ;
209
209
page_info -> unpacked_size = 0 ;
210
- struct region * region = & fiber ()-> gc ;
211
- size_t used = region_used (region );
212
- uint32_t size ;
213
- const char * region_key = tuple_extract_key (min_key , cmp_def , & size );
214
- if (region_key == NULL )
215
- return -1 ;
216
- page_info -> min_key = vy_key_dup (region_key );
217
- region_truncate (region , used );
210
+ page_info -> min_key = vy_key_dup (min_key );
218
211
return page_info -> min_key == NULL ? -1 : 0 ;
219
212
}
220
213
@@ -247,22 +240,33 @@ vy_run_new(int64_t id)
247
240
return run ;
248
241
}
249
242
250
- void
251
- vy_run_delete (struct vy_run * run )
243
+ static void
244
+ vy_run_clear (struct vy_run * run )
252
245
{
253
- assert (run -> refs == 0 );
254
- if (run -> fd >= 0 && close (run -> fd ) < 0 )
255
- say_syserror ("close failed" );
256
246
if (run -> page_info != NULL ) {
257
247
uint32_t page_no ;
258
248
for (page_no = 0 ; page_no < run -> info .page_count ; ++ page_no )
259
249
vy_page_info_destroy (run -> page_info + page_no );
260
250
free (run -> page_info );
261
251
}
252
+ run -> page_info = NULL ;
253
+ run -> info .page_count = 0 ;
262
254
if (run -> info .has_bloom )
263
255
bloom_destroy (& run -> info .bloom , runtime .quota );
256
+ run -> info .has_bloom = false;
264
257
free (run -> info .min_key );
258
+ run -> info .min_key = NULL ;
265
259
free (run -> info .max_key );
260
+ run -> info .max_key = NULL ;
261
+ }
262
+
263
+ void
264
+ vy_run_delete (struct vy_run * run )
265
+ {
266
+ assert (run -> refs == 0 );
267
+ if (run -> fd >= 0 && close (run -> fd ) < 0 )
268
+ say_syserror ("close failed" );
269
+ vy_run_clear (run );
266
270
TRASH (run );
267
271
free (run );
268
272
}
@@ -723,9 +727,7 @@ vy_page_stmt(struct vy_page *page, uint32_t stmt_no,
723
727
struct xrow_header xrow ;
724
728
if (vy_page_xrow (page , stmt_no , & xrow ) != 0 )
725
729
return NULL ;
726
- struct tuple_format * format_to_use = (xrow .type == IPROTO_UPSERT )
727
- ? upsert_format : format ;
728
- return vy_stmt_decode (& xrow , cmp_def , format_to_use , is_primary );
730
+ return vy_stmt_decode (& xrow , cmp_def , format , upsert_format , is_primary );
729
731
}
730
732
731
733
/**
@@ -1962,9 +1964,10 @@ vy_run_recover(struct vy_run *run, const char *dir,
1962
1964
xlog_cursor_close (& cursor , true);
1963
1965
return 0 ;
1964
1966
1965
- fail_close :
1967
+ fail_close :
1966
1968
xlog_cursor_close (& cursor , false);
1967
- fail :
1969
+ fail :
1970
+ vy_run_clear (run );
1968
1971
return -1 ;
1969
1972
}
1970
1973
@@ -2031,6 +2034,26 @@ vy_row_index_encode(const uint32_t *row_index, uint32_t row_count,
2031
2034
return 0 ;
2032
2035
}
2033
2036
2037
+ /**
2038
+ * Helper to extend run page info array
2039
+ */
2040
+ static inline int
2041
+ vy_run_alloc_page_info (struct vy_run * run , uint32_t * page_info_capacity )
2042
+ {
2043
+ uint32_t cap = * page_info_capacity > 0 ?
2044
+ * page_info_capacity * 2 : 16 ;
2045
+ struct vy_page_info * page_info = realloc (run -> page_info ,
2046
+ cap * sizeof (* page_info ));
2047
+ if (page_info == NULL ) {
2048
+ diag_set (OutOfMemory , cap * sizeof (* page_info ),
2049
+ "realloc" , "struct vy_page_info" );
2050
+ return -1 ;
2051
+ }
2052
+ run -> page_info = page_info ;
2053
+ * page_info_capacity = cap ;
2054
+ return 0 ;
2055
+ }
2056
+
2034
2057
/**
2035
2058
* Write statements from the iterator to a new page in the run,
2036
2059
* update page and run statistics.
@@ -2060,34 +2083,26 @@ vy_run_write_page(struct vy_run *run, struct xlog *data_xlog,
2060
2083
struct ibuf row_index_buf ;
2061
2084
ibuf_create (& row_index_buf , & cord ()-> slabc , sizeof (uint32_t ) * 4096 );
2062
2085
2063
- if (run -> info .page_count >= * page_info_capacity ) {
2064
- uint32_t cap = * page_info_capacity > 0 ?
2065
- * page_info_capacity * 2 : 16 ;
2066
- struct vy_page_info * page_info = realloc (run -> page_info ,
2067
- cap * sizeof (* page_info ));
2068
- if (page_info == NULL ) {
2069
- diag_set (OutOfMemory , cap * sizeof (* page_info ),
2070
- "realloc" , "struct vy_page_info" );
2071
- goto error_row_index ;
2072
- }
2073
- run -> page_info = page_info ;
2074
- * page_info_capacity = cap ;
2075
- }
2086
+ if (run -> info .page_count >= * page_info_capacity &&
2087
+ vy_run_alloc_page_info (run , page_info_capacity ) != 0 )
2088
+ goto error_row_index ;
2076
2089
assert (* page_info_capacity >= run -> info .page_count );
2077
2090
2091
+ /* See comment to run_info->max_key allocation below. */
2092
+ region_key = tuple_extract_key (* curr_stmt , cmp_def , NULL );
2093
+ if (region_key == NULL )
2094
+ goto error_row_index ;
2095
+
2078
2096
if (run -> info .page_count == 0 ) {
2079
- /* See comment to run_info->max_key allocation below. */
2080
- region_key = tuple_extract_key (* curr_stmt , cmp_def , NULL );
2081
- if (region_key == NULL )
2082
- goto error_row_index ;
2083
2097
assert (run -> info .min_key == NULL );
2084
2098
run -> info .min_key = vy_key_dup (region_key );
2085
2099
if (run -> info .min_key == NULL )
2086
2100
goto error_row_index ;
2087
2101
}
2088
2102
2089
2103
page = run -> page_info + run -> info .page_count ;
2090
- vy_page_info_create (page , data_xlog -> offset , * curr_stmt , cmp_def );
2104
+ if (vy_page_info_create (page , data_xlog -> offset , region_key ) != 0 )
2105
+ goto error_row_index ;
2091
2106
xlog_tx_begin (data_xlog );
2092
2107
2093
2108
do {
@@ -2456,6 +2471,9 @@ static int
2456
2471
vy_run_write_index (struct vy_run * run , const char * dirpath ,
2457
2472
uint32_t space_id , uint32_t iid )
2458
2473
{
2474
+ struct region * region = & fiber ()-> gc ;
2475
+ size_t mem_used = region_used (region );
2476
+
2459
2477
char path [PATH_MAX ];
2460
2478
vy_run_snprint_path (path , sizeof (path ), dirpath ,
2461
2479
space_id , iid , run -> id , VY_FILE_INDEX );
@@ -2489,10 +2507,10 @@ vy_run_write_index(struct vy_run *run, const char *dirpath,
2489
2507
xlog_rename (& index_xlog ) < 0 )
2490
2508
goto fail ;
2491
2509
xlog_close (& index_xlog , false);
2492
- fiber_gc ( );
2510
+ region_truncate ( region , mem_used );
2493
2511
return 0 ;
2494
- fail :
2495
- fiber_gc ( );
2512
+ fail :
2513
+ region_truncate ( region , mem_used );
2496
2514
xlog_tx_rollback (& index_xlog );
2497
2515
xlog_close (& index_xlog , false);
2498
2516
unlink (path );
@@ -2533,6 +2551,138 @@ vy_run_write(struct vy_run *run, const char *dirpath,
2533
2551
return 0 ;
2534
2552
}
2535
2553
2554
+ int
2555
+ vy_run_rebuild_index (struct vy_run * run , const char * dir ,
2556
+ uint32_t space_id , uint32_t iid ,
2557
+ const struct key_def * cmp_def ,
2558
+ const struct key_def * key_def ,
2559
+ struct tuple_format * space_format ,
2560
+ struct tuple_format * upsert_format ,
2561
+ const struct index_opts * opts )
2562
+ {
2563
+ assert (run -> info .has_bloom == false);
2564
+ assert (run -> page_info == NULL );
2565
+ struct region * region = & fiber ()-> gc ;
2566
+ size_t mem_used = region_used (region );
2567
+
2568
+ struct xlog_cursor cursor ;
2569
+ char path [PATH_MAX ];
2570
+ vy_run_snprint_path (path , sizeof (path ), dir ,
2571
+ space_id , iid , run -> id , VY_FILE_RUN );
2572
+
2573
+ say_warn ("Rebuilding run index from %s data file" , path );
2574
+ if (xlog_cursor_open (& cursor , path ))
2575
+ return -1 ;
2576
+
2577
+ int rc = 0 ;
2578
+ uint32_t page_info_capacity = 0 ;
2579
+ uint32_t run_row_count = 0 ;
2580
+
2581
+ const char * key = NULL ;
2582
+ int64_t max_lsn = 0 ;
2583
+ int64_t min_lsn = INT64_MAX ;
2584
+
2585
+ off_t page_offset , next_page_offset = xlog_cursor_pos (& cursor );
2586
+ while ((rc = xlog_cursor_next_tx (& cursor )) == 0 ) {
2587
+ page_offset = next_page_offset ;
2588
+ next_page_offset = xlog_cursor_pos (& cursor );
2589
+
2590
+ if (run -> info .page_count == page_info_capacity &&
2591
+ vy_run_alloc_page_info (run , & page_info_capacity ) != 0 )
2592
+ goto close_err ;
2593
+ const char * page_min_key = NULL ;
2594
+ uint32_t page_row_count = 0 ;
2595
+ uint64_t page_row_index_offset = 0 ;
2596
+ uint64_t row_offset = xlog_cursor_tx_pos (& cursor );
2597
+
2598
+ struct xrow_header xrow ;
2599
+ while ((rc = xlog_cursor_next_row (& cursor , & xrow )) == 0 ) {
2600
+ if (xrow .type == VY_RUN_ROW_INDEX ) {
2601
+ page_row_index_offset = row_offset ;
2602
+ row_offset = xlog_cursor_tx_pos (& cursor );
2603
+ continue ;
2604
+ }
2605
+ ++ page_row_count ;
2606
+ key = vy_stmt_extract_key (& xrow , cmp_def ,
2607
+ space_format , upsert_format ,
2608
+ iid == 0 );
2609
+ if (key == NULL )
2610
+ goto close_err ;
2611
+ if (run -> info .min_key == NULL ) {
2612
+ run -> info .min_key = vy_key_dup (key );
2613
+ if (run -> info .min_key == NULL )
2614
+ goto close_err ;
2615
+ }
2616
+ if (page_min_key == NULL )
2617
+ page_min_key = key ;
2618
+ if (xrow .lsn > max_lsn )
2619
+ max_lsn = xrow .lsn ;
2620
+ if (xrow .lsn < min_lsn )
2621
+ min_lsn = xrow .lsn ;
2622
+ row_offset = xlog_cursor_tx_pos (& cursor );
2623
+ }
2624
+ struct vy_page_info * info ;
2625
+ info = run -> page_info + run -> info .page_count ;
2626
+ if (vy_page_info_create (info , page_offset , page_min_key ) != 0 )
2627
+ goto close_err ;
2628
+ info -> row_count = page_row_count ;
2629
+ info -> size = next_page_offset - page_offset ;
2630
+ info -> unpacked_size = xlog_cursor_tx_pos (& cursor );
2631
+ info -> row_index_offset = page_row_index_offset ;
2632
+ ++ run -> info .page_count ;
2633
+ run_row_count += page_row_count ;
2634
+ region_truncate (region , mem_used );
2635
+ }
2636
+
2637
+ if (key != NULL ) {
2638
+ run -> info .max_key = vy_key_dup (key );
2639
+ if (run -> info .max_key == NULL )
2640
+ goto close_err ;
2641
+ }
2642
+ run -> info .max_lsn = max_lsn ;
2643
+ run -> info .min_lsn = min_lsn ;
2644
+ if (xlog_cursor_reset (& cursor ) != 0 )
2645
+ goto close_err ;
2646
+ if (bloom_create (& run -> info .bloom , run_row_count ,
2647
+ opts -> bloom_fpr , runtime .quota ) != 0 ) {
2648
+ diag_set (OutOfMemory , 0 ,
2649
+ "bloom_create" , "bloom" );
2650
+ goto close_err ;
2651
+ }
2652
+ struct xrow_header xrow ;
2653
+ while ((rc = xlog_cursor_next (& cursor , & xrow , false)) == 0 ) {
2654
+ if (xrow .type == VY_RUN_ROW_INDEX )
2655
+ continue ;
2656
+
2657
+ struct tuple * tuple = vy_stmt_decode (& xrow , cmp_def , space_format ,
2658
+ upsert_format , iid == 0 );
2659
+ if (tuple == NULL )
2660
+ goto close_err ;
2661
+ bloom_add (& run -> info .bloom , tuple_hash (tuple , key_def ));
2662
+ }
2663
+ run -> info .has_bloom = true;
2664
+
2665
+ region_truncate (region , mem_used );
2666
+ run -> fd = cursor .fd ;
2667
+ xlog_cursor_close (& cursor , true);
2668
+ /* New run index is ready for write, unlink old file if exists */
2669
+ vy_run_snprint_path (path , sizeof (path ), dir ,
2670
+ space_id , iid , run -> id , VY_FILE_INDEX );
2671
+ if (unlink (path ) < 0 && errno != ENOENT ) {
2672
+ diag_set (SystemError , "failed to unlink file '%s'" ,
2673
+ path );
2674
+ goto close_err ;
2675
+ }
2676
+ if (vy_run_write_index (run , dir , space_id , iid ) != 0 )
2677
+ goto close_err ;
2678
+ return 0 ;
2679
+ close_err :
2680
+ vy_run_clear (run );
2681
+ region_truncate (region , mem_used );
2682
+ xlog_cursor_close (& cursor , false);
2683
+ return -1 ;
2684
+ }
2685
+
2536
2686
/**
2537
2687
* Read a page with stream->page_no from the run and save it in stream->page.
2538
2688
* Support function of slice stream.
0 commit comments