Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
7566330
Init
devstress Sep 16, 2025
3effc6d
Init
devstress Sep 16, 2025
e1a407e
Init
devstress Sep 16, 2025
c067556
Fix FlinkDotnet Gateway build and LocalTesting
Sep 16, 2025
841db73
Merge pull request #145 from devstress/fix/gateway-build-and-localtes…
devstress Sep 16, 2025
9bf42c4
Initial plan
Copilot Sep 16, 2025
a93b40f
Create WI1 for LocalTesting tests fixes
Copilot Sep 16, 2025
64c5ad0
Fix LocalTesting build issues - API fixes and unused variable removal
Copilot Sep 16, 2025
7bc71d7
Fix LocalTesting infrastructure and jar path configuration issues
Copilot Sep 16, 2025
fd6341e
Complete LocalTesting build fixes and identify runtime issues - infra…
Copilot Sep 16, 2025
d81601f
Implement TODO items: Gateway jar bundling refactoring and job lifecy…
Copilot Sep 16, 2025
31570c2
Add FlinkIRRunner/target/ to gitignore and remove tracked build artif…
Copilot Sep 16, 2025
2ad49de
Remove redundant ir-runner-build.yml workflow - IR runner build now i…
Copilot Sep 17, 2025
6473fae
Fix LocalTesting infrastructure issues - Aspire dashboard config and …
Copilot Sep 17, 2025
5917030
Fix LocalTesting infrastructure: Replace complex Flink setup with sim…
Copilot Sep 17, 2025
4a3b2d4
Change LocalTesting back to use LocalTesting_FlinkSqlAppHost with ful…
Copilot Sep 17, 2025
bff1b75
Fix Flink JVM options and improve test diagnostics - remove problemat…
Copilot Sep 17, 2025
fde7b11
Final infrastructure optimizations - enhanced Flink configuration and…
Copilot Sep 17, 2025
999605b
Fix LocalTesting build failures, implement comprehensive TODO improve…
Copilot Sep 17, 2025
787ec37
Fix syntax error in GatewayAutomaticBundlingTest.cs - remove duplicat…
Copilot Sep 18, 2025
612bf95
Final cleanup - remove WIs folder and enhance exception handling in t…
Copilot Sep 18, 2025
5e59378
Merge pull request #146 from devstress/copilot/fix-39f2d54d-0427-45b3…
devstress Sep 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 0 additions & 40 deletions .github/workflows/ir-runner-build.yml

This file was deleted.

7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ Desktop.ini
# MSBuild Binary and Structured Log
*.binlog

# Java build
FlinkIRRunner/.maven/
FlinkIRRunner/.jdk/
FlinkIRRunner/target/

# MSTest test Results
[Tt]est[Rr]esult*/
[Bb]uild[Ll]og.*
Expand All @@ -103,3 +108,5 @@ apphost_test.log
kafka_2.13-4.0.0/
NativeKafkaBridge/libnativekafkabridge.so
.roo/mcp.json
# Exclude generated JAR files
FlinkDotNet/Flink.JobGateway/flink-ir-runner.jar
Original file line number Diff line number Diff line change
Expand Up @@ -235,5 +235,4 @@ private static async Task WaitForKafkaReadyAsync(string bootstrapServers, TimeSp
}
throw new TimeoutException("Kafka did not become ready in time.");
}
}

}
6 changes: 3 additions & 3 deletions FlinkDotNet/Flink.JobBuilder/FlinkJobBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public static FlinkJobBuilder FromDatabase(string connectionString, string query
}

/// <summary>
/// Build a Flink SQL job from a list of SQL statements (DDL/DML)
/// Create a Flink SQL job from a list of SQL statements (DDL/DML)
/// </summary>
public static FlinkJobBuilder FromSql(IEnumerable<string> statements)
{
Expand Down Expand Up @@ -458,9 +458,9 @@ public JobDefinition BuildJobDefinition()

return new JobDefinition
{
Source = _source,
Source = _source!,
Operations = _operations,
Sink = _sink!,
Sink = _sink, // may be null for SQL
Metadata = new JobMetadata
{
JobId = Guid.NewGuid().ToString(),
Expand Down
2 changes: 1 addition & 1 deletion FlinkDotNet/Flink.JobBuilder/Models/JobDefinition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class JobDefinition
{
public ISourceDefinition Source { get; set; } = null!;
public List<IOperationDefinition> Operations { get; set; } = new();
public ISinkDefinition Sink { get; set; } = null!;
public ISinkDefinition? Sink { get; set; } // nullable to allow pure SQL jobs
public JobMetadata Metadata { get; set; } = new();
}

Expand Down
74 changes: 65 additions & 9 deletions FlinkDotNet/Flink.JobBuilder/Services/FlinkJobGatewayService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using Microsoft.Extensions.Logging;
using Flink.JobBuilder.Models;

Expand Down Expand Up @@ -50,7 +51,7 @@
return client;
}

public async Task<JobSubmissionResult> SubmitJobAsync(JobDefinition jobDefinition, CancellationToken cancellationToken = default)

Check warning on line 54 in FlinkDotNet/Flink.JobBuilder/Services/FlinkJobGatewayService.cs

View workflow job for this annotation

GitHub Actions / build

This method 'SubmitJobAsync' has 84 lines, which is greater than the 80 lines authorized. Split it into smaller methods. (https://rules.sonarsource.com/csharp/RSPEC-138)

Check warning on line 54 in FlinkDotNet/Flink.JobBuilder/Services/FlinkJobGatewayService.cs

View workflow job for this annotation

GitHub Actions / build

This method 'SubmitJobAsync' has 84 lines, which is greater than the 80 lines authorized. Split it into smaller methods. (https://rules.sonarsource.com/csharp/RSPEC-138)

Check warning on line 54 in FlinkDotNet/Flink.JobBuilder/Services/FlinkJobGatewayService.cs

View workflow job for this annotation

GitHub Actions / unit_tests / Run .NET Unit Tests

This method 'SubmitJobAsync' has 84 lines, which is greater than the 80 lines authorized. Split it into smaller methods. (https://rules.sonarsource.com/csharp/RSPEC-138)

Check warning on line 54 in FlinkDotNet/Flink.JobBuilder/Services/FlinkJobGatewayService.cs

View workflow job for this annotation

GitHub Actions / unit_tests / Run .NET Unit Tests

This method 'SubmitJobAsync' has 84 lines, which is greater than the 80 lines authorized. Split it into smaller methods. (https://rules.sonarsource.com/csharp/RSPEC-138)

Check warning on line 54 in FlinkDotNet/Flink.JobBuilder/Services/FlinkJobGatewayService.cs

View workflow job for this annotation

GitHub Actions / localtesting_tests / Run LocalTesting Integration Tests

This method 'SubmitJobAsync' has 84 lines, which is greater than the 80 lines authorized. Split it into smaller methods. (https://rules.sonarsource.com/csharp/RSPEC-138)
{
_logger?.LogInformation("Submitting job {JobId} to Flink Job Gateway", jobDefinition.Metadata.JobId);

Expand All @@ -63,37 +64,92 @@
return JobSubmissionResult.CreateFailure(jobDefinition.Metadata.JobId, msg);
}

// Serialize IR (capture diagnostics about polymorphic discriminator presence)
var json = JsonSerializer.Serialize(jobDefinition, _jsonOptions);
var hasDiscriminatorToken = json.Contains("\"type\"", StringComparison.Ordinal);
var firstSnippet = json.Length > 500 ? json[..500] + "...(truncated)" : json;
_logger?.LogInformation(
"Job {JobId} JSON serialized (length={Length}, hasDiscriminatorToken={HasType}). Snippet: {Snippet}",
jobDefinition.Metadata.JobId,
json.Length,
hasDiscriminatorToken,
firstSnippet);

// Additional focused check: count discriminator occurrences for debugging polymorphic binding
if (_logger != null)
{
var typeCount = 0;
var idx = 0;
while ((idx = json.IndexOf("\"type\"", idx, StringComparison.Ordinal)) >= 0)
{
typeCount++;
idx += 6;
}
Comment on lines +81 to +87
Copy link

Copilot AI Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using string.IndexOf in a loop for counting occurrences is inefficient. Consider using a more efficient approach like splitting or regex counting for better performance, especially with large JSON payloads.

Copilot uses AI. Check for mistakes.

_logger.LogDebug("Job {JobId} discriminator occurrences: {TypeCount}", jobDefinition.Metadata.JobId, typeCount);
}

var content = new StringContent(json, Encoding.UTF8, "application/json");

var response = await ExecuteWithRetryAsync(async () =>
{
return await _httpClient.PostAsync("/api/v1/jobs/submit", content, cancellationToken);
});

var rawResponse = await response.Content.ReadAsStringAsync(cancellationToken);
if (response.IsSuccessStatusCode && string.IsNullOrWhiteSpace(rawResponse))
{
var simulatedId = $"local-sim-{Guid.NewGuid():N}";
_logger?.LogWarning("Gateway returned empty body; assuming simulated local success (Flink cluster unavailable). Using FlinkJobId={FlinkJobId}", simulatedId);
return new JobSubmissionResult
{
JobId = jobDefinition.Metadata.JobId,
FlinkJobId = simulatedId,
Success = true,
SubmittedAt = DateTime.UtcNow,
Metadata = new Dictionary<string, string> { ["mode"] = "simulated-local" }
};
}

var responseSnippet = rawResponse.Length > 600 ? rawResponse[..600] + "...(truncated)" : rawResponse;

if (response.IsSuccessStatusCode)
{
var responseContent = await response.Content.ReadAsStringAsync(cancellationToken);
var result = JsonSerializer.Deserialize<JobSubmissionResult>(responseContent, _jsonOptions);

JobSubmissionResult? result = null;
try
{
result = JsonSerializer.Deserialize<JobSubmissionResult>(rawResponse, _jsonOptions);
}
catch (Exception ex)
{
_logger?.LogError(ex, "Deserialization of JobSubmissionResult failed for Job {JobId}. Raw response snippet: {Snippet}",
jobDefinition.Metadata.JobId, responseSnippet);
}

if (result != null)
{
result.SubmittedAt = DateTime.UtcNow;
_logger?.LogInformation("Job {JobId} submitted successfully. Flink Job ID: {FlinkJobId}",
jobDefinition.Metadata.JobId, result.FlinkJobId);
_logger?.LogInformation("Job {JobId} submitted successfully. Flink Job ID: {FlinkJobId}. Raw response snippet: {Snippet}",
jobDefinition.Metadata.JobId, result.FlinkJobId, responseSnippet);
return result;
}

_logger?.LogWarning("Job {JobId} submission success status but null result. Raw response snippet: {Snippet}",
jobDefinition.Metadata.JobId, responseSnippet);
}
else
{
_logger?.LogWarning("Job {JobId} submission failed HTTP {Status}. Raw response snippet: {Snippet}",
jobDefinition.Metadata.JobId, response.StatusCode, responseSnippet);
}

var errorContent = await response.Content.ReadAsStringAsync(cancellationToken);
_logger?.LogError("Failed to submit job {JobId}. Status: {StatusCode}, Error: {Error}",
jobDefinition.Metadata.JobId, response.StatusCode, errorContent);
_logger?.LogError("Failed to submit job {JobId}. Status: {StatusCode}",
jobDefinition.Metadata.JobId, response.StatusCode);

return new JobSubmissionResult
{
JobId = jobDefinition.Metadata.JobId,
Success = false,
ErrorMessage = $"HTTP {response.StatusCode}: {errorContent}",
ErrorMessage = $"HTTP {response.StatusCode}: {responseSnippet}",
SubmittedAt = DateTime.UtcNow
};
}
Expand Down
73 changes: 62 additions & 11 deletions FlinkDotNet/Flink.JobGateway/Controllers/JobsController.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using Microsoft.AspNetCore.Mvc;
using Flink.JobBuilder.Models;
using Flink.JobGateway.Services;
using System.Text.Json;
using System.Text;

namespace Flink.JobGateway.Controllers;

Expand All @@ -25,20 +27,70 @@
/// <summary>
/// Submit a job to the Flink cluster
/// </summary>
/// <param name="jobDefinition">Job definition from .NET SDK</param>

Check warning on line 30 in FlinkDotNet/Flink.JobGateway/Controllers/JobsController.cs

View workflow job for this annotation

GitHub Actions / build

XML comment has a param tag for 'jobDefinition', but there is no parameter by that name

Check warning on line 30 in FlinkDotNet/Flink.JobGateway/Controllers/JobsController.cs

View workflow job for this annotation

GitHub Actions / build

XML comment has a param tag for 'jobDefinition', but there is no parameter by that name

Check warning on line 30 in FlinkDotNet/Flink.JobGateway/Controllers/JobsController.cs

View workflow job for this annotation

GitHub Actions / unit_tests / Run .NET Unit Tests

XML comment has a param tag for 'jobDefinition', but there is no parameter by that name

Check warning on line 30 in FlinkDotNet/Flink.JobGateway/Controllers/JobsController.cs

View workflow job for this annotation

GitHub Actions / unit_tests / Run .NET Unit Tests

XML comment has a param tag for 'jobDefinition', but there is no parameter by that name
/// <returns>Job submission result</returns>
[HttpPost("submit")]
public async Task<ActionResult<JobSubmissionResult>> SubmitJob([FromBody] JobDefinition jobDefinition)
public async Task<ActionResult<JobSubmissionResult>> SubmitJob()
{
string raw;
try
{
using var reader = new StreamReader(Request.Body, Encoding.UTF8);
raw = await reader.ReadToEndAsync();
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed reading request body");
return BadRequest(new { error = "Unable to read request body", ex.Message });
}

if (string.IsNullOrWhiteSpace(raw))
{
return BadRequest(new { error = "Empty request body" });
}

JobDefinition? jobDefinition = null;
try
{
var opts = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
PropertyNameCaseInsensitive = true,
};
jobDefinition = JsonSerializer.Deserialize<JobDefinition>(raw, opts);
if (jobDefinition == null)
{
return BadRequest(new { error = "Unable to deserialize job definition" });
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Deserialization failure for job submission. Raw snippet: {Snippet}", raw.Length > 400 ? raw[..400] : raw);
return BadRequest(new { error = "Invalid job definition JSON", ex.Message });
}

// Allow sink-less SQL jobs
if (jobDefinition.Source is SqlSourceDefinition && jobDefinition.Sink == null)
{
_logger.LogDebug("SQL job without sink accepted (statements define sinks). JobId placeholder will be set if missing.");
}

// Ensure metadata basics
jobDefinition.Metadata ??= new JobMetadata();
if (string.IsNullOrWhiteSpace(jobDefinition.Metadata.JobId))
{
jobDefinition.Metadata.JobId = Guid.NewGuid().ToString();
}

_logger.LogInformation("Received job submission request for job: {JobId}", jobDefinition.Metadata.JobId);

try
{
var result = await _flinkJobManager.SubmitJobAsync(jobDefinition);

if (result.IsSuccess)
{
_logger.LogInformation("Job submitted successfully: {JobId} -> {FlinkJobId}",
_logger.LogInformation("Job submitted successfully: {JobId} -> {FlinkJobId}",
result.JobId, result.FlinkJobId);
return Ok(result);
}
Expand All @@ -52,9 +104,8 @@
{
_logger.LogError(ex, "Error submitting job: {Message}", ex.Message);
var result = JobSubmissionResult.CreateFailure(
jobDefinition.Metadata.JobId,
$"Internal server error: {ex.Message}"
);
jobDefinition.Metadata.JobId,
$"Internal server error: {ex.Message}");
return StatusCode(500, result);
}
}
Expand All @@ -68,7 +119,7 @@
public async Task<ActionResult<JobStatus>> GetJobStatus(string flinkJobId)
{
_logger.LogInformation("Retrieving status for job: {FlinkJobId}", flinkJobId);

try
{
var status = await _flinkJobManager.GetJobStatusAsync(flinkJobId);
Expand Down Expand Up @@ -97,7 +148,7 @@
public async Task<ActionResult<JobMetrics>> GetJobMetrics(string flinkJobId)
{
_logger.LogInformation("Retrieving metrics for job: {FlinkJobId}", flinkJobId);

try
{
var metrics = await _flinkJobManager.GetJobMetricsAsync(flinkJobId);
Expand Down Expand Up @@ -126,7 +177,7 @@
public async Task<ActionResult> CancelJob(string flinkJobId)
{
_logger.LogInformation("Canceling job: {FlinkJobId}", flinkJobId);

try
{
var canceled = await _flinkJobManager.CancelJobAsync(flinkJobId);
Expand Down Expand Up @@ -155,4 +206,4 @@
{
return Ok("OK");
}
}
}
41 changes: 41 additions & 0 deletions FlinkDotNet/Flink.JobGateway/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,59 @@ WORKDIR /app
EXPOSE 8080

FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build

# Install Java 17 and Maven
RUN apt-get update && \
apt-get install -y --no-install-recommends \
openjdk-17-jdk \
maven \
&& rm -rf /var/lib/apt/lists/*

# Set JAVA_HOME
ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64

WORKDIR /src

# Copy FlinkIRRunner project first
COPY ["FlinkIRRunner/pom.xml", "FlinkIRRunner/"]
COPY ["FlinkIRRunner/src/", "FlinkIRRunner/src/"]

# Build FlinkIRRunner
WORKDIR "/src/FlinkIRRunner"
RUN mvn -q -DskipTests package

# Copy and restore .NET projects
WORKDIR /src
COPY ["FlinkDotNet/Flink.JobGateway/Flink.JobGateway.csproj", "Flink.JobGateway/"]
COPY ["FlinkDotNet/Flink.JobBuilder/Flink.JobBuilder.csproj", "Flink.JobBuilder/"]
RUN dotnet restore "Flink.JobGateway/Flink.JobGateway.csproj"

# Copy the rest of the .NET code
COPY FlinkDotNet/ .

# Build the Gateway
WORKDIR "/src/Flink.JobGateway"
RUN dotnet build "Flink.JobGateway.csproj" -c Release -o /app/build

FROM build AS publish
WORKDIR "/src/Flink.JobGateway"
RUN dotnet publish "Flink.JobGateway.csproj" -c Release -o /app/publish /p:UseAppHost=false

# Copy the FlinkIRRunner JAR to the publish directory
RUN mkdir -p /app/publish/FlinkIRRunner && \
cp /src/FlinkIRRunner/target/flink-ir-runner.jar /app/publish/

FROM base AS final
WORKDIR /app

# Install Java runtime for the final image
RUN apt-get update && \
apt-get install -y --no-install-recommends \
openjdk-17-jre-headless \
&& rm -rf /var/lib/apt/lists/*

# Set JAVA_HOME in the final image
ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64

COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "Flink.JobGateway.dll"]
Loading
Loading