File tree Expand file tree Collapse file tree 1 file changed +14
-6
lines changed Expand file tree Collapse file tree 1 file changed +14
-6
lines changed Original file line number Diff line number Diff line change @@ -229,21 +229,29 @@ def lz4_encode_old_kafka(payload):
229229 assert xxhash is not None
230230 data = lz4_encode (payload )
231231 header_size = 7
232- if isinstance ( data [4 ], int ):
233- flg = data [ 4 ]
234- else :
235- flg = ord ( data [ 4 ])
232+ flg = data [4 ]
233+ if not isinstance ( flg , int ):
234+ flg = ord ( flg )
235+
236236 content_size_bit = ((flg >> 3 ) & 1 )
237237 if content_size_bit :
238- header_size += 8
238+ # Old kafka does not accept the content-size field
239+ # so we need to discard it and reset the header flag
240+ flg -= 8
241+ data = bytearray (data )
242+ data [4 ] = flg
243+ data = bytes (data )
244+ payload = data [header_size + 8 :]
245+ else :
246+ payload = data [header_size :]
239247
240248 # This is the incorrect hc
241249 hc = xxhash .xxh32 (data [0 :header_size - 1 ]).digest ()[- 2 :- 1 ] # pylint: disable-msg=no-member
242250
243251 return b'' .join ([
244252 data [0 :header_size - 1 ],
245253 hc ,
246- data [ header_size :]
254+ payload
247255 ])
248256
249257
You can’t perform that action at this time.
0 commit comments