thirdparty package kafka updated to date

This commit is contained in:
John
2019-01-02 11:02:03 +08:00
parent d6aa2b2512
commit adf06a2b0d
98 changed files with 4670 additions and 1816 deletions

View File

@ -27,6 +27,29 @@ var (
0xFF, 0xFF, 0xFF, 0xFF,
0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
overflowMessageFetchResponse = []byte{
0x00, 0x00, 0x00, 0x01,
0x00, 0x05, 't', 'o', 'p', 'i', 'c',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x05,
0x00, 0x01,
0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10,
0x00, 0x00, 0x00, 0x30,
// messageSet
0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00,
0x00, 0x00, 0x00, 0x10,
// message
0x23, 0x96, 0x4a, 0xf7, // CRC
0x00,
0x00,
0xFF, 0xFF, 0xFF, 0xFF,
0x00, 0x00, 0x00, 0x02, 0x00, 0xEE,
// overflow messageSet
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
0x00, 0x00, 0x00, 0xFF,
// overflow bytes
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
oneRecordFetchResponse = []byte{
0x00, 0x00, 0x00, 0x00, // ThrottleTime
0x00, 0x00, 0x00, 0x01, // Number of Topics
@ -148,6 +171,66 @@ func TestOneMessageFetchResponse(t *testing.T) {
}
}
func TestOverflowMessageFetchResponse(t *testing.T) {
response := FetchResponse{}
testVersionDecodable(t, "overflow message", &response, overflowMessageFetchResponse, 0)
if len(response.Blocks) != 1 {
t.Fatal("Decoding produced incorrect number of topic blocks.")
}
if len(response.Blocks["topic"]) != 1 {
t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
}
block := response.GetBlock("topic", 5)
if block == nil {
t.Fatal("GetBlock didn't return block.")
}
if block.Err != ErrOffsetOutOfRange {
t.Error("Decoding didn't produce correct error code.")
}
if block.HighWaterMarkOffset != 0x10101010 {
t.Error("Decoding didn't produce correct high water mark offset.")
}
partial, err := block.Records.isPartial()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if partial {
t.Error("Decoding detected a partial trailing message where there wasn't one.")
}
overflow, err := block.Records.isOverflow()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !overflow {
t.Error("Decoding detected a partial trailing message where there wasn't one.")
}
n, err := block.Records.numRecords()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if n != 1 {
t.Fatal("Decoding produced incorrect number of messages.")
}
msgBlock := block.Records.MsgSet.Messages[0]
if msgBlock.Offset != 0x550000 {
t.Error("Decoding produced incorrect message offset.")
}
msg := msgBlock.Msg
if msg.Codec != CompressionNone {
t.Error("Decoding produced incorrect message compression.")
}
if msg.Key != nil {
t.Error("Decoding produced message key where there was none.")
}
if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
t.Error("Decoding produced incorrect message value.")
}
}
func TestOneRecordFetchResponse(t *testing.T) {
response := FetchResponse{}
testVersionDecodable(t, "one record", &response, oneRecordFetchResponse, 4)