Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ dependency-reduced-pom.xml
.flattened-pom.xml
*.java-version
*.DS_Store
.vscode*
.vscode*
10 changes: 10 additions & 0 deletions src/main/java/io/lettuce/core/cluster/ClusterCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,14 @@ public String toString() {
return sb.toString();
}

@Override
public void markEncodingError() {
command.markEncodingError();
}

@Override
public boolean hasEncodingError() {
return command.hasEncodingError();
}

}
10 changes: 10 additions & 0 deletions src/main/java/io/lettuce/core/protocol/AsyncCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,4 +232,14 @@ public int hashCode() {
return toHash != null ? toHash.hashCode() : 0;
}

@Override
public void markEncodingError() {
command.markEncodingError();
}

@Override
public boolean hasEncodingError() {
return command.hasEncodingError();
}

}
16 changes: 16 additions & 0 deletions src/main/java/io/lettuce/core/protocol/Command.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ public class Command<K, V, T> implements RedisCommand<K, V, T> {

protected volatile byte status = ST_INITIAL;

/**
* Flag to track encoding failures. When true, indicates this command
* failed during encoding and was never successfully sent to Redis.
*/
private volatile boolean encodingError = false;

/**
* Create a new command with the supplied type.
*
Expand Down Expand Up @@ -183,4 +189,14 @@ public boolean isDone() {
return status != ST_INITIAL;
}

@Override
public void markEncodingError() {
this.encodingError = true;
}

@Override
public boolean hasEncodingError() {
return encodingError;
}

}
1 change: 1 addition & 0 deletions src/main/java/io/lettuce/core/protocol/CommandEncoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ private void encode(ChannelHandlerContext ctx, ByteBuf out, RedisCommand<?, ?, ?
command.encode(out);
} catch (RuntimeException e) {
out.resetWriterIndex();
command.markEncodingError();
command.completeExceptionally(new EncoderException(
"Cannot encode command. Please close the connection as the connection state may be out of sync.", e));
}
Expand Down
45 changes: 34 additions & 11 deletions src/main/java/io/lettuce/core/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCom
/**
* Initialize a new instance that handles commands from the supplied queue.
*
* @param clientOptions client options for this connection, must not be {@code null}
* @param clientOptions client options for this connection, must not be {@code null}
* @param clientResources client resources for this connection, must not be {@code null}
* @param endpoint must not be {@code null}.
* @param endpoint must not be {@code null}.
*/
public CommandHandler(ClientOptions clientOptions, ClientResources clientResources, Endpoint endpoint) {

Expand Down Expand Up @@ -283,17 +283,28 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
InternalLogLevel logLevel = InternalLogLevel.WARN;

if (!stack.isEmpty()) {
RedisCommand<?, ?, ?> command = stack.poll();
if (debugEnabled) {
logger.debug("{} Storing exception in {}", logPrefix(), command);
// Clean up any encoding failures at head of stack first
while (!stack.isEmpty() && stack.peek().hasEncodingError()) {
RedisCommand<?, ?, ?> failed = stack.poll();
// Encoding failures were already completed exceptionally during encoding
if (debugEnabled) {
logger.debug("{} Cleaning up encoding failure command {}", logPrefix(), failed);
}
}
logLevel = InternalLogLevel.DEBUG;

try {
command.completeExceptionally(cause);
} catch (Exception ex) {
logger.warn("{} Unexpected exception during command completion exceptionally: {}", logPrefix, ex.toString(),
ex);
if (!stack.isEmpty()) {
RedisCommand<?, ?, ?> command = stack.poll();
if (debugEnabled) {
logger.debug("{} Storing exception in {}", logPrefix(), command);
}
logLevel = InternalLogLevel.DEBUG;

try {
command.completeExceptionally(cause);
} catch (Exception ex) {
logger.warn("{} Unexpected exception during command completion exceptionally: {}", logPrefix, ex.toString(),
ex);
}
}
}

Expand Down Expand Up @@ -672,6 +683,18 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup
} else {

RedisCommand<?, ?, ?> command = stack.peek();
// Clean up encoding failures before processing valid responses
while (!stack.isEmpty() && stack.peek().hasEncodingError()) {
RedisCommand<?, ?, ?> failed = stack.poll();
if (debugEnabled) {
logger.debug("{} Cleaning up encoding failure command {}", logPrefix(), failed);
}
// Encoding failures were already completed exceptionally during encoding
if (!stack.isEmpty()) {
command = stack.peek();
}
}

if (debugEnabled) {
logger.debug("{} Stack contains: {} commands", logPrefix(), stack.size());
}
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/io/lettuce/core/protocol/CommandWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,16 @@ public void setOutput(CommandOutput<K, V, T> output) {
command.setOutput(output);
}

@Override
public void markEncodingError() {
command.markEncodingError();
}

@Override
public boolean hasEncodingError() {
return command.hasEncodingError();
}

@Override
@SuppressWarnings({ "rawtypes", "unchecked" })
public void onComplete(Consumer<? super T> action) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,14 @@ public void set(long integer) {

}

@Override
public void markEncodingError() {
// Default implementation - pristine fallback commands don't track encoding errors
}

@Override
public boolean hasEncodingError() {
return false; // Default implementation - assume no encoding errors
}

}
18 changes: 18 additions & 0 deletions src/main/java/io/lettuce/core/protocol/RedisCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,24 @@ public interface RedisCommand<K, V, T> {
*/
boolean isDone();

/**
* Mark this command as having failed during encoding.
* This indicates the command was never successfully sent to Redis.
*
* @since 6.2.2-uber-0.5
*/
void markEncodingError();

/**
* Returns {@code true} if this command failed during encoding and was never sent to Redis.
* Commands with encoding errors should be cleaned up from the response queue without
* waiting for Redis responses.
*
* @return {@code true} if this command failed during encoding
* @since 6.2.2-uber-0.5
*/
boolean hasEncodingError();

/**
* Set a new output. Only possible as long as the command is not completed/cancelled.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright 2011-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lettuce.core.protocol;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import javax.inject.Inject;

import io.lettuce.core.RedisClient;
import io.lettuce.core.TestSupport;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.codec.RedisCodec;
import io.netty.handler.codec.EncoderException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;

import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.internal.Futures;
import io.lettuce.test.LettuceExtension;

/**
* Integration tests for command encoding error scenarios with GET/SET commands
* against a Redis test instance.
*
* @author Lettuce Contributors
*/
@ExtendWith(LettuceExtension.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class CommandEncodingErrorIntegrationTests extends TestSupport {

private final RedisClient client;
private final StatefulRedisConnection<String, String> connection;

@Inject
CommandEncodingErrorIntegrationTests(RedisClient client, StatefulRedisConnection<String, String> connection) {
this.client = client;
this.connection = connection;
}

@BeforeEach
void setUp() {
this.connection.async().flushall();
}

@Test
void testCommandsWithCustomCodec() {
// Create a codec that fails during value encoding with "encoding_failure" keyword
RedisCodec<String, String> failingCodec = new RedisCodec<String, String>() {
@Override
public String decodeKey(ByteBuffer bytes) {
return StandardCharsets.UTF_8.decode(bytes).toString();
}

@Override
public String decodeValue(ByteBuffer bytes) {
return StandardCharsets.UTF_8.decode(bytes).toString();
}

@Override
public ByteBuffer encodeKey(String key) {
return StandardCharsets.UTF_8.encode(key);
}

@Override
public ByteBuffer encodeValue(String value) {
// Only throw exception for specific value to test selective encoding failure
if ("encoding_failure".equals(value)) {
throw new RuntimeException("Simulated encoding failure during value encoding");
}
return StandardCharsets.UTF_8.encode(value);
}
};

try (StatefulRedisConnection<String, String> customConnection = client.connect(failingCodec)) {
RedisCommands<String, String> customRedis = customConnection.sync();

// First, test that normal values work fine
String normalKey = "normal-key";
String normalValue = "normal-value";

String result = customRedis.set(normalKey, normalValue);
assertThat(result).isEqualTo("OK");

String retrieved = customRedis.get(normalKey);
assertThat(retrieved).isEqualTo(normalValue);

// Now test that the specific failure value throws an exception
String failingKey = "failing-key";
String failingValue = "encoding_failure";

assertThatThrownBy(() -> customRedis.set(failingKey, failingValue))
.isInstanceOf(EncoderException.class)
.hasMessageContaining("Cannot encode command");

// test that we can get correct response after encoding failure
retrieved = customRedis.get(normalKey);
assertThat(retrieved).isEqualTo(normalValue);
}
}
}
Loading