Skip to content

Deployable table config #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/flink-sql-runner.yaml
Original file line number Diff line number Diff line change
@@ -20,6 +20,10 @@ jobs:
with:
distribution: temurin
java-version-file: .tool-versions
- name: Run tests with Maven
run: |
mvn test
working-directory: .
- name: Build with Maven
run: |
mvn clean package
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"java.configuration.updateBuildConfiguration": "automatic"
}
32 changes: 29 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@
<properties>
<flink.version>1.18.1</flink.version>
<flink.major-version>1.18</flink.major-version>
<scala.binary.version>2.12</scala.binary.version>
<hive.version>3.1.3</hive.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
@@ -42,12 +43,29 @@
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Required as per https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/configuration/overview/#running-and-packaging -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<dependency>
<groupId>org.apache.flink</groupId>
@@ -125,6 +143,11 @@
<version>2.3</version>
</dependency>

<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20210307</version>
</dependency>

<!-- Test -->
<dependency>
@@ -138,9 +161,12 @@
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin.version}</version>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin.version}</version>
<configuration>
<trimStackTrace>false</trimStackTrace>
</configuration>
</plugin>

<plugin>
110 changes: 88 additions & 22 deletions src/main/java/io/ecraft/SqlRunner.java
Original file line number Diff line number Diff line change
@@ -7,21 +7,25 @@
import java.io.*;
import java.nio.file.*;
import org.apache.commons.io.FilenameUtils;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.Environment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogStore;
import org.apache.flink.table.catalog.FileCatalogStore;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.velocity.VelocityContext;
import org.apache.velocity.app.Velocity;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -35,39 +39,27 @@ public class SqlRunner {
private static final String COMMENT_PATTERN = "(--.*)|(((\\/\\*)+?[\\w\\W]+?(\\*\\/)+))";

public static void main(String[] args) throws Exception {
ParameterTool parameters = ParameterTool.fromArgs(args);

if (args.length != 1) {
throw new Exception("Exactly one argument is expected.");
// Debug log the keys and values of the parameters
for (String key : parameters.toMap().keySet()) {
LOG.debug("Parameter: {} = {}", key, parameters.get(key));
}

EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String archiveUri = parameters.getRequired("archiveUri");
String environment = parameters.getRequired("environment");



String name = "hive";
String defaultDatabase = "default";
String hiveConfDir = "/conf/hive-conf";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog(name, hive);

// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog(name);

tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

LOG.debug("Current catalog: {}", tableEnv.getCurrentCatalog());
LOG.debug("Current database: {}", tableEnv.getCurrentDatabase());
LOG.debug("Available tables:");

for (String t: tableEnv.listTables()) {
LOG.debug(" - {}", t);
}
Path remoteArchivePath = new Path(archiveUri);

// Read the tar file from azure blob store to a local file
Path remoteArchivePath = new Path(args[0]);
FileSystem remoteArchiveFs = remoteArchivePath.getFileSystem();
FSDataInputStream remoteArchiveStream = remoteArchiveFs.open(remoteArchivePath);
// We name everything after the full name of the archive without extension (including hashes)
@@ -90,14 +82,47 @@ public static void main(String[] args) throws Exception {
InputStream zipInputStream = zipFile.getInputStream(entry);
transferTo(zipInputStream, zipEntryOutputStream);
}
zipFile.close();

// Read the json file
String jsonName = remoteArchivePath.getName().substring(0, remoteArchivePath.getName().lastIndexOf("-")) + ".json";
Path jsonPath = new Path("/tmp/" + jobName + "/" + jsonName);
FileSystem jsonFs = jsonPath.getFileSystem();
FSDataInputStream jsonInputStream = jsonFs.open(jsonPath);
BufferedReader jsonStreamReader = new BufferedReader(new InputStreamReader(jsonInputStream, "UTF-8"));
StringBuilder responseStrBuilder = new StringBuilder();

String inputStr;
while ((inputStr = jsonStreamReader.readLine()) != null)
responseStrBuilder.append(inputStr);
JSONObject deployableConfiguration = new JSONObject(responseStrBuilder.toString());

EnvironmentSettings settings = configureEnvironmentSettings(environment, deployableConfiguration, EnvironmentSettings.newInstance()).build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
tableEnv.registerCatalog(name, hive);

// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog(name);

tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

LOG.debug("Current catalog: {}", tableEnv.getCurrentCatalog());
LOG.debug("Current database: {}", tableEnv.getCurrentDatabase());
LOG.debug("Available tables:");

for (String t: tableEnv.listTables()) {
LOG.debug(" - {}", t);
}
configureTableEnvironment(environment, deployableConfiguration, tableEnv);

// Read the sql file
String sqlName = remoteArchivePath.getName().substring(0, remoteArchivePath.getName().lastIndexOf("-")) + ".sql";
Path sqlPath = new Path("/tmp/" + jobName + "/" + sqlName);
FileSystem sqlFs = sqlPath.getFileSystem();
FSDataInputStream sqlInputStream = sqlFs.open(sqlPath);
InputStreamReader reader = new InputStreamReader(sqlInputStream);
String script = new BufferedReader(reader).lines().parallel().collect(Collectors.joining("\n"));
BufferedReader scriptReader = new BufferedReader(reader);
String script = scriptReader.lines().parallel().collect(Collectors.joining("\n"));

List<String> statements = parseStatements(script, SqlRunner.loadEnvironment());
for (String statement : statements) {
@@ -107,6 +132,47 @@ public static void main(String[] args) throws Exception {
}
}

public static EnvironmentSettings.Builder configureEnvironmentSettings(String currentEnv, JSONObject deployableConfiguration, EnvironmentSettings.Builder builder) {
if (deployableConfiguration.has("environments")) {
JSONObject environments = deployableConfiguration.getJSONObject("environments");
if (environments.has(currentEnv)) {
JSONObject currentEnvironment = environments.getJSONObject(currentEnv);
if (currentEnvironment.has("mode")) {
String mode = currentEnvironment.getString("mode");
if (mode.equals("batch")) {
builder.inBatchMode();
} else if (mode.equals("streaming")) {
builder.inStreamingMode();
} else {
throw new RuntimeException("Invalid deployable configuration: '"+ mode + "' is not a valid mode");
}
}
}
}

return builder;
}

public static void configureTableEnvironment(String currentEnv, JSONObject deployableConfiguration, TableEnvironment tableEnvironment) {
TableConfig tableConfig = tableEnvironment.getConfig();

if (deployableConfiguration.has("environments")) {
JSONObject environments = deployableConfiguration.getJSONObject("environments");
if (environments.has(currentEnv)) {
JSONObject currentEnvironment = environments.getJSONObject(currentEnv);
if (currentEnvironment.has("tableConfig")) {
JSONObject tableConfigJson = currentEnvironment.getJSONObject("tableConfig");
for (String key : tableConfigJson.keySet()) {
String value = tableConfigJson.getString(key);
tableConfig.getConfiguration().setString(key, value);

LOG.debug("Setting table config {} to {}", key, value);
}
}
}
}
}

public static void transferTo(InputStream input, OutputStream output) throws IOException {
try {
byte[] buffer = new byte[1024];
56 changes: 56 additions & 0 deletions src/test/java/io/ecraft/SqlRunnerTest.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package io.ecraft;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.json.JSONObject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;
import java.util.HashMap;

@@ -32,4 +40,52 @@ public void testTemplating() throws Exception {

assertEquals(sql, "SELECT * FROM T;\n");
}

@Test
public void testTableConfig() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

String filePath = "src/test/java/io/ecraft/fixtures/deployableconfig1.json";
JSONObject jsonConfig = readJsonFile(filePath);

SqlRunner.configureTableEnvironment("dev", jsonConfig, tableEnv);

// Correctly define the ConfigOption
ConfigOption<Boolean> miniBatchEnabled = ConfigOptions.key("table.exec.mini-batch.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Enable mini-batch execution.");

assertEquals(tableEnv.getConfig().getConfiguration().get(miniBatchEnabled), true);

// define config option for table.exec.source.idle-timeout duration
ConfigOption<String> sourceIdleTimeout = ConfigOptions.key("table.exec.source.idle-timeout")
.stringType()
.defaultValue("0")
.withDescription("The time that a source will wait for new data before shutting down.");

assertEquals(tableEnv.getConfig().getConfiguration().get(sourceIdleTimeout), "5 min");
}

@Test
public void testEnvironmentConfig() throws Exception {
EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance();

String filePath = "src/test/java/io/ecraft/fixtures/deployableconfig1.json";
JSONObject jsonConfig = readJsonFile(filePath);

EnvironmentSettings settings = SqlRunner.configureEnvironmentSettings("dev", jsonConfig, builder).build();

assertEquals(settings.isStreamingMode(), true);
}

public static JSONObject readJsonFile(String filePath) throws IOException {
String content = new String(Files.readAllBytes(Paths.get(filePath)));
return new JSONObject(content);
}

}
13 changes: 13 additions & 0 deletions src/test/java/io/ecraft/fixtures/deployableconfig1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"name": "test01",
"type": "sql",
"environments": {
"dev": {
"mode": "streaming",
"tableConfig": {
"table.exec.mini-batch.enabled": true,
"table.exec.source.idle-timeout": "5 min"
}
}
}
}