@@ -329,7 +329,7 @@ def __init__(self, magic, compression_type, batch_size):
329
329
self ._batch_size = batch_size
330
330
self ._buffer = bytearray ()
331
331
332
- def append (self , offset , timestamp , key , value , headers = None ):
332
+ def append (self , offset , timestamp , key_bytes , value_bytes , headers = None ):
333
333
""" Append message to batch.
334
334
"""
335
335
assert not headers , "Headers not supported in v0/v1"
@@ -344,18 +344,18 @@ def append(self, offset, timestamp, key, value, headers=None):
344
344
raise TypeError (
345
345
"`timestamp` should be int, but {} provided" .format (
346
346
type (timestamp )))
347
- if not (key is None or
348
- isinstance (key , (bytes , bytearray , memoryview ))):
347
+ if not (key_bytes is None or
348
+ isinstance (key_bytes , (bytes , bytearray , memoryview ))):
349
349
raise TypeError (
350
- "Not supported type for key: {}" .format (type (key )))
351
- if not (value is None or
352
- isinstance (value , (bytes , bytearray , memoryview ))):
350
+ "Not supported type for key: {}" .format (type (key_bytes )))
351
+ if not (value_bytes is None or
352
+ isinstance (value_bytes , (bytes , bytearray , memoryview ))):
353
353
raise TypeError (
354
- "Not supported type for value: {}" .format (type (value )))
354
+ "Not supported type for value: {}" .format (type (value_bytes )))
355
355
356
356
# Check if we have room for another message
357
357
pos = len (self ._buffer )
358
- size = self .size_in_bytes (offset , timestamp , key , value )
358
+ size = self .size_in_bytes (offset , timestamp , key_bytes , value_bytes )
359
359
# We always allow at least one record to be appended
360
360
if offset != 0 and pos + size >= self ._batch_size :
361
361
return None
@@ -364,11 +364,11 @@ def append(self, offset, timestamp, key, value, headers=None):
364
364
self ._buffer .extend (bytearray (size ))
365
365
366
366
# Encode message
367
- crc = self ._encode_msg (pos , offset , timestamp , key , value )
367
+ crc = self ._encode_msg (pos , offset , timestamp , key_bytes , value_bytes )
368
368
369
369
return LegacyRecordMetadata (offset , crc , size , timestamp )
370
370
371
- def _encode_msg (self , start_pos , offset , timestamp , key , value ,
371
+ def _encode_msg (self , start_pos , offset , timestamp , key_bytes , value_bytes ,
372
372
attributes = 0 ):
373
373
""" Encode msg data into the `msg_buffer`, which should be allocated
374
374
to at least the size of this message.
@@ -380,24 +380,24 @@ def _encode_msg(self, start_pos, offset, timestamp, key, value,
380
380
# Write key and value
381
381
pos += self .KEY_OFFSET_V0 if magic == 0 else self .KEY_OFFSET_V1
382
382
383
- if key is None :
383
+ if key_bytes is None :
384
384
struct .pack_into (">i" , buf , pos , - 1 )
385
385
pos += self .KEY_LENGTH
386
386
else :
387
- key_size = len (key )
387
+ key_size = len (key_bytes )
388
388
struct .pack_into (">i" , buf , pos , key_size )
389
389
pos += self .KEY_LENGTH
390
- buf [pos : pos + key_size ] = key
390
+ buf [pos : pos + key_size ] = key_bytes
391
391
pos += key_size
392
392
393
- if value is None :
393
+ if value_bytes is None :
394
394
struct .pack_into (">i" , buf , pos , - 1 )
395
395
pos += self .VALUE_LENGTH
396
396
else :
397
- value_size = len (value )
397
+ value_size = len (value_bytes )
398
398
struct .pack_into (">i" , buf , pos , value_size )
399
399
pos += self .VALUE_LENGTH
400
- buf [pos : pos + value_size ] = value
400
+ buf [pos : pos + value_size ] = value_bytes
401
401
pos += value_size
402
402
length = (pos - start_pos ) - self .LOG_OVERHEAD
403
403
@@ -430,15 +430,15 @@ def _maybe_compress(self):
430
430
else :
431
431
compressed = lz4_encode (data )
432
432
size = self .size_in_bytes (
433
- 0 , timestamp = 0 , key = None , value = compressed )
433
+ 0 , timestamp = 0 , key_bytes = None , value_bytes = compressed )
434
434
# We will try to reuse the same buffer if we have enough space
435
435
if size > len (self ._buffer ):
436
436
self ._buffer = bytearray (size )
437
437
else :
438
438
del self ._buffer [size :]
439
439
self ._encode_msg (
440
440
start_pos = 0 ,
441
- offset = 0 , timestamp = 0 , key = None , value = compressed ,
441
+ offset = 0 , timestamp = 0 , key_bytes = None , value_bytes = compressed ,
442
442
attributes = self ._compression_type )
443
443
return True
444
444
return False
@@ -455,20 +455,20 @@ def size(self):
455
455
456
456
# Size calculations. Just copied Java's implementation
457
457
458
- def size_in_bytes (self , offset , timestamp , key , value , headers = None ):
458
+ def size_in_bytes (self , offset , timestamp , key_bytes , value_bytes , headers = None ):
459
459
""" Actual size of message to add
460
460
"""
461
461
assert not headers , "Headers not supported in v0/v1"
462
462
magic = self ._magic
463
- return self .LOG_OVERHEAD + self .record_size (magic , key , value )
463
+ return self .LOG_OVERHEAD + self .record_size (magic , key_bytes , value_bytes )
464
464
465
465
@classmethod
466
- def record_size (cls , magic , key , value ):
466
+ def record_size (cls , magic , key_bytes , value_bytes ):
467
467
message_size = cls .record_overhead (magic )
468
- if key is not None :
469
- message_size += len (key )
470
- if value is not None :
471
- message_size += len (value )
468
+ if key_bytes is not None :
469
+ message_size += len (key_bytes )
470
+ if value_bytes is not None :
471
+ message_size += len (value_bytes )
472
472
return message_size
473
473
474
474
@classmethod
@@ -480,17 +480,17 @@ def record_overhead(cls, magic):
480
480
return cls .RECORD_OVERHEAD_V1
481
481
482
482
@classmethod
483
- def estimate_size_in_bytes (cls , magic , compression_type , key , value ):
483
+ def estimate_size_in_bytes (cls , magic , compression_type , key_bytes , value_bytes ):
484
484
""" Upper bound estimate of record size.
485
485
"""
486
486
assert magic in [0 , 1 ], "Not supported magic"
487
487
# In case of compression we may need another overhead for inner msg
488
488
if compression_type :
489
489
return (
490
490
cls .LOG_OVERHEAD + cls .record_overhead (magic ) +
491
- cls .record_size (magic , key , value )
491
+ cls .record_size (magic , key_bytes , value_bytes )
492
492
)
493
- return cls .LOG_OVERHEAD + cls .record_size (magic , key , value )
493
+ return cls .LOG_OVERHEAD + cls .record_size (magic , key_bytes , value_bytes )
494
494
495
495
496
496
class LegacyRecordMetadata (object ):
0 commit comments