diff --git a/src/main/java/org/tarantool/SqlProtoUtils.java b/src/main/java/org/tarantool/SqlProtoUtils.java index f84b406b..af7d809d 100644 --- a/src/main/java/org/tarantool/SqlProtoUtils.java +++ b/src/main/java/org/tarantool/SqlProtoUtils.java @@ -11,10 +11,10 @@ public abstract class SqlProtoUtils { public static List> readSqlResult(TarantoolPacket pack) { List> data = (List>) pack.getBody().get(Key.DATA.getId()); - List> values = new ArrayList>(data.size()); + List> values = new ArrayList<>(data.size()); List metaData = getSQLMetadata(pack); - LinkedHashMap value = new LinkedHashMap(); for (List row : data) { + LinkedHashMap value = new LinkedHashMap<>(); for (int i = 0; i < row.size(); i++) { value.put(metaData.get(i).getName(), row.get(i)); } diff --git a/src/main/java/org/tarantool/TarantoolClientImpl.java b/src/main/java/org/tarantool/TarantoolClientImpl.java index bd7a993d..e0f1412f 100644 --- a/src/main/java/org/tarantool/TarantoolClientImpl.java +++ b/src/main/java/org/tarantool/TarantoolClientImpl.java @@ -2,8 +2,8 @@ import org.tarantool.protocol.ProtoUtils; import org.tarantool.protocol.ReadableViaSelectorChannel; -import org.tarantool.protocol.TarantoolPacket; import org.tarantool.protocol.TarantoolGreeting; +import org.tarantool.protocol.TarantoolPacket; import java.io.IOException; import java.nio.ByteBuffer; @@ -36,7 +36,7 @@ public class TarantoolClientImpl extends TarantoolBase> implements Tar protected SocketChannelProvider socketProvider; protected volatile Exception thumbstone; - protected Map> futures; + protected Map> futures; protected AtomicInteger wait = new AtomicInteger(); /** * Write properties @@ -216,38 +216,38 @@ protected Future exec(Code code, Object... args) { protected CompletableFuture doExec(Code code, Object[] args) { validateArgs(args); long sid = syncId.incrementAndGet(); - CompletableFuture q = new CompletableFuture<>(); + TarantoolOp future = new TarantoolOp<>(code); - if (isDead(q)) { - return q; + if (isDead(future)) { + return future; } - futures.put(sid, q); - if (isDead(q)) { + futures.put(sid, future); + if (isDead(future)) { futures.remove(sid); - return q; + return future; } try { write(code, sid, null, args); } catch (Exception e) { futures.remove(sid); - fail(q, e); + fail(future, e); } - return q; + return future; } protected synchronized void die(String message, Exception cause) { if (thumbstone != null) { return; } - final CommunicationException err = new CommunicationException(message, cause); - this.thumbstone = err; + final CommunicationException error = new CommunicationException(message, cause); + this.thumbstone = error; while (!futures.isEmpty()) { - Iterator>> iterator = futures.entrySet().iterator(); + Iterator>> iterator = futures.entrySet().iterator(); while (iterator.hasNext()) { - Map.Entry> elem = iterator.next(); + Map.Entry> elem = iterator.next(); if (elem != null) { - CompletableFuture future = elem.getValue(); - fail(future, err); + TarantoolOp future = elem.getValue(); + fail(future, error); } iterator.remove(); } @@ -345,7 +345,7 @@ protected void readThread() { Map headers = packet.getHeaders(); Long syncId = (Long) headers.get(Key.SYNC.getId()); - CompletableFuture future = futures.remove(syncId); + TarantoolOp future = futures.remove(syncId); stats.received++; wait.decrementAndGet(); complete(packet, future); @@ -395,30 +395,29 @@ protected void fail(CompletableFuture q, Exception e) { q.completeExceptionally(e); } - protected void complete(TarantoolPacket packet, CompletableFuture q) { - if (q != null) { + protected void complete(TarantoolPacket packet, TarantoolOp future) { + if (future != null) { long code = packet.getCode(); if (code == 0) { - - if (code == Code.EXECUTE.getId()) { - completeSql(q, packet); + if (future.getCode() == Code.EXECUTE) { + completeSql(future, packet); } else { - ((CompletableFuture) q).complete(packet.getBody().get(Key.DATA.getId())); + ((CompletableFuture) future).complete(packet.getBody().get(Key.DATA.getId())); } } else { Object error = packet.getBody().get(Key.ERROR.getId()); - fail(q, serverError(code, error)); + fail(future, serverError(code, error)); } } } - protected void completeSql(CompletableFuture q, TarantoolPacket pack) { + protected void completeSql(CompletableFuture future, TarantoolPacket pack) { Long rowCount = SqlProtoUtils.getSqlRowCount(pack); - if (rowCount!=null) { - ((CompletableFuture) q).complete(rowCount); + if (rowCount != null) { + ((CompletableFuture) future).complete(rowCount); } else { List> values = SqlProtoUtils.readSqlResult(pack); - ((CompletableFuture) q).complete(values); + ((CompletableFuture) future).complete(values); } } @@ -715,4 +714,21 @@ private CountDownLatch getStateLatch(int state) { return null; } } + + protected static class TarantoolOp extends CompletableFuture { + + /** + * Tarantool binary protocol operation code. + */ + final private Code code; + + public TarantoolOp(Code code) { + this.code = code; + } + + public Code getCode() { + return code; + } + } + } diff --git a/src/main/java/org/tarantool/TarantoolClusterClient.java b/src/main/java/org/tarantool/TarantoolClusterClient.java index 3a6c243a..42cf1db3 100644 --- a/src/main/java/org/tarantool/TarantoolClusterClient.java +++ b/src/main/java/org/tarantool/TarantoolClusterClient.java @@ -12,7 +12,7 @@ /** * Basic implementation of a client that may work with the cluster * of tarantool instances in fault-tolerant way. - * + *

* Failed operations will be retried once connection is re-established * unless the configured expiration time is over. */ @@ -25,7 +25,7 @@ public class TarantoolClusterClient extends TarantoolClientImpl { /** * @param config Configuration. - * @param addrs Array of addresses in the form of [host]:[port]. + * @param addrs Array of addresses in the form of [host]:[port]. */ public TarantoolClusterClient(TarantoolClusterClientConfig config, String... addrs) { this(config, new RoundRobinSocketProviderImpl(addrs).setTimeout(config.operationExpiryTimeMillis)); @@ -33,13 +33,13 @@ public TarantoolClusterClient(TarantoolClusterClientConfig config, String... add /** * @param provider Socket channel provider. - * @param config Configuration. + * @param config Configuration. */ public TarantoolClusterClient(TarantoolClusterClientConfig config, SocketChannelProvider provider) { super(provider, config); this.executor = config.executor == null ? - Executors.newSingleThreadExecutor() : config.executor; + Executors.newSingleThreadExecutor() : config.executor; } @Override @@ -59,23 +59,23 @@ protected boolean isDead(CompletableFuture q) { protected CompletableFuture doExec(Code code, Object[] args) { validateArgs(args); long sid = syncId.incrementAndGet(); - CompletableFuture q = makeFuture(sid, code, args); + ExpirableOp future = makeFuture(sid, code, args); - if (isDead(q)) { - return q; + if (isDead(future)) { + return future; } - futures.put(sid, q); - if (isDead(q)) { + futures.put(sid, future); + if (isDead(future)) { futures.remove(sid); - return q; + return future; } try { write(code, sid, null, args); } catch (Exception e) { futures.remove(sid); - fail(q, e); + fail(future, e); } - return q; + return future; } @Override @@ -85,12 +85,12 @@ protected void fail(CompletableFuture q, Exception e) { protected boolean checkFail(CompletableFuture q, Exception e) { assert q instanceof ExpirableOp; - if (!isTransientError(e) || ((ExpirableOp)q).hasExpired(System.currentTimeMillis())) { + if (!isTransientError(e) || ((ExpirableOp) q).hasExpired(System.currentTimeMillis())) { q.completeExceptionally(e); return true; } else { assert retries != null; - retries.put(((ExpirableOp) q).getId(), (ExpirableOp)q); + retries.put(((ExpirableOp) q).getId(), (ExpirableOp) q); return false; } } @@ -114,12 +114,12 @@ protected boolean isTransientError(Exception e) { return true; } if (e instanceof TarantoolException) { - return ((TarantoolException)e).isTransient(); + return ((TarantoolException) e).isTransient(); } return false; } - protected CompletableFuture makeFuture(long id, Code code, Object...args) { + private ExpirableOp makeFuture(long id, Code code, Object... args) { int expireTime = ((TarantoolClusterClientConfig) config).operationExpiryTimeMillis; return new ExpirableOp(id, expireTime, code, args); } @@ -133,20 +133,20 @@ protected void onReconnect() { // First call is before the constructor finished. Skip it. return; } - Collection> futsToRetry = new ArrayList>(retries.values()); + Collection> futuresToRetry = new ArrayList>(retries.values()); retries.clear(); long now = System.currentTimeMillis(); - for (final ExpirableOp fut : futsToRetry) { - if (!fut.hasExpired(now)) { + for (final ExpirableOp future : futuresToRetry) { + if (!future.hasExpired(now)) { executor.execute(new Runnable() { @Override public void run() { - futures.put(fut.getId(), fut); + futures.put(future.getId(), future); try { - write(fut.getCode(), fut.getId(), null, fut.getArgs()); + write(future.getCode(), future.getId(), null, future.getArgs()); } catch (Exception e) { - futures.remove(fut.getId()); - fail(fut, e); + futures.remove(future.getId()); + fail(future, e); } } }); @@ -157,8 +157,11 @@ public void run() { /** * Holds operation code and arguments for retry. */ - private class ExpirableOp extends CompletableFuture { - /** Moment in time when operation is not considered for retry. */ + private class ExpirableOp extends TarantoolOp { + + /** + * Moment in time when operation is not considered for retry. + */ final private long deadline; /** @@ -167,24 +170,20 @@ private class ExpirableOp extends CompletableFuture { final private long id; /** - * Tarantool binary protocol operation code. + * Arguments of operation. */ - final private Code code; - - /** Arguments of operation. */ final private Object[] args; /** - * - * @param id Sync. + * @param id Sync. * @param expireTime Expiration time (relative) in ms. - * @param code Tarantool operation code. - * @param args Operation arguments. + * @param code Tarantool operation code. + * @param args Operation arguments. */ - ExpirableOp(long id, int expireTime, Code code, Object...args) { + ExpirableOp(long id, int expireTime, Code code, Object... args) { + super(code); this.id = id; this.deadline = System.currentTimeMillis() + expireTime; - this.code = code; this.args = args; } @@ -196,12 +195,9 @@ public long getId() { return id; } - public Code getCode() { - return code; - } - public Object[] getArgs() { return args; } + } } diff --git a/src/test/java/org/tarantool/AbstractTarantoolSQLConnectorIT.java b/src/test/java/org/tarantool/AbstractTarantoolSQLConnectorIT.java new file mode 100644 index 00000000..a4fcb301 --- /dev/null +++ b/src/test/java/org/tarantool/AbstractTarantoolSQLConnectorIT.java @@ -0,0 +1,142 @@ +package org.tarantool; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.tarantool.TestUtils.makeInstanceEnv; + +/** + * Abstract test. Provides environment control and frequently used functions which are related to SQL. + */ +public abstract class AbstractTarantoolSQLConnectorIT { + + protected static final String HOST = System.getProperty("tntHost", "localhost"); + protected static final int PORT = Integer.parseInt(System.getProperty("tntPort", "3301")); + protected static final int CONSOLE_PORT = Integer.parseInt(System.getProperty("tntConsolePort", "3313")); + protected static final String USERNAME = System.getProperty("tntUser", "test_admin"); + protected static final String PASSWORD = System.getProperty("tntPass", "4pWBZmLEgkmKK5WP"); + + protected static final String LUA_FILE = "jdk-testing.lua"; + protected static final int LISTEN = 3301; + protected static final int ADMIN = 3313; + protected static final int TIMEOUT = 500; + protected static final int RESTART_TIMEOUT = 2000; + + protected static final SocketChannelProvider socketChannelProvider = new TestSocketChannelProvider( + HOST, PORT, RESTART_TIMEOUT + ); + + protected static TarantoolControl control; + protected static TarantoolConsole console; + + protected static final String TABLE_NAME = "sql_test"; + + private static final String[] setupScript = new String[]{ + "\\set language sql", + "\\set delimiter ;", + + "CREATE TABLE sql_test (id INTEGER PRIMARY KEY, val VARCHAR(100));", + "CREATE UNIQUE INDEX sql_test_val_index_unique ON sql_test (val);", + + "INSERT INTO sql_test VALUES (1, 'A');", + "INSERT INTO sql_test VALUES (2, 'B');", + "INSERT INTO sql_test VALUES (3, 'C');", + }; + + private static final String[] cleanScript = new String[]{ + "DROP TABLE sql_test;" + }; + + @BeforeAll + public static void setupEnv() { + control = new TarantoolControl(); + control.createInstance("jdk-testing", LUA_FILE, makeInstanceEnv(LISTEN, ADMIN)); + startTarantool("jdk-testing"); + + console = openConsole(); + + executeLua(setupScript); + } + + @AfterAll + public static void cleanupEnv() { + try { + executeLua(cleanScript); + console.close(); + } finally { + stopTarantool("jdk-testing"); + } + } + + private static void executeLua(String[] exprs) { + for (String expr : exprs) { + console.exec(expr); + } + } + + protected void checkTupleResult(List> expected, List> actual) { + assertNotNull(expected); + assertEquals(expected, actual); + } + + protected List> asResult(Object[][] tuples) { + List> result = new ArrayList<>(); + if (tuples != null) { + for (int i = 0; i < tuples.length; i++) { + Object[] tuple = tuples[i]; + if (tuple.length % 2 != 0) { + continue; + } + Map row = new HashMap<>(); + for (int j = 0; j <= tuple.length / 2; j += 2) { + row.put(tuple[j].toString(), tuple[j + 1]); + } + result.add(row); + } + } + return result; + } + + protected TarantoolClient makeClient() { + return new TarantoolClientImpl(socketChannelProvider, makeClientConfig()); + } + + protected TarantoolClient makeClient(SocketChannelProvider provider) { + return new TarantoolClientImpl(provider, makeClientConfig()); + } + + protected static TarantoolClientConfig makeClientConfig() { + TarantoolClientConfig config = new TarantoolClientConfig(); + config.username = USERNAME; + config.password = PASSWORD; + config.initTimeoutMillis = RESTART_TIMEOUT; + config.sharedBufferSize = 128; + return config; + } + + protected static TarantoolConsole openConsole() { + return TarantoolConsole.open(HOST, CONSOLE_PORT); + } + + protected static TarantoolConsole openConsole(String instance) { + return TarantoolConsole.open(control.tntCtlWorkDir, instance); + } + + protected static void stopTarantool(String instance) { + control.stop(instance); + control.waitStopped("jdk-testing"); + } + + protected static void startTarantool(String instance) { + control.start(instance); + control.waitStarted("jdk-testing"); + } + +} diff --git a/src/test/java/org/tarantool/AbstractTarantoolSQLOpsIT.java b/src/test/java/org/tarantool/AbstractTarantoolSQLOpsIT.java new file mode 100644 index 00000000..347c7709 --- /dev/null +++ b/src/test/java/org/tarantool/AbstractTarantoolSQLOpsIT.java @@ -0,0 +1,56 @@ +package org.tarantool; + +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * Tests operations available in {@link TarantoolSQLOps} interface. + */ +public abstract class AbstractTarantoolSQLOpsIT extends AbstractTarantoolSQLConnectorIT { + + protected abstract TarantoolSQLOps>> getSQLOps(); + + @Test + public void testSelectOne() { + List> result = getSQLOps().query("SELECT id, val FROM sql_test WHERE id = 1"); + checkTupleResult( + asResult(new Object[][] { {"ID", 1, "VAL", "A"} }), + result + ); + } + + @Test + public void testSelectMany() { + List> result = getSQLOps().query("SELECT id, val FROM sql_test WHERE id = 1 or val = 'B'"); + checkTupleResult( + asResult(new Object[][] { {"ID", 1, "VAL", "A"}, {"ID", 2, "VAL", "B"} }), + result + ); + } + + @Test + public void testSelectEmpty() { + List> result = getSQLOps().query("SELECT id, val FROM sql_test WHERE val = 'AB'"); + checkTupleResult( + asResult(new Object[][] { }), + result + ); + } + + @Test + public void testInsertOneRecord() { + Long rowsAffected = getSQLOps().update("INSERT INTO sql_test VALUES (27, 'Z');"); + assertEquals(1L, (long) rowsAffected); + } + + @Test + public void testInsertDuplication() { + assertThrows(TarantoolException.class, () -> getSQLOps().update("INSERT INTO sql_test VALUES (1, 'A');")); + } + +} diff --git a/src/test/java/org/tarantool/SQLOperationsIT.java b/src/test/java/org/tarantool/SQLOperationsIT.java new file mode 100644 index 00000000..d1fa2e81 --- /dev/null +++ b/src/test/java/org/tarantool/SQLOperationsIT.java @@ -0,0 +1,33 @@ +package org.tarantool; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; + +import java.util.List; +import java.util.Map; + +/** + * Tests for synchronous operations of {@link TarantoolClientImpl#sqlSyncOps()} implementation. + * + * Actual tests reside in base class. + */ +public class SQLOperationsIT extends AbstractTarantoolSQLOpsIT { + + private TarantoolClient client; + + @BeforeEach + public void setup() { + client = makeClient(); + } + + @AfterEach + public void tearDown() { + client.close(); + } + + @Override + protected TarantoolSQLOps>> getSQLOps() { + return client.sqlSyncOps(); + } + +}