diff --git a/README.md b/README.md index 0e5f2da4..b1e90500 100644 --- a/README.md +++ b/README.md @@ -131,7 +131,7 @@ Feel free to override any method of `TarantoolClientImpl`. For example, to hook all the results, you could override this: ```java -protected void complete(TarantoolPacket packet, TarantoolOp future); +protected void complete(TarantoolPacket packet, CompletableFuture future); ``` ### Client config options @@ -181,6 +181,55 @@ Supported options are follow: 14. `operationExpiryTimeMillis` is a default request timeout in ms. Default value is `1000` (1 second). +## String space/index resolution + +Each operation that requires space or index to be executed, can work with +number ID as well as string name of a space or an index. +Assume, we have `my_space` space with space ID `512` and its primary index +`primary` with index ID `0`. Then, for instance, `select` operations can be +performed using their names: + +```java +client.syncOps().select(512, 0, Collections.singletonList(1), 0, 1, Iterator.EQ); +// or using more convenient way +client.syncOps().select("my_space", "primary", Collections.singletonList(1), 0, 1, Iterator.EQ); +``` + +Because _iproto_ has not yet supported string spaces and indexes, a client caches current server +schema in memory. The client relies on protocol SCHEMA_ID and sends each request with respect to +cached schema version. The schema is used primarily to resolve string names of spaces or indexes +against its integer IDs. + +### Schema update + +1. Just after a (re-)connection to the Tarantool instance. + The client cannot guarantee that new instance is the same and has same schema, + thus, the client drops the cached schema and fetches new one. +2. Receiving a schema version error as a response to our request. + It's possible some request can be rejected by server because of schema + mismatching between client and server. In this case the schema will be + reloaded and the refused request will be resent using the updated schema + version. +3. Sending a DDL request and receiving a new version in a response. +4. Sending a request against a non-existent space/index name. + The client cannot exactly know whether name was not found because of + it does not exist or it has not the latest schema version. A ping request + is sent in the case to check a schema version and then a client will reload + it if needed. The original request will be retried if a space / an index + name will be found in a new schema. + +### Schema support caveats + +1. Each schema reloading requires at least two extra requests to fetch spaces and + indexes metadata respectively. There is also a ping request followed by reloading + of the schema to check whether the client has outdated version (see point 4 in + [Schema update](#schema-update)). +2. In some circumstance, requests can be rejected several times until both client's + and server's versions matches. It may take significant amount of time or even be + a cause of request timeout. +3. The client guarantees an order of synchronous requests per thread. Other cases such + as asynchronous or multi-threaded requests may be out of order before the execution. + ## Spring NamedParameterJdbcTemplate usage example The JDBC driver uses `TarantoolClient` implementation to provide a communication with server. diff --git a/src/main/java/org/tarantool/AbstractTarantoolOps.java b/src/main/java/org/tarantool/AbstractTarantoolOps.java index 8cefb379..bdef668d 100644 --- a/src/main/java/org/tarantool/AbstractTarantoolOps.java +++ b/src/main/java/org/tarantool/AbstractTarantoolOps.java @@ -1,62 +1,204 @@ package org.tarantool; +import static org.tarantool.TarantoolRequestArgumentFactory.cacheLookupValue; +import static org.tarantool.TarantoolRequestArgumentFactory.value; -public abstract class AbstractTarantoolOps - implements TarantoolClientOps { +import org.tarantool.schema.TarantoolSchemaMeta; + +import java.util.List; + +public abstract class AbstractTarantoolOps + implements TarantoolClientOps, Object, Result> { private Code callCode = Code.CALL; - protected abstract Result exec(Code code, Object... args); + protected abstract Result exec(TarantoolRequest request); + + protected abstract TarantoolSchemaMeta getSchemaMeta(); + + public Result select(Integer space, Integer index, List key, int offset, int limit, Iterator iterator) { + return select(space, index, key, offset, limit, iterator.getValue()); + } - public Result select(Space space, Space index, Tuple key, int offset, int limit, Iterator iterator) { + @Override + public Result select(String space, String index, List key, int offset, int limit, Iterator iterator) { return select(space, index, key, offset, limit, iterator.getValue()); } - public Result select(Space space, Space index, Tuple key, int offset, int limit, int iterator) { + @Override + public Result select(Integer space, Integer index, List key, int offset, int limit, int iterator) { + return exec( + new TarantoolRequest( + Code.SELECT, + value(Key.SPACE), value(space), + value(Key.INDEX), value(index), + value(Key.KEY), value(key), + value(Key.ITERATOR), value(iterator), + value(Key.LIMIT), value(limit), + value(Key.OFFSET), value(offset) + ) + ); + } + + @Override + public Result select(String space, String index, List key, int offset, int limit, int iterator) { + return exec( + new TarantoolRequest( + Code.SELECT, + value(Key.SPACE), cacheLookupValue(() -> getSchemaMeta().getSpace(space).getId()), + value(Key.INDEX), cacheLookupValue(() -> getSchemaMeta().getSpaceIndex(space, index).getId()), + value(Key.KEY), value(key), + value(Key.ITERATOR), value(iterator), + value(Key.LIMIT), value(limit), + value(Key.OFFSET), value(offset) + ) + ); + } + + @Override + public Result insert(Integer space, List tuple) { + return exec(new TarantoolRequest( + Code.INSERT, + value(Key.SPACE), value(space), + value(Key.TUPLE), value(tuple) + ) + ); + } + + @Override + public Result insert(String space, List tuple) { + return exec( + new TarantoolRequest( + Code.INSERT, + value(Key.SPACE), cacheLookupValue(() -> getSchemaMeta().getSpace(space).getId()), + value(Key.TUPLE), value(tuple) + ) + ); + } + + @Override + public Result replace(Integer space, List tuple) { + return exec( + new TarantoolRequest( + Code.REPLACE, + value(Key.SPACE), value(space), + value(Key.TUPLE), value(tuple) + ) + ); + } + + @Override + public Result replace(String space, List tuple) { return exec( - Code.SELECT, - Key.SPACE, space, - Key.INDEX, index, - Key.KEY, key, - Key.ITERATOR, iterator, - Key.LIMIT, limit, - Key.OFFSET, offset + new TarantoolRequest( + Code.REPLACE, + value(Key.SPACE), cacheLookupValue(() -> getSchemaMeta().getSpace(space).getId()), + value(Key.TUPLE), value(tuple) + ) ); } - public Result insert(Space space, Tuple tuple) { - return exec(Code.INSERT, Key.SPACE, space, Key.TUPLE, tuple); + @Override + public Result update(Integer space, List key, Object... operations) { + return exec( + new TarantoolRequest( + Code.UPDATE, + value(Key.SPACE), value(space), + value(Key.KEY), value(key), + value(Key.TUPLE), value(operations) + ) + ); } - public Result replace(Space space, Tuple tuple) { - return exec(Code.REPLACE, Key.SPACE, space, Key.TUPLE, tuple); + @Override + public Result update(String space, List key, Object... operations) { + return exec( + new TarantoolRequest( + Code.UPDATE, + value(Key.SPACE), cacheLookupValue(() -> getSchemaMeta().getSpace(space).getId()), + value(Key.KEY), value(key), + value(Key.TUPLE), value(operations) + ) + ); + } + + @Override + public Result upsert(Integer space, List key, List defTuple, Object... operations) { + return exec( + new TarantoolRequest( + Code.UPSERT, + value(Key.SPACE), value(space), + value(Key.KEY), value(key), + value(Key.TUPLE), value(defTuple), + value(Key.UPSERT_OPS), value(operations) + ) + ); } - public Result update(Space space, Tuple key, Operation... args) { - return exec(Code.UPDATE, Key.SPACE, space, Key.KEY, key, Key.TUPLE, args); + @Override + public Result upsert(String space, List key, List defTuple, Object... operations) { + return exec( + new TarantoolRequest( + Code.UPSERT, + value(Key.SPACE), cacheLookupValue(() -> getSchemaMeta().getSpace(space).getId()), + value(Key.KEY), value(key), + value(Key.TUPLE), value(defTuple), + value(Key.UPSERT_OPS), value(operations) + ) + ); } - public Result upsert(Space space, Tuple key, Tuple def, Operation... args) { - return exec(Code.UPSERT, Key.SPACE, space, Key.KEY, key, Key.TUPLE, def, Key.UPSERT_OPS, args); + @Override + public Result delete(Integer space, List key) { + return exec( + new TarantoolRequest( + Code.DELETE, + value(Key.SPACE), value(space), + value(Key.KEY), value(key) + ) + ); } - public Result delete(Space space, Tuple key) { - return exec(Code.DELETE, Key.SPACE, space, Key.KEY, key); + @Override + public Result delete(String space, List key) { + return exec( + new TarantoolRequest( + Code.DELETE, + value(Key.SPACE), cacheLookupValue(() -> getSchemaMeta().getSpace(space).getId()), + value(Key.KEY), value(key) + ) + ); } + @Override public Result call(String function, Object... args) { - return exec(callCode, Key.FUNCTION, function, Key.TUPLE, args); + return exec( + new TarantoolRequest( + callCode, + value(Key.FUNCTION), value(function), + value(Key.TUPLE), value(args) + ) + ); } + @Override public Result eval(String expression, Object... args) { - return exec(Code.EVAL, Key.EXPRESSION, expression, Key.TUPLE, args); + return exec( + new TarantoolRequest( + Code.EVAL, + value(Key.EXPRESSION), value(expression), + value(Key.TUPLE), value(args) + ) + ); } + @Override public void ping() { - exec(Code.PING); + exec(new TarantoolRequest(Code.PING)); } public void setCallCode(Code callCode) { this.callCode = callCode; } + } diff --git a/src/main/java/org/tarantool/Iterator.java b/src/main/java/org/tarantool/Iterator.java index 4452a744..c013530a 100644 --- a/src/main/java/org/tarantool/Iterator.java +++ b/src/main/java/org/tarantool/Iterator.java @@ -1,5 +1,7 @@ package org.tarantool; +import java.util.Arrays; + // Iterator info was taken from here https://github.com/tarantool/tarantool/blob/f66584c3bcdffe61d6d99a4868a9b72d62338a11/src/box/iterator_type.h#L62 public enum Iterator { EQ(0), // key == x ASC order @@ -24,4 +26,12 @@ public enum Iterator { public int getValue() { return value; } + + public static Iterator valueOf(int value) { + return Arrays.stream(Iterator.values()) + .filter(v -> value == v.getValue()) + .findFirst() + .orElseThrow(IllegalArgumentException::new); + } + } diff --git a/src/main/java/org/tarantool/TarantoolBase.java b/src/main/java/org/tarantool/TarantoolBase.java index c74647ae..26656112 100644 --- a/src/main/java/org/tarantool/TarantoolBase.java +++ b/src/main/java/org/tarantool/TarantoolBase.java @@ -6,10 +6,9 @@ import java.io.IOException; import java.net.Socket; import java.nio.channels.SocketChannel; -import java.util.List; import java.util.concurrent.atomic.AtomicLong; -public abstract class TarantoolBase extends AbstractTarantoolOps, Object, Result> { +public abstract class TarantoolBase extends AbstractTarantoolOps { protected String serverVersion; protected MsgPackLite msgPackLite = MsgPackLite.INSTANCE; protected AtomicLong syncId = new AtomicLong(); @@ -42,16 +41,6 @@ protected void closeChannel(SocketChannel channel) { } } - protected void validateArgs(Object[] args) { - if (args != null) { - for (int i = 0; i < args.length; i += 2) { - if (args[i + 1] == null) { - throw new NullPointerException(((Key) args[i]).name() + " should not be null"); - } - } - } - } - public void setInitialRequestSize(int initialRequestSize) { this.initialRequestSize = initialRequestSize; } diff --git a/src/main/java/org/tarantool/TarantoolClient.java b/src/main/java/org/tarantool/TarantoolClient.java index 2ad0c84c..d560d682 100644 --- a/src/main/java/org/tarantool/TarantoolClient.java +++ b/src/main/java/org/tarantool/TarantoolClient.java @@ -1,5 +1,7 @@ package org.tarantool; +import org.tarantool.schema.TarantoolSchemaMeta; + import java.util.List; import java.util.Map; import java.util.concurrent.CompletionStage; @@ -29,4 +31,6 @@ public interface TarantoolClient { boolean waitAlive(long timeout, TimeUnit unit) throws InterruptedException; + TarantoolSchemaMeta getSchemaMeta(); + } diff --git a/src/main/java/org/tarantool/TarantoolClientImpl.java b/src/main/java/org/tarantool/TarantoolClientImpl.java index 22922733..ae1d7360 100644 --- a/src/main/java/org/tarantool/TarantoolClientImpl.java +++ b/src/main/java/org/tarantool/TarantoolClientImpl.java @@ -2,40 +2,50 @@ import org.tarantool.logging.Logger; import org.tarantool.logging.LoggerFactory; +import org.tarantool.protocol.ProtoConstants; import org.tarantool.protocol.ProtoUtils; import org.tarantool.protocol.ReadableViaSelectorChannel; import org.tarantool.protocol.TarantoolGreeting; import org.tarantool.protocol.TarantoolPacket; +import org.tarantool.schema.TarantoolMetaSpacesCache; +import org.tarantool.schema.TarantoolSchemaException; +import org.tarantool.schema.TarantoolSchemaMeta; import org.tarantool.util.StringUtils; +import org.tarantool.util.TupleTwo; import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; +import java.time.Duration; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.StampedLock; public class TarantoolClientImpl extends TarantoolBase> implements TarantoolClient { private static final Logger LOGGER = LoggerFactory.getLogger(TarantoolClientImpl.class); protected TarantoolClientConfig config; - protected long operationTimeout; + protected Duration operationTimeout; /** * External. @@ -46,7 +56,12 @@ public class TarantoolClientImpl extends TarantoolBase> implements Tar protected volatile Exception thumbstone; - protected Map> futures; + protected ScheduledExecutorService workExecutor; + + protected StampedLock schemaLock = new StampedLock(); + protected BlockingQueue delayedOperationsQueue; + + protected Map futures; protected AtomicInteger pendingResponsesCount = new AtomicInteger(); /** @@ -66,6 +81,7 @@ public class TarantoolClientImpl extends TarantoolBase> implements Tar protected SyncOps syncOps; protected FireAndForgetOps fireAndForgetOps; protected ComposableAsyncOps composableAsyncOps; + protected UnsafeSchemaOps unsafeSchemaOps; /** * Inner. @@ -75,6 +91,8 @@ public class TarantoolClientImpl extends TarantoolBase> implements Tar protected Thread reader; protected Thread writer; + protected TarantoolSchemaMeta schemaMeta = new TarantoolMetaSpacesCache(this); + protected Thread connector = new Thread(new Runnable() { @Override public void run() { @@ -106,10 +124,13 @@ public TarantoolClientImpl(SocketChannelProvider socketProvider, TarantoolClient private void initClient(SocketChannelProvider socketProvider, TarantoolClientConfig config) { this.config = config; this.initialRequestSize = config.defaultRequestSize; - this.operationTimeout = config.operationExpiryTimeMillis; + this.operationTimeout = Duration.ofMillis(config.operationExpiryTimeMillis); this.socketProvider = socketProvider; this.stats = new TarantoolClientStats(); this.futures = new ConcurrentHashMap<>(config.predictedFutures); + this.delayedOperationsQueue = new PriorityBlockingQueue<>(128); + this.workExecutor = + Executors.newSingleThreadScheduledExecutor(new TarantoolThreadDaemonFactory("tarantool-worker")); this.sharedBuffer = ByteBuffer.allocateDirect(config.sharedBufferSize); this.writerBuffer = ByteBuffer.allocateDirect(sharedBuffer.capacity()); this.connector.setDaemon(true); @@ -117,6 +138,7 @@ private void initClient(SocketChannelProvider socketProvider, TarantoolClientCon this.syncOps = new SyncOps(); this.composableAsyncOps = new ComposableAsyncOps(); this.fireAndForgetOps = new FireAndForgetOps(); + this.unsafeSchemaOps = new UnsafeSchemaOps(); if (!config.useNewCall) { setCallCode(Code.OLD_CALL); this.syncOps.setCallCode(Code.OLD_CALL); @@ -203,6 +225,7 @@ protected void connect(final SocketChannel channel) throws Exception { } this.thumbstone = null; startThreads(channel.socket().getRemoteSocketAddress().toString()); + updateSchema(); } protected void startThreads(String threadName) throws InterruptedException { @@ -214,7 +237,7 @@ protected void startThreads(String threadName) throws InterruptedException { try { readThread(); } finally { - state.release(StateHelper.READING); + state.release(StateHelper.READING | StateHelper.SCHEMA_UPDATING); // only last of two IO-threads can signal for reconnection if (leftIoThreads.decrementAndGet() == 0) { state.trySignalForReconnection(); @@ -228,7 +251,7 @@ protected void startThreads(String threadName) throws InterruptedException { try { writeThread(); } finally { - state.release(StateHelper.WRITING); + state.release(StateHelper.WRITING | StateHelper.SCHEMA_UPDATING); // only last of two IO-threads can signal for reconnection if (leftIoThreads.decrementAndGet() == 0) { state.trySignalForReconnection(); @@ -251,61 +274,86 @@ protected void configureThreads(String threadName) { reader.setPriority(config.readerThreadPriority); } + @Override + public TarantoolSchemaMeta getSchemaMeta() { + return schemaMeta; + } + /** * Executes an operation with default timeout. * - * @param code operation code - * @param args operation arguments + * @param request operation data * * @return deferred result * * @see #setOperationTimeout(long) */ - protected Future exec(Code code, Object... args) { - return doExec(operationTimeout, code, args); + @Override + protected Future exec(TarantoolRequest request) { + return doExec(request).getResult(); + } + + protected TarantoolOperation doExec(TarantoolRequest request) { + long stamp = schemaLock.readLock(); + try { + if (request.getTimeout() == null) { + request.setTimeout(operationTimeout); + } + TarantoolOperation operation = request.toOperation(syncId.incrementAndGet(), schemaMeta.getSchemaVersion()); + // space or index names could not be found in the cache + if (!operation.isSerializable()) { + delayedOperationsQueue.add(operation); + // It's possible the client keeps the outdated schema. + // Send a preflight ping request to check the schema + // version and refresh it if one is obsolete + if (isSchemaLoaded()) { + TarantoolOperation ping = new TarantoolRequest(Code.PING) + .toPreflightOperation(syncId.incrementAndGet(), schemaMeta.getSchemaVersion(), operation); + registerOperation(ping); + } + return operation; + } + // postpone operation if the schema is not ready + if (!isSchemaLoaded()) { + delayedOperationsQueue.add(operation); + return operation; + } + return registerOperation(operation); + } finally { + schemaLock.unlockRead(stamp); + } } /** - * Executes an operation with the given timeout. - * {@code timeoutMillis} will override the default - * timeout. 0 means the limitless operation. + * Checks whether the schema is fully cached. * - * @param timeoutMillis operation timeout - * @param code operation code - * @param args operation arguments - * - * @return deferred result + * @return {@literal true} if the schema is loaded */ - protected Future exec(long timeoutMillis, Code code, Object... args) { - return doExec(timeoutMillis, code, args); + private boolean isSchemaLoaded() { + return schemaMeta.isInitialized() && !state.isStateSet(StateHelper.SCHEMA_UPDATING); } - protected TarantoolOp doExec(long timeoutMillis, Code code, Object[] args) { - validateArgs(args); - long sid = syncId.incrementAndGet(); - - TarantoolOp future = makeNewOperation(timeoutMillis, sid, code, args); - - if (isDead(future)) { - return future; + protected TarantoolOperation registerOperation(TarantoolOperation operation) { + if (isDead(operation)) { + return operation; } - futures.put(sid, future); - if (isDead(future)) { - futures.remove(sid); - return future; + futures.put(operation.getId(), operation); + if (isDead(operation)) { + futures.remove(operation.getId()); + return operation; } try { - write(code, sid, null, args); + write( + operation.getCode(), + operation.getId(), + operation.getSentSchemaId(), + operation.getArguments().toArray() + ); } catch (Exception e) { - futures.remove(sid); - fail(future, e); + futures.remove(operation.getId()); + fail(operation, e); } - return future; - } - - protected TarantoolOp makeNewOperation(long timeoutMillis, long sid, Code code, Object[] args) { - return new TarantoolOp<>(sid, code, args) - .orTimeout(timeoutMillis, TimeUnit.MILLISECONDS); + return operation; } protected synchronized void die(String message, Exception cause) { @@ -315,16 +363,22 @@ protected synchronized void die(String message, Exception cause) { final CommunicationException error = new CommunicationException(message, cause); this.thumbstone = error; while (!futures.isEmpty()) { - Iterator>> iterator = futures.entrySet().iterator(); + Iterator> iterator = futures.entrySet().iterator(); while (iterator.hasNext()) { - Map.Entry> elem = iterator.next(); + Map.Entry elem = iterator.next(); if (elem != null) { - TarantoolOp future = elem.getValue(); - fail(future, error); + TarantoolOperation operation = elem.getValue(); + fail(operation, error); } iterator.remove(); } } + + TarantoolOperation operation; + while ((operation = delayedOperationsQueue.poll()) != null) { + fail(operation, error); + } + pendingResponsesCount.set(0); bufferLock.lock(); @@ -337,8 +391,9 @@ protected synchronized void die(String message, Exception cause) { stopIO(); } + @Override public void ping() { - syncGet(exec(Code.PING)); + syncGet(exec(new TarantoolRequest(Code.PING))); } protected void write(Code code, Long syncId, Long schemaId, Object... args) @@ -430,10 +485,10 @@ protected void readThread() { Map headers = packet.getHeaders(); Long syncId = (Long) headers.get(Key.SYNC.getId()); - TarantoolOp future = futures.remove(syncId); + TarantoolOperation request = futures.remove(syncId); stats.received++; pendingResponsesCount.decrementAndGet(); - complete(packet, future); + complete(packet, request); } catch (Exception e) { die("Cant read answer", e); return; @@ -473,33 +528,119 @@ protected void writeThread() { } } - protected void fail(TarantoolOp future, Exception e) { - future.completeExceptionally(e); + protected void fail(TarantoolOperation operation, Exception e) { + operation.getResult().completeExceptionally(e); } - protected void complete(TarantoolPacket packet, TarantoolOp future) { - if (future != null) { - long code = packet.getCode(); - if (code == 0) { - if (future.getCode() == Code.EXECUTE) { - completeSql(future, packet); - } else { - ((TarantoolOp) future).complete(packet.getBody().get(Key.DATA.getId())); + protected void complete(TarantoolPacket packet, TarantoolOperation operation) { + CompletableFuture result = operation.getResult(); + if (result.isDone()) { + return; + } + + long code = packet.getCode(); + long schemaId = packet.getSchemaId(); + boolean isPreflightPing = operation.getDependedOperation() != null; + if (code == ProtoConstants.SUCCESS) { + operation.setCompletedSchemaId(schemaId); + if (isPreflightPing) { + // the schema wasn't changed + // try to evaluate an unserializable target operation + // in order to complete the operation exceptionally. + TarantoolOperation target = operation.getDependedOperation(); + delayedOperationsQueue.remove(target); + try { + target.getArguments(); + } catch (TarantoolSchemaException cause) { + fail(target, cause); } + } else if (operation.getCode() == Code.EXECUTE) { + completeSql(operation, packet); } else { - Object error = packet.getBody().get(Key.ERROR.getId()); - fail(future, serverError(code, error)); + ((CompletableFuture) result).complete(packet.getData()); + } + } else if (code == ProtoConstants.ERR_WRONG_SCHEMA_VERSION) { + if (schemaId > schemaMeta.getSchemaVersion()) { + delayedOperationsQueue.add(operation); + } else { + operation.setSentSchemaId(schemaMeta.getSchemaVersion()); + registerOperation(operation); + } + } else { + Object error = packet.getError(); + fail(operation, serverError(code, error)); + } + + if (operation.getSentSchemaId() == 0) { + return; + } + // it's possible to receive bigger version than current + // i.e. after DDL operation or wrong schema version response + if (schemaId > schemaMeta.getSchemaVersion()) { + updateSchema(); + } + } + + private void updateSchema() { + performSchemaAction(() -> { + if (state.acquire(StateHelper.SCHEMA_UPDATING)) { + workExecutor.execute(createUpdateSchemaTask()); + } + }); + } + + private Runnable createUpdateSchemaTask() { + return () -> { + try { + schemaMeta.refresh(); + } catch (Exception cause) { + workExecutor.schedule(createUpdateSchemaTask(), 300L, TimeUnit.MILLISECONDS); + return; + } + performSchemaAction(() -> { + try { + rescheduleDelayedOperations(); + } finally { + state.release(StateHelper.SCHEMA_UPDATING); + } + }); + }; + } + + private void rescheduleDelayedOperations() { + TarantoolOperation operation; + while ((operation = delayedOperationsQueue.poll()) != null) { + CompletableFuture result = operation.getResult(); + if (!result.isDone()) { + operation.setSentSchemaId(schemaMeta.getSchemaVersion()); + registerOperation(operation); } } } - protected void completeSql(TarantoolOp future, TarantoolPacket pack) { + protected void completeSql(TarantoolOperation operation, TarantoolPacket pack) { Long rowCount = SqlProtoUtils.getSQLRowCount(pack); + CompletableFuture result = operation.getResult(); if (rowCount != null) { - ((TarantoolOp) future).complete(rowCount); + ((CompletableFuture) result).complete(rowCount); } else { List> values = SqlProtoUtils.readSqlResult(pack); - ((TarantoolOp) future).complete(values); + ((CompletableFuture) result).complete(values); + } + } + + /** + * Convenient guard scope that executes given runnable + * inside schema write lock. + * + * @param action to be executed + */ + protected void performSchemaAction(Runnable action) { + long stamp = schemaLock.writeLock(); + try { + action.run(); + } finally { + schemaLock.unlockWrite(stamp); } } @@ -535,6 +676,9 @@ public void close() { protected void close(Exception e) { if (state.close()) { + if (workExecutor != null) { + workExecutor.shutdownNow(); + } connector.interrupt(); die(e.getMessage(), e); } @@ -563,7 +707,7 @@ protected void stopIO() { * @return timeout in millis */ public long getOperationTimeout() { - return operationTimeout; + return operationTimeout.toMillis(); } /** @@ -572,17 +716,17 @@ public long getOperationTimeout() { * @param operationTimeout timeout in millis */ public void setOperationTimeout(long operationTimeout) { - this.operationTimeout = operationTimeout; + this.operationTimeout = Duration.ofMillis(operationTimeout); } @Override public boolean isAlive() { - return state.getState() == StateHelper.ALIVE && thumbstone == null; + return state.isStateSet(StateHelper.ALIVE) && thumbstone == null; } @Override public boolean isClosed() { - return state.getState() == StateHelper.CLOSED; + return state.isStateSet(StateHelper.CLOSED); } @Override @@ -615,17 +759,31 @@ public TarantoolClientOps, Object, Long> fireAndForgetOps() { return fireAndForgetOps; } + public TarantoolClientOps, Object, TupleTwo, Long>> unsafeSchemaOps() { + return unsafeSchemaOps; + } + + protected TarantoolRequest makeSqlRequest(String sql, List bind) { + return new TarantoolRequest( + Code.EXECUTE, + TarantoolRequestArgumentFactory.value(Key.SQL_TEXT), + TarantoolRequestArgumentFactory.value(sql), + TarantoolRequestArgumentFactory.value(Key.SQL_BIND), + TarantoolRequestArgumentFactory.value(bind) + ); + } + @Override public TarantoolSQLOps>> sqlSyncOps() { return new TarantoolSQLOps>>() { @Override public Long update(String sql, Object... bind) { - return (Long) syncGet(exec(Code.EXECUTE, Key.SQL_TEXT, sql, Key.SQL_BIND, bind)); + return (Long) syncGet(exec(makeSqlRequest(sql, Arrays.asList(bind)))); } @Override public List> query(String sql, Object... bind) { - return (List>) syncGet(exec(Code.EXECUTE, Key.SQL_TEXT, sql, Key.SQL_BIND, bind)); + return (List>) syncGet(exec(makeSqlRequest(sql, Arrays.asList(bind)))); } }; } @@ -635,39 +793,32 @@ public TarantoolSQLOps, Future>>> return new TarantoolSQLOps, Future>>>() { @Override public Future update(String sql, Object... bind) { - return (Future) exec(Code.EXECUTE, Key.SQL_TEXT, sql, Key.SQL_BIND, bind); + return (Future) exec(makeSqlRequest(sql, Arrays.asList(bind))); } @Override public Future>> query(String sql, Object... bind) { - return (Future>>) exec(Code.EXECUTE, Key.SQL_TEXT, sql, Key.SQL_BIND, bind); + return (Future>>) exec(makeSqlRequest(sql, Arrays.asList(bind))); } }; } - protected class SyncOps extends AbstractTarantoolOps, Object, List> { + protected class SyncOps extends BaseClientOps> { @Override - public List exec(Code code, Object... args) { - return (List) syncGet(TarantoolClientImpl.this.exec(code, args)); - } - - @Override - public void close() { - throw new IllegalStateException("You should close TarantoolClient instead."); + protected List exec(TarantoolRequest request) { + return (List) syncGet(TarantoolClientImpl.this.exec(request)); } } - protected class FireAndForgetOps extends AbstractTarantoolOps, Object, Long> { + protected class FireAndForgetOps extends BaseClientOps { @Override - public Long exec(Code code, Object... args) { + protected Long exec(TarantoolRequest request) { if (thumbstone == null) { try { - long syncId = TarantoolClientImpl.this.syncId.incrementAndGet(); - write(code, syncId, null, args); - return syncId; + return doExec(request).getId(); } catch (Exception e) { throw new CommunicationException("Execute failed", e); } @@ -676,117 +827,16 @@ public Long exec(Code code, Object... args) { } } - @Override - public void close() { - throw new IllegalStateException("You should close TarantoolClient instead."); - } - } - protected boolean isDead(TarantoolOp future) { + protected boolean isDead(TarantoolOperation operation) { if (this.thumbstone != null) { - fail(future, new CommunicationException("Connection is dead", thumbstone)); + fail(operation, new CommunicationException("Connection is dead", thumbstone)); return true; } return false; } - protected static class TarantoolOp extends CompletableFuture { - - /** - * A task identifier used in {@link TarantoolClientImpl#futures}. - */ - private final long id; - - /** - * Tarantool binary protocol operation code. - */ - private final Code code; - - /** - * Arguments of operation. - */ - private final Object[] args; - - public TarantoolOp(long id, Code code, Object[] args) { - this.id = id; - this.code = code; - this.args = args; - } - - public long getId() { - return id; - } - - public Code getCode() { - return code; - } - - public Object[] getArgs() { - return args; - } - - @Override - public String toString() { - return "TarantoolOp{" + - "id=" + id + - ", code=" + code + - '}'; - } - - /** - * Missed in jdk8 CompletableFuture operator to limit execution - * by time. - * - * @param timeout execution timeout - * @param unit measurement unit for given timeout value - * - * @return a future on which the method is called - */ - public TarantoolOp orTimeout(long timeout, TimeUnit unit) { - if (timeout < 0) { - throw new IllegalArgumentException("Timeout cannot be negative"); - } - if (unit == null) { - throw new IllegalArgumentException("Time unit cannot be null"); - } - if (timeout == 0 || isDone()) { - return this; - } - ScheduledFuture abandonByTimeoutAction = TimeoutScheduler.EXECUTOR.schedule( - () -> { - if (!this.isDone()) { - this.completeExceptionally(new TimeoutException()); - } - }, - timeout, unit - ); - whenComplete( - (ignored, error) -> { - if (error == null && !abandonByTimeoutAction.isDone()) { - abandonByTimeoutAction.cancel(false); - } - } - ); - return this; - } - - /** - * Runs timeout operation as a delayed task. - */ - static class TimeoutScheduler { - - static final ScheduledThreadPoolExecutor EXECUTOR; - - static { - EXECUTOR = - new ScheduledThreadPoolExecutor(1, new TarantoolThreadDaemonFactory("tarantoolTimeout")); - EXECUTOR.setRemoveOnCancelPolicy(true); - } - } - - } - /** * A subclass may use this as a trigger to start retries. * This method is called when state becomes ALIVE. @@ -810,10 +860,11 @@ protected final class StateHelper { static final int UNINITIALIZED = 0; static final int READING = 1; - static final int WRITING = 2; + static final int WRITING = 1 << 1; static final int ALIVE = READING | WRITING; - static final int RECONNECT = 4; - static final int CLOSED = 8; + static final int SCHEMA_UPDATING = 1 << 2; + static final int RECONNECT = 1 << 3; + static final int CLOSED = 1 << 4; private final AtomicInteger state; @@ -842,6 +893,10 @@ protected int getState() { return state.get(); } + boolean isStateSet(int mask) { + return (getState() & mask) == mask; + } + /** * Set CLOSED state, drop RECONNECT state. * @@ -852,12 +907,12 @@ protected boolean close() { int currentState = getState(); /* CLOSED is the terminal state. */ - if ((currentState & CLOSED) == CLOSED) { + if (isStateSet(CLOSED)) { return false; } - /* Drop RECONNECT, set CLOSED. */ - if (compareAndSet(currentState, (currentState & ~RECONNECT) | CLOSED)) { + /* Clear all states and set CLOSED. */ + if (compareAndSet(currentState, CLOSED)) { return true; } } @@ -877,7 +932,7 @@ protected boolean acquire(int mask) { int currentState = getState(); /* CLOSED is the terminal state. */ - if ((currentState & CLOSED) == CLOSED) { + if ((isStateSet(CLOSED))) { return false; } @@ -887,8 +942,8 @@ protected boolean acquire(int mask) { } /* Cannot move from a state to the same state. */ - if ((currentState & mask) != 0) { - throw new IllegalStateException("State is already " + mask); + if (isStateSet(mask)) { + return false; } /* Set acquired state. */ @@ -912,7 +967,8 @@ protected boolean compareAndSet(int expect, int update) { return false; } - if (update == ALIVE) { + boolean wasAlreadyAlive = (expect & ALIVE) == ALIVE; + if (!wasAlreadyAlive && (update & ALIVE) == ALIVE) { CountDownLatch latch = nextAliveLatch.getAndSet(new CountDownLatch(1)); latch.countDown(); onReconnect(); @@ -951,13 +1007,13 @@ private CountDownLatch getStateLatch(int state) { return closedLatch; } if (state == ALIVE) { - if (getState() == CLOSED) { + if (isStateSet(CLOSED)) { throw new IllegalStateException("State is CLOSED."); } CountDownLatch latch = nextAliveLatch.get(); /* It may happen so that an error is detected but the state is still alive. Wait for the 'next' alive state in such cases. */ - return (getState() == ALIVE && thumbstone == null) ? null : latch; + return (isStateSet(ALIVE) && thumbstone == null) ? null : latch; } return null; } @@ -970,7 +1026,7 @@ private CountDownLatch getStateLatch(int state) { private void awaitReconnection() throws InterruptedException { connectorLock.lock(); try { - while (getState() != StateHelper.RECONNECT) { + while (!isStateSet(RECONNECT)) { reconnectRequired.await(); } } finally { @@ -996,12 +1052,11 @@ private void trySignalForReconnection() { } - protected class ComposableAsyncOps - extends AbstractTarantoolOps, Object, CompletionStage>> { + protected class ComposableAsyncOps extends BaseClientOps>> { @Override - public CompletionStage> exec(Code code, Object... args) { - return (CompletionStage>) TarantoolClientImpl.this.exec(code, args); + protected CompletionStage> exec(TarantoolRequest request) { + return (CompletionStage>) TarantoolClientImpl.this.exec(request); } @Override @@ -1011,4 +1066,32 @@ public void close() { } + /** + * Used by internal services to ignore schema ID issues. + */ + protected class UnsafeSchemaOps extends BaseClientOps, Long>> { + + protected TupleTwo, Long> exec(TarantoolRequest request) { + long syncId = TarantoolClientImpl.this.syncId.incrementAndGet(); + TarantoolOperation operation = request.toOperation(syncId, 0L); + List result = (List) syncGet(registerOperation(operation).getResult()); + return TupleTwo.of(result, operation.getCompletedSchemaId()); + } + + } + + protected abstract class BaseClientOps extends AbstractTarantoolOps { + + @Override + protected TarantoolSchemaMeta getSchemaMeta() { + return TarantoolClientImpl.this.getSchemaMeta(); + } + + @Override + public void close() { + throw new IllegalStateException("You should close TarantoolClient instead."); + } + + } + } diff --git a/src/main/java/org/tarantool/TarantoolClientOps.java b/src/main/java/org/tarantool/TarantoolClientOps.java index 69ab3a9e..9a466c44 100644 --- a/src/main/java/org/tarantool/TarantoolClientOps.java +++ b/src/main/java/org/tarantool/TarantoolClientOps.java @@ -1,20 +1,41 @@ package org.tarantool; +/** + * Provides a set of typical operations with data in Tarantool. + * + * @param represents space/index identifiers (not used anymore) + * @param represents tuple keys and/or tuples + * @param

represents tuples and/or update operations + * @param represents an operation result + */ +public interface TarantoolClientOps { + R select(Integer space, Integer index, O key, int offset, int limit, int iterator); -public interface TarantoolClientOps { - R select(T space, T index, O key, int offset, int limit, int iterator); + R select(String space, String index, O key, int offset, int limit, int iterator); - R select(T space, T index, O key, int offset, int limit, Iterator iterator); + R select(Integer space, Integer index, O key, int offset, int limit, Iterator iterator); - R insert(T space, O tuple); + R select(String space, String index, O key, int offset, int limit, Iterator iterator); - R replace(T space, O tuple); + R insert(Integer space, O tuple); - R update(T space, O key, P... tuple); + R insert(String space, O tuple); - R upsert(T space, O key, O defTuple, P... ops); + R replace(Integer space, O tuple); - R delete(T space, O key); + R replace(String space, O tuple); + + R update(Integer space, O key, P... tuple); + + R update(String space, O key, P... tuple); + + R upsert(Integer space, O key, O defTuple, P... ops); + + R upsert(String space, O key, O defTuple, P... ops); + + R delete(Integer space, O key); + + R delete(String space, O key); R call(String function, Object... args); diff --git a/src/main/java/org/tarantool/TarantoolClusterClient.java b/src/main/java/org/tarantool/TarantoolClusterClient.java index e3697a91..fff54e95 100644 --- a/src/main/java/org/tarantool/TarantoolClusterClient.java +++ b/src/main/java/org/tarantool/TarantoolClusterClient.java @@ -16,7 +16,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.StampedLock; @@ -39,14 +38,13 @@ public class TarantoolClusterClient extends TarantoolClientImpl { /** * Discovery activity. */ - private ScheduledExecutorService instancesDiscoveryExecutor; private Runnable instancesDiscovererTask; private StampedLock discoveryLock = new StampedLock(); /** * Collection of operations to be retried. */ - private ConcurrentHashMap> retries = new ConcurrentHashMap<>(); + private ConcurrentHashMap retries = new ConcurrentHashMap<>(); /** * Constructs a new cluster client. @@ -74,14 +72,12 @@ public TarantoolClusterClient(TarantoolClusterClientConfig config, SocketChannel if (StringUtils.isNotBlank(config.clusterDiscoveryEntryFunction)) { this.instancesDiscovererTask = createDiscoveryTask(new TarantoolClusterStoredFunctionDiscoverer(config, this)); - this.instancesDiscoveryExecutor - = Executors.newSingleThreadScheduledExecutor(new TarantoolThreadDaemonFactory("tarantoolDiscoverer")); int delay = config.clusterDiscoveryDelayMillis > 0 ? config.clusterDiscoveryDelayMillis : TarantoolClusterClientConfig.DEFAULT_CLUSTER_DISCOVERY_DELAY_MILLIS; // todo: it's better to start a job later (out of ctor) - this.instancesDiscoveryExecutor.scheduleWithFixedDelay( + this.workExecutor.scheduleWithFixedDelay( this.instancesDiscovererTask, 0, delay, @@ -91,73 +87,50 @@ public TarantoolClusterClient(TarantoolClusterClientConfig config, SocketChannel } @Override - protected boolean isDead(TarantoolOp future) { + protected boolean isDead(TarantoolOperation operation) { if ((state.getState() & StateHelper.CLOSED) != 0) { - future.completeExceptionally(new CommunicationException("Connection is dead", thumbstone)); + operation.getResult().completeExceptionally(new CommunicationException("Connection is dead", thumbstone)); return true; } Exception err = thumbstone; if (err != null) { - return checkFail(future, err); + return checkFail(operation, err); } return false; } - @Override - protected TarantoolOp doExec(long timeoutMillis, Code code, Object[] args) { - validateArgs(args); - long sid = syncId.incrementAndGet(); - TarantoolOp future = makeNewOperation(timeoutMillis, sid, code, args); - return registerOperation(future); - } - /** * Registers a new async operation which will be resolved later. * Registration is discovery-aware in term of synchronization and * it may be blocked util the discovery finishes its work. * - * @param future operation to be performed + * @param operation operation to be performed * * @return registered operation */ - private TarantoolOp registerOperation(TarantoolOp future) { + @Override + protected TarantoolOperation registerOperation(TarantoolOperation operation) { long stamp = discoveryLock.readLock(); try { - if (isDead(future)) { - return future; - } - futures.put(future.getId(), future); - if (isDead(future)) { - futures.remove(future.getId()); - return future; - } - - try { - write(future.getCode(), future.getId(), null, future.getArgs()); - } catch (Exception e) { - futures.remove(future.getId()); - fail(future, e); - } - - return future; + return super.registerOperation(operation); } finally { discoveryLock.unlock(stamp); } } @Override - protected void fail(TarantoolOp future, Exception e) { - checkFail(future, e); + protected void fail(TarantoolOperation operation, Exception cause) { + checkFail(operation, cause); } - protected boolean checkFail(TarantoolOp future, Exception e) { - if (!isTransientError(e)) { - future.completeExceptionally(e); + protected boolean checkFail(TarantoolOperation operation, Exception cause) { + if (!isTransientError(cause)) { + operation.getResult().completeExceptionally(cause); return true; } else { assert retries != null; - retries.put(future.getId(), future); - LOGGER.trace("Request {0} was delayed because of {1}", future, e); + retries.put(operation.getId(), operation); + LOGGER.trace("Request {0} was delayed because of {1}", operation, cause); return false; } } @@ -166,17 +139,13 @@ protected boolean checkFail(TarantoolOp future, Exception e) { protected void close(Exception e) { super.close(e); - if (instancesDiscoveryExecutor != null) { - instancesDiscoveryExecutor.shutdownNow(); - } - if (retries == null) { // May happen within constructor. return; } - for (TarantoolOp op : retries.values()) { - op.completeExceptionally(e); + for (TarantoolOperation operation : retries.values()) { + operation.getResult().completeExceptionally(e); } } @@ -199,23 +168,24 @@ protected void onReconnect() { // First call is before the constructor finished. Skip it. return; } - Collection> delayed = new ArrayList<>(retries.values()); - Collection> reissued = new ArrayList<>(retries.size()); + Collection delayed = new ArrayList<>(retries.values()); + Collection reissued = new ArrayList<>(retries.size()); retries.clear(); - for (final TarantoolOp future : delayed) { - if (!future.isDone()) { - executor.execute(() -> registerOperation(future)); - reissued.add(future); + for (final TarantoolOperation operation : delayed) { + if (!operation.getResult().isDone()) { + operation.setSentSchemaId(schemaMeta.getSchemaVersion()); + executor.execute(() -> registerOperation(operation)); + reissued.add(operation); } } - for (final TarantoolOp future : reissued) { - LOGGER.trace("{0} was re-issued after reconnection", future); + for (final TarantoolOperation operation : reissued) { + LOGGER.trace("{0} was re-issued after reconnection", operation); } } @Override - protected void complete(TarantoolPacket packet, TarantoolOp future) { - super.complete(packet, future); + protected void complete(TarantoolPacket packet, TarantoolOperation operation) { + super.complete(packet, operation); RefreshableSocketProvider provider = getRefreshableSocketProvider(); if (provider != null) { renewConnectionIfRequired(provider.getAddresses()); @@ -288,7 +258,6 @@ public synchronized void run() { onInstancesRefreshed(lastInstances); } } catch (Exception ignored) { - ignored.getCause(); // no-op } } diff --git a/src/main/java/org/tarantool/TarantoolConnection.java b/src/main/java/org/tarantool/TarantoolConnection.java index 09883bc0..0bdca220 100644 --- a/src/main/java/org/tarantool/TarantoolConnection.java +++ b/src/main/java/org/tarantool/TarantoolConnection.java @@ -2,6 +2,8 @@ import org.tarantool.protocol.ProtoUtils; import org.tarantool.protocol.TarantoolPacket; +import org.tarantool.schema.TarantoolSchemaException; +import org.tarantool.schema.TarantoolSchemaMeta; import java.io.IOException; import java.io.InputStream; @@ -27,11 +29,17 @@ public TarantoolConnection(String username, String password, Socket socket) thro } @Override - protected List exec(Code code, Object... args) { - TarantoolPacket responsePacket = writeAndRead(code, args); + protected List exec(TarantoolRequest request) { + Object[] args = request.getArguments().toArray(); + TarantoolPacket responsePacket = writeAndRead(request.getCode(), args); return (List) responsePacket.getBody().get(Key.DATA.getId()); } + @Override + protected TarantoolSchemaMeta getSchemaMeta() { + throw new TarantoolSchemaException("Schema operations are not supported."); + } + protected TarantoolPacket writeAndRead(Code code, Object... args) { try { ByteBuffer packet = ProtoUtils.createPacket(initialRequestSize, msgPackLite, @@ -44,7 +52,7 @@ protected TarantoolPacket writeAndRead(Code code, Object... args) { Long c = responsePacket.getCode(); if (c != 0) { - throw serverError(c, responsePacket.getBody().get(Key.ERROR.getId())); + throw serverError(c, responsePacket.getError()); } return responsePacket; diff --git a/src/main/java/org/tarantool/TarantoolException.java b/src/main/java/org/tarantool/TarantoolException.java index e7d38e82..10afdfb8 100644 --- a/src/main/java/org/tarantool/TarantoolException.java +++ b/src/main/java/org/tarantool/TarantoolException.java @@ -1,5 +1,11 @@ package org.tarantool; +import static org.tarantool.protocol.ProtoConstants.ERR_LOADING; +import static org.tarantool.protocol.ProtoConstants.ERR_LOCAL_INSTANCE_ID_IS_READ_ONLY; +import static org.tarantool.protocol.ProtoConstants.ERR_READONLY; +import static org.tarantool.protocol.ProtoConstants.ERR_TIMEOUT; +import static org.tarantool.protocol.ProtoConstants.ERR_WRONG_SCHEMA_VERSION; + /** * A remote server error with error code and message. * @@ -7,11 +13,6 @@ * @version $Id: $ */ public class TarantoolException extends RuntimeException { - /* taken from src/box/errcode.h */ - public static final int ERR_READONLY = 7; - public static final int ERR_TIMEOUT = 78; - public static final int ERR_LOADING = 116; - public static final int ERR_LOCAL_INSTANCE_ID_IS_READ_ONLY = 128; private static final long serialVersionUID = 1L; long code; @@ -60,6 +61,7 @@ public boolean isTransient() { switch ((int) code) { case ERR_READONLY: case ERR_TIMEOUT: + case ERR_WRONG_SCHEMA_VERSION: case ERR_LOADING: case ERR_LOCAL_INSTANCE_ID_IS_READ_ONLY: return true; diff --git a/src/main/java/org/tarantool/TarantoolOperation.java b/src/main/java/org/tarantool/TarantoolOperation.java new file mode 100644 index 00000000..0707d920 --- /dev/null +++ b/src/main/java/org/tarantool/TarantoolOperation.java @@ -0,0 +1,183 @@ +package org.tarantool; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +/** + * Describes an internal state of a registered request. + */ +public class TarantoolOperation implements Comparable { + + /** + * A operation identifier. + */ + private final long id; + + /** + * Tarantool binary protocol operation code. + */ + private final Code code; + + /** + * Schema ID when this operation was registered. + */ + private long sentSchemaId; + + /** + * Schema ID when this operation was completed. + */ + private long completedSchemaId; + + /** + * Arguments of operation. + */ + private final List arguments; + + /** + * Future request result. + */ + private final CompletableFuture result = new CompletableFuture<>(); + + /** + * Operation timeout. + */ + private final Duration timeout; + + /** + * Optional operation which is used for + * schema synchronization purposes. + */ + private TarantoolOperation dependedOperation; + + public TarantoolOperation(Code code, + List arguments, + long id, + long schemaId, + Duration timeout) { + this.id = id; + this.sentSchemaId = schemaId; + this.code = Objects.requireNonNull(code); + this.arguments = new ArrayList<>(arguments); + this.timeout = timeout; + setupTimeout(timeout); + } + + public TarantoolOperation(Code code, + List arguments, + long id, + long schemaId, + Duration timeout, + TarantoolOperation dependedOperation) { + this.id = id; + this.sentSchemaId = schemaId; + this.code = Objects.requireNonNull(code); + this.arguments = new ArrayList<>(arguments); + this.timeout = timeout; + this.dependedOperation = dependedOperation; + setupTimeout(timeout); + } + + public long getId() { + return id; + } + + public long getSentSchemaId() { + return sentSchemaId; + } + + public void setSentSchemaId(long sentSchemaId) { + this.sentSchemaId = sentSchemaId; + } + + public long getCompletedSchemaId() { + return completedSchemaId; + } + + public void setCompletedSchemaId(long completedSchemaId) { + this.completedSchemaId = completedSchemaId; + } + + public CompletableFuture getResult() { + return result; + } + + public Code getCode() { + return code; + } + + public TarantoolOperation getDependedOperation() { + return dependedOperation; + } + + public Duration getTimeout() { + return timeout; + } + + /** + * Serializability means this requests is capable being + * translated in a binary packet according to {@code iproto} + * protocol. + * + * @return {@literal true} if this request is serializable + */ + public boolean isSerializable() { + return arguments.stream().allMatch(TarantoolRequestArgument::isSerializable); + } + + public List getArguments() { + return arguments.stream().map(TarantoolRequestArgument::getValue).collect(Collectors.toList()); + } + + @Override + public int compareTo(TarantoolOperation other) { + return Long.compareUnsigned(this.id, other.id); + } + + private void setupTimeout(Duration duration) { + if (duration == null) { + return; + } + if (duration.isNegative()) { + throw new IllegalArgumentException("Timeout cannot be negative"); + } + if (duration.isZero() || result.isDone()) { + return; + } + ScheduledFuture abandonByTimeoutAction = TimeoutScheduler.EXECUTOR.schedule( + () -> { + if (!result.isDone()) { + result.completeExceptionally(new TimeoutException()); + } + }, + duration.toMillis(), TimeUnit.MILLISECONDS + ); + result.whenComplete((ignored, error) -> { + if (error == null && !abandonByTimeoutAction.isDone()) { + abandonByTimeoutAction.cancel(false); + } + }); + } + + /** + * Runs timeout operation as a delayed task. + */ + static class TimeoutScheduler { + static final ScheduledThreadPoolExecutor EXECUTOR; + + static { + EXECUTOR = new ScheduledThreadPoolExecutor( + 1, new TarantoolThreadDaemonFactory("tarantoolTimeout") + ); + EXECUTOR.setRemoveOnCancelPolicy(true); + } + } + +} diff --git a/src/main/java/org/tarantool/TarantoolRequest.java b/src/main/java/org/tarantool/TarantoolRequest.java new file mode 100644 index 00000000..190f7dae --- /dev/null +++ b/src/main/java/org/tarantool/TarantoolRequest.java @@ -0,0 +1,86 @@ +package org.tarantool; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Describes a static request parameters. + */ +public class TarantoolRequest { + + /** + * Tarantool binary protocol operation code. + */ + private Code code; + + /** + * Arguments of operation. + */ + private List arguments; + + /** + * Request timeout start just after initialization. + */ + private Duration timeout; + + public TarantoolRequest(Code code) { + this.code = code; + this.arguments = new ArrayList<>(); + } + + public TarantoolRequest(Code code, TarantoolRequestArgument... arguments) { + this.code = code; + this.arguments = Arrays.asList(arguments); + } + + /** + * Initializes an operation and starts its timer. + * + * @param sid internal request id + * @param schemaId schema version + */ + TarantoolOperation toOperation(long sid, long schemaId) { + return new TarantoolOperation(code, arguments, sid, schemaId, timeout); + } + + /** + * Initializes a preflight operation that + * will be processed before the dependent. + * + * @param sid internal request id + * @param schemaId schema version + * @param operation depended operation + */ + TarantoolOperation toPreflightOperation(long sid, long schemaId, TarantoolOperation operation) { + return new TarantoolOperation(code, arguments, sid, schemaId, timeout, operation); + } + + + public Code getCode() { + return code; + } + + public void setCode(Code code) { + this.code = code; + } + + public Duration getTimeout() { + return timeout; + } + + public void setTimeout(Duration timeout) { + this.timeout = timeout; + } + + public List getArguments() { + return arguments.stream().map(TarantoolRequestArgument::getValue).collect(Collectors.toList()); + } + + public void addArguments(TarantoolRequestArgument... arguments) { + this.arguments.addAll(Arrays.asList(arguments)); + } + +} diff --git a/src/main/java/org/tarantool/TarantoolRequestArgument.java b/src/main/java/org/tarantool/TarantoolRequestArgument.java new file mode 100644 index 00000000..6ff11c21 --- /dev/null +++ b/src/main/java/org/tarantool/TarantoolRequestArgument.java @@ -0,0 +1,23 @@ +package org.tarantool; + +/** + * Holds a request argument value. + */ +public interface TarantoolRequestArgument { + + /** + * Flag indicating that held value can be + * represented as bytes supported by iproto. + * + * @return {@literal true} if value is {@code iproto} serializable + */ + boolean isSerializable(); + + /** + * Gets a held value. + * + * @return wrapped value + */ + Object getValue(); + +} diff --git a/src/main/java/org/tarantool/TarantoolRequestArgumentFactory.java b/src/main/java/org/tarantool/TarantoolRequestArgumentFactory.java new file mode 100644 index 00000000..8d602e79 --- /dev/null +++ b/src/main/java/org/tarantool/TarantoolRequestArgumentFactory.java @@ -0,0 +1,79 @@ +package org.tarantool; + +import java.util.Objects; +import java.util.function.Supplier; + +/** + * Request argument factory. + * + * @see TarantoolRequestArgument + */ +public class TarantoolRequestArgumentFactory { + + private TarantoolRequestArgumentFactory() { + } + + public static TarantoolRequestArgument value(Object value) { + return new SimpleArgument(value); + } + + public static TarantoolRequestArgument cacheLookupValue(Supplier supplier) { + return new LookupArgument(supplier); + } + + /** + * Simple wrapper that holds the original value. + */ + private static class SimpleArgument implements TarantoolRequestArgument { + + private Object value; + + SimpleArgument(Object value) { + Objects.requireNonNull(value); + this.value = value; + } + + @Override + public boolean isSerializable() { + return true; + } + + @Override + public Object getValue() { + return value; + } + + } + + /** + * Wrapper that evaluates the value each time + * it is requested. + *

+ * It works like a function, where {@code argument = f(key)}. + */ + private static class LookupArgument implements TarantoolRequestArgument { + + Supplier lookup; + + LookupArgument(Supplier lookup) { + this.lookup = Objects.requireNonNull(lookup); + } + + @Override + public boolean isSerializable() { + try { + lookup.get(); + } catch (Exception ignored) { + return false; + } + return true; + } + + @Override + public synchronized Object getValue() { + return lookup.get(); + } + + } + +} diff --git a/src/main/java/org/tarantool/cluster/TarantoolClusterStoredFunctionDiscoverer.java b/src/main/java/org/tarantool/cluster/TarantoolClusterStoredFunctionDiscoverer.java index 3b79819e..052ff4ba 100644 --- a/src/main/java/org/tarantool/cluster/TarantoolClusterStoredFunctionDiscoverer.java +++ b/src/main/java/org/tarantool/cluster/TarantoolClusterStoredFunctionDiscoverer.java @@ -1,11 +1,12 @@ package org.tarantool.cluster; -import org.tarantool.TarantoolClient; +import org.tarantool.TarantoolClientImpl; import org.tarantool.TarantoolClientOps; import org.tarantool.TarantoolClusterClientConfig; import org.tarantool.logging.Logger; import org.tarantool.logging.LoggerFactory; import org.tarantool.util.StringUtils; +import org.tarantool.util.TupleTwo; import java.util.LinkedHashSet; import java.util.List; @@ -14,34 +15,37 @@ /** * A cluster nodes discoverer based on calling a predefined function * which returns list of nodes. - * + *

* The function has to have no arguments and return list of * the strings which follow host[:port] format + *

+ * This class is not a part of public API. */ public class TarantoolClusterStoredFunctionDiscoverer implements TarantoolClusterDiscoverer { private static final Logger LOGGER = LoggerFactory.getLogger(TarantoolClusterStoredFunctionDiscoverer.class); - private TarantoolClient client; + private TarantoolClientImpl client; private String entryFunction; - public TarantoolClusterStoredFunctionDiscoverer(TarantoolClusterClientConfig clientConfig, TarantoolClient client) { + public TarantoolClusterStoredFunctionDiscoverer(TarantoolClusterClientConfig clientConfig, + TarantoolClientImpl client) { this.client = client; this.entryFunction = clientConfig.clusterDiscoveryEntryFunction; } @Override public Set getInstances() { - TarantoolClientOps, Object, List> syncOperations = client.syncOps(); + TarantoolClientOps, Object, TupleTwo, Long>> syncOperations = client.unsafeSchemaOps(); - List list = syncOperations.call(entryFunction); + TupleTwo, Long> result = syncOperations.call(entryFunction); // discoverer expects a single array result from the function now; // in order to protect this contract the discoverer does a strict // validation against the data returned; // this strict-mode allows us to extend the contract in a non-breaking // way for old clients just reserve an extra return value in // terms of Lua multi-result support.; - return checkAndFilterAddresses(list); + return checkAndFilterAddresses(result.getFirst()); } /** diff --git a/src/main/java/org/tarantool/jdbc/SQLConnection.java b/src/main/java/org/tarantool/jdbc/SQLConnection.java index 15f3258a..327d0d69 100644 --- a/src/main/java/org/tarantool/jdbc/SQLConnection.java +++ b/src/main/java/org/tarantool/jdbc/SQLConnection.java @@ -1,12 +1,12 @@ package org.tarantool.jdbc; -import org.tarantool.Code; import org.tarantool.CommunicationException; -import org.tarantool.Key; import org.tarantool.SocketChannelProvider; import org.tarantool.SqlProtoUtils; import org.tarantool.TarantoolClientConfig; import org.tarantool.TarantoolClientImpl; +import org.tarantool.TarantoolOperation; +import org.tarantool.TarantoolRequest; import org.tarantool.protocol.TarantoolPacket; import org.tarantool.util.JdbcConstants; import org.tarantool.util.SQLStates; @@ -32,6 +32,8 @@ import java.sql.Savepoint; import java.sql.Statement; import java.sql.Struct; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -39,6 +41,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; @@ -538,8 +541,8 @@ public SQLBatchResultHolder executeBatch(long timeout, List quer checkNotClosed(); SQLTarantoolClientImpl.SQLRawOps sqlOps = client.sqlRawOps(); SQLBatchResultHolder batchResult = useNetworkTimeout(timeout) - ? sqlOps.executeBatch(queries) - : sqlOps.executeBatch(timeout, queries); + ? sqlOps.executeBatch(queries) + : sqlOps.executeBatch(timeout, queries); return batchResult; } @@ -734,13 +737,13 @@ private static String formatError(SQLQueryHolder query) { static class SQLTarantoolClientImpl extends TarantoolClientImpl { private Future executeQuery(SQLQueryHolder queryHolder) { - return exec(Code.EXECUTE, Key.SQL_TEXT, queryHolder.getQuery(), Key.SQL_BIND, queryHolder.getParams()); + return exec(makeSqlRequest(queryHolder.getQuery(), queryHolder.getParams())); } private Future executeQuery(SQLQueryHolder queryHolder, long timeoutMillis) { - return exec( - timeoutMillis, Code.EXECUTE, Key.SQL_TEXT, queryHolder.getQuery(), Key.SQL_BIND, queryHolder.getParams() - ); + TarantoolRequest request = makeSqlRequest(queryHolder.getQuery(), queryHolder.getParams()); + request.setTimeout(Duration.of(timeoutMillis, ChronoUnit.MILLIS)); + return exec(request); } final SQLRawOps sqlRawOps = new SQLRawOps() { @@ -809,12 +812,12 @@ SQLRawOps sqlRawOps() { } @Override - protected void completeSql(TarantoolOp future, TarantoolPacket pack) { + protected void completeSql(TarantoolOperation operation, TarantoolPacket pack) { Long rowCount = SqlProtoUtils.getSQLRowCount(pack); SQLResultHolder result = (rowCount == null) ? SQLResultHolder.ofQuery(SqlProtoUtils.getSQLMetadata(pack), SqlProtoUtils.getSQLData(pack)) : SQLResultHolder.ofUpdate(rowCount.intValue(), SqlProtoUtils.getSQLAutoIncrementIds(pack)); - ((TarantoolOp) future).complete(result); + ((CompletableFuture) operation.getResult()).complete(result); } interface SQLRawOps { diff --git a/src/main/java/org/tarantool/protocol/ProtoConstants.java b/src/main/java/org/tarantool/protocol/ProtoConstants.java new file mode 100644 index 00000000..daee4dd7 --- /dev/null +++ b/src/main/java/org/tarantool/protocol/ProtoConstants.java @@ -0,0 +1,19 @@ +package org.tarantool.protocol; + +public class ProtoConstants { + + private ProtoConstants() { + } + + public static final long ERROR_TYPE_MARKER = 0x8000; + + public static final long SUCCESS = 0x0; + + /* taken from src/box/errcode.h */ + public static final int ERR_READONLY = 7; + public static final int ERR_TIMEOUT = 78; + public static final int ERR_WRONG_SCHEMA_VERSION = 109; + public static final int ERR_LOADING = 116; + public static final int ERR_LOCAL_INSTANCE_ID_IS_READ_ONLY = 128; + +} diff --git a/src/main/java/org/tarantool/protocol/ProtoUtils.java b/src/main/java/org/tarantool/protocol/ProtoUtils.java index 7b1a12be..51481d38 100644 --- a/src/main/java/org/tarantool/protocol/ProtoUtils.java +++ b/src/main/java/org/tarantool/protocol/ProtoUtils.java @@ -213,7 +213,7 @@ private static void assertCorrectWelcome(String firstLine, SocketAddress remoteA private static void assertNoErrCode(TarantoolPacket authResponse) { Long code = (Long) authResponse.getHeaders().get(Key.CODE.getId()); if (code != 0) { - Object error = authResponse.getBody().get(Key.ERROR.getId()); + Object error = authResponse.getError(); String errorMsg = error instanceof String ? (String) error : new String((byte[]) error); throw new TarantoolException(code, errorMsg); } @@ -305,7 +305,22 @@ public static ByteBuffer createPacket(int initialRequestSize, return buffer; } + /** + * Extracts an error code. + * + * @param code in 0x8XXX format + * + * @return actual error code (which is a XXX part) + */ + public static long extractErrorCode(long code) { + if ((code & ProtoConstants.ERROR_TYPE_MARKER) == 0) { + throw new IllegalArgumentException(String.format("Code %h does not follow 0x8XXX format", code)); + } + return (~ProtoConstants.ERROR_TYPE_MARKER & code); + } + private static class ByteArrayOutputStream extends java.io.ByteArrayOutputStream { + public ByteArrayOutputStream(int size) { super(size); } @@ -313,6 +328,7 @@ public ByteArrayOutputStream(int size) { ByteBuffer toByteBuffer() { return ByteBuffer.wrap(buf, 0, count); } + } } diff --git a/src/main/java/org/tarantool/protocol/TarantoolPacket.java b/src/main/java/org/tarantool/protocol/TarantoolPacket.java index ca131177..ca661ed9 100644 --- a/src/main/java/org/tarantool/protocol/TarantoolPacket.java +++ b/src/main/java/org/tarantool/protocol/TarantoolPacket.java @@ -29,8 +29,9 @@ public Long getCode() { potenticalCode != null ? potenticalCode.getClass().toString() : "null" ); } + Long code = (Long) potenticalCode; - return (Long) potenticalCode; + return code == 0 ? code : ProtoUtils.extractErrorCode(code); } public Long getSync() { @@ -48,4 +49,16 @@ public Map getBody() { public boolean hasBody() { return body != null && body.size() > 0; } + + public long getSchemaId() { + return (Long) headers.get(Key.SCHEMA_ID.getId()); + } + + public Object getData() { + return hasBody() ? body.get(Key.DATA.getId()) : null; + } + + public Object getError() { + return hasBody() ? body.get(Key.ERROR.getId()) : null; + } } diff --git a/src/main/java/org/tarantool/schema/TarantoolIndexMeta.java b/src/main/java/org/tarantool/schema/TarantoolIndexMeta.java new file mode 100644 index 00000000..cb468d7e --- /dev/null +++ b/src/main/java/org/tarantool/schema/TarantoolIndexMeta.java @@ -0,0 +1,128 @@ +package org.tarantool.schema; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Keeps a space index metadata. + */ +public class TarantoolIndexMeta { + + public static final int VINDEX_IID_FIELD_NUMBER = 1; + public static final int VINDEX_NAME_FIELD_NUMBER = 2; + public static final int VINDEX_TYPE_FIELD_NUMBER = 3; + public static final int VINDEX_OPTIONS_FIELD_NUMBER = 4; + public static final int VINDEX_PARTS_FIELD_NUMBER = 5; + + public static final int VINDEX_PART_FIELD = 0; + public static final int VINDEX_PART_TYPE = 1; + + private final int id; + private final String name; + private final String type; + private final IndexOptions options; + private final List parts; + + public TarantoolIndexMeta(int id, + String name, + String type, + IndexOptions options, + List parts) { + this.id = id; + this.name = name; + this.type = type; + this.options = options; + this.parts = parts; + } + + public static TarantoolIndexMeta fromTuple(List tuple) { + Map optionsMap = (Map) tuple.get(VINDEX_OPTIONS_FIELD_NUMBER); + + List parts = Collections.emptyList(); + List partsTuple = (List) tuple.get(VINDEX_PARTS_FIELD_NUMBER); + if (!partsTuple.isEmpty()) { + // simplified index parts as an array + // (when the parts don't use collation and is_nullable options) + if (partsTuple.get(0) instanceof List) { + parts = ((List>) partsTuple) + .stream() + .map(part -> new IndexPart( + (Integer) part.get(VINDEX_PART_FIELD), + (String) part.get(VINDEX_PART_TYPE) + ) + ) + .collect(Collectors.toList()); + } else if (partsTuple.get(0) instanceof Map) { + parts = ((List>) partsTuple) + .stream() + .map(part -> new IndexPart((Integer) part.get("field"), (String) part.get("type"))) + .collect(Collectors.toList()); + } + } + + return new TarantoolIndexMeta( + (Integer) tuple.get(VINDEX_IID_FIELD_NUMBER), + (String) tuple.get(VINDEX_NAME_FIELD_NUMBER), + (String) tuple.get(VINDEX_TYPE_FIELD_NUMBER), + new IndexOptions((Boolean) optionsMap.get("unique")), + parts + ); + } + + public int getId() { + return id; + } + + public String getName() { + return name; + } + + public String getType() { + return type; + } + + public IndexOptions getOptions() { + return options; + } + + public List getParts() { + return parts; + } + + public static class IndexOptions { + + private final boolean unique; + + public IndexOptions(boolean unique) { + this.unique = unique; + } + + public boolean isUnique() { + return unique; + } + + } + + public static class IndexPart { + + private final int fieldNumber; + private final String type; + + public IndexPart(int fieldNumber, String type) { + this.fieldNumber = fieldNumber; + this.type = type; + } + + public int getFieldNumber() { + return fieldNumber; + } + + public String getType() { + return type; + } + + } + +} diff --git a/src/main/java/org/tarantool/schema/TarantoolIndexNotFoundException.java b/src/main/java/org/tarantool/schema/TarantoolIndexNotFoundException.java new file mode 100644 index 00000000..49406fae --- /dev/null +++ b/src/main/java/org/tarantool/schema/TarantoolIndexNotFoundException.java @@ -0,0 +1,16 @@ +package org.tarantool.schema; + +public class TarantoolIndexNotFoundException extends TarantoolSchemaException { + + private final String indexName; + + public TarantoolIndexNotFoundException(String targetSpace, String indexName) { + super(targetSpace); + this.indexName = indexName; + } + + public String getIndexName() { + return indexName; + } + +} diff --git a/src/main/java/org/tarantool/schema/TarantoolMetaSpacesCache.java b/src/main/java/org/tarantool/schema/TarantoolMetaSpacesCache.java new file mode 100644 index 00000000..8a800e38 --- /dev/null +++ b/src/main/java/org/tarantool/schema/TarantoolMetaSpacesCache.java @@ -0,0 +1,122 @@ +package org.tarantool.schema; + +import org.tarantool.Iterator; +import org.tarantool.TarantoolClientImpl; +import org.tarantool.TarantoolClientOps; +import org.tarantool.util.TupleTwo; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * In-memory schema cache. + *

+ * Caches meta spaces {@code _vspace} and {@code _vindex}. + *

+ * This class is not a part of public API. + */ +public class TarantoolMetaSpacesCache implements TarantoolSchemaMeta { + + private static final int VSPACE_ID = 281; + private static final int VSPACE_ID_INDEX_ID = 0; + + private static final int VINDEX_ID = 289; + private static final int VINDEX_ID_INDEX_ID = 0; + + /** + * Describes the theoretical maximum tuple size + * which is (2^31 - 1) (box.schema.SPACE_MAX) + */ + private static final int MAX_TUPLES = 2_147_483_647; + + private TarantoolClientImpl client; + + private volatile Map cachedSpaces = Collections.emptyMap(); + private volatile long schemaVersion; + + public TarantoolMetaSpacesCache(TarantoolClientImpl client) { + this.client = client; + } + + @Override + public TarantoolSpaceMeta getSpace(String spaceName) { + TarantoolSpaceMeta space = cachedSpaces.get(spaceName); + if (space == null) { + throw new TarantoolSpaceNotFoundException(spaceName); + } + return space; + } + + @Override + public TarantoolIndexMeta getSpaceIndex(String spaceName, String indexName) { + TarantoolIndexMeta index = getSpace(spaceName).getIndex(indexName); + if (index == null) { + throw new TarantoolIndexNotFoundException(spaceName, indexName); + } + return index; + } + + @Override + public long getSchemaVersion() { + return schemaVersion; + } + + @Override + public synchronized long refresh() { + TupleTwo, Long> result = fetchSpaces(); + cachedSpaces = result.getFirst() + .stream() + .collect( + Collectors.toConcurrentMap( + TarantoolSpaceMeta::getName, + Function.identity(), + (oldValue, newValue) -> newValue, + ConcurrentHashMap::new + ) + ); + return schemaVersion = result.getSecond(); + } + + @Override + public boolean isInitialized() { + return schemaVersion != 0; + } + + private TupleTwo, Long> fetchSpaces() { + TarantoolClientOps, Object, TupleTwo, Long>> clientOps = client.unsafeSchemaOps(); + + long firstRequestSchema = -1; + long secondRequestSchema = 0; + List spaces = null; + List indexes = null; + while (firstRequestSchema != secondRequestSchema) { + TupleTwo, Long> spacesResult = clientOps + .select(VSPACE_ID, VSPACE_ID_INDEX_ID, Collections.emptyList(), 0, Integer.MAX_VALUE, Iterator.ALL); + TupleTwo, Long> indexesResult = clientOps + .select(VINDEX_ID, VINDEX_ID_INDEX_ID, Collections.emptyList(), 0, Integer.MAX_VALUE, Iterator.ALL); + spaces = spacesResult.getFirst(); + indexes = indexesResult.getFirst(); + firstRequestSchema = spacesResult.getSecond(); + secondRequestSchema = indexesResult.getSecond(); + } + + Map>> indexesBySpace = indexes.stream() + .map(tuple -> (List) tuple) + .collect(Collectors.groupingBy(tuple -> (Integer) tuple.get(0))); + + List cachedMeta = spaces.stream() + .map(tuple -> (List) tuple) + .map(tuple -> TarantoolSpaceMeta.fromTuple( + tuple, + indexesBySpace.getOrDefault((Integer) tuple.get(0), Collections.emptyList())) + ) + .collect(Collectors.toList()); + + return TupleTwo.of(cachedMeta, firstRequestSchema); + } + +} diff --git a/src/main/java/org/tarantool/schema/TarantoolSchemaException.java b/src/main/java/org/tarantool/schema/TarantoolSchemaException.java new file mode 100644 index 00000000..877f6c6b --- /dev/null +++ b/src/main/java/org/tarantool/schema/TarantoolSchemaException.java @@ -0,0 +1,15 @@ +package org.tarantool.schema; + +public class TarantoolSchemaException extends RuntimeException { + + private final String schemaName; + + public TarantoolSchemaException(String schemaName) { + this.schemaName = schemaName; + } + + public String getSchemaName() { + return schemaName; + } + +} diff --git a/src/main/java/org/tarantool/schema/TarantoolSchemaMeta.java b/src/main/java/org/tarantool/schema/TarantoolSchemaMeta.java new file mode 100644 index 00000000..bb66b27f --- /dev/null +++ b/src/main/java/org/tarantool/schema/TarantoolSchemaMeta.java @@ -0,0 +1,48 @@ +package org.tarantool.schema; + +/** + * Provides Tarantool instance schema info. + */ +public interface TarantoolSchemaMeta { + + /** + * Finds a space by name if any. + * + * @param spaceName name of target space + * + * @return found space + */ + TarantoolSpaceMeta getSpace(String spaceName); + + /** + * Finds a space index by name if any. + * + * @param spaceName name of target space + * @param indexName name of target index + * + * @return found index meta + */ + TarantoolIndexMeta getSpaceIndex(String spaceName, String indexName); + + /** + * Gets current schema version that is cached. + * + * @return current version + */ + long getSchemaVersion(); + + /** + * Fetches schema metadata. + * + * @return fetched schema metadata version + */ + long refresh(); + + /** + * Checks whether a schema fully cached or not. + * + * @return {@literal true} if the schema is cached at least once + */ + boolean isInitialized(); + +} diff --git a/src/main/java/org/tarantool/schema/TarantoolSpaceMeta.java b/src/main/java/org/tarantool/schema/TarantoolSpaceMeta.java new file mode 100644 index 00000000..0fd4b214 --- /dev/null +++ b/src/main/java/org/tarantool/schema/TarantoolSpaceMeta.java @@ -0,0 +1,99 @@ +package org.tarantool.schema; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Keeps a space metadata. + */ +public class TarantoolSpaceMeta { + + public static final int VSPACE_ID_FIELD_NUMBER = 0; + public static final int VSPACE_NAME_FIELD_NUMBER = 2; + public static final int VSPACE_ENGINE_FIELD_NUMBER = 3; + public static final int VSPACE_FORMAT_FIELD_NUMBER = 6; + + private final int id; + private final String name; + private final String engine; + private final List format; + private final Map indexes; + + public static TarantoolSpaceMeta fromTuple(List spaceTuple, List> indexTuples) { + List fields = ((List>) spaceTuple.get(VSPACE_FORMAT_FIELD_NUMBER)).stream() + .map(field -> new SpaceField(field.get("name").toString(), field.get("type").toString())) + .collect(Collectors.toList()); + + Map indexesMap = indexTuples.stream() + .map(TarantoolIndexMeta::fromTuple) + .collect(Collectors.toMap(TarantoolIndexMeta::getName, Function.identity())); + + return new TarantoolSpaceMeta( + (Integer) spaceTuple.get(VSPACE_ID_FIELD_NUMBER), + spaceTuple.get(VSPACE_NAME_FIELD_NUMBER).toString(), + spaceTuple.get(VSPACE_ENGINE_FIELD_NUMBER).toString(), + Collections.unmodifiableList(fields), + Collections.unmodifiableMap(indexesMap) + ); + } + + public TarantoolSpaceMeta(int id, + String name, + String engine, + List format, + Map indexes) { + this.id = id; + this.name = name; + this.engine = engine; + this.format = format; + this.indexes = indexes; + } + + public int getId() { + return id; + } + + public String getName() { + return name; + } + + public String getEngine() { + return engine; + } + + public List getFormat() { + return format; + } + + public Map getIndexes() { + return indexes; + } + + public TarantoolIndexMeta getIndex(String indexName) { + return indexes.get(indexName); + } + + public static class SpaceField { + + private final String name; + private final String type; + + public SpaceField(String name, String type) { + this.name = name; + this.type = type; + } + + public String getName() { + return name; + } + + public String getType() { + return type; + } + + } + +} diff --git a/src/main/java/org/tarantool/schema/TarantoolSpaceNotFoundException.java b/src/main/java/org/tarantool/schema/TarantoolSpaceNotFoundException.java new file mode 100644 index 00000000..28498e6b --- /dev/null +++ b/src/main/java/org/tarantool/schema/TarantoolSpaceNotFoundException.java @@ -0,0 +1,9 @@ +package org.tarantool.schema; + +public class TarantoolSpaceNotFoundException extends TarantoolSchemaException { + + public TarantoolSpaceNotFoundException(String spaceName) { + super(spaceName); + } + +} diff --git a/src/main/java/org/tarantool/util/TupleTwo.java b/src/main/java/org/tarantool/util/TupleTwo.java index b124e287..9bad7059 100644 --- a/src/main/java/org/tarantool/util/TupleTwo.java +++ b/src/main/java/org/tarantool/util/TupleTwo.java @@ -8,12 +8,12 @@ public class TupleTwo { private final T first; private final U second; - private TupleTwo(T first, U second) { + TupleTwo(T first, U second) { this.first = first; this.second = second; } - public static TupleTwo of(T first, U second) { + public static TupleTwo of(T first, U second) { return new TupleTwo<>(first, second); } diff --git a/src/test/java/org/tarantool/ClientAsyncOperationsIT.java b/src/test/java/org/tarantool/ClientAsyncOperationsIT.java index 087a4760..20ead6a3 100644 --- a/src/test/java/org/tarantool/ClientAsyncOperationsIT.java +++ b/src/test/java/org/tarantool/ClientAsyncOperationsIT.java @@ -7,6 +7,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.tarantool.TestAssertions.checkRawTupleResult; +import org.tarantool.schema.TarantoolIndexNotFoundException; +import org.tarantool.schema.TarantoolSpaceNotFoundException; + import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -27,7 +30,7 @@ /** * Class with test cases for asynchronous operations - * + *

* NOTE: Parametrized tests can be simplified after * https://github.com/junit-team/junit5/issues/878 */ @@ -120,22 +123,43 @@ void testAsyncError(AsyncOpsProvider provider) { @MethodSource("getAsyncOps") void testOperations(AsyncOpsProvider provider) throws ExecutionException, InterruptedException, TimeoutException { - TarantoolClientOps, Object, Future>> ops = provider.getAsyncOps(); + testHelper.executeLua( + "box.space.basic_test:insert{10, '10'}", + "box.space.basic_test:insert{20, '20'}", + "box.space.basic_test:insert{30, '30'}", + "box.space.basic_test:insert{40, '40'}", + "box.space.basic_test:insert{50, '50'}" + ); + TarantoolClientOps, Object, Future>> ops = provider.getAsyncOps(); List>> futures = new ArrayList<>(); - futures.add(ops.insert(spaceId, Arrays.asList(10, "10"))); futures.add(ops.delete(spaceId, Collections.singletonList(10))); - futures.add(ops.insert(spaceId, Arrays.asList(10, "10"))); - futures.add(ops.update(spaceId, Collections.singletonList(10), Arrays.asList("=", 1, "ten"))); - - futures.add(ops.replace(spaceId, Arrays.asList(20, "20"))); - futures.add(ops.upsert(spaceId, Collections.singletonList(20), Arrays.asList(20, "twenty"), - Arrays.asList("=", 1, "twenty"))); - - futures.add(ops.insert(spaceId, Arrays.asList(30, "30"))); - futures.add(ops.call("box.space.basic_test:delete", Collections.singletonList(30))); + futures.add(ops.insert(spaceId, Arrays.asList(60, "60"))); + + futures.add(ops.update(spaceId, Collections.singletonList(50), Arrays.asList("=", 1, "fifty"))); + + futures.add(ops.replace(spaceId, Arrays.asList(30, "thirty"))); + futures.add(ops.replace(spaceId, Arrays.asList(70, "70"))); + + futures.add( + ops.upsert( + spaceId, + Collections.singletonList(20), + Arrays.asList(20, "20"), + Arrays.asList("=", 1, "twenty") + ) + ); + futures.add( + ops.upsert( + spaceId, + Collections.singletonList(80), + Arrays.asList(80, "80"), + Arrays.asList("=", 1, "eighty") + ) + ); + futures.add(ops.call("box.space.basic_test:delete", Collections.singletonList(40))); // Wait completion of all operations. for (Future> f : futures) { @@ -143,10 +167,14 @@ void testOperations(AsyncOpsProvider provider) } // Check the effects. - checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "ten")); + assertEquals(Collections.emptyList(), consoleSelect(10)); checkRawTupleResult(consoleSelect(20), Arrays.asList(20, "twenty")); - assertEquals(consoleSelect(30), Collections.emptyList()); - + checkRawTupleResult(consoleSelect(30), Arrays.asList(30, "thirty")); + assertEquals(Collections.emptyList(), consoleSelect(40)); + checkRawTupleResult(consoleSelect(50), Arrays.asList(50, "fifty")); + checkRawTupleResult(consoleSelect(60), Arrays.asList(60, "60")); + checkRawTupleResult(consoleSelect(70), Arrays.asList(70, "70")); + checkRawTupleResult(consoleSelect(80), Arrays.asList(80, "80")); provider.close(); } @@ -185,16 +213,225 @@ void testCall(AsyncOpsProvider provider) throws ExecutionException, InterruptedE provider.close(); } + @ParameterizedTest + @MethodSource("getAsyncOps") + void testStringSelect(AsyncOpsProvider provider) throws ExecutionException, InterruptedException, TimeoutException { + testHelper.executeLua("box.space.basic_test:insert{1, 'one'}"); + Future> result = provider.getAsyncOps() + .select("basic_test", "pk", Collections.singletonList(1), 0, 1, Iterator.EQ); + + assertEquals( + Collections.singletonList(Arrays.asList(1, "one")), + result.get(TIMEOUT, TimeUnit.MILLISECONDS) + ); + + provider.close(); + } + + @ParameterizedTest + @MethodSource("getAsyncOps") + void testStringInsert(AsyncOpsProvider provider) throws ExecutionException, InterruptedException, TimeoutException { + Future> resultOne = provider.getAsyncOps() + .insert("basic_test", Arrays.asList(1, "one")); + + Future> resultTen = provider.getAsyncOps() + .insert("basic_test", Arrays.asList(10, "ten")); + + resultOne.get(TIMEOUT, TimeUnit.MILLISECONDS); + resultTen.get(TIMEOUT, TimeUnit.MILLISECONDS); + + checkRawTupleResult(consoleSelect(1), Arrays.asList(1, "one")); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "ten")); + + provider.close(); + } + + @ParameterizedTest + @MethodSource("getAsyncOps") + void testStringReplace(AsyncOpsProvider provider) + throws ExecutionException, InterruptedException, TimeoutException { + testHelper.executeLua("box.space.basic_test:insert{1, '1'}"); + testHelper.executeLua("box.space.basic_test:insert{10, '10'}"); + + Future> resultOne = provider.getAsyncOps() + .replace("basic_test", Arrays.asList(1, "one")); + + Future> resultTen = provider.getAsyncOps() + .replace("basic_test", Arrays.asList(10, "ten")); + + resultOne.get(TIMEOUT, TimeUnit.MILLISECONDS); + resultTen.get(TIMEOUT, TimeUnit.MILLISECONDS); + + checkRawTupleResult(consoleSelect(1), Arrays.asList(1, "one")); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "ten")); + + provider.close(); + } + + @ParameterizedTest + @MethodSource("getAsyncOps") + void testStringDelete(AsyncOpsProvider provider) throws ExecutionException, InterruptedException, TimeoutException { + testHelper.executeLua("box.space.basic_test:insert{1, '1'}"); + testHelper.executeLua("box.space.basic_test:insert{10, '10'}"); + testHelper.executeLua("box.space.basic_test:insert{20, '20'}"); + + Future> resultOne = provider.getAsyncOps() + .delete("basic_test", Collections.singletonList(1)); + + Future> resultTwenty = provider.getAsyncOps() + .delete("basic_test", Collections.singletonList(20)); + + resultOne.get(TIMEOUT, TimeUnit.MILLISECONDS); + resultTwenty.get(TIMEOUT, TimeUnit.MILLISECONDS); + + assertEquals(Collections.emptyList(), consoleSelect(1)); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "10")); + assertEquals(Collections.emptyList(), consoleSelect(20)); + + provider.close(); + } + + @ParameterizedTest + @MethodSource("getAsyncOps") + void testStringUpdate(AsyncOpsProvider provider) throws ExecutionException, InterruptedException, TimeoutException { + testHelper.executeLua("box.space.basic_test:insert{1, '1'}"); + testHelper.executeLua("box.space.basic_test:insert{10, '10'}"); + + Future> resultOne = provider.getAsyncOps() + .update("basic_test", Collections.singletonList(1), Arrays.asList("=", 1, "one")); + + Future> resultTwo = provider.getAsyncOps() + .update("basic_test", Collections.singletonList(2), Arrays.asList("=", 1, "two")); + + Future> resultTen = provider.getAsyncOps() + .update("basic_test", Collections.singletonList(10), Arrays.asList("=", 1, "ten")); + + resultOne.get(TIMEOUT, TimeUnit.MILLISECONDS); + resultTwo.get(TIMEOUT, TimeUnit.MILLISECONDS); + resultTen.get(TIMEOUT, TimeUnit.MILLISECONDS); + + checkRawTupleResult(consoleSelect(1), Arrays.asList(1, "one")); + assertEquals(Collections.emptyList(), consoleSelect(2)); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "ten")); + + provider.close(); + } + + @ParameterizedTest + @MethodSource("getAsyncOps") + void testStringUpsert(AsyncOpsProvider provider) throws ExecutionException, InterruptedException, TimeoutException { + testHelper.executeLua("box.space.basic_test:insert{1, '1'}"); + testHelper.executeLua("box.space.basic_test:insert{10, '10'}"); + + Future> resultOne = provider.getAsyncOps() + .upsert("basic_test", Collections.singletonList(1), Arrays.asList(1, "001"), Arrays.asList("=", 1, "one")); + + Future> resultTwo = provider.getAsyncOps() + .upsert("basic_test", Collections.singletonList(2), Arrays.asList(2, "002"), Arrays.asList("=", 1, "two")); + + Future> resultTen = provider.getAsyncOps() + .upsert("basic_test", Collections.singletonList(10), Arrays.asList(10, "010"), + Arrays.asList("=", 1, "ten")); + + resultOne.get(TIMEOUT, TimeUnit.MILLISECONDS); + resultTwo.get(TIMEOUT, TimeUnit.MILLISECONDS); + resultTen.get(TIMEOUT, TimeUnit.MILLISECONDS); + + checkRawTupleResult(consoleSelect(1), Arrays.asList(1, "one")); + checkRawTupleResult(consoleSelect(2), Arrays.asList(2, "002")); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "ten")); + + provider.close(); + } + + @ParameterizedTest + @MethodSource("getAsyncOps") + void testStringMultipleIndirectChanges(AsyncOpsProvider provider) + throws ExecutionException, InterruptedException, TimeoutException { + testHelper.executeLua("box.space.basic_test:insert{1, 'one'}"); + Future> result = provider.getAsyncOps() + .select("basic_test", "pk", Collections.singletonList(1), 0, 1, Iterator.EQ); + + assertEquals( + Collections.singletonList(Arrays.asList(1, "one")), + result.get(TIMEOUT, TimeUnit.MILLISECONDS) + ); + + testHelper.executeLua("box.space.basic_test and box.space.basic_test:drop()"); + testHelper.executeLua( + "box.schema.space.create('basic_test', { format = " + + "{{name = 'id', type = 'integer'}," + + " {name = 'val', type = 'string'} } })", + + "box.space.basic_test:create_index('pk', { type = 'TREE', parts = {'id'} } )" + ); + testHelper.executeLua("box.space.basic_test:insert{2, 'two'}"); + + result = provider.getAsyncOps() + .select("basic_test", "pk", Collections.singletonList(2), 0, 1, Iterator.EQ); + + assertEquals( + Collections.singletonList(Arrays.asList(2, "two")), + result.get(TIMEOUT, TimeUnit.MILLISECONDS) + ); + + testHelper.executeLua("box.space.basic_test and box.space.basic_test:drop()"); + testHelper.executeLua( + "box.schema.space.create('basic_test', { format = " + + "{{name = 'id', type = 'integer'}," + + " {name = 'val', type = 'string'} } })", + + "box.space.basic_test:create_index('pk', { type = 'TREE', parts = {'id'} } )" + ); + testHelper.executeLua("box.space.basic_test:insert{3, 'three'}"); + + result = provider.getAsyncOps() + .select("basic_test", "pk", Collections.singletonList(3), 0, 1, Iterator.EQ); + + assertEquals( + Collections.singletonList(Arrays.asList(3, "three")), + result.get(TIMEOUT, TimeUnit.MILLISECONDS) + ); + + provider.close(); + } + + @ParameterizedTest + @MethodSource("getAsyncOps") + void testUnknownSpace(AsyncOpsProvider provider) throws ExecutionException, InterruptedException, TimeoutException { + Future> resultOne = provider.getAsyncOps() + .update("basic_test_unknown", Collections.singletonList(1), Arrays.asList("=", 1, "one")); + + Exception exception = assertThrows(Exception.class, () -> resultOne.get(TIMEOUT, TimeUnit.MILLISECONDS)); + assertTrue(exception.getCause() instanceof TarantoolSpaceNotFoundException); + + provider.close(); + } + + @ParameterizedTest + @MethodSource("getAsyncOps") + void testUnknownSpaceIndex(AsyncOpsProvider provider) { + Future> resultOne = provider.getAsyncOps() + .select("basic_test", "pk_unknown", Collections.singletonList(3), 0, 1, Iterator.EQ); + + Exception exception = assertThrows(Exception.class, () -> resultOne.get(TIMEOUT, TimeUnit.MILLISECONDS)); + assertTrue(exception.getCause() instanceof TarantoolIndexNotFoundException); + + provider.close(); + } + private List consoleSelect(Object key) { return testHelper.evaluate(TestUtils.toLuaSelect("basic_test", key)); } private interface AsyncOpsProvider { + TarantoolClientOps, Object, Future>> getAsyncOps(); TarantoolClient getClient(); void close(); + } private static class ClientAsyncOpsProvider implements AsyncOpsProvider { @@ -236,7 +473,7 @@ public TarantoolClient getClient() { @Override public void close() { - composableOps.close(); + client.close(); } } @@ -257,6 +494,11 @@ public Future> select(Integer space, Integer index, List key, int off return originOps.select(space, index, key, offset, limit, iterator).toCompletableFuture(); } + @Override + public Future> select(String space, String index, List key, int offset, int limit, int iterator) { + return originOps.select(space, index, key, offset, limit, iterator).toCompletableFuture(); + } + @Override public Future> select(Integer space, Integer index, @@ -267,31 +509,66 @@ public Future> select(Integer space, return originOps.select(space, index, key, offset, limit, iterator).toCompletableFuture(); } + @Override + public Future> select(String space, + String index, + List key, + int offset, + int limit, + Iterator iterator) { + return originOps.select(space, index, key, offset, limit, iterator).toCompletableFuture(); + } + @Override public Future> insert(Integer space, List tuple) { return originOps.insert(space, tuple).toCompletableFuture(); } + @Override + public Future> insert(String space, List tuple) { + return originOps.insert(space, tuple).toCompletableFuture(); + } + @Override public Future> replace(Integer space, List tuple) { return originOps.replace(space, tuple).toCompletableFuture(); } + @Override + public Future> replace(String space, List tuple) { + return originOps.replace(space, tuple).toCompletableFuture(); + } + @Override public Future> update(Integer space, List key, Object... tuple) { return originOps.update(space, key, tuple).toCompletableFuture(); } + @Override + public Future> update(String space, List key, Object... tuple) { + return originOps.update(space, key, tuple).toCompletableFuture(); + } + @Override public Future> upsert(Integer space, List key, List defTuple, Object... ops) { return originOps.upsert(space, key, defTuple, ops).toCompletableFuture(); } + @Override + public Future> upsert(String space, List key, List defTuple, Object... ops) { + return originOps.upsert(space, key, defTuple, ops).toCompletableFuture(); + } + @Override public Future> delete(Integer space, List key) { return originOps.delete(space, key).toCompletableFuture(); } + @Override + public Future> delete(String space, List key) { + return originOps.delete(space, key).toCompletableFuture(); + } + @Override public Future> call(String function, Object... args) { return originOps.call(function, args).toCompletableFuture(); @@ -311,6 +588,7 @@ public void ping() { public void close() { originOps.close(); } + } } diff --git a/src/test/java/org/tarantool/ClientOperationsIT.java b/src/test/java/org/tarantool/ClientOperationsIT.java index 1f7e1826..762c3dfb 100644 --- a/src/test/java/org/tarantool/ClientOperationsIT.java +++ b/src/test/java/org/tarantool/ClientOperationsIT.java @@ -2,6 +2,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.tarantool.TestAssertions.checkRawTupleResult; + +import org.tarantool.schema.TarantoolIndexNotFoundException; +import org.tarantool.schema.TarantoolSpaceNotFoundException; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -9,6 +14,10 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + /** * Tests for synchronous operations of {@link TarantoolClientImpl} class. * @@ -34,11 +43,18 @@ public static void tearDownEnv() { @BeforeEach public void setUp() { + testHelper.executeLua( + "box.schema.space.create('basic_test', { format = " + + "{{name = 'id', type = 'integer'}," + + " {name = 'val', type = 'string'} } })", + "box.space.basic_test:create_index('pk', { type = 'TREE', parts = {'id'} } )" + ); client = TestUtils.makeTestClient(TestUtils.makeDefaultClientConfig(), 2000); } @AfterEach public void tearDown() { + testHelper.executeLua("box.space.basic_test and box.space.basic_test:drop()"); client.close(); } @@ -48,4 +64,173 @@ public void testClose() { assertEquals(e.getMessage(), "You should close TarantoolClient instead."); } + @Test + void testStringSelect() { + testHelper.executeLua("box.space.basic_test:insert{1, 'one'}"); + List result = client.syncOps() + .select("basic_test", "pk", Collections.singletonList(1), 0, 1, Iterator.EQ); + + assertEquals(Collections.singletonList(Arrays.asList(1, "one")), result); + } + + @Test + void testStringInsert() { + client.syncOps().insert("basic_test", Arrays.asList(1, "one")); + client.syncOps().insert("basic_test", Arrays.asList(10, "ten")); + + checkRawTupleResult(consoleSelect(1), Arrays.asList(1, "one")); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "ten")); + } + + @Test + void testStringReplace() { + testHelper.executeLua("box.space.basic_test:insert{1, '1'}"); + testHelper.executeLua("box.space.basic_test:insert{10, '10'}"); + + client.syncOps().replace("basic_test", Arrays.asList(1, "one")); + client.syncOps().replace("basic_test", Arrays.asList(10, "ten")); + + checkRawTupleResult(consoleSelect(1), Arrays.asList(1, "one")); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "ten")); + } + + @Test + void testStringDelete() { + testHelper.executeLua("box.space.basic_test:insert{1, '1'}"); + testHelper.executeLua("box.space.basic_test:insert{10, '10'}"); + testHelper.executeLua("box.space.basic_test:insert{20, '20'}"); + + client.syncOps().delete("basic_test", Collections.singletonList(1)); + client.syncOps().delete("basic_test", Collections.singletonList(20)); + + assertEquals(Collections.emptyList(), consoleSelect(1)); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "10")); + assertEquals(Collections.emptyList(), consoleSelect(20)); + } + + @Test + void testStringUpdate() { + testHelper.executeLua("box.space.basic_test:insert{1, '1'}"); + testHelper.executeLua("box.space.basic_test:insert{10, '10'}"); + + TarantoolClientOps, Object, List> clientOps = client.syncOps(); + clientOps.update("basic_test", Collections.singletonList(1), Arrays.asList("=", 1, "one")); + clientOps.update("basic_test", Collections.singletonList(2), Arrays.asList("=", 1, "two")); + clientOps.update("basic_test", Collections.singletonList(10), Arrays.asList("=", 1, "ten")); + + checkRawTupleResult(consoleSelect(1), Arrays.asList(1, "one")); + assertEquals(Collections.emptyList(), consoleSelect(2)); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "ten")); + } + + @Test + void testStringUpsert() { + testHelper.executeLua("box.space.basic_test:insert{1, '1'}"); + testHelper.executeLua("box.space.basic_test:insert{10, '10'}"); + + TarantoolClientOps, Object, List> ops = client.syncOps(); + ops.upsert( + "basic_test", Collections.singletonList(1), + Arrays.asList(1, "001"), Arrays.asList("=", 1, "one") + ); + ops.upsert( + "basic_test", Collections.singletonList(2), + Arrays.asList(2, "002"), Arrays.asList("=", 1, "two") + ); + ops.upsert( + "basic_test", Collections.singletonList(10), + Arrays.asList(10, "010"), Arrays.asList("=", 1, "ten") + ); + + checkRawTupleResult(consoleSelect(1), Arrays.asList(1, "one")); + checkRawTupleResult(consoleSelect(2), Arrays.asList(2, "002")); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "ten")); + } + + @Test + void testStringMultipleIndirectChanges() { + testHelper.executeLua("box.space.basic_test:insert{1, 'one'}"); + List result = client.syncOps().select("basic_test", "pk", Collections.singletonList(1), 0, 1, Iterator.EQ); + assertEquals(Collections.singletonList(Arrays.asList(1, "one")), result); + + testHelper.executeLua("box.space.basic_test and box.space.basic_test:drop()"); + testHelper.executeLua( + "box.schema.space.create('basic_test', { format = " + + "{{name = 'id', type = 'integer'}," + + " {name = 'val', type = 'string'} } })", + + "box.space.basic_test:create_index('pk', { type = 'TREE', parts = {'id'} } )" + ); + testHelper.executeLua("box.space.basic_test:insert{2, 'two'}"); + result = client.syncOps().select("basic_test", "pk", Collections.singletonList(2), 0, 1, Iterator.EQ); + assertEquals(Collections.singletonList(Arrays.asList(2, "two")), result); + + testHelper.executeLua("box.space.basic_test and box.space.basic_test:drop()"); + testHelper.executeLua( + "box.schema.space.create('basic_test', { format = " + + "{{name = 'id', type = 'integer'}," + + " {name = 'val', type = 'string'} } })", + + "box.space.basic_test:create_index('pk', { type = 'TREE', parts = {'id'} } )" + ); + testHelper.executeLua("box.space.basic_test:insert{3, 'three'}"); + result = client.syncOps().select("basic_test", "pk", Collections.singletonList(3), 0, 1, Iterator.EQ); + assertEquals(Collections.singletonList(Arrays.asList(3, "three")), result); + } + + @Test + void testUnknownSpace() { + TarantoolClientOps, Object, List> clientOps = client.syncOps(); + Exception error = assertThrows( + Exception.class, + () -> clientOps.select("base_test_unknown", "pk", Collections.singletonList(12), 0, 1, Iterator.EQ) + ); + + assertTrue(error.getCause() instanceof TarantoolSpaceNotFoundException); + } + + @Test + void testUnknownSpaceIndex() { + TarantoolClientOps, Object, List> clientOps = client.syncOps(); + Exception error = assertThrows( + Exception.class, + () -> clientOps.select("basic_test", "pk_unknown", Collections.singletonList(12), 0, 1, Iterator.EQ) + ); + + assertTrue(error.getCause() instanceof TarantoolIndexNotFoundException); + } + + @Test + void testCreateSpaceAfterFailedRequest() { + TarantoolClientOps, Object, List> clientOps = client.syncOps(); + Exception error = assertThrows( + Exception.class, + () -> clientOps + .select("base_test_unknown", "pk", Collections.emptyList(), 0, 10, Iterator.ALL) + ); + assertTrue(error.getCause() instanceof TarantoolSpaceNotFoundException); + + testHelper.executeLua( + "box.schema.space.create('base_test_unknown', { format = { { name = 'id', type = 'integer' } } })", + "box.space.base_test_unknown:create_index('pk', { type = 'TREE', parts = {'id'} } )", + "box.space.base_test_unknown:insert{ 5 }" + ); + List result = clientOps + .select("base_test_unknown", "pk", Collections.emptyList(), 0, 10, Iterator.ALL); + assertEquals(Collections.singletonList(5), result.get(0)); + + error = assertThrows( + Exception.class, + () -> clientOps + .select("base_test_unknown1", "pk", Collections.emptyList(), 0, 10, Iterator.ALL) + ); + assertTrue(error.getCause() instanceof TarantoolSpaceNotFoundException); + + testHelper.executeLua("box.space.base_test_unknown:drop()"); + } + + private List consoleSelect(Object key) { + return testHelper.evaluate(TestUtils.toLuaSelect("basic_test", key)); + } + } diff --git a/src/test/java/org/tarantool/ClientReconnectClusterIT.java b/src/test/java/org/tarantool/ClientReconnectClusterIT.java index 6e1d005d..32e96963 100644 --- a/src/test/java/org/tarantool/ClientReconnectClusterIT.java +++ b/src/test/java/org/tarantool/ClientReconnectClusterIT.java @@ -3,6 +3,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import static org.tarantool.TestUtils.findCause; import static org.tarantool.TestUtils.makeDefaultClusterClientConfig; import static org.tarantool.TestUtils.makeDiscoveryFunction; @@ -24,9 +25,9 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -90,7 +91,7 @@ public void tearDownTest() { @Test @DisplayName("requests were re-issued after reconnection") public void testRetriesOnReconnect() throws ExecutionException, InterruptedException { - CyclicBarrier barrier = new CyclicBarrier(2); + Phaser phaser = new Phaser(1); TarantoolClusterClientConfig config = makeDefaultClusterClientConfig(); config.operationExpiryTimeMillis = 3_000; TarantoolClusterClient client = new TarantoolClusterClient( @@ -104,7 +105,7 @@ public void testRetriesOnReconnect() throws ExecutionException, InterruptedExcep @Override protected void reconnect(Throwable lastError) { if (notFirst) { - tryAwait(barrier); + tryAwait(phaser, 0); } notFirst = true; super.reconnect(lastError); @@ -119,7 +120,7 @@ protected void reconnect(Throwable lastError) { futures.add(client.asyncOps().eval("return 1+3")); futures.add(client.asyncOps().eval("return 1+4")); - tryAwait(barrier); + phaser.arrive(); for (Future future : futures) { future.get(); @@ -168,7 +169,7 @@ void testUpdateExtendedNodeList() { String service1Address = "localhost:" + PORTS[0]; String service2Address = "127.0.0.1:" + PORTS[1]; - CyclicBarrier barrier = new CyclicBarrier(2); + Phaser phaser = new Phaser(1); String infoFunctionName = "getAddresses"; String infoFunctionScript = @@ -179,7 +180,7 @@ void testUpdateExtendedNodeList() { final TarantoolClusterClient client = makeClientWithDiscoveryFeature( infoFunctionName, 0, - (ignored) -> tryAwait(barrier), + (ignored) -> phaser.arrive(), service1Address ); @@ -187,7 +188,7 @@ void testUpdateExtendedNodeList() { final int spaceId = ids[0]; final int pkId = ids[1]; - tryAwait(barrier); // client = { srv1 }; wait for { srv1, srv2 } + tryAwait(phaser, 0); // client = { srv1 }; wait for { srv1, srv2 } expectConnected(client, spaceId, pkId); @@ -213,7 +214,7 @@ void testUpdateNarrowNodeList() { String service1Address = "localhost:" + PORTS[0]; String service2Address = "127.0.0.1:" + PORTS[1]; - CyclicBarrier barrier = new CyclicBarrier(2); + Phaser phaser = new Phaser(1); String infoFunctionName = "getAddresses"; String infoFunctionScript = makeDiscoveryFunction(infoFunctionName, Collections.singletonList(service1Address)); @@ -223,7 +224,7 @@ void testUpdateNarrowNodeList() { final TarantoolClusterClient client = makeClientWithDiscoveryFeature( infoFunctionName, 0, - (ignored) -> tryAwait(barrier), + (ignored) -> phaser.arrive(), service1Address, service2Address ); @@ -232,7 +233,7 @@ void testUpdateNarrowNodeList() { final int spaceId = ids[0]; final int pkId = ids[1]; - tryAwait(barrier); // client = { srv1, srv2 }; wait for { srv1 } + tryAwait(phaser, 0); // client = { srv1, srv2 }; wait for { srv1 } expectConnected(client, spaceId, pkId); @@ -397,7 +398,7 @@ void testDelayFunctionResultFetch() { String service2Address = "127.0.0.1:" + PORTS[1]; String service3Address = "localhost:" + PORTS[2]; - CyclicBarrier barrier = new CyclicBarrier(2); + Phaser phaser = new Phaser(1); String infoFunctionName = "getAddressesFunction"; String functionBody = Stream.of(service1Address, service2Address) @@ -415,7 +416,7 @@ void testDelayFunctionResultFetch() { final TarantoolClusterClient client = makeClientWithDiscoveryFeature( infoFunctionName, 3000, - (ignored) -> tryAwait(barrier), + (ignored) -> phaser.arrive(), service1Address ); @@ -423,16 +424,16 @@ void testDelayFunctionResultFetch() { final int spaceId = ids[0]; final int pkId = ids[1]; - tryAwait(barrier); // client = { srv1 }; wait for { srv1 } + tryAwait(phaser, 0); // client = { srv1 }; wait for { srv1 } expectConnected(client, spaceId, pkId); - tryAwait(barrier); // client = { srv1 }; wait for { srv2 } + tryAwait(phaser, 1); // client = { srv1 }; wait for { srv2 } stopInstancesAndAwait(SRV1); expectConnected(client, spaceId, pkId); - tryAwait(barrier); // client = { srv2 }; wait for { srv3 } + tryAwait(phaser, 2); // client = { srv2 }; wait for { srv3 } stopInstancesAndAwait(SRV2); expectConnected(client, spaceId, pkId); @@ -508,11 +509,11 @@ void testRoundRobinSocketProviderRefusedAfterConnect() { assertEquals(origin, client.getThumbstone()); } - private void tryAwait(CyclicBarrier barrier) { + private void tryAwait(Phaser phaser, int phase) { try { - barrier.await(6000, TimeUnit.MILLISECONDS); + phaser.awaitAdvanceInterruptibly(phase, 6000, TimeUnit.MILLISECONDS); } catch (Throwable e) { - e.printStackTrace(); + fail(e); } } @@ -574,6 +575,7 @@ private TarantoolClusterClient makeClientWithDiscoveryFeature(String entryFuncti Consumer> consumer, String... addresses) { TarantoolClusterClientConfig config = makeDefaultClusterClientConfig(); + config.operationExpiryTimeMillis = 3000; config.clusterDiscoveryEntryFunction = entryFunction; config.clusterDiscoveryDelayMillis = entryDelayMillis; diff --git a/src/test/java/org/tarantool/ClientReconnectIT.java b/src/test/java/org/tarantool/ClientReconnectIT.java index af2afc86..ad8a94f3 100644 --- a/src/test/java/org/tarantool/ClientReconnectIT.java +++ b/src/test/java/org/tarantool/ClientReconnectIT.java @@ -15,7 +15,6 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.function.Executable; import java.net.ConnectException; import java.nio.channels.SocketChannel; @@ -159,13 +158,7 @@ protected void write(Code code, Long syncId, Long schemaId, Object... args) { client.close(); - ExecutionException e = assertThrows(ExecutionException.class, new Executable() { - @Override - public void execute() throws Throwable { - res.get(); - } - }); - assertEquals("Connection is closed.", e.getCause().getMessage()); + ExecutionException e = assertThrows(ExecutionException.class, res::get); } /** @@ -189,16 +182,12 @@ protected void write(Code code, Long syncId, Long schemaId, Object... args) thro testHelper.stopInstance(); - assertThrows(ExecutionException.class, new Executable() { - @Override - public void execute() throws Throwable { - mustFail.get(); - } - }); + ExecutionException executionException = assertThrows(ExecutionException.class, mustFail::get); + assertEquals(executionException.getCause().getClass(), CommunicationException.class); + writeEnabled.set(true); testHelper.startInstance(); - writeEnabled.set(true); try { client.waitAlive(RESTART_TIMEOUT, TimeUnit.MILLISECONDS); diff --git a/src/test/java/org/tarantool/FireAndForgetClientOperationsIT.java b/src/test/java/org/tarantool/FireAndForgetClientOperationsIT.java index 97cf697a..09844e11 100644 --- a/src/test/java/org/tarantool/FireAndForgetClientOperationsIT.java +++ b/src/test/java/org/tarantool/FireAndForgetClientOperationsIT.java @@ -91,21 +91,45 @@ public void execute() throws Throwable { @Test public void testFireAndForgetOperations() { + testHelper.executeLua( + "box.space.basic_test:insert{1, '1'}", + "box.space.basic_test:insert{5, '5'}", + "box.space.basic_test:insert{10, '10'}", + "box.space.basic_test:insert{20, '20'}", + "box.space.basic_test:insert{30, '30'}" + ); + TarantoolClientOps, Object, Long> ffOps = client.fireAndForgetOps(); - Set syncIds = new HashSet(); + Set syncIds = new HashSet<>(); + + syncIds.add(ffOps.delete(spaceId, Collections.singletonList(1))); + + syncIds.add(ffOps.insert(spaceId, Arrays.asList(2, "2"))); - syncIds.add(ffOps.insert(spaceId, Arrays.asList(10, "10"))); - syncIds.add(ffOps.delete(spaceId, Collections.singletonList(10))); + syncIds.add(ffOps.replace(spaceId, Arrays.asList(3, "3"))); + syncIds.add(ffOps.replace(spaceId, Arrays.asList(5, "five"))); - syncIds.add(ffOps.insert(spaceId, Arrays.asList(10, "10"))); syncIds.add(ffOps.update(spaceId, Collections.singletonList(10), Arrays.asList("=", 1, "ten"))); - syncIds.add(ffOps.replace(spaceId, Arrays.asList(20, "20"))); - syncIds.add(ffOps.upsert(spaceId, Collections.singletonList(20), Arrays.asList(20, "twenty"), - Arrays.asList("=", 1, "twenty"))); + syncIds.add( + ffOps.upsert( + spaceId, + Collections.singletonList(20), + Arrays.asList(20, "twenty"), + Arrays.asList("=", 1, "twenty") + ) + ); + + syncIds.add( + ffOps.upsert( + spaceId, + Collections.singletonList(25), + Arrays.asList(25, "25"), + Arrays.asList("=", 1, "twenty five") + ) + ); - syncIds.add(ffOps.insert(spaceId, Arrays.asList(30, "30"))); syncIds.add(ffOps.call("box.space.basic_test:delete", Collections.singletonList(30))); // Check the syncs. @@ -117,9 +141,66 @@ public void testFireAndForgetOperations() { client.syncOps().ping(); // Check the effects + assertEquals(Collections.emptyList(), consoleSelect(SPACE_NAME, 1)); + checkRawTupleResult(consoleSelect(SPACE_NAME, 2), Arrays.asList(2, "2")); + checkRawTupleResult(consoleSelect(SPACE_NAME, 3), Arrays.asList(3, "3")); + checkRawTupleResult(consoleSelect(SPACE_NAME, 5), Arrays.asList(5, "five")); checkRawTupleResult(consoleSelect(SPACE_NAME, 10), Arrays.asList(10, "ten")); checkRawTupleResult(consoleSelect(SPACE_NAME, 20), Arrays.asList(20, "twenty")); - assertEquals(consoleSelect(SPACE_NAME, 30), Collections.emptyList()); + checkRawTupleResult(consoleSelect(SPACE_NAME, 25), Arrays.asList(25, "25")); + assertEquals(Collections.emptyList(), consoleSelect(SPACE_NAME, 30)); + } + + @Test + public void testFireAndForgetStringOperations() { + testHelper.executeLua( + "box.space.basic_test:insert{2, '2'}", + "box.space.basic_test:insert{20, '20'}", + "box.space.basic_test:insert{200, '200'}", + "box.space.basic_test:insert{2000, '2000'}" + ); + + TarantoolClientOps, Object, Long> ffOps = client.fireAndForgetOps(); + Set syncIds = new HashSet<>(); + + syncIds.add(ffOps.delete(SPACE_NAME, Collections.singletonList(2))); + syncIds.add(ffOps.insert(SPACE_NAME, Arrays.asList(3, "3"))); + syncIds.add(ffOps.replace(spaceId, Arrays.asList(2000, "2k"))); + syncIds.add(ffOps.replace(spaceId, Arrays.asList(3000, "3k"))); + syncIds.add(ffOps.update(SPACE_NAME, Collections.singletonList(20), Arrays.asList("=", 1, "twenty"))); + syncIds.add( + ffOps.upsert( + SPACE_NAME, + Collections.singletonList(200), + Arrays.asList(200, "200"), + Arrays.asList("=", 1, "two hundred") + ) + ); + syncIds.add( + ffOps.upsert( + SPACE_NAME, + Collections.singletonList(400), + Arrays.asList(400, "400"), + Arrays.asList("=", 1, "four hundred") + ) + ); + + // Check the syncs. + assertFalse(syncIds.contains(0L)); + assertEquals(7, syncIds.size()); + + // The reply for synchronous ping will + // indicate to us that previous fire & forget operations are completed. + client.syncOps().ping(); + + // Check the effects + assertEquals(consoleSelect(SPACE_NAME, 2), Collections.emptyList()); + checkRawTupleResult(consoleSelect(SPACE_NAME, 3), Arrays.asList(3, "3")); + checkRawTupleResult(consoleSelect(SPACE_NAME, 20), Arrays.asList(20, "twenty")); + checkRawTupleResult(consoleSelect(SPACE_NAME, 200), Arrays.asList(200, "two hundred")); + checkRawTupleResult(consoleSelect(SPACE_NAME, 400), Arrays.asList(400, "400")); + checkRawTupleResult(consoleSelect(SPACE_NAME, 2000), Arrays.asList(2000, "2k")); + checkRawTupleResult(consoleSelect(SPACE_NAME, 3000), Arrays.asList(3000, "3k")); } private List consoleSelect(String spaceName, Object key) { diff --git a/src/test/java/org/tarantool/IteratorTest.java b/src/test/java/org/tarantool/IteratorTest.java deleted file mode 100644 index 7fd68b61..00000000 --- a/src/test/java/org/tarantool/IteratorTest.java +++ /dev/null @@ -1,35 +0,0 @@ -package org.tarantool; - -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import org.junit.jupiter.api.Test; - -import java.util.ArrayList; -import java.util.List; - -class IteratorTest { - protected class MockOps extends AbstractTarantoolOps, Object, List> { - - @Override - public List exec(Code code, Object... args) { - return null; - } - - @Override - public void close() { - throw new UnsupportedOperationException(); - } - } - - @Test - void testSelectWithIteratorInsteadOfInteger() { - MockOps ops = new MockOps(); - MockOps spyOps = spy(ops); - - spyOps.select(1, 1, new ArrayList(), 0, 1, Iterator.EQ); - - verify(spyOps, times(1)).select(1, 1, new ArrayList(), 0, 1, 0); - } -} diff --git a/src/test/java/org/tarantool/TarantoolClientOpsIT.java b/src/test/java/org/tarantool/TarantoolClientOpsIT.java index ebaca9f5..a8964b7b 100644 --- a/src/test/java/org/tarantool/TarantoolClientOpsIT.java +++ b/src/test/java/org/tarantool/TarantoolClientOpsIT.java @@ -580,12 +580,10 @@ public void execute() throws Throwable { @MethodSource("getClientOps") public void testInsertDuplicateKey(SyncOpsProvider provider) { final List tup = Arrays.asList(1, "uno"); - TarantoolException ex = assertThrows(TarantoolException.class, new Executable() { - @Override - public void execute() throws Throwable { - provider.getClientOps().insert(spaceId, tup); - } - }); + TarantoolException ex = assertThrows( + TarantoolException.class, + () -> provider.getClientOps().insert(spaceId, tup) + ); assertEquals("Duplicate key exists in unique index 'pk' in space 'basic_test'", ex.getMessage()); // Check the tuple stayed intact. diff --git a/src/test/java/org/tarantool/TestUtils.java b/src/test/java/org/tarantool/TestUtils.java index cb2b73e6..f144526b 100644 --- a/src/test/java/org/tarantool/TestUtils.java +++ b/src/test/java/org/tarantool/TestUtils.java @@ -276,7 +276,7 @@ public static TarantoolClusterClientConfig makeDefaultClusterClientConfig() { config.username = TarantoolTestHelper.USERNAME; config.password = TarantoolTestHelper.PASSWORD; config.initTimeoutMillis = 2000; - config.operationExpiryTimeMillis = 1000; + config.operationExpiryTimeMillis = 2000; config.sharedBufferSize = 128; config.executor = null; return config; diff --git a/src/test/java/org/tarantool/cluster/ClusterServiceStoredFunctionDiscovererIT.java b/src/test/java/org/tarantool/cluster/ClusterServiceStoredFunctionDiscovererIT.java index 4e940033..858e2143 100644 --- a/src/test/java/org/tarantool/cluster/ClusterServiceStoredFunctionDiscovererIT.java +++ b/src/test/java/org/tarantool/cluster/ClusterServiceStoredFunctionDiscovererIT.java @@ -9,7 +9,6 @@ import static org.tarantool.TestUtils.makeDiscoveryFunction; import org.tarantool.CommunicationException; -import org.tarantool.TarantoolClient; import org.tarantool.TarantoolClientImpl; import org.tarantool.TarantoolClusterClientConfig; import org.tarantool.TarantoolException; @@ -36,7 +35,7 @@ public class ClusterServiceStoredFunctionDiscovererIT { private static TarantoolTestHelper testHelper; private TarantoolClusterClientConfig clusterConfig; - private TarantoolClient client; + private TarantoolClientImpl client; @BeforeAll public static void setupEnv() { diff --git a/src/test/java/org/tarantool/jdbc/JdbcConnectionTimeoutIT.java b/src/test/java/org/tarantool/jdbc/JdbcConnectionTimeoutIT.java index 7f4d2b3f..9f503f79 100644 --- a/src/test/java/org/tarantool/jdbc/JdbcConnectionTimeoutIT.java +++ b/src/test/java/org/tarantool/jdbc/JdbcConnectionTimeoutIT.java @@ -7,6 +7,7 @@ import org.tarantool.ServerVersion; import org.tarantool.TarantoolClientConfig; +import org.tarantool.TarantoolOperation; import org.tarantool.TarantoolTestHelper; import org.tarantool.protocol.TarantoolPacket; @@ -51,7 +52,7 @@ void setUp() throws SQLException { protected SQLTarantoolClientImpl makeSqlClient(String address, TarantoolClientConfig config) { return new SQLTarantoolClientImpl(address, config) { @Override - protected void completeSql(TarantoolOp operation, TarantoolPacket pack) { + protected void completeSql(TarantoolOperation operation, TarantoolPacket pack) { try { Thread.sleep(LONG_ENOUGH_TIMEOUT); } catch (InterruptedException ignored) { diff --git a/src/test/java/org/tarantool/schema/ClientReconnectSchemaIT.java b/src/test/java/org/tarantool/schema/ClientReconnectSchemaIT.java new file mode 100644 index 00000000..ad4018eb --- /dev/null +++ b/src/test/java/org/tarantool/schema/ClientReconnectSchemaIT.java @@ -0,0 +1,94 @@ +package org.tarantool.schema; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.tarantool.TestUtils.makeDefaultClusterClientConfig; + +import org.tarantool.Iterator; +import org.tarantool.TarantoolClientImpl; +import org.tarantool.TarantoolClusterClient; +import org.tarantool.TarantoolClusterClientConfig; +import org.tarantool.TarantoolTestHelper; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +public class ClientReconnectSchemaIT { + + private static final String[] SRVS = { "srv-schema-it-1", "srv-schema-it-2" }; + private static final int[] PORTS = { 3401, 3402 }; + + private static TarantoolTestHelper firstTestHelper; + private static TarantoolTestHelper secondTestHelper; + + @BeforeAll + public static void setupEnv() { + firstTestHelper = new TarantoolTestHelper(SRVS[0]); + firstTestHelper.createInstance(TarantoolTestHelper.LUA_FILE, PORTS[0], PORTS[0] + 1000); + firstTestHelper.startInstance(); + + secondTestHelper = new TarantoolTestHelper(SRVS[1]); + secondTestHelper.createInstance(TarantoolTestHelper.LUA_FILE, PORTS[1], PORTS[1] + 1000); + secondTestHelper.startInstance(); + } + + @AfterAll + public static void teardownEnv() { + firstTestHelper.stopInstance(); + secondTestHelper.stopInstance(); + } + + @Test + @DisplayName("got a result from another node after the current node had disappeared") + public void testSameNamedSpaceAfterReconnection() { + String[] firstSpace = { + "box.schema.space.create('string_space1', { format = { {name = 'id', type = 'integer'} } })", + "box.space.string_space1:create_index('primary', { type = 'TREE', parts = {'id'} })" + }; + String[] secondSpace = { + "box.schema.space.create('string_space2', { format = { {name = 'id', type = 'integer'} } })", + "box.space.string_space2:create_index('primary', { type = 'TREE', parts = {'id'} })" + }; + + // create spaces on two instances with an inverted order + // as a result, instances have same schema version but spaces have unequal IDs + firstTestHelper.executeLua(firstSpace); + firstTestHelper.executeLua(secondSpace); + firstTestHelper.executeLua("box.space.string_space1:insert{100}"); + secondTestHelper.executeLua(secondSpace); + secondTestHelper.executeLua(firstSpace); + secondTestHelper.executeLua("box.space.string_space1:insert{200}"); + assertEquals(firstTestHelper.getInstanceVersion(), secondTestHelper.getInstanceVersion()); + + int firstSpaceIdFirstInstance = firstTestHelper.evaluate("box.space.string_space1.id"); + int firstSpaceIdSecondInstance = secondTestHelper.evaluate("box.space.string_space1.id"); + assertNotEquals(firstSpaceIdFirstInstance, firstSpaceIdSecondInstance); + + final TarantoolClientImpl client = makeClusterClient( + "localhost:" + PORTS[0], + "127.0.0.1:" + PORTS[1] + ); + + List result = client.syncOps() + .select("string_space1", "primary", Collections.emptyList(), 0, 10, Iterator.ALL); + assertEquals(Arrays.asList(100), result.get(0)); + firstTestHelper.stopInstance(); + + result = client.syncOps() + .select("string_space1", "primary", Collections.emptyList(), 0, 10, Iterator.ALL); + assertEquals(Arrays.asList(200), result.get(0)); + secondTestHelper.stopInstance(); + } + + private TarantoolClusterClient makeClusterClient(String... addresses) { + TarantoolClusterClientConfig config = makeDefaultClusterClientConfig(); + return new TarantoolClusterClient(config, addresses); + } + +} diff --git a/src/test/java/org/tarantool/schema/ClientSchemaIT.java b/src/test/java/org/tarantool/schema/ClientSchemaIT.java new file mode 100644 index 00000000..214c599c --- /dev/null +++ b/src/test/java/org/tarantool/schema/ClientSchemaIT.java @@ -0,0 +1,247 @@ +package org.tarantool.schema; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; +import static org.tarantool.TestUtils.makeDefaultClientConfig; + +import org.tarantool.ServerVersion; +import org.tarantool.TarantoolClientConfig; +import org.tarantool.TarantoolClientImpl; +import org.tarantool.TarantoolTestHelper; +import org.tarantool.TestAssumptions; +import org.tarantool.schema.TarantoolIndexMeta.IndexOptions; +import org.tarantool.schema.TarantoolIndexMeta.IndexPart; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; + +@DisplayName("A schema meta") +public class ClientSchemaIT { + + private static TarantoolTestHelper testHelper; + + private TarantoolClientImpl client; + + @BeforeAll + public static void setupEnv() { + testHelper = new TarantoolTestHelper("client-schema-it"); + testHelper.createInstance(); + testHelper.startInstance(); + } + + @AfterAll + public static void teardownEnv() { + testHelper.stopInstance(); + } + + @BeforeEach + public void setup() { + TarantoolClientConfig config = makeDefaultClientConfig(); + + client = new TarantoolClientImpl( + TarantoolTestHelper.HOST + ":" + TarantoolTestHelper.PORT, + config + ); + } + + @AfterEach + public void tearDown() { + client.close(); + testHelper.executeLua("box.space.count_space and box.space.count_space:drop()"); + } + + @Test + @DisplayName("fetched a space with its index") + void testFetchSpaces() { + testHelper.executeLua( + "box.schema.space.create('count_space', { format = " + + "{ {name = 'id', type = 'integer'}," + + " {name = 'counts', type = 'integer'} }" + + "})" + ); + testHelper.executeLua("box.space.count_space:create_index('pk', { type = 'TREE', parts = {'id'} } )"); + + TarantoolSchemaMeta meta = new TarantoolMetaSpacesCache(client); + meta.refresh(); + + TarantoolSpaceMeta space = meta.getSpace("count_space"); + assertNotNull(space); + assertEquals("count_space", space.getName()); + + List spaceFormat = space.getFormat(); + assertEquals(2, spaceFormat.size()); + assertEquals("id", spaceFormat.get(0).getName()); + assertEquals("integer", spaceFormat.get(0).getType()); + assertEquals("counts", spaceFormat.get(1).getName()); + assertEquals("integer", spaceFormat.get(1).getType()); + + TarantoolIndexMeta primaryIndex = space.getIndex("pk"); + TarantoolIndexMeta expectedPrimaryIndex = new TarantoolIndexMeta( + 0, "pk", "TREE", + new IndexOptions(true), + Collections.singletonList(new IndexPart(0, "integer")) + ); + assertIndex(expectedPrimaryIndex, primaryIndex); + } + + @Test + @DisplayName("fetched newly created spaces and indexes") + void testFetchNewSpaces() { + // add count_space + testHelper.executeLua( + "box.schema.space.create('count_space', { format = " + + "{ {name = 'id', type = 'integer'}," + + " {name = 'counts', type = 'integer'} }" + + "})" + ); + TarantoolSchemaMeta meta = new TarantoolMetaSpacesCache(client); + meta.refresh(); + TarantoolSpaceMeta space = meta.getSpace("count_space"); + assertNotNull(space); + assertEquals("count_space", space.getName()); + assertThrows(TarantoolSpaceNotFoundException.class, () -> meta.getSpace("count_space_2")); + + // add count_space_2 + testHelper.executeLua( + "box.schema.space.create('count_space_2', { format = " + + "{ {name = 'id', type = 'integer'} } })" + ); + meta.refresh(); + space = meta.getSpace("count_space_2"); + assertNotNull(space); + assertEquals("count_space_2", space.getName()); + assertThrows(TarantoolIndexNotFoundException.class, () -> meta.getSpaceIndex("count_space_2", "pk")); + + // add a primary index for count_space_2 + testHelper.executeLua( + "box.space.count_space_2:create_index('pk', { unique = true, type = 'TREE', parts = {'id'} } )" + ); + meta.refresh(); + TarantoolIndexMeta spaceIndex = meta.getSpaceIndex("count_space_2", "pk"); + TarantoolIndexMeta expectedPrimaryIndex = new TarantoolIndexMeta( + 0, "pk", "TREE", + new IndexOptions(true), + Collections.singletonList(new IndexPart(0, "integer")) + ); + assertIndex(expectedPrimaryIndex, spaceIndex); + } + + @Test + @DisplayName("fetched space indexes of a space") + void testFetchIndexes() { + testHelper.executeLua( + "box.schema.space.create('count_space', { format = " + + "{ {name = 'id', type = 'integer'}," + + " {name = 'counts', type = 'integer'} }" + + "})" + ); + testHelper.executeLua( + "box.space.count_space:create_index('pk', { type = 'HASH', parts = {'id'} } )", + "box.space.count_space:create_index('c_index', { unique = false, type = 'TREE', parts = {'counts'} } )" + ); + + TarantoolSchemaMeta meta = new TarantoolMetaSpacesCache(client); + meta.refresh(); + + TarantoolIndexMeta primaryIndex = meta.getSpaceIndex("count_space", "pk"); + TarantoolIndexMeta expectedPrimaryIndex = new TarantoolIndexMeta( + 0, "pk", "HASH", + new IndexOptions(true), + Collections.singletonList(new IndexPart(0, "integer")) + ); + assertIndex(expectedPrimaryIndex, primaryIndex); + + TarantoolIndexMeta secondaryIndex = meta.getSpaceIndex("count_space", "c_index"); + TarantoolIndexMeta expectedSecondaryIndex = new TarantoolIndexMeta( + 1, "c_index", "TREE", + new IndexOptions(false), + Collections.singletonList(new IndexPart(1, "integer")) + ); + assertIndex(expectedSecondaryIndex, secondaryIndex); + } + + @Test + @DisplayName("fetched sql table primary index") + void testFetchSqlIndexes() { + TestAssumptions.assumeMinimalServerVersion(testHelper.getInstanceVersion(), ServerVersion.V_2_1); + testHelper.executeSql("create table my_table (id int primary key, val varchar(100))"); + + TarantoolSchemaMeta meta = new TarantoolMetaSpacesCache(client); + meta.refresh(); + + TarantoolIndexMeta primaryIndex = meta.getSpaceIndex("MY_TABLE", "pk_unnamed_MY_TABLE_1"); + TarantoolIndexMeta expectedPrimaryIndex = new TarantoolIndexMeta( + 0, "pk_unnamed_MY_TABLE_1", "tree", + new IndexOptions(true), + Collections.singletonList(new IndexPart(0, "integer")) + ); + assertIndex(expectedPrimaryIndex, primaryIndex); + } + + @Test + @DisplayName("got an error with a wrong space name") + void tesGetUnknownSpace() { + TarantoolSchemaMeta meta = new TarantoolMetaSpacesCache(client); + meta.refresh(); + + TarantoolSpaceNotFoundException exception = assertThrows( + TarantoolSpaceNotFoundException.class, + () -> meta.getSpace("unknown_space") + ); + assertEquals("unknown_space", exception.getSchemaName()); + } + + @Test + @DisplayName("got an error with a wrong space index name") + void testGetUnknownSpaceIndex() { + testHelper.executeLua( + "box.schema.space.create('count_space', { format = " + + "{ {name = 'id', type = 'integer'} } })" + ); + testHelper.executeLua("box.space.count_space:create_index('pk', { type = 'TREE', parts = {'id'} } )"); + + TarantoolSchemaMeta meta = new TarantoolMetaSpacesCache(client); + meta.refresh(); + + assertEquals("count_space", meta.getSpace("count_space").getName()); + TarantoolIndexNotFoundException exception = assertThrows( + TarantoolIndexNotFoundException.class, + () -> meta.getSpaceIndex("count_space", "wrong_pk") + ); + assertEquals("wrong_pk", exception.getIndexName()); + } + + private void assertIndex(TarantoolIndexMeta expectedIndex, TarantoolIndexMeta actualIndex) { + assertEquals(expectedIndex.getId(), actualIndex.getId()); + assertEquals(expectedIndex.getName(), actualIndex.getName()); + assertEquals(expectedIndex.getType(), actualIndex.getType()); + assertEqualsOptions(expectedIndex.getOptions(), actualIndex.getOptions()); + assertEqualsParts(expectedIndex.getParts(), actualIndex.getParts()); + } + + private void assertEqualsOptions(IndexOptions expected, IndexOptions actual) { + assertEquals(expected.isUnique(), actual.isUnique()); + } + + private void assertEqualsParts(List expected, List actual) { + if (expected.size() != actual.size()) { + fail("Part lists have different sizes"); + } + for (int i = 0; i < expected.size(); i++) { + IndexPart expectedPart = expected.get(i); + IndexPart actualPart = actual.get(i); + assertEquals(expectedPart.getFieldNumber(), actualPart.getFieldNumber()); + assertEquals(expectedPart.getType(), actualPart.getType()); + } + } + +} diff --git a/src/test/java/org/tarantool/schema/ClientThreadSafeSchemaIT.java b/src/test/java/org/tarantool/schema/ClientThreadSafeSchemaIT.java new file mode 100644 index 00000000..e8c8e04b --- /dev/null +++ b/src/test/java/org/tarantool/schema/ClientThreadSafeSchemaIT.java @@ -0,0 +1,105 @@ +package org.tarantool.schema; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.tarantool.TestUtils.makeDefaultClientConfig; +import static org.tarantool.TestUtils.makeTestClient; + +import org.tarantool.TarantoolClient; +import org.tarantool.TarantoolClientConfig; +import org.tarantool.TarantoolClientOps; +import org.tarantool.TarantoolTestHelper; +import org.tarantool.TarantoolThreadDaemonFactory; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +@DisplayName("A client") +public class ClientThreadSafeSchemaIT { + + private static TarantoolTestHelper testHelper; + + @BeforeAll + public static void setupEnv() { + testHelper = new TarantoolTestHelper("client-schema-thread-safe-it"); + testHelper.createInstance(); + testHelper.startInstance(); + } + + @AfterAll + public static void teardownEnv() { + testHelper.stopInstance(); + } + + @Test + @DisplayName("executed many DML/DDL string-operations from several threads simultaneously") + void testFetchSpaces() { + testHelper.executeLua( + makeCreateSpaceFunction(), + makeDropSpaceFunction() + ); + + TarantoolClientConfig config = makeDefaultClientConfig(); + config.operationExpiryTimeMillis = 2000; + TarantoolClient client = makeTestClient(config, 500); + + int threadsNumber = 16; + int iterations = 100; + final CountDownLatch latch = new CountDownLatch(threadsNumber); + ExecutorService executor = Executors.newFixedThreadPool( + threadsNumber, + new TarantoolThreadDaemonFactory("testWorkers") + ); + + // multiple threads can cause schema invalidation simultaneously + // but it hasn't to affect other threads + for (int i = 0; i < threadsNumber; i++) { + int threadNumber = i; + executor.submit(() -> { + String spaceName = "my_space" + threadNumber; + for (int k = 0; k < iterations; k++) { + TarantoolClientOps, Object, List> ops = client.syncOps(); + ops.call("makeSpace", spaceName); + ops.insert(spaceName, Arrays.asList(k, threadNumber)); + ops.call("dropSpace", spaceName); + } + latch.countDown(); + }); + } + + try { + assertTrue(latch.await(20, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + fail(e); + } finally { + executor.shutdownNow(); + client.close(); + } + } + + private String makeCreateSpaceFunction() { + return "function makeSpace(spaceName) " + + "box.schema.space.create(spaceName, { format = " + + "{ {name = 'id', type = 'integer'}, " + + " {name = 'counts', type = 'integer'} } " + + "}); " + + "box.space[spaceName]:create_index('pk', { type = 'TREE', parts = {'id'} } ) " + + "end"; + } + + private String makeDropSpaceFunction() { + return "function dropSpace(spaceName) " + + "box.space[spaceName]:drop() " + + "end"; + } + +}