@@ -22,7 +22,7 @@ use std::sync::{Arc, RwLock};
22
22
23
23
use arrow_array:: RecordBatch ;
24
24
use futures:: channel:: mpsc:: { channel, Sender } ;
25
- use futures:: stream:: BoxStream ;
25
+ use futures:: stream:: { self , BoxStream } ;
26
26
use futures:: { SinkExt , StreamExt , TryFutureExt , TryStreamExt } ;
27
27
use serde:: { Deserialize , Serialize } ;
28
28
@@ -160,29 +160,33 @@ impl<'a> TableScanBuilder<'a> {
160
160
/// Build the table scan.
161
161
pub fn build ( self ) -> Result < TableScan > {
162
162
let snapshot = match self . snapshot_id {
163
- Some ( snapshot_id) => self
164
- . table
165
- . metadata ( )
166
- . snapshot_by_id ( snapshot_id)
167
- . ok_or_else ( || {
168
- Error :: new (
169
- ErrorKind :: DataInvalid ,
170
- format ! ( "Snapshot with id {} not found" , snapshot_id) ,
171
- )
172
- } ) ?
173
- . clone ( ) ,
174
- None => self
175
- . table
176
- . metadata ( )
177
- . current_snapshot ( )
178
- . ok_or_else ( || {
179
- Error :: new (
180
- ErrorKind :: FeatureUnsupported ,
181
- "Can't scan table without snapshots" ,
182
- )
183
- } ) ?
184
- . clone ( ) ,
163
+ Some ( snapshot_id) => Some (
164
+ self . table
165
+ . metadata ( )
166
+ . snapshot_by_id ( snapshot_id)
167
+ . ok_or_else ( || {
168
+ Error :: new (
169
+ ErrorKind :: DataInvalid ,
170
+ format ! ( "Snapshot with id {} not found" , snapshot_id) ,
171
+ )
172
+ } ) ?,
173
+ ) ,
174
+ None => self . table . metadata ( ) . current_snapshot ( ) ,
185
175
} ;
176
+ if snapshot. is_none ( ) {
177
+ return Ok ( TableScan {
178
+ plan_context : None ,
179
+ batch_size : None ,
180
+ file_io : self . table . file_io ( ) . clone ( ) ,
181
+ column_names : self . column_names ,
182
+ concurrency_limit_data_files : self . concurrency_limit_data_files ,
183
+ concurrency_limit_manifest_entries : self . concurrency_limit_manifest_entries ,
184
+ concurrency_limit_manifest_files : self . concurrency_limit_manifest_files ,
185
+ row_group_filtering_enabled : self . row_group_filtering_enabled ,
186
+ } ) ;
187
+ }
188
+
189
+ let snapshot = snapshot. unwrap ( ) ;
186
190
187
191
let schema = snapshot. schema ( self . table . metadata ( ) ) ?;
188
192
@@ -246,7 +250,7 @@ impl<'a> TableScanBuilder<'a> {
246
250
} ;
247
251
248
252
let plan_context = PlanContext {
249
- snapshot,
253
+ snapshot : snapshot . clone ( ) ,
250
254
table_metadata : self . table . metadata_ref ( ) ,
251
255
snapshot_schema : schema,
252
256
case_sensitive : self . case_sensitive ,
@@ -263,7 +267,7 @@ impl<'a> TableScanBuilder<'a> {
263
267
batch_size : self . batch_size ,
264
268
column_names : self . column_names ,
265
269
file_io : self . table . file_io ( ) . clone ( ) ,
266
- plan_context,
270
+ plan_context : Some ( plan_context ) ,
267
271
concurrency_limit_data_files : self . concurrency_limit_data_files ,
268
272
concurrency_limit_manifest_entries : self . concurrency_limit_manifest_entries ,
269
273
concurrency_limit_manifest_files : self . concurrency_limit_manifest_files ,
@@ -275,7 +279,7 @@ impl<'a> TableScanBuilder<'a> {
275
279
/// Table scan.
276
280
#[ derive( Debug ) ]
277
281
pub struct TableScan {
278
- plan_context : PlanContext ,
282
+ plan_context : Option < PlanContext > ,
279
283
batch_size : Option < usize > ,
280
284
file_io : FileIO ,
281
285
column_names : Vec < String > ,
@@ -316,6 +320,12 @@ struct PlanContext {
316
320
impl TableScan {
317
321
/// Returns a stream of [`FileScanTask`]s.
318
322
pub async fn plan_files ( & self ) -> Result < FileScanTaskStream > {
323
+ if self . plan_context . is_none ( ) {
324
+ return Ok ( stream:: empty ( ) . boxed ( ) ) ;
325
+ } ;
326
+
327
+ let plan_context = & self . plan_context . as_ref ( ) . unwrap ( ) ;
328
+
319
329
let concurrency_limit_manifest_files = self . concurrency_limit_manifest_files ;
320
330
let concurrency_limit_manifest_entries = self . concurrency_limit_manifest_entries ;
321
331
@@ -325,14 +335,13 @@ impl TableScan {
325
335
// used to stream the results back to the caller
326
336
let ( file_scan_task_tx, file_scan_task_rx) = channel ( concurrency_limit_manifest_entries) ;
327
337
328
- let manifest_list = self . plan_context . get_manifest_list ( ) . await ?;
338
+ let manifest_list = plan_context. get_manifest_list ( ) . await ?;
329
339
330
340
// get the [`ManifestFile`]s from the [`ManifestList`], filtering out any
331
341
// whose content type is not Data or whose partitions cannot match this
332
342
// scan's filter
333
- let manifest_file_contexts = self
334
- . plan_context
335
- . build_manifest_file_contexts ( manifest_list, manifest_entry_ctx_tx) ?;
343
+ let manifest_file_contexts =
344
+ plan_context. build_manifest_file_contexts ( manifest_list, manifest_entry_ctx_tx) ?;
336
345
337
346
let mut channel_for_manifest_error = file_scan_task_tx. clone ( ) ;
338
347
@@ -392,8 +401,11 @@ impl TableScan {
392
401
& self . column_names
393
402
}
394
403
/// Returns a reference to the snapshot of the table scan.
395
- pub fn snapshot ( & self ) -> & SnapshotRef {
396
- & self . plan_context . snapshot
404
+ pub fn snapshot ( & self ) -> Option < & SnapshotRef > {
405
+ match & self . plan_context {
406
+ Some ( plan_context) => Some ( & plan_context. snapshot ) ,
407
+ None => None ,
408
+ }
397
409
}
398
410
399
411
async fn process_manifest_entry (
@@ -1175,7 +1187,7 @@ mod tests {
1175
1187
let table_scan = table. scan ( ) . build ( ) . unwrap ( ) ;
1176
1188
assert_eq ! (
1177
1189
table. metadata( ) . current_snapshot( ) . unwrap( ) . snapshot_id( ) ,
1178
- table_scan. snapshot( ) . snapshot_id( )
1190
+ table_scan. snapshot( ) . unwrap ( ) . snapshot_id( )
1179
1191
) ;
1180
1192
}
1181
1193
@@ -1196,7 +1208,10 @@ mod tests {
1196
1208
. snapshot_id ( 3051729675574597004 )
1197
1209
. build ( )
1198
1210
. unwrap ( ) ;
1199
- assert_eq ! ( table_scan. snapshot( ) . snapshot_id( ) , 3051729675574597004 ) ;
1211
+ assert_eq ! (
1212
+ table_scan. snapshot( ) . unwrap( ) . snapshot_id( ) ,
1213
+ 3051729675574597004
1214
+ ) ;
1200
1215
}
1201
1216
1202
1217
#[ tokio:: test]
0 commit comments