File tree Expand file tree Collapse file tree 1 file changed +11
-3
lines changed Expand file tree Collapse file tree 1 file changed +11
-3
lines changed Original file line number Diff line number Diff line change @@ -66,13 +66,21 @@ def self.decode(decoder)
6666
6767 crc = message_decoder . int32
6868 magic_byte = message_decoder . int8
69+ attributes = message_decoder . int8
6970
70- unless magic_byte == MAGIC_BYTE
71+ # The magic byte indicates the message format version. There are situations
72+ # where an old message format can be returned from a newer version of Kafka,
73+ # because old messages are not necessarily rewritten on upgrades.
74+ case magic_byte
75+ when 0
76+ # No timestamp in the pre-0.10 message format.
77+ timestamp = nil
78+ when 1
79+ timestamp = message_decoder . int64
80+ else
7181 raise Kafka ::Error , "Invalid magic byte: #{ magic_byte } "
7282 end
7383
74- attributes = message_decoder . int8
75- timestamp = message_decoder . int64
7684 key = message_decoder . bytes
7785 value = message_decoder . bytes
7886
You can’t perform that action at this time.
0 commit comments