Skip to content

Support interruptible send within the Producer #59

@idugalic

Description

@idugalic

First of all, thanks for taking this initiative. I like it, a lot! It is an opportunity for idiomatic (async and reactive) Kotlin integration with Kafka (coroutines and Flow included).

I wonder if we can improve this suspending function in the Producer.kt :

public suspend fun <A, B> KafkaProducer<A, B>.sendAwait(
  record: ProducerRecord<A, B>,
): RecordMetadata =
  suspendCoroutine { cont ->
    // Those can be a SerializationException when it fails to serialize the message,
    // a BufferExhaustedException or TimeoutException if the buffer is full,
    // or an InterruptException if the sending thread was interrupted.
    send(record) { a, e ->
      // null if an error occurred, see: org.apache.kafka.clients.producer.Callback
      if (a != null) cont.resume(a) else cont.resumeWithException(e)
    }
  }

Do we want to interrupt it (the thread) on the coroutine cancelation? Or, I am exaggerating, and this is not needed?

I can see that you have wrapped the poll method on the Consumer.kt in runIterruptable:

 runInterruptible(dispatcher) {
        poll(timeout.toJavaDuration())
      }

Obviously, this is a blocking function, and there is no (async) callback like in the send method on the Producer side. It was interesting to learn that the exception is of type org.apache.kafka.common.errors.InterruptException :) But, this part is clear. It will work.

I am just trying to understand these two low-level methods at first, and how we translate them into the Kotlin Coroutines (suspension) world.

Metadata

Metadata

Assignees

No one assigned

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions