diff --git a/build.gradle b/build.gradle index 04490aa..52db858 100644 --- a/build.gradle +++ b/build.gradle @@ -29,6 +29,8 @@ dependencies { compile "com.treasuredata.client:td-client:0.9.0" testCompile "junit:junit:4.+" + testCompile "org.embulk:embulk-standards:0.9.12" + testCompile "org.embulk:embulk-core:0.9.12:tests" } task classpath(type: Copy, dependsOn: ["jar"]) { diff --git a/src/main/java/org/embulk/input/td/writer/DoubleValueWriter.java b/src/main/java/org/embulk/input/td/writer/DoubleValueWriter.java index f2fdc5a..1a2edb3 100644 --- a/src/main/java/org/embulk/input/td/writer/DoubleValueWriter.java +++ b/src/main/java/org/embulk/input/td/writer/DoubleValueWriter.java @@ -13,6 +13,13 @@ public DoubleValueWriter(final Column column) { @Override public void writeNotNull(final Value v, final PageBuilder to) { - to.setDouble(index, v.asFloatValue().toDouble()); + if (v.isFloatValue()) { + to.setDouble(index, v.asFloatValue().toDouble()); + } else if (v.isStringValue()) { + // Support for DECIMAL type, which is packed / unpacked as String type in msgpack + to.setDouble(index, Double.parseDouble(v.asStringValue().toString())); + } else { + to.setNull(index); + } } } diff --git a/src/test/java/org/embulk/input/td/TestTdInputPlugin.java b/src/test/java/org/embulk/input/td/TestTdInputPlugin.java index e7a8975..397f297 100644 --- a/src/test/java/org/embulk/input/td/TestTdInputPlugin.java +++ b/src/test/java/org/embulk/input/td/TestTdInputPlugin.java @@ -1,4 +1,127 @@ package org.embulk.input.td; +import static org.embulk.spi.type.Types.BOOLEAN; +import static org.embulk.spi.type.Types.DOUBLE; +import static org.embulk.spi.type.Types.JSON; +import static org.embulk.spi.type.Types.LONG; +import static org.embulk.spi.type.Types.STRING; +import static org.embulk.spi.type.Types.TIMESTAMP; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assume.assumeNotNull; + +import java.util.ArrayList; +import java.util.List; +import org.embulk.EmbulkTestRuntime; +import org.embulk.config.ConfigDiff; +import org.embulk.config.ConfigSource; +import org.embulk.config.TaskReport; +import org.embulk.config.TaskSource; +import org.embulk.input.td.TdInputPlugin; +import org.embulk.spi.InputPlugin; +import org.embulk.spi.PageOutput; +import org.embulk.spi.Schema; +import org.embulk.spi.TestPageBuilderReader.MockPageOutput; +import org.embulk.spi.util.Pages; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + + public class TestTdInputPlugin { + private static String EMBULK_TD_TEST_APIKEY; + private static String EMBULK_TD_TEST_DATABASE; + + /** + * This test case requires environment variables: + * EMBULK_TD_TEST_APIKEY + * EMBULK_TD_TEST_DATABASE + * If the variables not set, the test case is skipped. + */ + @BeforeClass + public static void initializeConstantVariables() { + EMBULK_TD_TEST_APIKEY = System.getenv("EMBULK_TD_TEST_APIKEY"); + EMBULK_TD_TEST_DATABASE = System.getenv("EMBULK_TD_TEST_DATABASE"); + assumeNotNull(EMBULK_TD_TEST_APIKEY, EMBULK_TD_TEST_DATABASE); + } + + @Rule + public EmbulkTestRuntime runtime = new EmbulkTestRuntime(); + + private ConfigSource config; + private InputPlugin runner; + private MockPageOutput output; + + /** + * Initialize the config, runner, and output + * Note: "query" needs to be updated in each test case + */ + @Before + public void createResources() { + config = runtime.getExec().newConfigSource() + .set("type", "td") + .set("apikey", EMBULK_TD_TEST_APIKEY) + .set("database", EMBULK_TD_TEST_DATABASE) + .set("query", ""); + runner = new TdInputPlugin(); + output = new MockPageOutput(); + } + + @Test + public void primitiveTest() { + String primitiveQuery = + "SELECT * FROM ( VALUES ( " + + "BOOLEAN 'true', " + + "INTEGER '0', " + + "BIGINT '1', " + + "DOUBLE '0.0', " + + "DECIMAL '0.5', " + + "VARCHAR '0123' " + + " ) )"; + Schema schema = Schema.builder() + .add("col_boolean", BOOLEAN) + .add("col_integer", LONG) + .add("col_bigint", LONG) + .add("col_double", DOUBLE) + .add("col_decimal", DOUBLE) + .add("col_varchar", STRING) + .build(); + + ConfigSource config = this.config.deepCopy().set("query", primitiveQuery); + ConfigDiff configDiff = runner.transaction(config, new Control(runner, output)); + + List records = Pages.toObjects(schema, output.pages); + assertEquals(1, records.size()); + { + Object[] record = records.get(0); + assertEquals(true, record[0]); + assertEquals(0L, record[1]); + assertEquals(1L, record[2]); + assertEquals(0.0, record[3]); + assertEquals(0.5, record[4]); + assertEquals("0123", record[5]); + } + } + + static class Control + implements InputPlugin.Control { + private InputPlugin runner; + private PageOutput output; + + Control(InputPlugin runner, PageOutput output) { + this.runner = runner; + this.output = output; + } + + @Override + public List run(TaskSource taskSource, Schema schema, int taskCount) { + List reports = new ArrayList<>(); + for (int i = 0; i < taskCount; i++) { + reports.add(runner.run(taskSource, schema, i, output)); + } + return reports; + } + } }