diff --git a/balancer.go b/balancer.go index cd2e8c1c4..f4768cf88 100644 --- a/balancer.go +++ b/balancer.go @@ -260,7 +260,7 @@ func (b CRC32Balancer) Balance(msg Message, partitions ...int) (partition int) { // determine which partition to route messages to. This ensures that messages // with the same key are routed to the same partition. This balancer is // compatible with the partitioner used by the Java library and by librdkafka's -// "murmur2" and "murmur2_random" partitioners. / +// "murmur2" and "murmur2_random" partitioners. // // With the Consistent field false (default), this partitioner is equivalent to // the "murmur2_random" setting in librdkafka. When Consistent is true, this diff --git a/compress/compress.go b/compress/compress.go index 6e92968f2..054bf03d0 100644 --- a/compress/compress.go +++ b/compress/compress.go @@ -13,7 +13,7 @@ import ( "github.com/segmentio/kafka-go/compress/zstd" ) -// Compression represents the the compression applied to a record set. +// Compression represents the compression applied to a record set. type Compression int8 const ( diff --git a/fetch.go b/fetch.go index e682aeadb..eafd0de88 100644 --- a/fetch.go +++ b/fetch.go @@ -49,7 +49,7 @@ type FetchResponse struct { Topic string Partition int - // Informations about the topic partition layout returned from the broker. + // Information about the topic partition layout returned from the broker. // // LastStableOffset requires the kafka broker to support the Fetch API in // version 4 or above (otherwise the value is zero). diff --git a/produce.go b/produce.go index 1a196fe6b..26281a0b8 100644 --- a/produce.go +++ b/produce.go @@ -117,21 +117,21 @@ type ProduceResponse struct { // Time at which the broker wrote the records to the topic partition. // - // This field will be zero if the kafka broker did no support the Produce - // API in version 2 or above. + // This field will be zero if the kafka broker did not support Produce API + // version 2 or above. LogAppendTime time.Time // First offset in the topic partition that the records were written to. // - // This field will be zero if the kafka broker did no support the Produce - // API in version 5 or above (or if the first offset is zero). + // This field will be zero if the kafka broker did not support Produce API + // version 5 or above (or if the first offset is zero). LogStartOffset int64 // If errors occurred writing specific records, they will be reported in // this map. // - // This field will always be empty if the kafka broker did no support the - // Produce API in version 8 or above. + // This field will always be empty if the kafka broker did not support + // Produce API version 8 or above. RecordErrors map[int]error } diff --git a/reader.go b/reader.go index fed5d1d4e..13a336e51 100644 --- a/reader.go +++ b/reader.go @@ -19,7 +19,7 @@ const ( ) const ( - // defaultCommitRetries holds the number commit attempts to make + // defaultCommitRetries holds the number of commit attempts to make // before giving up. defaultCommitRetries = 3 ) @@ -785,7 +785,7 @@ func (r *Reader) Close() error { // offset when called. Note that this could result in an offset being committed // before the message is fully processed. // -// If more fine grained control of when offsets are committed is required, it +// If more fine-grained control of when offsets are committed is required, it // is recommended to use FetchMessage with CommitMessages instead. func (r *Reader) ReadMessage(ctx context.Context) (Message, error) { m, err := r.FetchMessage(ctx) @@ -1220,7 +1220,7 @@ func (r *Reader) start(offsetsByPartition map[topicPartition]int64) { } // A reader reads messages from kafka and produces them on its channels, it's -// used as an way to asynchronously fetch messages while the main program reads +// used as a way to asynchronously fetch messages while the main program reads // them using the high level reader API. type reader struct { dialer *Dialer diff --git a/reader_test.go b/reader_test.go index d73bdfbe3..f413d7429 100644 --- a/reader_test.go +++ b/reader_test.go @@ -313,7 +313,7 @@ func createTopic(t *testing.T, topic string, partitions int) { }) if err != nil { if !errors.Is(err, TopicAlreadyExists) { - err = fmt.Errorf("creaetTopic, conn.createtTopics: %w", err) + err = fmt.Errorf("createTopic, conn.createTopics: %w", err) t.Error(err) t.FailNow() } diff --git a/transport.go b/transport.go index 6ba2d638c..b66422966 100644 --- a/transport.go +++ b/transport.go @@ -60,7 +60,7 @@ type Transport struct { // Time limit set for establishing connections to the kafka cluster. This // limit includes all round trips done to establish the connections (TLS - // hadbhaske, SASL negotiation, etc...). + // handshake, SASL negotiation, etc...). // // Defaults to 5s. DialTimeout time.Duration @@ -150,7 +150,7 @@ func (t *Transport) CloseIdleConnections() { // package. // // The type of the response message will match the type of the request. For -// exmple, if RoundTrip was called with a *fetch.Request as argument, the value +// example, if RoundTrip was called with a *fetch.Request as argument, the value // returned will be of type *fetch.Response. It is safe for the program to do a // type assertion after checking that no error was returned. // @@ -413,14 +413,16 @@ func (p *connPool) roundTrip(ctx context.Context, req Request) (Response, error) case *meta.Response: m := req.(*meta.Request) // If we get here with allow auto topic creation then - // we didn't have that topic in our cache so we should update + // we didn't have that topic in our cache, so we should update // the cache. if m.AllowAutoTopicCreation { topicsToRefresh := make([]string, 0, len(resp.Topics)) for _, topic := range resp.Topics { - // fixes issue 806: don't refresh topics that failed to create, - // it may means kafka doesn't enable auto topic creation. - // This causes the library to hang indefinitely, same as createtopics process. + // Don't refresh topics that failed to create, since that may + // mean that enable automatic topic creation is not enabled. + // That causes the library to hang indefinitely, same as + // don't refresh topics that failed to create, + // createtopics process. Fixes issue 806. if topic.ErrorCode != 0 { continue } diff --git a/writer.go b/writer.go index f3c6bd4d1..ce69ab30c 100644 --- a/writer.go +++ b/writer.go @@ -533,7 +533,7 @@ func (w *Writer) enter() bool { // completed. func (w *Writer) leave() { w.group.Done() } -// spawn starts an new asynchronous operation on the writer. This method is used +// spawn starts a new asynchronous operation on the writer. This method is used // instead of starting goroutines inline to help manage the state of the // writer's wait group. The wait group is used to block Close calls until all // inflight operations have completed, therefore automatically including those diff --git a/writer_test.go b/writer_test.go index d34358865..70a44ca8d 100644 --- a/writer_test.go +++ b/writer_test.go @@ -131,7 +131,7 @@ func TestWriter(t *testing.T) { }, { - scenario: "writing messsages with a small batch byte size", + scenario: "writing messages with a small batch byte size", function: testWriterSmallBatchBytes, }, { @@ -159,7 +159,7 @@ func TestWriter(t *testing.T) { function: testWriterInvalidPartition, }, { - scenario: "writing a message to a non-existant topic creates the topic", + scenario: "writing a message to a non-existent topic creates the topic", function: testWriterAutoCreateTopic, }, {