Skip to content

Wrong SQL result processing #146

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/java/org/tarantool/SqlProtoUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ public abstract class SqlProtoUtils {
public static List<Map<String, Object>> readSqlResult(TarantoolPacket pack) {
List<List<?>> data = (List<List<?>>) pack.getBody().get(Key.DATA.getId());

List<Map<String, Object>> values = new ArrayList<Map<String, Object>>(data.size());
List<Map<String, Object>> values = new ArrayList<>(data.size());
List<TarantoolBase.SQLMetaData> metaData = getSQLMetadata(pack);
LinkedHashMap<String, Object> value = new LinkedHashMap<String, Object>();
for (List row : data) {
LinkedHashMap<String, Object> value = new LinkedHashMap<>();
for (int i = 0; i < row.size(); i++) {
value.put(metaData.get(i).getName(), row.get(i));
}
Expand Down
72 changes: 44 additions & 28 deletions src/main/java/org/tarantool/TarantoolClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import org.tarantool.protocol.ProtoUtils;
import org.tarantool.protocol.ReadableViaSelectorChannel;
import org.tarantool.protocol.TarantoolPacket;
import org.tarantool.protocol.TarantoolGreeting;
import org.tarantool.protocol.TarantoolPacket;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -36,7 +36,7 @@ public class TarantoolClientImpl extends TarantoolBase<Future<?>> implements Tar
protected SocketChannelProvider socketProvider;
protected volatile Exception thumbstone;

protected Map<Long, CompletableFuture<?>> futures;
protected Map<Long, TarantoolOp<?>> futures;
protected AtomicInteger wait = new AtomicInteger();
/**
* Write properties
Expand Down Expand Up @@ -216,38 +216,38 @@ protected Future<?> exec(Code code, Object... args) {
protected CompletableFuture<?> doExec(Code code, Object[] args) {
validateArgs(args);
long sid = syncId.incrementAndGet();
CompletableFuture<?> q = new CompletableFuture<>();
TarantoolOp<?> future = new TarantoolOp<>(code);

if (isDead(q)) {
return q;
if (isDead(future)) {
return future;
}
futures.put(sid, q);
if (isDead(q)) {
futures.put(sid, future);
if (isDead(future)) {
futures.remove(sid);
return q;
return future;
}
try {
write(code, sid, null, args);
} catch (Exception e) {
futures.remove(sid);
fail(q, e);
fail(future, e);
}
return q;
return future;
}

protected synchronized void die(String message, Exception cause) {
if (thumbstone != null) {
return;
}
final CommunicationException err = new CommunicationException(message, cause);
this.thumbstone = err;
final CommunicationException error = new CommunicationException(message, cause);
this.thumbstone = error;
while (!futures.isEmpty()) {
Iterator<Map.Entry<Long, CompletableFuture<?>>> iterator = futures.entrySet().iterator();
Iterator<Map.Entry<Long, TarantoolOp<?>>> iterator = futures.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, CompletableFuture<?>> elem = iterator.next();
Map.Entry<Long, TarantoolOp<?>> elem = iterator.next();
if (elem != null) {
CompletableFuture<?> future = elem.getValue();
fail(future, err);
TarantoolOp<?> future = elem.getValue();
fail(future, error);
}
iterator.remove();
}
Expand Down Expand Up @@ -345,7 +345,7 @@ protected void readThread() {
Map<Integer, Object> headers = packet.getHeaders();

Long syncId = (Long) headers.get(Key.SYNC.getId());
CompletableFuture<?> future = futures.remove(syncId);
TarantoolOp<?> future = futures.remove(syncId);
stats.received++;
wait.decrementAndGet();
complete(packet, future);
Expand Down Expand Up @@ -395,30 +395,29 @@ protected void fail(CompletableFuture<?> q, Exception e) {
q.completeExceptionally(e);
}

protected void complete(TarantoolPacket packet, CompletableFuture<?> q) {
if (q != null) {
protected void complete(TarantoolPacket packet, TarantoolOp<?> future) {
if (future != null) {
long code = packet.getCode();
if (code == 0) {

if (code == Code.EXECUTE.getId()) {
completeSql(q, packet);
if (future.getCode() == Code.EXECUTE) {
completeSql(future, packet);
} else {
((CompletableFuture) q).complete(packet.getBody().get(Key.DATA.getId()));
((CompletableFuture) future).complete(packet.getBody().get(Key.DATA.getId()));
}
} else {
Object error = packet.getBody().get(Key.ERROR.getId());
fail(q, serverError(code, error));
fail(future, serverError(code, error));
}
}
}

protected void completeSql(CompletableFuture<?> q, TarantoolPacket pack) {
protected void completeSql(CompletableFuture<?> future, TarantoolPacket pack) {
Long rowCount = SqlProtoUtils.getSqlRowCount(pack);
if (rowCount!=null) {
((CompletableFuture) q).complete(rowCount);
if (rowCount != null) {
((CompletableFuture) future).complete(rowCount);
} else {
List<Map<String, Object>> values = SqlProtoUtils.readSqlResult(pack);
((CompletableFuture) q).complete(values);
((CompletableFuture) future).complete(values);
}
}

Expand Down Expand Up @@ -715,4 +714,21 @@ private CountDownLatch getStateLatch(int state) {
return null;
}
}

protected static class TarantoolOp<V> extends CompletableFuture<V> {

/**
* Tarantool binary protocol operation code.
*/
final private Code code;

public TarantoolOp(Code code) {
this.code = code;
}

public Code getCode() {
return code;
}
}

}
74 changes: 35 additions & 39 deletions src/main/java/org/tarantool/TarantoolClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
/**
* Basic implementation of a client that may work with the cluster
* of tarantool instances in fault-tolerant way.
*
* <p>
* Failed operations will be retried once connection is re-established
* unless the configured expiration time is over.
*/
Expand All @@ -25,21 +25,21 @@ public class TarantoolClusterClient extends TarantoolClientImpl {

/**
* @param config Configuration.
* @param addrs Array of addresses in the form of [host]:[port].
* @param addrs Array of addresses in the form of [host]:[port].
*/
public TarantoolClusterClient(TarantoolClusterClientConfig config, String... addrs) {
this(config, new RoundRobinSocketProviderImpl(addrs).setTimeout(config.operationExpiryTimeMillis));
}

/**
* @param provider Socket channel provider.
* @param config Configuration.
* @param config Configuration.
*/
public TarantoolClusterClient(TarantoolClusterClientConfig config, SocketChannelProvider provider) {
super(provider, config);

this.executor = config.executor == null ?
Executors.newSingleThreadExecutor() : config.executor;
Executors.newSingleThreadExecutor() : config.executor;
}

@Override
Expand All @@ -59,23 +59,23 @@ protected boolean isDead(CompletableFuture<?> q) {
protected CompletableFuture<?> doExec(Code code, Object[] args) {
validateArgs(args);
long sid = syncId.incrementAndGet();
CompletableFuture<?> q = makeFuture(sid, code, args);
ExpirableOp<?> future = makeFuture(sid, code, args);

if (isDead(q)) {
return q;
if (isDead(future)) {
return future;
}
futures.put(sid, q);
if (isDead(q)) {
futures.put(sid, future);
if (isDead(future)) {
futures.remove(sid);
return q;
return future;
}
try {
write(code, sid, null, args);
} catch (Exception e) {
futures.remove(sid);
fail(q, e);
fail(future, e);
}
return q;
return future;
}

@Override
Expand All @@ -85,12 +85,12 @@ protected void fail(CompletableFuture<?> q, Exception e) {

protected boolean checkFail(CompletableFuture<?> q, Exception e) {
assert q instanceof ExpirableOp<?>;
if (!isTransientError(e) || ((ExpirableOp<?>)q).hasExpired(System.currentTimeMillis())) {
if (!isTransientError(e) || ((ExpirableOp<?>) q).hasExpired(System.currentTimeMillis())) {
q.completeExceptionally(e);
return true;
} else {
assert retries != null;
retries.put(((ExpirableOp<?>) q).getId(), (ExpirableOp<?>)q);
retries.put(((ExpirableOp<?>) q).getId(), (ExpirableOp<?>) q);
return false;
}
}
Expand All @@ -114,12 +114,12 @@ protected boolean isTransientError(Exception e) {
return true;
}
if (e instanceof TarantoolException) {
return ((TarantoolException)e).isTransient();
return ((TarantoolException) e).isTransient();
}
return false;
}

protected CompletableFuture<?> makeFuture(long id, Code code, Object...args) {
private ExpirableOp<?> makeFuture(long id, Code code, Object... args) {
int expireTime = ((TarantoolClusterClientConfig) config).operationExpiryTimeMillis;
return new ExpirableOp(id, expireTime, code, args);
}
Expand All @@ -133,20 +133,20 @@ protected void onReconnect() {
// First call is before the constructor finished. Skip it.
return;
}
Collection<ExpirableOp<?>> futsToRetry = new ArrayList<ExpirableOp<?>>(retries.values());
Collection<ExpirableOp<?>> futuresToRetry = new ArrayList<ExpirableOp<?>>(retries.values());
retries.clear();
long now = System.currentTimeMillis();
for (final ExpirableOp<?> fut : futsToRetry) {
if (!fut.hasExpired(now)) {
for (final ExpirableOp<?> future : futuresToRetry) {
if (!future.hasExpired(now)) {
executor.execute(new Runnable() {
@Override
public void run() {
futures.put(fut.getId(), fut);
futures.put(future.getId(), future);
try {
write(fut.getCode(), fut.getId(), null, fut.getArgs());
write(future.getCode(), future.getId(), null, future.getArgs());
} catch (Exception e) {
futures.remove(fut.getId());
fail(fut, e);
futures.remove(future.getId());
fail(future, e);
}
}
});
Expand All @@ -157,8 +157,11 @@ public void run() {
/**
* Holds operation code and arguments for retry.
*/
private class ExpirableOp<V> extends CompletableFuture<V> {
/** Moment in time when operation is not considered for retry. */
private class ExpirableOp<V> extends TarantoolOp<V> {

/**
* Moment in time when operation is not considered for retry.
*/
final private long deadline;

/**
Expand All @@ -167,24 +170,20 @@ private class ExpirableOp<V> extends CompletableFuture<V> {
final private long id;

/**
* Tarantool binary protocol operation code.
* Arguments of operation.
*/
final private Code code;

/** Arguments of operation. */
final private Object[] args;

/**
*
* @param id Sync.
* @param id Sync.
* @param expireTime Expiration time (relative) in ms.
* @param code Tarantool operation code.
* @param args Operation arguments.
* @param code Tarantool operation code.
* @param args Operation arguments.
*/
ExpirableOp(long id, int expireTime, Code code, Object...args) {
ExpirableOp(long id, int expireTime, Code code, Object... args) {
super(code);
this.id = id;
this.deadline = System.currentTimeMillis() + expireTime;
this.code = code;
this.args = args;
}

Expand All @@ -196,12 +195,9 @@ public long getId() {
return id;
}

public Code getCode() {
return code;
}

public Object[] getArgs() {
return args;
}

}
}
Loading