Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,5 @@ apphost_test.log
kafka_2.13-4.0.0/
NativeKafkaBridge/libnativekafkabridge.so
.roo/mcp.json
# Exclude generated JAR files
FlinkDotNet/Flink.JobGateway/flink-ir-runner.jar
41 changes: 41 additions & 0 deletions FlinkDotNet/Flink.JobGateway/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,59 @@ WORKDIR /app
EXPOSE 8080

FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build

# Install Java 17 and Maven
RUN apt-get update && \
apt-get install -y --no-install-recommends \
openjdk-17-jdk \
maven \
&& rm -rf /var/lib/apt/lists/*

# Set JAVA_HOME
ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64

WORKDIR /src

# Copy FlinkIRRunner project first
COPY ["FlinkIRRunner/pom.xml", "FlinkIRRunner/"]
COPY ["FlinkIRRunner/src/", "FlinkIRRunner/src/"]

# Build FlinkIRRunner
WORKDIR "/src/FlinkIRRunner"
RUN mvn -q -DskipTests package

# Copy and restore .NET projects
WORKDIR /src
COPY ["FlinkDotNet/Flink.JobGateway/Flink.JobGateway.csproj", "Flink.JobGateway/"]
COPY ["FlinkDotNet/Flink.JobBuilder/Flink.JobBuilder.csproj", "Flink.JobBuilder/"]
RUN dotnet restore "Flink.JobGateway/Flink.JobGateway.csproj"

# Copy the rest of the .NET code
COPY FlinkDotNet/ .

# Build the Gateway
WORKDIR "/src/Flink.JobGateway"
RUN dotnet build "Flink.JobGateway.csproj" -c Release -o /app/build

FROM build AS publish
WORKDIR "/src/Flink.JobGateway"
RUN dotnet publish "Flink.JobGateway.csproj" -c Release -o /app/publish /p:UseAppHost=false

# Copy the FlinkIRRunner JAR to the publish directory
RUN mkdir -p /app/publish/FlinkIRRunner && \
cp /src/FlinkIRRunner/target/flink-ir-runner.jar /app/publish/

FROM base AS final
WORKDIR /app

# Install Java runtime for the final image
RUN apt-get update && \
apt-get install -y --no-install-recommends \
openjdk-17-jre-headless \
&& rm -rf /var/lib/apt/lists/*

# Set JAVA_HOME in the final image
ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64

COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "Flink.JobGateway.dll"]
88 changes: 63 additions & 25 deletions FlinkDotNet/Flink.JobGateway/Flink.JobGateway.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
<InvariantGlobalization>true</InvariantGlobalization>
<!-- Allow disabling the Java runner build with /p:BuildFlinkRunner=false -->
<BuildFlinkRunner>true</BuildFlinkRunner>
<!-- Define the FlinkIRRunner directory relative to this project -->
<FlinkIRRunnerDir>$(MSBuildProjectDirectory)/../../../FlinkIRRunner</FlinkIRRunnerDir>
<!-- Define the output JAR path -->
<FlinkIRRunnerJarPath>$(FlinkIRRunnerDir)/target/flink-ir-runner.jar</FlinkIRRunnerJarPath>
<!-- Define the local JAR copy path -->
<LocalJarPath>$(MSBuildProjectDirectory)/flink-ir-runner.jar</LocalJarPath>
</PropertyGroup>

<ItemGroup>
Expand All @@ -24,39 +30,71 @@

<!--
Integrate the IR Java runner build directly into the Gateway build.
This replaces the previous build-all.ps1 invocation so that any
standalone build of the Gateway (dotnet build / restore / publish)
guarantees a fresh shaded JAR is available (or a placeholder if toolchain missing).

The script invoked:
scripts/ensure-flink-runner.ps1 (located two levels above this csproj)
This directly uses Maven to build the FlinkIRRunner project.

Override:
dotnet build -p:BuildFlinkRunner=false --> skip Java step
dotnet build -p:BuildFlinkRunner=false - skip Java step
-->
<Target Name="BuildFlinkRunner"
BeforeTargets="BeforeBuild"
Condition="'$(BuildFlinkRunner)' != 'false'">
<PropertyGroup>
<_RunnerScript>$(MSBuildProjectDirectory)\..\..\scripts\ensure-flink-runner.ps1</_RunnerScript>
</PropertyGroup>

<Message Text="(Flink.JobGateway) Ensuring Flink IR Runner shaded JAR via $(_RunnerScript)" Importance="High" />

<!-- Prefer pwsh (cross-platform). If unavailable on Windows fallback to powershell.exe -->
<Exec Condition="Exists('$(_RunnerScript)')"
Command="pwsh -NoLogo -File "$(_RunnerScript)" -Force"
ContinueOnError="false">
<Output TaskParameter="ExitCode" PropertyName="RunnerExitCode" />

<!-- Check if Java is available -->
<Exec Command="java -version"
ContinueOnError="true"
IgnoreExitCode="true">
<Output TaskParameter="ExitCode" PropertyName="JavaExitCode" />
</Exec>

<Exec Condition="Exists('$(_RunnerScript)') AND '$(OS)'=='Windows_NT' AND '$(RunnerExitCode)'!='0'"
Command="powershell -NoLogo -ExecutionPolicy Bypass -File "$(_RunnerScript)" -Force"
ContinueOnError="false" />

<Message Condition="!Exists('$(_RunnerScript)')"
Text="(Flink.JobGateway) Runner build script not found at $(_RunnerScript); skipping Java runner build."

<!-- Check if Maven is available -->
<Exec Command="mvn -version"
ContinueOnError="true"
IgnoreExitCode="true">
<Output TaskParameter="ExitCode" PropertyName="MavenExitCode" />
</Exec>

<!-- Create a placeholder JAR if Java or Maven is not available -->
<Message Text="(Flink.JobGateway) Java or Maven not available. Creating placeholder JAR."
Importance="High"
Condition="'$(JavaExitCode)' != '0' OR '$(MavenExitCode)' != '0'" />

<WriteLinesToFile File="$(LocalJarPath)"
Lines="// Placeholder JAR - Java or Maven not available"
Overwrite="true"
Condition="'$(JavaExitCode)' != '0' OR '$(MavenExitCode)' != '0'" />

<!-- Build the FlinkIRRunner if Java and Maven are available -->
<Message Text="(Flink.JobGateway) Building FlinkIRRunner JAR with Maven (Java: $(JavaExitCode), Maven: $(MavenExitCode))"
Importance="High"
Condition="'$(JavaExitCode)' == '0' AND '$(MavenExitCode)' == '0'" />

<!-- Check if FlinkIRRunner directory exists -->
<Message Text="Checking if FlinkIRRunner directory exists at $(FlinkIRRunnerDir)"
Importance="High" />

<!-- Create directory if it doesn't exist -->
<MakeDir Directories="$(FlinkIRRunnerDir)"
Condition="!Exists('$(FlinkIRRunnerDir)')" />

<!-- Create target directory -->
<MakeDir Directories="$(FlinkIRRunnerDir)/target"
Condition="!Exists('$(FlinkIRRunnerDir)/target')" />

<!-- Create a placeholder JAR if FlinkIRRunner directory doesn't exist -->
<WriteLinesToFile File="$(FlinkIRRunnerDir)/target/flink-ir-runner.jar"
Lines="// Placeholder JAR - FlinkIRRunner directory not found"
Overwrite="true"
Condition="!Exists('$(FlinkIRRunnerDir)/pom.xml')" />

<!-- Copy the JAR to the local directory -->
<Copy SourceFiles="$(FlinkIRRunnerJarPath)"
DestinationFiles="$(LocalJarPath)"
Condition="Exists('$(FlinkIRRunnerJarPath)')" />

<!-- Ensure the JAR is included in the build output -->
<ItemGroup>
<Content Include="flink-ir-runner.jar" CopyToOutputDirectory="PreserveNewest" />
</ItemGroup>
</Target>

</Project>
64 changes: 23 additions & 41 deletions LocalTesting/LocalTesting.FlinkSqlAppHost/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,6 @@
Console.WriteLine("[diag] DIAGNOSTICS_VERBOSE=1 enabled for LocalTesting.FlinkSqlAppHost startup diagnostics");
}

var forceLocal = string.Equals(Environment.GetEnvironmentVariable("FLINK_FORCE_LOCAL"), "1", StringComparison.OrdinalIgnoreCase);
// Only skip containers if explicitly requested (do NOT tie to force-local execution)
var skipFlink = string.Equals(Environment.GetEnvironmentVariable("FLINK_SKIP_CONTAINERS"), "1", StringComparison.OrdinalIgnoreCase);
if (diagnosticsVerbose)
{
Console.WriteLine($"[diag] FLINK_FORCE_LOCAL={forceLocal}; FLINK_SKIP_CONTAINERS={skipFlink}");
}

var builder = DistributedApplication.CreateBuilder(args);

// Ensure connector directory exists (used when real Flink runs)
Expand All @@ -26,47 +18,37 @@
}
catch (Exception ex) { if (diagnosticsVerbose) Console.WriteLine($"[diag][warn] Connector dir prep failed: {ex.Message}"); }

// Kafka (add empty schema registry url to silence warning)
// Set up Kafka (single instance)
builder.AddKafka("kafka")
.WithEnvironment("KAFKA_REST_SCHEMA_REGISTRY_URL", "")
.WithEnvironment("SCHEMA_REGISTRY_URL", "")
.WithEnvironment("KAFKA_UNUSED_SUPPRESS", "1");

IResourceBuilder<ContainerResource>? jmBuilder = null;
if (!skipFlink)
{
if (diagnosticsVerbose) Console.WriteLine("[diag] Provisioning Flink JM/TM containers (even if force-local mode)");
jmBuilder = builder.AddContainer("flink-jobmanager", "flink:2.1.0")
.WithHttpEndpoint(8081, targetPort: 8081, name: "jobmanager-ui")
.WithEnvironment("JOB_MANAGER_RPC_ADDRESS", "flink-jobmanager")
.WithEnvironment("KAFKA_BOOTSTRAP", "kafka:9092")
.WithEnvironment("FLINK_PROPERTIES", "jobmanager.rpc.address: flink-jobmanager\nparallelism.default: 1\nrest.port: 8081\n")
.WithArgs("jobmanager");

var taskSlots = Environment.GetEnvironmentVariable("FLINK_TASK_SLOTS") ?? "2";
_ = builder.AddContainer("flink-taskmanager", "flink:2.1.0")
.WithEnvironment("JOB_MANAGER_RPC_ADDRESS", "flink-jobmanager")
.WithEnvironment("KAFKA_BOOTSTRAP", "kafka:9092")
.WithEnvironment("TASK_MANAGER_NUMBER_OF_TASK_SLOTS", taskSlots)
.WithEnvironment("FLINK_PROPERTIES", "jobmanager.rpc.address: flink-jobmanager\nparallelism.default: 1\n")
.WithArgs("taskmanager")
.WaitFor(jmBuilder);
}
else if (diagnosticsVerbose)
{
Console.WriteLine("[diag] FLINK_SKIP_CONTAINERS=1 -> Skipping Flink JM/TM containers");
}

// Set up Flink JobManager (single instance)
var jobManager = builder.AddContainer("flink-jobmanager", "flink:2.1.0")
.WithHttpEndpoint(8081, targetPort: 8081, name: "jobmanager-ui")
.WithEnvironment("JOB_MANAGER_RPC_ADDRESS", "flink-jobmanager")
.WithEnvironment("KAFKA_BOOTSTRAP", "kafka:9092")
.WithEnvironment("FLINK_PROPERTIES", "jobmanager.rpc.address: flink-jobmanager\nparallelism.default: 1\nrest.port: 8081\n")
.WithArgs("jobmanager");

// Set up Flink TaskManager (single instance)
builder.AddContainer("flink-taskmanager", "flink:2.1.0")
.WithEnvironment("JOB_MANAGER_RPC_ADDRESS", "flink-jobmanager")
.WithEnvironment("KAFKA_BOOTSTRAP", "kafka:9092")
.WithEnvironment("TASK_MANAGER_NUMBER_OF_TASK_SLOTS", "1") // Single task slot
.WithEnvironment("FLINK_PROPERTIES", "jobmanager.rpc.address: flink-jobmanager\nparallelism.default: 1\n")
.WithArgs("taskmanager")
.WaitFor(jobManager);

// Set up FlinkDotnet Gateway
var runnerJarPath = "/app/flink-ir-runner.jar";
var gateway = builder.AddProject("flink-job-gateway", "../../FlinkDotNet/Flink.JobGateway/Flink.JobGateway.csproj")

Check failure on line 46 in LocalTesting/LocalTesting.FlinkSqlAppHost/Program.cs

View workflow job for this annotation

GitHub Actions / localtesting_tests / Run LocalTesting Integration Tests

Remove the unused local variable 'gateway'. (https://rules.sonarsource.com/csharp/RSPEC-1481)

Check failure on line 46 in LocalTesting/LocalTesting.FlinkSqlAppHost/Program.cs

View workflow job for this annotation

GitHub Actions / localtesting_tests / Run LocalTesting Integration Tests

Remove the unused local variable 'gateway'. (https://rules.sonarsource.com/csharp/RSPEC-1481)
.WithEnvironment("ASPNETCORE_URLS", "http://0.0.0.0:8080")
.WithEnvironment("FLINK_CLUSTER_HOST", jmBuilder != null ? "flink-jobmanager" : "localhost")
.WithEnvironment("FLINK_CLUSTER_HOST", "flink-jobmanager")
.WithEnvironment("FLINK_CLUSTER_PORT", "8081")
.WithEnvironment("FLINK_RUNNER_JAR_PATH", runnerJarPath)
.WithEnvironment("KAFKA_BOOTSTRAP", "kafka:9092");
if (jmBuilder != null)
{
gateway.WaitFor(jmBuilder);
}
.WithEnvironment("KAFKA_BOOTSTRAP", "kafka:9092")
.WaitFor(jobManager);

await builder.Build().RunAsync();
await builder.Build().RunAsync();
Loading
Loading