@@ -218,6 +218,14 @@ impl NodeType {
218
218
NodeType :: Querier => "querier" ,
219
219
}
220
220
}
221
+
222
+ fn to_mode ( & self ) -> Mode {
223
+ match self {
224
+ NodeType :: Ingestor => Mode :: Ingest ,
225
+ NodeType :: Indexer => Mode :: Index ,
226
+ NodeType :: Querier => Mode :: Query ,
227
+ }
228
+ }
221
229
}
222
230
223
231
impl fmt:: Display for NodeType {
@@ -274,96 +282,119 @@ impl NodeMetadata {
274
282
storage : & dyn ObjectStorageProvider ,
275
283
node_type : NodeType ,
276
284
) -> Arc < Self > {
277
- // all the files should be in the staging directory root
278
- let entries = options
279
- . staging_dir ( )
280
- . read_dir ( )
281
- . expect ( "Couldn't read from file" ) ;
285
+ let staging_path = options. staging_dir ( ) ;
286
+ let node_type_str = node_type. as_str ( ) ;
282
287
283
- let mode = match node_type {
284
- NodeType :: Ingestor => Mode :: Ingest ,
285
- NodeType :: Indexer => Mode :: Index ,
286
- NodeType :: Querier => Mode :: Query ,
287
- } ;
288
+ // Attempt to load metadata from staging
289
+ if let Some ( mut meta) = Self :: load_from_staging ( staging_path, node_type_str, options) {
290
+ Self :: update_metadata ( & mut meta, options, node_type) ;
291
+ meta. put_on_disk ( staging_path)
292
+ . expect ( "Couldn't write updated metadata to disk" ) ;
293
+ return Arc :: new ( meta) ;
294
+ }
288
295
289
- let url = options. get_url ( mode) ;
290
- let port = url. port ( ) . unwrap_or ( 80 ) . to_string ( ) ;
291
- let url = url. to_string ( ) ;
292
- let Options {
293
- username, password, ..
294
- } = options;
295
- let staging_path = options. staging_dir ( ) ;
296
- let flight_port = options. flight_port . to_string ( ) ;
297
- let type_str = node_type. as_str ( ) ;
296
+ // If no metadata is found in staging, create a new one
297
+ let meta = Self :: create_new_metadata ( options, storage, node_type) ;
298
+ meta. put_on_disk ( staging_path)
299
+ . expect ( "Couldn't write new metadata to disk" ) ;
300
+ Arc :: new ( meta)
301
+ }
302
+
303
+ /// Load metadata from the staging directory
304
+ fn load_from_staging (
305
+ staging_path : & Path ,
306
+ node_type_str : & str ,
307
+ options : & Options ,
308
+ ) -> Option < Self > {
309
+ let entries = staging_path
310
+ . read_dir ( )
311
+ . expect ( "Couldn't read from staging directory" ) ;
298
312
299
313
for entry in entries {
300
- // the staging directory will have only one file with the node type in the name
301
- // so the JSON Parse should not error unless the file is corrupted
302
314
let path = entry. expect ( "Should be a directory entry" ) . path ( ) ;
303
- if !path
304
- . file_name ( )
305
- . and_then ( |s| s. to_str ( ) )
306
- . is_some_and ( |s| s. contains ( type_str) )
307
- {
315
+ if !Self :: is_valid_metadata_file ( & path, node_type_str) {
308
316
continue ;
309
317
}
310
318
311
- // get the metadata from staging
312
- let bytes = std:: fs:: read ( path) . expect ( "File should be present" ) ;
313
- let mut meta = Self :: from_bytes ( & bytes, options. flight_port )
314
- . unwrap_or_else ( |_| panic ! ( "Extracted {} metadata" , type_str) ) ;
315
-
316
- // compare url endpoint and port, update
317
- if meta. domain_name != url {
318
- info ! (
319
- "Domain Name was Updated. Old: {} New: {}" ,
320
- meta. domain_name, url
321
- ) ;
322
- meta. domain_name = url;
319
+ let bytes = std:: fs:: read ( & path) . expect ( "File should be present" ) ;
320
+ match Self :: from_bytes ( & bytes, options. flight_port ) {
321
+ Ok ( meta) => return Some ( meta) ,
322
+ Err ( e) => {
323
+ error ! ( "Failed to extract {} metadata: {}" , node_type_str, e) ;
324
+ return None ;
325
+ }
323
326
}
327
+ }
324
328
325
- if meta. port != port {
326
- info ! ( "Port was Updated. Old: {} New: {}" , meta. port, port) ;
327
- meta. port = port;
328
- }
329
+ None
330
+ }
331
+
332
+ /// Check if a file is a valid metadata file for the given node type
333
+ fn is_valid_metadata_file ( path : & Path , node_type_str : & str ) -> bool {
334
+ path. file_name ( )
335
+ . and_then ( |s| s. to_str ( ) )
336
+ . map_or ( false , |s| s. contains ( node_type_str) )
337
+ }
338
+
339
+ /// Update metadata fields if they differ from the current configuration
340
+ fn update_metadata ( meta : & mut Self , options : & Options , node_type : NodeType ) {
341
+ let url = options. get_url ( node_type. to_mode ( ) ) ;
342
+ let port = url. port ( ) . unwrap_or ( 80 ) . to_string ( ) ;
343
+ let url = url. to_string ( ) ;
329
344
330
- let token = format ! (
331
- "Basic {}" ,
332
- BASE64_STANDARD . encode( format!( "{username}:{password}" ) )
345
+ if meta. domain_name != url {
346
+ info ! (
347
+ "Domain Name was Updated. Old: {} New: {}" ,
348
+ meta. domain_name, url
333
349
) ;
334
- if meta. token != token {
335
- // TODO: Update the message to be more informative with username and password
336
- warn ! (
337
- "Credentials were Updated. Tokens updated; Old: {} New: {}" ,
338
- meta. token, token
339
- ) ;
340
- meta. token = token;
341
- }
350
+ meta. domain_name = url;
351
+ }
342
352
343
- meta. node_type . clone_from ( & node_type) ;
344
- meta. put_on_disk ( staging_path)
345
- . expect ( "Couldn't write to disk" ) ;
353
+ if meta. port != port {
354
+ info ! ( "Port was Updated. Old: {} New: {}" , meta. port, port) ;
355
+ meta. port = port;
356
+ }
346
357
347
- return Arc :: new ( meta) ;
358
+ let token = Self :: generate_token ( & options. username , & options. password ) ;
359
+ if meta. token != token {
360
+ warn ! (
361
+ "Credentials were Updated. Tokens updated; Old: {} New: {}" ,
362
+ meta. token, token
363
+ ) ;
364
+ meta. token = token;
348
365
}
349
366
350
- let storage = storage. get_object_store ( ) ;
351
- let node_id = get_node_id ( ) ;
367
+ meta. node_type = node_type;
368
+ }
369
+
370
+ /// Create a new metadata instance
371
+ fn create_new_metadata (
372
+ options : & Options ,
373
+ storage : & dyn ObjectStorageProvider ,
374
+ node_type : NodeType ,
375
+ ) -> Self {
376
+ let url = options. get_url ( node_type. to_mode ( ) ) ;
377
+ let port = url. port ( ) . unwrap_or ( 80 ) . to_string ( ) ;
378
+ let url = url. to_string ( ) ;
352
379
353
- let meta = Self :: new (
380
+ Self :: new (
354
381
port,
355
382
url,
356
- storage. get_bucket_name ( ) ,
357
- username,
358
- password,
359
- node_id ,
360
- flight_port,
383
+ storage. get_object_store ( ) . get_bucket_name ( ) ,
384
+ & options . username ,
385
+ & options . password ,
386
+ get_node_id ( ) ,
387
+ options . flight_port . to_string ( ) ,
361
388
node_type,
362
- ) ;
389
+ )
390
+ }
363
391
364
- meta. put_on_disk ( staging_path)
365
- . expect ( "Should Be valid Json" ) ;
366
- Arc :: new ( meta)
392
+ /// Generate a token from the username and password
393
+ fn generate_token ( username : & str , password : & str ) -> String {
394
+ format ! (
395
+ "Basic {}" ,
396
+ BASE64_STANDARD . encode( format!( "{username}:{password}" ) )
397
+ )
367
398
}
368
399
369
400
pub fn get_node_id ( & self ) -> String {
0 commit comments