1
1
# -*- coding: utf-8 -*-
2
2
from __future__ import unicode_literals
3
3
import pytest
4
+ from mock import patch
5
+ import kafka .codec
4
6
from kafka .record .default_records import (
5
7
DefaultRecordBatch , DefaultRecordBatchBuilder
6
8
)
9
+ from kafka .errors import UnsupportedCodecError
7
10
8
11
9
12
@pytest .mark .parametrize ("compression_type" , [
@@ -17,7 +20,7 @@ def test_read_write_serde_v2(compression_type):
17
20
magic = 2 , compression_type = compression_type , is_transactional = 1 ,
18
21
producer_id = 123456 , producer_epoch = 123 , base_sequence = 9999 ,
19
22
batch_size = 999999 )
20
- headers = [] # [ ("header1", b"aaa"), ("header2", b"bbb")]
23
+ headers = [("header1" , b"aaa" ), ("header2" , b"bbb" )]
21
24
for offset in range (10 ):
22
25
builder .append (
23
26
offset , timestamp = 9999999 , key = b"test" , value = b"Super" ,
@@ -167,3 +170,35 @@ def test_default_batch_size_limit():
167
170
2 , timestamp = None , key = None , value = b"M" * 700 , headers = [])
168
171
assert meta is None
169
172
assert len (builder .build ()) < 1000
173
+
174
+
175
+ @pytest .mark .parametrize ("compression_type,name,checker_name" , [
176
+ (DefaultRecordBatch .CODEC_GZIP , "gzip" , "has_gzip" ),
177
+ (DefaultRecordBatch .CODEC_SNAPPY , "snappy" , "has_snappy" ),
178
+ (DefaultRecordBatch .CODEC_LZ4 , "lz4" , "has_lz4" )
179
+ ])
180
+ @pytest .mark .parametrize ("magic" , [0 , 1 ])
181
+ def test_unavailable_codec (magic , compression_type , name , checker_name ):
182
+ builder = DefaultRecordBatchBuilder (
183
+ magic = 2 , compression_type = compression_type , is_transactional = 0 ,
184
+ producer_id = - 1 , producer_epoch = - 1 , base_sequence = - 1 ,
185
+ batch_size = 1024 )
186
+ builder .append (0 , timestamp = None , key = None , value = b"M" * 2000 , headers = [])
187
+ correct_buffer = builder .build ()
188
+
189
+ with patch .object (kafka .codec , checker_name ) as mocked :
190
+ mocked .return_value = False
191
+ # Check that builder raises error
192
+ builder = DefaultRecordBatchBuilder (
193
+ magic = 2 , compression_type = compression_type , is_transactional = 0 ,
194
+ producer_id = - 1 , producer_epoch = - 1 , base_sequence = - 1 ,
195
+ batch_size = 1024 )
196
+ error_msg = "Libraries for {} compression codec not found" .format (name )
197
+ with pytest .raises (UnsupportedCodecError , match = error_msg ):
198
+ builder .append (0 , timestamp = None , key = None , value = b"M" , headers = [])
199
+ builder .build ()
200
+
201
+ # Check that reader raises same error
202
+ batch = DefaultRecordBatch (bytes (correct_buffer ))
203
+ with pytest .raises (UnsupportedCodecError , match = error_msg ):
204
+ list (batch )
0 commit comments