Skip to content

Lwt awareness #125

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,20 @@ public interface PreparedStatement {
@NonNull
ColumnDefinitions getResultSetDefinitions();

/**
* Informs if this is an LWT query.
*
* <p>Not guaranteed to return true for LWT queries (but guaranteed to return false for non-LWT
* ones). It can happen for several reasons, for example: using Cassandra instead of Scylla, using
* too old Scylla version, future changes in driver allowing channels to be created without
* sending OPTIONS request.
*
* <p>More information about LWT:
*
* @see <a href="https://docs.scylladb.com/using-scylla/lwt/">Docs about LWT</a>
*/
boolean isLWT();

/**
* Updates {@link #getResultMetadataId()} and {@link #getResultSetDefinitions()} atomically.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*
* Copyright (C) 2022 ScyllaDB
*
* Modified by ScyllaDB
*/
package com.datastax.oss.driver.api.core.cql;

import com.datastax.oss.driver.api.core.ConsistencyLevel;
Expand Down Expand Up @@ -516,6 +522,20 @@ default SelfT setNowInSeconds(int nowInSeconds) {
return (SelfT) this;
}

/**
* Informs if this is a prepared LWT query.
*
* <p>Not guaranteed to return true for prepared LWT queries (but guaranteed to return false for
* non-LWT ones). It can happen for several reasons, for example: using Cassandra instead of
* Scylla, using too old Scylla version, future changes in driver allowing channels to be created
* without sending OPTIONS request.
*
* <p>More information about LWT:
*
* @see <a href="https://docs.scylladb.com/using-scylla/lwt/">Docs about LWT</a>
*/
boolean isLWT();

/**
* Calculates the approximate size in bytes that the statement will have when encoded.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.datastax.oss.driver.internal.core.adminrequest.AdminRequestHandler;
import com.datastax.oss.driver.internal.core.adminrequest.ThrottledAdminRequestHandler;
import com.datastax.oss.driver.internal.core.pool.ChannelPool;
import com.datastax.oss.driver.internal.core.protocol.LwtInfo;
import com.datastax.oss.driver.internal.core.protocol.ShardingInfo;
import com.datastax.oss.driver.internal.core.protocol.ShardingInfo.ConnectionShardingInfo;
import com.datastax.oss.driver.internal.core.session.DefaultSession;
Expand Down Expand Up @@ -60,6 +61,7 @@ public class DriverChannel {
AttributeKey.newInstance("options");
static final AttributeKey<ConnectionShardingInfo> SHARDING_INFO_KEY =
AttributeKey.newInstance("sharding_info");
static final AttributeKey<LwtInfo> LWT_INFO_KEY = AttributeKey.newInstance("lwt_info");

@SuppressWarnings("RedundantStringConstructorCall")
static final Object GRACEFUL_CLOSE_MESSAGE = new String("GRACEFUL_CLOSE_MESSAGE");
Expand Down Expand Up @@ -154,6 +156,10 @@ public ShardingInfo getShardingInfo() {
: null;
}

public LwtInfo getLwtInfo() {
return channel.attr(LWT_INFO_KEY).get();
}

/**
* @return the number of available stream ids on the channel; more precisely, this is the number
* of {@link #preAcquireId()} calls for which the id has not been released yet. This is used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
*/
package com.datastax.oss.driver.internal.core.channel;

import static com.datastax.oss.driver.internal.core.channel.DriverChannel.LWT_INFO_KEY;

import com.datastax.oss.driver.api.core.DefaultProtocolVersion;
import com.datastax.oss.driver.api.core.InvalidKeyspaceException;
import com.datastax.oss.driver.api.core.ProtocolVersion;
Expand All @@ -36,6 +38,7 @@
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.protocol.BytesToSegmentDecoder;
import com.datastax.oss.driver.internal.core.protocol.FrameToSegmentEncoder;
import com.datastax.oss.driver.internal.core.protocol.LwtInfo;
import com.datastax.oss.driver.internal.core.protocol.SegmentToBytesEncoder;
import com.datastax.oss.driver.internal.core.protocol.SegmentToFrameDecoder;
import com.datastax.oss.driver.internal.core.protocol.ShardingInfo;
Expand All @@ -61,7 +64,9 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import net.jcip.annotations.NotThreadSafe;
import org.slf4j.Logger;
Expand All @@ -88,6 +93,7 @@ class ProtocolInitHandler extends ConnectInitHandler {
private String logPrefix;
private ChannelHandlerContext ctx;
private final boolean querySupportedOptions;
private LwtInfo lwtInfo;

/**
* @param querySupportedOptions whether to send OPTIONS as the first message, to request which
Expand Down Expand Up @@ -181,7 +187,11 @@ Message getRequest() {
case OPTIONS:
return request = Options.INSTANCE;
case STARTUP:
return request = new Startup(context.getStartupOptions());
Map<String, String> startupOptions = new HashMap<>(context.getStartupOptions());
if (lwtInfo != null) {
lwtInfo.addOption(startupOptions);
}
return request = new Startup(startupOptions);
case GET_CLUSTER_NAME:
return request = CLUSTER_NAME_QUERY;
case SET_KEYSPACE:
Expand Down Expand Up @@ -212,9 +222,13 @@ void onResponse(Message response) {
if (step == Step.OPTIONS && response instanceof Supported) {
channel.attr(DriverChannel.OPTIONS_KEY).set(((Supported) response).options);
Supported res = (Supported) response;
ConnectionShardingInfo info = ShardingInfo.parseShardingInfo(res.options);
if (info != null) {
channel.attr(DriverChannel.SHARDING_INFO_KEY).set(info);
ConnectionShardingInfo shardingInfo = ShardingInfo.parseShardingInfo(res.options);
if (shardingInfo != null) {
channel.attr(DriverChannel.SHARDING_INFO_KEY).set(shardingInfo);
}
lwtInfo = LwtInfo.parseLwtInfo(res.options);
if (lwtInfo != null) {
channel.attr(LWT_INFO_KEY).set(lwtInfo);
}
step = Step.STARTUP;
send();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.data.ValuesHelper;
import com.datastax.oss.driver.internal.core.metadata.PartitionerFactory;
import com.datastax.oss.driver.internal.core.protocol.LwtInfo;
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
import com.datastax.oss.driver.shaded.guava.common.primitives.Ints;
Expand Down Expand Up @@ -363,7 +364,7 @@ public static ColumnDefinitions getResultDefinitions(
}

public static DefaultPreparedStatement toPreparedStatement(
Prepared response, PrepareRequest request, InternalDriverContext context) {
Prepared response, PrepareRequest request, InternalDriverContext context, LwtInfo lwtInfo) {
ColumnDefinitions variableDefinitions =
toColumnDefinitions(response.variablesMetadata, context);

Expand Down Expand Up @@ -402,7 +403,8 @@ public static DefaultPreparedStatement toPreparedStatement(
request.getSerialConsistencyLevelForBoundStatements(),
request.areBoundStatementsTracing(),
context.getCodecRegistry(),
context.getProtocolVersion());
context.getProtocolVersion(),
lwtInfo != null && lwtInfo.isLwt(response.variablesMetadata.flags));
}

public static ColumnDefinitions toColumnDefinitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,13 @@ private void recordError(Node node, Throwable error) {
errorsSnapshot.add(new AbstractMap.SimpleEntry<>(node, error));
}

private void setFinalResult(PrepareRequest request, Prepared response) {
private void setFinalResult(PrepareRequest request, Prepared response, DriverChannel channel) {

// Whatever happens below, we're done with this stream id
throttler.signalSuccess(this);

DefaultPreparedStatement preparedStatement =
Conversions.toPreparedStatement(response, request, context);
Conversions.toPreparedStatement(response, request, context, channel.getLwtInfo());

session
.getRepreparePayloads()
Expand Down Expand Up @@ -375,7 +375,7 @@ public void onResponse(Frame responseFrame) {
Message responseMessage = responseFrame.message;
if (responseMessage instanceof Prepared) {
LOG.trace("[{}] Got result, completing", logPrefix);
setFinalResult(request, (Prepared) responseMessage);
setFinalResult(request, (Prepared) responseMessage, channel);
} else if (responseMessage instanceof Error) {
LOG.trace("[{}] Got error response, processing", logPrefix);
processErrorResponse((Error) responseMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,11 @@
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -195,15 +197,47 @@ public void onThrottleReady(boolean wasDelayed) {
System.nanoTime() - startTimeNanos,
TimeUnit.NANOSECONDS);
}
Queue<Node> queryPlan =
this.initialStatement.getNode() != null
? new SimpleQueryPlan(this.initialStatement.getNode())
: context
.getLoadBalancingPolicyWrapper()
.newQueryPlan(initialStatement, executionProfile.getName(), session);
Queue<Node> queryPlan;
if (this.initialStatement.getNode() != null) {
queryPlan = new SimpleQueryPlan(this.initialStatement.getNode());
} else if (this.initialStatement.isLWT()) {
queryPlan =
getReplicas(
session.getKeyspace().orElse(null),
this.initialStatement,
context
.getLoadBalancingPolicyWrapper()
.newQueryPlan(initialStatement, executionProfile.getName(), session));
} else {
queryPlan =
context
.getLoadBalancingPolicyWrapper()
.newQueryPlan(initialStatement, executionProfile.getName(), session);
}

sendRequest(initialStatement, null, queryPlan, 0, 0, true);
}

private Queue<Node> getReplicas(
CqlIdentifier loggedKeyspace, Statement<?> statement, Queue<Node> fallback) {
Token routingToken = getRoutingToken(statement);
CqlIdentifier keyspace = statement.getKeyspace();
if (keyspace == null) {
keyspace = statement.getRoutingKeyspace();
if (keyspace == null) {
keyspace = loggedKeyspace;
}
}

TokenMap tokenMap = context.getMetadataManager().getMetadata().getTokenMap().orElse(null);
if (routingToken == null || keyspace == null || tokenMap == null) {
return fallback;
}

Set<Node> replicas = tokenMap.getReplicas(keyspace, routingToken);
return new ConcurrentLinkedQueue<>(replicas);
}

public CompletionStage<AsyncResultSet> handle() {
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*
* Copyright (C) 2022 ScyllaDB
*
* Modified by ScyllaDB
*/
package com.datastax.oss.driver.internal.core.cql;

import com.datastax.oss.driver.api.core.ConsistencyLevel;
Expand Down Expand Up @@ -783,4 +789,9 @@ public BatchStatement setNowInSeconds(int newNowInSeconds) {
node,
newNowInSeconds);
}

@Override
public boolean isLWT() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -770,4 +770,9 @@ public BoundStatement setNowInSeconds(int newNowInSeconds) {
node,
newNowInSeconds);
}

@Override
public boolean isLWT() {
return this.getPreparedStatement().isLWT();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class DefaultPreparedStatement implements PreparedStatement {
private final ConsistencyLevel serialConsistencyLevelForBoundStatements;
private final Duration timeoutForBoundStatements;
private final Partitioner partitioner;
private final boolean isLWT;

public DefaultPreparedStatement(
ByteBuffer id,
Expand All @@ -91,7 +92,8 @@ public DefaultPreparedStatement(
ConsistencyLevel serialConsistencyLevelForBoundStatements,
boolean areBoundStatementsTracing,
CodecRegistry codecRegistry,
ProtocolVersion protocolVersion) {
ProtocolVersion protocolVersion,
boolean isLWT) {
this.id = id;
this.partitionKeyIndices = partitionKeyIndices;
// It's important that we keep a reference to this object, so that it only gets evicted from
Expand All @@ -117,6 +119,7 @@ public DefaultPreparedStatement(

this.codecRegistry = codecRegistry;
this.protocolVersion = protocolVersion;
this.isLWT = isLWT;
}

@NonNull
Expand Down Expand Up @@ -159,6 +162,11 @@ public ColumnDefinitions getResultSetDefinitions() {
return resultMetadata.resultSetDefinitions;
}

@Override
public boolean isLWT() {
return isLWT;
}

@Override
public void setResultMetadata(
@NonNull ByteBuffer newResultMetadataId, @NonNull ColumnDefinitions newResultSetDefinitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*
* Copyright (C) 2022 ScyllaDB
*
* Modified by ScyllaDB
*/
package com.datastax.oss.driver.internal.core.cql;

import com.datastax.oss.driver.api.core.ConsistencyLevel;
Expand Down Expand Up @@ -741,6 +747,11 @@ public SimpleStatement setNowInSeconds(int newNowInSeconds) {
newNowInSeconds);
}

@Override
public boolean isLWT() {
return false;
}

public static Map<CqlIdentifier, Object> wrapKeys(Map<String, Object> namedValues) {
NullAllowingImmutableMap.Builder<CqlIdentifier, Object> builder =
NullAllowingImmutableMap.builder();
Expand Down
Loading