18
18
//! Describes the interface and built-in implementations of schemas,
19
19
//! representing collections of named tables.
20
20
21
- use parking_lot:: { Mutex , RwLock } ;
21
+ use parking_lot:: RwLock ;
22
22
use std:: any:: Any ;
23
23
use std:: collections:: HashMap ;
24
24
use std:: sync:: Arc ;
25
25
26
- use crate :: datasource:: listing:: { ListingTable , ListingTableConfig , ListingTableUrl } ;
27
- use crate :: datasource:: object_store:: ObjectStoreRegistry ;
28
26
use crate :: datasource:: TableProvider ;
29
27
use crate :: error:: { DataFusionError , Result } ;
30
- use datafusion_data_access:: object_store:: ObjectStore ;
31
28
32
29
/// Represents a schema, comprising a number of named tables.
33
30
pub trait SchemaProvider : Sync + Send {
@@ -130,135 +127,19 @@ impl SchemaProvider for MemorySchemaProvider {
130
127
}
131
128
}
132
129
133
- /// `ObjectStore` implementation of `SchemaProvider` to enable registering a `ListingTable`
134
- pub struct ObjectStoreSchemaProvider {
135
- tables : RwLock < HashMap < String , Arc < dyn TableProvider > > > ,
136
- object_store_registry : Arc < Mutex < ObjectStoreRegistry > > ,
137
- }
138
-
139
- impl ObjectStoreSchemaProvider {
140
- /// Instantiates a new `ObjectStoreSchemaProvider` with an empty collection of tables.
141
- pub fn new ( ) -> Self {
142
- Self {
143
- tables : RwLock :: new ( HashMap :: new ( ) ) ,
144
- object_store_registry : Arc :: new ( Mutex :: new ( ObjectStoreRegistry :: new ( ) ) ) ,
145
- }
146
- }
147
-
148
- /// Assign an `ObjectStore` which enables calling `register_listing_table`.
149
- pub fn register_object_store (
150
- & self ,
151
- scheme : impl Into < String > ,
152
- object_store : Arc < dyn ObjectStore > ,
153
- ) -> Option < Arc < dyn ObjectStore > > {
154
- self . object_store_registry
155
- . lock ( )
156
- . register_store ( scheme. into ( ) , object_store)
157
- }
158
-
159
- /// Retrieves a `ObjectStore` instance for a given Url
160
- pub fn object_store (
161
- & self ,
162
- url : impl AsRef < url:: Url > ,
163
- ) -> Result < Arc < dyn ObjectStore > > {
164
- self . object_store_registry
165
- . lock ( )
166
- . get_by_url ( url)
167
- . map_err ( DataFusionError :: from)
168
- }
169
-
170
- /// If supported by the implementation, adds a new table to this schema by creating a
171
- /// `ListingTable` from the provided `url` and a previously registered `ObjectStore`.
172
- /// If a table of the same name existed before, it returns "Table already exists" error.
173
- pub async fn register_listing_table (
174
- & self ,
175
- name : & str ,
176
- table_path : ListingTableUrl ,
177
- config : Option < ListingTableConfig > ,
178
- ) -> Result < ( ) > {
179
- let config = match config {
180
- Some ( cfg) => cfg,
181
- None => {
182
- let object_store = self . object_store ( & table_path) ?;
183
- ListingTableConfig :: new ( object_store, table_path)
184
- . infer ( )
185
- . await ?
186
- }
187
- } ;
188
-
189
- let table = Arc :: new ( ListingTable :: try_new ( config) ?) ;
190
- self . register_table ( name. into ( ) , table) ?;
191
- Ok ( ( ) )
192
- }
193
- }
194
-
195
- impl Default for ObjectStoreSchemaProvider {
196
- fn default ( ) -> Self {
197
- Self :: new ( )
198
- }
199
- }
200
-
201
- impl SchemaProvider for ObjectStoreSchemaProvider {
202
- fn as_any ( & self ) -> & dyn Any {
203
- self
204
- }
205
-
206
- fn table_names ( & self ) -> Vec < String > {
207
- let tables = self . tables . read ( ) ;
208
- tables. keys ( ) . cloned ( ) . collect ( )
209
- }
210
-
211
- fn table ( & self , name : & str ) -> Option < Arc < dyn TableProvider > > {
212
- let tables = self . tables . read ( ) ;
213
- tables. get ( name) . cloned ( )
214
- }
215
-
216
- fn register_table (
217
- & self ,
218
- name : String ,
219
- table : Arc < dyn TableProvider > ,
220
- ) -> Result < Option < Arc < dyn TableProvider > > > {
221
- if self . table_exist ( name. as_str ( ) ) {
222
- return Err ( DataFusionError :: Execution ( format ! (
223
- "The table {} already exists" ,
224
- name
225
- ) ) ) ;
226
- }
227
- let mut tables = self . tables . write ( ) ;
228
- Ok ( tables. insert ( name, table) )
229
- }
230
-
231
- fn deregister_table ( & self , name : & str ) -> Result < Option < Arc < dyn TableProvider > > > {
232
- let mut tables = self . tables . write ( ) ;
233
- Ok ( tables. remove ( name) )
234
- }
235
-
236
- fn table_exist ( & self , name : & str ) -> bool {
237
- let tables = self . tables . read ( ) ;
238
- tables. contains_key ( name)
239
- }
240
- }
241
-
242
130
#[ cfg( test) ]
243
131
mod tests {
244
- use std:: ffi:: OsStr ;
245
- use std:: path:: Path ;
246
132
use std:: sync:: Arc ;
247
133
248
134
use arrow:: datatypes:: Schema ;
135
+ use datafusion_data_access:: object_store:: local:: LocalFileSystem ;
249
136
250
137
use crate :: assert_batches_eq;
251
- use crate :: catalog:: catalog:: CatalogProvider ;
252
- use crate :: catalog:: catalog:: MemoryCatalogProvider ;
253
- use crate :: catalog:: schema:: {
254
- MemorySchemaProvider , ObjectStoreSchemaProvider , SchemaProvider ,
255
- } ;
256
- use crate :: datafusion_data_access:: object_store:: local:: LocalFileSystem ;
138
+ use crate :: catalog:: catalog:: { CatalogProvider , MemoryCatalogProvider } ;
139
+ use crate :: catalog:: schema:: { MemorySchemaProvider , SchemaProvider } ;
257
140
use crate :: datasource:: empty:: EmptyTable ;
258
- use crate :: execution:: context:: SessionContext ;
259
-
260
- use crate :: datasource:: listing:: ListingTableUrl ;
261
- use futures:: StreamExt ;
141
+ use crate :: datasource:: listing:: { ListingTable , ListingTableConfig , ListingTableUrl } ;
142
+ use crate :: prelude:: SessionContext ;
262
143
263
144
#[ tokio:: test]
264
145
async fn test_mem_provider ( ) {
@@ -282,22 +163,27 @@ mod tests {
282
163
#[ tokio:: test]
283
164
async fn test_schema_register_listing_table ( ) {
284
165
let testdata = crate :: test_util:: parquet_test_data ( ) ;
285
- let filename = format ! ( "{}/{}" , testdata, "alltypes_plain.parquet" ) ;
166
+ let filename = format ! ( "test:/// {}/{}" , testdata, "alltypes_plain.parquet" ) ;
286
167
let table_path = ListingTableUrl :: parse ( filename) . unwrap ( ) ;
287
168
288
- let schema = ObjectStoreSchemaProvider :: new ( ) ;
289
- let _store = schema . register_object_store ( "test" , Arc :: new ( LocalFileSystem { } ) ) ;
169
+ let catalog = MemoryCatalogProvider :: new ( ) ;
170
+ let schema = MemorySchemaProvider :: new ( ) ;
290
171
291
- schema
292
- . register_listing_table ( "alltypes_plain" , table_path, None )
172
+ let ctx = SessionContext :: new ( ) ;
173
+ let store = Arc :: new ( LocalFileSystem { } ) ;
174
+ ctx. runtime_env ( ) . register_object_store ( "test" , store) ;
175
+
176
+ let config = ListingTableConfig :: new ( table_path)
177
+ . infer ( & ctx. state ( ) )
293
178
. await
294
179
. unwrap ( ) ;
180
+ let table = ListingTable :: try_new ( config) . unwrap ( ) ;
295
181
296
- let catalog = MemoryCatalogProvider :: new ( ) ;
297
- catalog. register_schema ( "active" , Arc :: new ( schema) ) . unwrap ( ) ;
298
-
299
- let ctx = SessionContext :: new ( ) ;
182
+ schema
183
+ . register_table ( "alltypes_plain" . to_string ( ) , Arc :: new ( table) )
184
+ . unwrap ( ) ;
300
185
186
+ catalog. register_schema ( "active" , Arc :: new ( schema) ) . unwrap ( ) ;
301
187
ctx. register_catalog ( "cat" , Arc :: new ( catalog) ) ;
302
188
303
189
let df = ctx
@@ -323,61 +209,4 @@ mod tests {
323
209
] ;
324
210
assert_batches_eq ! ( expected, & actual) ;
325
211
}
326
-
327
- #[ tokio:: test]
328
- async fn test_schema_register_listing_tables ( ) {
329
- let testdata = crate :: test_util:: parquet_test_data ( ) ;
330
-
331
- let schema = ObjectStoreSchemaProvider :: new ( ) ;
332
- let store = schema
333
- . register_object_store ( "file" , Arc :: new ( LocalFileSystem { } ) )
334
- . unwrap ( ) ;
335
-
336
- let mut files = store. list_file ( & testdata) . await . unwrap ( ) ;
337
- while let Some ( file) = files. next ( ) . await {
338
- let sized_file = file. unwrap ( ) . sized_file ;
339
- let path = Path :: new ( & sized_file. path ) ;
340
- let file = path. file_name ( ) . unwrap ( ) ;
341
- if file == OsStr :: new ( "alltypes_dictionary.parquet" )
342
- || file == OsStr :: new ( "alltypes_plain.parquet" )
343
- {
344
- let name = path. file_stem ( ) . unwrap ( ) . to_str ( ) . unwrap ( ) ;
345
- let path = ListingTableUrl :: parse ( & sized_file. path ) . unwrap ( ) ;
346
- schema
347
- . register_listing_table ( name, path, None )
348
- . await
349
- . unwrap ( ) ;
350
- }
351
- }
352
-
353
- let tables = vec ! [
354
- String :: from( "alltypes_dictionary" ) ,
355
- String :: from( "alltypes_plain" ) ,
356
- ] ;
357
-
358
- let mut schema_tables = schema. table_names ( ) ;
359
- schema_tables. sort ( ) ;
360
- assert_eq ! ( schema_tables, tables) ;
361
- }
362
-
363
- #[ tokio:: test]
364
- #[ should_panic( expected = "already exists" ) ]
365
- async fn test_schema_register_same_listing_table ( ) {
366
- let testdata = crate :: test_util:: parquet_test_data ( ) ;
367
- let filename = format ! ( "{}/{}" , testdata, "alltypes_plain.parquet" ) ;
368
- let table_path = ListingTableUrl :: parse ( filename) . unwrap ( ) ;
369
-
370
- let schema = ObjectStoreSchemaProvider :: new ( ) ;
371
- let _store = schema. register_object_store ( "test" , Arc :: new ( LocalFileSystem { } ) ) ;
372
-
373
- schema
374
- . register_listing_table ( "alltypes_plain" , table_path. clone ( ) , None )
375
- . await
376
- . unwrap ( ) ;
377
-
378
- schema
379
- . register_listing_table ( "alltypes_plain" , table_path, None )
380
- . await
381
- . unwrap ( ) ;
382
- }
383
212
}
0 commit comments