From 7693d3705405aa839196d7396f0c390e3f452ec0 Mon Sep 17 00:00:00 2001 From: "Rob Hansen (github)" Date: Thu, 25 May 2023 21:38:40 -0700 Subject: [PATCH 1/3] CreateTopics: only suppress topic already exists errors --- createtopics.go | 16 +++------- createtopics_test.go | 71 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 74 insertions(+), 13 deletions(-) diff --git a/createtopics.go b/createtopics.go index 561425a53..8ad9ebf44 100644 --- a/createtopics.go +++ b/createtopics.go @@ -3,7 +3,6 @@ package kafka import ( "bufio" "context" - "errors" "fmt" "net" "time" @@ -65,7 +64,6 @@ func (c *Client) CreateTopics(ctx context.Context, req *CreateTopicsRequest) (*C TimeoutMs: c.timeoutMs(ctx, defaultCreateTopicsTimeout), ValidateOnly: req.ValidateOnly, }) - if err != nil { return nil, fmt.Errorf("kafka.(*Client).CreateTopics: %w", err) } @@ -363,6 +361,9 @@ func (c *Conn) createTopics(request createTopicsRequestV0) (createTopicsResponse return response, err } for _, tr := range response.TopicErrors { + if tr.ErrorCode == int16(TopicAlreadyExists) { + continue + } if tr.ErrorCode != 0 { return response, Error(tr.ErrorCode) } @@ -385,14 +386,5 @@ func (c *Conn) CreateTopics(topics ...TopicConfig) error { _, err := c.createTopics(createTopicsRequestV0{ Topics: requestV0Topics, }) - if err != nil { - if errors.Is(err, TopicAlreadyExists) { - // ok - return nil - } - - return err - } - - return nil + return err } diff --git a/createtopics_test.go b/createtopics_test.go index 71cf456e0..b42e6d171 100644 --- a/createtopics_test.go +++ b/createtopics_test.go @@ -4,10 +4,80 @@ import ( "bufio" "bytes" "context" + "errors" + "net" "reflect" + "strconv" "testing" ) +func TestConnCreateTopics(t *testing.T) { + topic1 := makeTopic() + topic2 := makeTopic() + + conn, err := DialContext(context.Background(), "tcp", "localhost:9092") + defer conn.Close() + if err != nil { + panic(err) + } + + controller, _ := conn.Controller() + + controllerConn, err := Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) + if err != nil { + t.Fatal(err) + } + defer controllerConn.Close() + + err = controllerConn.CreateTopics(TopicConfig{ + Topic: topic1, + NumPartitions: 1, + ReplicationFactor: 1, + }) + if err != nil { + t.Fatalf("unexpected error creating topic: %s", err.Error()) + } + + err = controllerConn.CreateTopics(TopicConfig{ + Topic: topic1, + NumPartitions: 1, + ReplicationFactor: 1, + }) + + // Duplicate topic should not return an error + if err != nil { + t.Fatalf("unexpected error creating duplicate topic topic: %v", err) + } + + err = controllerConn.CreateTopics( + TopicConfig{ + Topic: topic1, + NumPartitions: 1, + ReplicationFactor: 1, + }, + TopicConfig{ + Topic: topic2, + NumPartitions: 1, + ReplicationFactor: 1, + }, + TopicConfig{ + Topic: topic2, + NumPartitions: 1, + ReplicationFactor: 1, + }, + ) + + if err == nil { + t.Fatal("CreateTopics should have returned an error for invalid requests") + } + + if !errors.Is(err, InvalidRequest) { + t.Fatalf("expected invalid request: %v", err) + } + + deleteTopic(t, topic1, topic2) +} + func TestClientCreateTopics(t *testing.T) { const ( topic1 = "client-topic-1" @@ -59,7 +129,6 @@ func TestClientCreateTopics(t *testing.T) { }, }, }) - if err != nil { t.Fatal(err) } From e76ab9877cd5520d0b92c91cdae164f095311006 Mon Sep 17 00:00:00 2001 From: "Rob Hansen (github)" Date: Thu, 25 May 2023 21:55:11 -0700 Subject: [PATCH 2/3] only delete topic1 --- createtopics_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/createtopics_test.go b/createtopics_test.go index b42e6d171..61349f2f9 100644 --- a/createtopics_test.go +++ b/createtopics_test.go @@ -75,7 +75,7 @@ func TestConnCreateTopics(t *testing.T) { t.Fatalf("expected invalid request: %v", err) } - deleteTopic(t, topic1, topic2) + deleteTopic(t, topic1) } func TestClientCreateTopics(t *testing.T) { From c96cc0e02c8bc9b4fe908a90c969a7755c564e83 Mon Sep 17 00:00:00 2001 From: "Rob Hansen (github)" Date: Thu, 25 May 2023 21:58:58 -0700 Subject: [PATCH 3/3] check conn Close error --- createtopics_test.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/createtopics_test.go b/createtopics_test.go index 61349f2f9..38819c382 100644 --- a/createtopics_test.go +++ b/createtopics_test.go @@ -16,11 +16,17 @@ func TestConnCreateTopics(t *testing.T) { topic2 := makeTopic() conn, err := DialContext(context.Background(), "tcp", "localhost:9092") - defer conn.Close() if err != nil { - panic(err) + t.Fatal(err) } + defer func() { + err := conn.Close() + if err != nil { + t.Fatalf("failed to close connection: %v", err) + } + }() + controller, _ := conn.Controller() controllerConn, err := Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))