Skip to content

GH-2198: Spring Observability Initial Commit #2394

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Sep 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -335,12 +335,14 @@ project ('spring-kafka') {
optionalApi 'io.projectreactor:reactor-core'
optionalApi 'io.projectreactor.kafka:reactor-kafka'
optionalApi 'io.micrometer:micrometer-core'
api 'io.micrometer:micrometer-observation'
optionalApi 'io.micrometer:micrometer-tracing'

testImplementation project (':spring-kafka-test')
testImplementation 'io.projectreactor:reactor-test'
testImplementation "org.mockito:mockito-junit-jupiter:$mockitoVersion"
testImplementation "org.hibernate.validator:hibernate-validator:$hibernateValidationVersion"
testImplementation 'io.micrometer:micrometer-observation-test'
testImplementation 'io.micrometer:micrometer-tracing-bridge-brave'
testImplementation 'io.micrometer:micrometer-tracing-test'
testImplementation 'io.micrometer:micrometer-tracing-integration-test'
Expand Down
16 changes: 15 additions & 1 deletion spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3310,7 +3310,6 @@ IMPORTANT: By default, the application context's event multicaster invokes event
If you change the multicaster to use an async executor, thread cleanup is not effective.

[[micrometer]]

==== Monitoring

===== Monitoring Listener Performance
Expand Down Expand Up @@ -3398,6 +3397,21 @@ double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")

A similar listener is provided for the `StreamsBuilderFactoryBean` - see <<streams-micrometer>>.

[[observation]]
===== Micrometer Observation

Using Micrometer for observation is now supported, since version 3.0, for the `KafkaTemplate` and listener containers.

Set `observationEnabled` on each component to enable observation; this will disable <<micrometer,Micrometer Timers>> because the timers will now be managed with each observation.

Refer to https://micrometer.io/docs/tracing[Micrometer Tracing] for more information.

To add tags to timers/traces, configure a custom `KafkaTemplateObservationConvention` or `KafkaListenerObservationConvention` to the template or listener container, respectively.

The default implementations add the `bean.name` tag for template observations and `listener.id` tag for containers.

You can either subclass `DefaultKafkaTemplateObservationConvention` or `DefaultKafkaListenerObservationConvention` or provide completely new implementations.

[[transactions]]
==== Transactions

Expand Down
6 changes: 6 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ IMPORTANT: When using transactions, the minimum broker version is 2.5.

See <<exactly-once>> and https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics[KIP-447] for more information.

[[x30-obs]]
==== Observation

Enabling observation for timers and tracing using Micrometer is now supported.
See <<observation>> for more information.

[[x30-global-embedded-kafka]]
==== Global Single Embedded Kafka

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@

import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
Expand All @@ -62,13 +64,20 @@
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.micrometer.DefaultKafkaTemplateObservationConvention;
import org.springframework.kafka.support.micrometer.KafkaRecordSenderContext;
import org.springframework.kafka.support.micrometer.KafkaTemplateObservation;
import org.springframework.kafka.support.micrometer.KafkaTemplateObservationConvention;
import org.springframework.kafka.support.micrometer.MicrometerHolder;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;

/**
* A template for executing high-level operations. When used with a
* {@link DefaultKafkaProducerFactory}, the template is thread-safe. The producer factory
Expand All @@ -90,7 +99,7 @@
*/
@SuppressWarnings("deprecation")
public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationContextAware, BeanNameAware,
ApplicationListener<ContextStoppedEvent>, DisposableBean {
ApplicationListener<ContextStoppedEvent>, DisposableBean, SmartInitializingSingleton {

protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass())); //NOSONAR

Expand Down Expand Up @@ -126,11 +135,17 @@ public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationCo

private ConsumerFactory<K, V> consumerFactory;

private volatile boolean micrometerEnabled = true;
private ProducerInterceptor<K, V> producerInterceptor;

private boolean micrometerEnabled = true;

private volatile MicrometerHolder micrometerHolder;
private MicrometerHolder micrometerHolder;

private ProducerInterceptor<K, V> producerInterceptor;
private boolean observationEnabled;

private KafkaTemplateObservationConvention observationConvention;

private ObservationRegistry observationRegistry;

/**
* Create an instance using the supplied producer factory and autoFlush false.
Expand Down Expand Up @@ -382,6 +397,37 @@ public void setProducerInterceptor(ProducerInterceptor<K, V> producerInterceptor
this.producerInterceptor = producerInterceptor;
}

/**
* Set to true to enable observation via Micrometer.
* @param observationEnabled true to enable.
* @since 3.0
* @see #setMicrometerEnabled(boolean)
*/
public void setObservationEnabled(boolean observationEnabled) {
this.observationEnabled = observationEnabled;
}

/**
* Set a custom {@link KafkaTemplateObservationConvention}.
* @param observationConvention the convention.
* @since 3.0
*/
public void setObservationConvention(KafkaTemplateObservationConvention observationConvention) {
this.observationConvention = observationConvention;
}

@Override
public void afterSingletonsInstantiated() {
if (this.observationEnabled && this.observationRegistry == null && this.applicationContext != null) {
ObjectProvider<ObservationRegistry> registry =
this.applicationContext.getBeanProvider(ObservationRegistry.class);
this.observationRegistry = registry.getIfUnique();
}
else if (this.micrometerEnabled) {
this.micrometerHolder = obtainMicrometerHolder();
}
}

@Override
public void onApplicationEvent(ContextStoppedEvent event) {
if (this.customProducerFactory) {
Expand Down Expand Up @@ -412,33 +458,33 @@ public CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long t
@Override
public CompletableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);
return doSend(producerRecord);
return observeSend(producerRecord);
}

@Override
public CompletableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, data);
return doSend(producerRecord);
return observeSend(producerRecord);
}

@Override
public CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, key, data);
return doSend(producerRecord);
return observeSend(producerRecord);
}

@Override
public CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key,
@Nullable V data) {

ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, timestamp, key, data);
return doSend(producerRecord);
return observeSend(producerRecord);
}

@Override
public CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record) {
Assert.notNull(record, "'record' cannot be null");
return doSend(record);
return observeSend(record);
}

@SuppressWarnings("unchecked")
Expand All @@ -451,7 +497,7 @@ public CompletableFuture<SendResult<K, V>> send(Message<?> message) {
producerRecord.headers().add(KafkaHeaders.CORRELATION_ID, correlationId);
}
}
return doSend((ProducerRecord<K, V>) producerRecord);
return observeSend((ProducerRecord<K, V>) producerRecord);
}


Expand Down Expand Up @@ -621,28 +667,48 @@ protected void closeProducer(Producer<K, V> producer, boolean inTx) {
}
}

private CompletableFuture<SendResult<K, V>> observeSend(final ProducerRecord<K, V> producerRecord) {
Observation observation;
if (!this.observationEnabled || this.observationRegistry == null) {
observation = Observation.NOOP;
}
else {
observation = KafkaTemplateObservation.TEMPLATE_OBSERVATION.observation(
this.observationConvention, DefaultKafkaTemplateObservationConvention.INSTANCE,
new KafkaRecordSenderContext(producerRecord, this.beanName), this.observationRegistry);
}
try {
observation.start();
return doSend(producerRecord, observation);
}
catch (RuntimeException ex) {
observation.error(ex);
observation.stop();
throw ex;
}
}
/**
* Send the producer record.
* @param producerRecord the producer record.
* @param observation the observation.
* @return a Future for the {@link org.apache.kafka.clients.producer.RecordMetadata
* RecordMetadata}.
*/
protected CompletableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
protected CompletableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord,
@Nullable Observation observation) {

final Producer<K, V> producer = getTheProducer(producerRecord.topic());
this.logger.trace(() -> "Sending: " + KafkaUtils.format(producerRecord));
final CompletableFuture<SendResult<K, V>> future = new CompletableFuture<>();
Object sample = null;
if (this.micrometerEnabled && this.micrometerHolder == null) {
this.micrometerHolder = obtainMicrometerHolder();
}
if (this.micrometerHolder != null) {
sample = this.micrometerHolder.start();
}
if (this.producerInterceptor != null) {
this.producerInterceptor.onSend(producerRecord);
}
Future<RecordMetadata> sendFuture =
producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample));
producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample, observation));
// May be an immediate failure
if (sendFuture.isDone()) {
try {
Expand All @@ -664,7 +730,7 @@ protected CompletableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V>
}

private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final Producer<K, V> producer,
final CompletableFuture<SendResult<K, V>> future, @Nullable Object sample) {
final CompletableFuture<SendResult<K, V>> future, @Nullable Object sample, Observation observation) {

return (metadata, exception) -> {
try {
Expand All @@ -680,6 +746,7 @@ private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final
if (sample != null) {
this.micrometerHolder.success(sample);
}
observation.stop();
future.complete(new SendResult<>(producerRecord, metadata));
if (KafkaTemplate.this.producerListener != null) {
KafkaTemplate.this.producerListener.onSuccess(producerRecord, metadata);
Expand All @@ -691,6 +758,8 @@ private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final
if (sample != null) {
this.micrometerHolder.failure(sample, exception.getClass().getSimpleName());
}
observation.error(exception);
observation.stop();
future.completeExceptionally(
new KafkaProducerException(producerRecord, "Failed to send", exception));
if (KafkaTemplate.this.producerListener != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.springframework.aop.support.AopUtils;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.micrometer.KafkaListenerObservationConvention;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.transaction.PlatformTransactionManager;
Expand Down Expand Up @@ -262,6 +263,8 @@ public enum EOSMode {

private boolean micrometerEnabled = true;

private boolean observationEnabled;

private Duration consumerStartTimeout = DEFAULT_CONSUMER_START_TIMEOUT;

private Boolean subBatchPerPartition;
Expand All @@ -282,6 +285,8 @@ public enum EOSMode {

private boolean pauseImmediate;

private KafkaListenerObservationConvention observationConvention;

/**
* Create properties for a container that will subscribe to the specified topics.
* @param topics the topics.
Expand Down Expand Up @@ -635,13 +640,28 @@ public boolean isMicrometerEnabled() {

/**
* Set to false to disable the Micrometer listener timers. Default true.
* Disabled when {@link #setObservationEnabled(boolean)} is true.
* @param micrometerEnabled false to disable.
* @since 2.3
*/
public void setMicrometerEnabled(boolean micrometerEnabled) {
this.micrometerEnabled = micrometerEnabled;
}

public boolean isObservationEnabled() {
return this.observationEnabled;
}

/**
* Set to true to enable observation via Micrometer.
* @param observationEnabled true to enable.
* @since 3.0
* @see #setMicrometerEnabled(boolean)
*/
public void setObservationEnabled(boolean observationEnabled) {
this.observationEnabled = observationEnabled;
}

/**
* Set additional tags for the Micrometer listener timers.
* @param tags the tags.
Expand Down Expand Up @@ -912,6 +932,19 @@ private void adviseListenerIfNeeded() {
}
}

public KafkaListenerObservationConvention getObservationConvention() {
return this.observationConvention;
}

/**
* Set a custom {@link KafkaListenerObservationConvention}.
* @param observationConvention the convention.
* @since 3.0
*/
public void setObservationConvention(KafkaListenerObservationConvention observationConvention) {
this.observationConvention = observationConvention;
}

@Override
public String toString() {
return "ContainerProperties ["
Expand Down Expand Up @@ -942,7 +975,12 @@ public String toString() {
+ "\n stopContainerWhenFenced=" + this.stopContainerWhenFenced
+ "\n stopImmediate=" + this.stopImmediate
+ "\n asyncAcks=" + this.asyncAcks
+ "\n idleBeforeDataMultiplier" + this.idleBeforeDataMultiplier
+ "\n idleBeforeDataMultiplier=" + this.idleBeforeDataMultiplier
+ "\n micrometerEnabled=" + this.micrometerEnabled
+ "\n observationEnabled=" + this.observationEnabled
+ (this.observationConvention != null
? "\n observationConvention=" + this.observationConvention
: "")
+ "\n]";
}

Expand Down
Loading