diff --git a/.gitignore b/.gitignore index 9181c7c4..347e30e8 100644 --- a/.gitignore +++ b/.gitignore @@ -107,3 +107,5 @@ apphost_test.log kafka_2.13-4.0.0/ NativeKafkaBridge/libnativekafkabridge.so .roo/mcp.json +# Exclude generated JAR files +FlinkDotNet/Flink.JobGateway/flink-ir-runner.jar diff --git a/FlinkDotNet/Flink.JobGateway/Dockerfile b/FlinkDotNet/Flink.JobGateway/Dockerfile index 9d081666..45c5bd25 100644 --- a/FlinkDotNet/Flink.JobGateway/Dockerfile +++ b/FlinkDotNet/Flink.JobGateway/Dockerfile @@ -3,18 +3,59 @@ WORKDIR /app EXPOSE 8080 FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build + +# Install Java 17 and Maven +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + openjdk-17-jdk \ + maven \ + && rm -rf /var/lib/apt/lists/* + +# Set JAVA_HOME +ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 + +WORKDIR /src + +# Copy FlinkIRRunner project first +COPY ["FlinkIRRunner/pom.xml", "FlinkIRRunner/"] +COPY ["FlinkIRRunner/src/", "FlinkIRRunner/src/"] + +# Build FlinkIRRunner +WORKDIR "/src/FlinkIRRunner" +RUN mvn -q -DskipTests package + +# Copy and restore .NET projects WORKDIR /src COPY ["FlinkDotNet/Flink.JobGateway/Flink.JobGateway.csproj", "Flink.JobGateway/"] COPY ["FlinkDotNet/Flink.JobBuilder/Flink.JobBuilder.csproj", "Flink.JobBuilder/"] RUN dotnet restore "Flink.JobGateway/Flink.JobGateway.csproj" + +# Copy the rest of the .NET code COPY FlinkDotNet/ . + +# Build the Gateway WORKDIR "/src/Flink.JobGateway" RUN dotnet build "Flink.JobGateway.csproj" -c Release -o /app/build FROM build AS publish +WORKDIR "/src/Flink.JobGateway" RUN dotnet publish "Flink.JobGateway.csproj" -c Release -o /app/publish /p:UseAppHost=false +# Copy the FlinkIRRunner JAR to the publish directory +RUN mkdir -p /app/publish/FlinkIRRunner && \ + cp /src/FlinkIRRunner/target/flink-ir-runner.jar /app/publish/ + FROM base AS final WORKDIR /app + +# Install Java runtime for the final image +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + openjdk-17-jre-headless \ + && rm -rf /var/lib/apt/lists/* + +# Set JAVA_HOME in the final image +ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 + COPY --from=publish /app/publish . ENTRYPOINT ["dotnet", "Flink.JobGateway.dll"] \ No newline at end of file diff --git a/FlinkDotNet/Flink.JobGateway/Flink.JobGateway.csproj b/FlinkDotNet/Flink.JobGateway/Flink.JobGateway.csproj index ac64ed68..3c209766 100644 --- a/FlinkDotNet/Flink.JobGateway/Flink.JobGateway.csproj +++ b/FlinkDotNet/Flink.JobGateway/Flink.JobGateway.csproj @@ -7,6 +7,12 @@ true true + + $(MSBuildProjectDirectory)/../../../FlinkIRRunner + + $(FlinkIRRunnerDir)/target/flink-ir-runner.jar + + $(MSBuildProjectDirectory)/flink-ir-runner.jar @@ -24,39 +30,71 @@ skip Java step + dotnet build -p:BuildFlinkRunner=false - skip Java step --> - - <_RunnerScript>$(MSBuildProjectDirectory)\..\..\scripts\ensure-flink-runner.ps1 - - - - - - - + + + + - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/LocalTesting/LocalTesting.FlinkSqlAppHost/Program.cs b/LocalTesting/LocalTesting.FlinkSqlAppHost/Program.cs index 3f5dda31..7e6c01c6 100644 --- a/LocalTesting/LocalTesting.FlinkSqlAppHost/Program.cs +++ b/LocalTesting/LocalTesting.FlinkSqlAppHost/Program.cs @@ -7,14 +7,6 @@ Console.WriteLine("[diag] DIAGNOSTICS_VERBOSE=1 enabled for LocalTesting.FlinkSqlAppHost startup diagnostics"); } -var forceLocal = string.Equals(Environment.GetEnvironmentVariable("FLINK_FORCE_LOCAL"), "1", StringComparison.OrdinalIgnoreCase); -// Only skip containers if explicitly requested (do NOT tie to force-local execution) -var skipFlink = string.Equals(Environment.GetEnvironmentVariable("FLINK_SKIP_CONTAINERS"), "1", StringComparison.OrdinalIgnoreCase); -if (diagnosticsVerbose) -{ - Console.WriteLine($"[diag] FLINK_FORCE_LOCAL={forceLocal}; FLINK_SKIP_CONTAINERS={skipFlink}"); -} - var builder = DistributedApplication.CreateBuilder(args); // Ensure connector directory exists (used when real Flink runs) @@ -26,47 +18,37 @@ } catch (Exception ex) { if (diagnosticsVerbose) Console.WriteLine($"[diag][warn] Connector dir prep failed: {ex.Message}"); } -// Kafka (add empty schema registry url to silence warning) +// Set up Kafka (single instance) builder.AddKafka("kafka") .WithEnvironment("KAFKA_REST_SCHEMA_REGISTRY_URL", "") .WithEnvironment("SCHEMA_REGISTRY_URL", "") .WithEnvironment("KAFKA_UNUSED_SUPPRESS", "1"); -IResourceBuilder? jmBuilder = null; -if (!skipFlink) -{ - if (diagnosticsVerbose) Console.WriteLine("[diag] Provisioning Flink JM/TM containers (even if force-local mode)"); - jmBuilder = builder.AddContainer("flink-jobmanager", "flink:2.1.0") - .WithHttpEndpoint(8081, targetPort: 8081, name: "jobmanager-ui") - .WithEnvironment("JOB_MANAGER_RPC_ADDRESS", "flink-jobmanager") - .WithEnvironment("KAFKA_BOOTSTRAP", "kafka:9092") - .WithEnvironment("FLINK_PROPERTIES", "jobmanager.rpc.address: flink-jobmanager\nparallelism.default: 1\nrest.port: 8081\n") - .WithArgs("jobmanager"); - - var taskSlots = Environment.GetEnvironmentVariable("FLINK_TASK_SLOTS") ?? "2"; - _ = builder.AddContainer("flink-taskmanager", "flink:2.1.0") - .WithEnvironment("JOB_MANAGER_RPC_ADDRESS", "flink-jobmanager") - .WithEnvironment("KAFKA_BOOTSTRAP", "kafka:9092") - .WithEnvironment("TASK_MANAGER_NUMBER_OF_TASK_SLOTS", taskSlots) - .WithEnvironment("FLINK_PROPERTIES", "jobmanager.rpc.address: flink-jobmanager\nparallelism.default: 1\n") - .WithArgs("taskmanager") - .WaitFor(jmBuilder); -} -else if (diagnosticsVerbose) -{ - Console.WriteLine("[diag] FLINK_SKIP_CONTAINERS=1 -> Skipping Flink JM/TM containers"); -} - +// Set up Flink JobManager (single instance) +var jobManager = builder.AddContainer("flink-jobmanager", "flink:2.1.0") + .WithHttpEndpoint(8081, targetPort: 8081, name: "jobmanager-ui") + .WithEnvironment("JOB_MANAGER_RPC_ADDRESS", "flink-jobmanager") + .WithEnvironment("KAFKA_BOOTSTRAP", "kafka:9092") + .WithEnvironment("FLINK_PROPERTIES", "jobmanager.rpc.address: flink-jobmanager\nparallelism.default: 1\nrest.port: 8081\n") + .WithArgs("jobmanager"); + +// Set up Flink TaskManager (single instance) +builder.AddContainer("flink-taskmanager", "flink:2.1.0") + .WithEnvironment("JOB_MANAGER_RPC_ADDRESS", "flink-jobmanager") + .WithEnvironment("KAFKA_BOOTSTRAP", "kafka:9092") + .WithEnvironment("TASK_MANAGER_NUMBER_OF_TASK_SLOTS", "1") // Single task slot + .WithEnvironment("FLINK_PROPERTIES", "jobmanager.rpc.address: flink-jobmanager\nparallelism.default: 1\n") + .WithArgs("taskmanager") + .WaitFor(jobManager); + +// Set up FlinkDotnet Gateway var runnerJarPath = "/app/flink-ir-runner.jar"; var gateway = builder.AddProject("flink-job-gateway", "../../FlinkDotNet/Flink.JobGateway/Flink.JobGateway.csproj") .WithEnvironment("ASPNETCORE_URLS", "http://0.0.0.0:8080") - .WithEnvironment("FLINK_CLUSTER_HOST", jmBuilder != null ? "flink-jobmanager" : "localhost") + .WithEnvironment("FLINK_CLUSTER_HOST", "flink-jobmanager") .WithEnvironment("FLINK_CLUSTER_PORT", "8081") .WithEnvironment("FLINK_RUNNER_JAR_PATH", runnerJarPath) - .WithEnvironment("KAFKA_BOOTSTRAP", "kafka:9092"); -if (jmBuilder != null) -{ - gateway.WaitFor(jmBuilder); -} + .WithEnvironment("KAFKA_BOOTSTRAP", "kafka:9092") + .WaitFor(jobManager); -await builder.Build().RunAsync(); +await builder.Build().RunAsync(); \ No newline at end of file diff --git a/LocalTesting/LocalTesting.IntegrationTests/FlinkDotNetComprehensiveTest.cs b/LocalTesting/LocalTesting.IntegrationTests/FlinkDotNetComprehensiveTest.cs new file mode 100644 index 00000000..c8d48372 --- /dev/null +++ b/LocalTesting/LocalTesting.IntegrationTests/FlinkDotNetComprehensiveTest.cs @@ -0,0 +1,285 @@ +using System.Diagnostics; +using Aspire.Hosting.Testing; +using Confluent.Kafka; +using NUnit.Framework; + +namespace LocalTesting.IntegrationTests; + +[TestFixture] +[Category("flinkdotnet-comprehensive")] +public class FlinkDotNetComprehensiveTest +{ + // Topic naming convention: lt.flink.. + private const string BasicInputTopic = "lt.flink.basic.input"; + private const string BasicOutputTopic = "lt.flink.basic.output"; + + private const string FilterInputTopic = "lt.flink.filter.input"; + private const string FilterOutputTopic = "lt.flink.filter.output"; + + private const string SplitInputTopic = "lt.flink.split.input"; + private const string SplitOutputTopic = "lt.flink.split.output"; + + private const string TimerInputTopic = "lt.flink.timer.input"; + private const string TimerOutputTopic = "lt.flink.timer.output"; + + private const string SqlInputTopic = "lt.flink.sql.input"; + private const string SqlOutputTopic = "lt.flink.sql.output"; + + private const string SqlTransformInputTopic = "lt.flink.sqltransform.input"; + private const string SqlTransformOutputTopic = "lt.flink.sqltransform.output"; + + private const string CompositeInputTopic = "lt.flink.composite.input"; + private const string CompositeOutputTopic = "lt.flink.composite.output"; + + [Test] + public async Task FlinkDotNet_Comprehensive_AllJobTypes() + { + // Remove forced local simulation; require real Flink cluster + Environment.SetEnvironmentVariable("FLINK_FORCE_LOCAL", null); + + var ct = TestContext.CurrentContext.CancellationToken; + var appHost = await DistributedApplicationTestingBuilder.CreateAsync(ct); + var app = await appHost.BuildAsync(ct); + await app.StartAsync(ct); + + try + { + // Wait for Kafka to be ready + await app.ResourceNotifications + .WaitForResourceHealthyAsync("kafka", ct) + .WaitAsync(TimeSpan.FromSeconds(90), ct); + + var kafka = await app.GetConnectionStringAsync("kafka", ct); + await WaitForKafkaReady(kafka!, TimeSpan.FromSeconds(90), ct); + + // Wait for Flink to be ready + await WaitForFlinkReadyAsync("http://localhost:8081/v1/overview", TimeSpan.FromSeconds(60), ct); + + // Wait for Gateway to be ready + await EnsureGatewayAsync(ct); + + // Create all topics + await CreateTopicAsync(kafka!, BasicInputTopic, 1); + await CreateTopicAsync(kafka!, BasicOutputTopic, 1); + await CreateTopicAsync(kafka!, FilterInputTopic, 1); + await CreateTopicAsync(kafka!, FilterOutputTopic, 1); + await CreateTopicAsync(kafka!, SplitInputTopic, 1); + await CreateTopicAsync(kafka!, SplitOutputTopic, 1); + await CreateTopicAsync(kafka!, TimerInputTopic, 1); + await CreateTopicAsync(kafka!, TimerOutputTopic, 1); + await CreateTopicAsync(kafka!, SqlInputTopic, 1); + await CreateTopicAsync(kafka!, SqlOutputTopic, 1); + await CreateTopicAsync(kafka!, SqlTransformInputTopic, 1); + await CreateTopicAsync(kafka!, SqlTransformOutputTopic, 1); + await CreateTopicAsync(kafka!, CompositeInputTopic, 1); + await CreateTopicAsync(kafka!, CompositeOutputTopic, 1); + + // Test 1: Basic Uppercase Job + TestContext.WriteLine("Testing Basic Uppercase Job"); + var basicResult = await FlinkDotNetJobs.CreateUppercaseJob( + BasicInputTopic, BasicOutputTopic, kafka!, "lt-basic", ct); + + Assert.That(basicResult.Success, Is.True, "Basic job must submit successfully"); + TestContext.WriteLine($"Basic job submitted with ID: {basicResult.FlinkJobId}"); + + await ProduceSimpleMessagesAsync(kafka!, BasicInputTopic, 10, ct); + var basicConsumed = await ConsumeAsync(kafka!, BasicOutputTopic, 10, TimeSpan.FromSeconds(30), ct); + Assert.That(basicConsumed, Is.GreaterThanOrEqualTo(10), "Basic job should process all messages"); + + // Test 2: Filter Job + TestContext.WriteLine("Testing Filter Job"); + var filterResult = await FlinkDotNetJobs.CreateFilterJob( + FilterInputTopic, FilterOutputTopic, kafka!, "lt-filter", ct); + + Assert.That(filterResult.Success, Is.True, "Filter job must submit successfully"); + TestContext.WriteLine($"Filter job submitted with ID: {filterResult.FlinkJobId}"); + + // Send some empty and non-empty messages + await ProduceMessagesAsync(kafka!, FilterInputTopic, new[] { "", "value1", "", "value2", "value3" }, ct); + var filterConsumed = await ConsumeAsync(kafka!, FilterOutputTopic, 3, TimeSpan.FromSeconds(30), ct); + Assert.That(filterConsumed, Is.EqualTo(3), "Filter job should only process non-empty messages"); + + // Test 3: SQL Job + TestContext.WriteLine("Testing SQL Job"); + var sqlResult = await FlinkDotNetJobs.CreateSqlPassthroughJob( + SqlInputTopic, SqlOutputTopic, kafka!, "lt-sql", ct); + + Assert.That(sqlResult.Success, Is.True, "SQL job must submit successfully"); + TestContext.WriteLine($"SQL job submitted with ID: {sqlResult.FlinkJobId}"); + + await ProduceJsonMessagesAsync(kafka!, SqlInputTopic, 5, ct); + var sqlConsumed = await ConsumeAsync(kafka!, SqlOutputTopic, 5, TimeSpan.FromSeconds(30), ct); + Assert.That(sqlConsumed, Is.GreaterThanOrEqualTo(5), "SQL job should process all messages"); + } + finally + { + try { await app.DisposeAsync(); } catch { } + } + } + + #region Helpers + private static async Task EnsureGatewayAsync(CancellationToken ct) + { + // Flink Job Gateway health endpoint (ASP.NET) + await WaitForHttpOkAsync("http://localhost:8080/api/v1/health", TimeSpan.FromSeconds(60), ct); + } + + private static async Task CreateTopicAsync(string bootstrapServers, string topic, int partitions) + { + using var admin = new Confluent.Kafka.AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build(); + try + { + await admin.CreateTopicsAsync(new[] { new Confluent.Kafka.Admin.TopicSpecification { Name = topic, NumPartitions = partitions, ReplicationFactor = 1 } }); + } + catch (Confluent.Kafka.Admin.CreateTopicsException ex) + { + if (!ex.Results.Any(r => r.Error.Code == Confluent.Kafka.ErrorCode.TopicAlreadyExists)) + throw; + } + } + + private static async Task ProduceSimpleMessagesAsync(string bootstrap, string topic, int count, CancellationToken ct) + { + using var producer = new ProducerBuilder(new ProducerConfig + { + BootstrapServers = bootstrap, + EnableIdempotence = true, + Acks = Acks.All, + LingerMs = 5 + }).Build(); + + for (int i = 0; i < count; i++) + { + await producer.ProduceAsync(topic, new Message { Key = $"key-{i}", Value = $"value-{i}" }, ct); + } + + producer.Flush(TimeSpan.FromSeconds(10)); + } + + private static async Task ProduceMessagesAsync(string bootstrap, string topic, string[] values, CancellationToken ct) + { + using var producer = new ProducerBuilder(new ProducerConfig + { + BootstrapServers = bootstrap, + EnableIdempotence = true, + Acks = Acks.All, + LingerMs = 5 + }).Build(); + + for (int i = 0; i < values.Length; i++) + { + await producer.ProduceAsync(topic, new Message { Key = $"key-{i}", Value = values[i] }, ct); + } + + producer.Flush(TimeSpan.FromSeconds(10)); + } + + private static async Task ProduceJsonMessagesAsync(string bootstrap, string topic, int count, CancellationToken ct) + { + using var producer = new ProducerBuilder(new ProducerConfig + { + BootstrapServers = bootstrap, + EnableIdempotence = true, + Acks = Acks.All, + LingerMs = 5 + }).Build(); + + for (int i = 0; i < count; i++) + { + var jsonValue = $"{{"key":"key-{i}","value":"value-{i}"}}"; + await producer.ProduceAsync(topic, new Message { Key = $"key-{i}", Value = jsonValue }, ct); + } + + producer.Flush(TimeSpan.FromSeconds(10)); + } + + private static Task ConsumeAsync(string bootstrap, string topic, int expectedMin, TimeSpan timeout, CancellationToken ct) + { + var config = new ConsumerConfig + { + BootstrapServers = bootstrap, + GroupId = $"lt-flink-consumer-{Guid.NewGuid()}", + AutoOffsetReset = AutoOffsetReset.Earliest, + EnableAutoCommit = false + }; + + using var consumer = new ConsumerBuilder(config).Build(); + consumer.Subscribe(topic); + var sw = Stopwatch.StartNew(); + long total = 0; + + while (sw.Elapsed < timeout && total < expectedMin && !ct.IsCancellationRequested) + { + var cr = consumer.Consume(TimeSpan.FromMilliseconds(250)); + if (cr != null) total++; + } + + consumer.Close(); + return Task.FromResult(total); + } + + private static async Task WaitForHttpOkAsync(string url, TimeSpan timeout, CancellationToken ct) + { + using var http = new HttpClient { Timeout = TimeSpan.FromSeconds(5) }; + var sw = Stopwatch.StartNew(); + + while (sw.Elapsed < timeout) + { + try + { + var resp = await http.GetAsync(url, ct); + if ((int)resp.StatusCode >= 200 && (int)resp.StatusCode < 500) return; // tolerate 404 placeholder + } + catch { } + + await Task.Delay(500, ct); + } + + throw new TimeoutException($"HTTP probe timed out for {url}"); + } + + private static async Task WaitForFlinkReadyAsync(string overviewUrl, TimeSpan timeout, CancellationToken ct) + { + using var http = new HttpClient { Timeout = TimeSpan.FromSeconds(5) }; + var sw = Stopwatch.StartNew(); + + while (sw.Elapsed < timeout && !ct.IsCancellationRequested) + { + try + { + var resp = await http.GetAsync(overviewUrl, ct); + if (resp.IsSuccessStatusCode) + { + var content = await resp.Content.ReadAsStringAsync(ct); + if (!string.IsNullOrEmpty(content)) return; // Consider ready + } + } + catch { } + + await Task.Delay(1000, ct); + } + + throw new TimeoutException("Flink JobManager REST API not ready: " + overviewUrl); + } + + private static async Task WaitForKafkaReady(string bootstrapServers, TimeSpan timeout, CancellationToken ct) + { + var sw = Stopwatch.StartNew(); + + while (sw.Elapsed < timeout && !ct.IsCancellationRequested) + { + try + { + using var admin = new Confluent.Kafka.AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers, SocketTimeoutMs = 5000 }).Build(); + var md = admin.GetMetadata(TimeSpan.FromSeconds(3)); + if (md?.Brokers?.Count > 0) return; + } + catch { } + + await Task.Delay(500, ct); + } + + throw new TimeoutException($"Kafka did not become ready within {timeout.TotalSeconds:F0}s at {bootstrapServers}"); + } + #endregion +} \ No newline at end of file diff --git a/LocalTesting/LocalTesting.IntegrationTests/FlinkDotNetJobs.cs b/LocalTesting/LocalTesting.IntegrationTests/FlinkDotNetJobs.cs new file mode 100644 index 00000000..97e1dd01 --- /dev/null +++ b/LocalTesting/LocalTesting.IntegrationTests/FlinkDotNetJobs.cs @@ -0,0 +1,172 @@ +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace LocalTesting.IntegrationTests; + +/// +/// Contains various FlinkDotNet job implementations for testing different features +/// +public static class FlinkDotNetJobs +{ + /// + /// Creates a simple DataStream job that converts input strings to uppercase + /// + public static async Task CreateUppercaseJob( + string inputTopic, + string outputTopic, + string kafka, + string jobName, + CancellationToken ct) + { + var job = FlinkDotNet.Flink.JobBuilder + .FromKafka(inputTopic, kafka) + .Map("upper") + .ToKafka(outputTopic, kafka); + + return await job.Submit(jobName, ct); + } + + /// + /// Creates a DataStream job with filtering + /// + public static async Task CreateFilterJob( + string inputTopic, + string outputTopic, + string kafka, + string jobName, + CancellationToken ct) + { + var job = FlinkDotNet.Flink.JobBuilder + .FromKafka(inputTopic, kafka) + .Where("nonempty") + .ToKafka(outputTopic, kafka); + + return await job.Submit(jobName, ct); + } + + /// + /// Creates a DataStream job with string splitting and concatenation + /// + public static async Task CreateSplitConcatJob( + string inputTopic, + string outputTopic, + string kafka, + string jobName, + CancellationToken ct) + { + var job = FlinkDotNet.Flink.JobBuilder + .FromKafka(inputTopic, kafka) + .Map("split:,") + .Map("concat:-joined") + .ToKafka(outputTopic, kafka); + + return await job.Submit(jobName, ct); + } + + /// + /// Creates a DataStream job with timer functionality + /// + public static async Task CreateTimerJob( + string inputTopic, + string outputTopic, + string kafka, + string jobName, + CancellationToken ct) + { + var job = FlinkDotNet.Flink.JobBuilder + .FromKafka(inputTopic, kafka) + .WithTimer(5) + .ToKafka(outputTopic, kafka); + + return await job.Submit(jobName, ct); + } + + /// + /// Creates a SQL job that passes through data from input to output + /// + public static async Task CreateSqlPassthroughJob( + string inputTopic, + string outputTopic, + string kafka, + string jobName, + CancellationToken ct) + { + var sqlStatements = new[] + { + $@"CREATE TABLE input ( `key` STRING, `value` STRING ) WITH ( + 'connector'='kafka', + 'topic'='{inputTopic}', + 'properties.bootstrap.servers'='{kafka}', + 'properties.group.id'='flink-sql-test', + 'scan.startup.mode'='earliest-offset', + 'format'='json' + )", + $@"CREATE TABLE output ( `key` STRING, `value` STRING ) WITH ( + 'connector'='kafka', + 'topic'='{outputTopic}', + 'properties.bootstrap.servers'='{kafka}', + 'format'='json' + )", + "INSERT INTO output SELECT `key`, `value` FROM input" + }; + + var sqlJob = FlinkDotNet.Pipelines.FlinkDotNet.Sql(sqlStatements); + return await sqlJob.Submit(jobName, ct); + } + + /// + /// Creates a SQL job that transforms data + /// + public static async Task CreateSqlTransformJob( + string inputTopic, + string outputTopic, + string kafka, + string jobName, + CancellationToken ct) + { + var sqlStatements = new[] + { + $@"CREATE TABLE input ( `key` STRING, `value` STRING ) WITH ( + 'connector'='kafka', + 'topic'='{inputTopic}', + 'properties.bootstrap.servers'='{kafka}', + 'properties.group.id'='flink-sql-transform', + 'scan.startup.mode'='earliest-offset', + 'format'='json' + )", + $@"CREATE TABLE output ( `key` STRING, `transformed` STRING ) WITH ( + 'connector'='kafka', + 'topic'='{outputTopic}', + 'properties.bootstrap.servers'='{kafka}', + 'format'='json' + )", + "INSERT INTO output SELECT `key`, UPPER(`value`) as `transformed` FROM input" + }; + + var sqlJob = FlinkDotNet.Pipelines.FlinkDotNet.Sql(sqlStatements); + return await sqlJob.Submit(jobName, ct); + } + + /// + /// Creates a composite job that combines multiple operations + /// + public static async Task CreateCompositeJob( + string inputTopic, + string outputTopic, + string kafka, + string jobName, + CancellationToken ct) + { + var job = FlinkDotNet.Flink.JobBuilder + .FromKafka(inputTopic, kafka) + .Map("split:,") + .Map("concat:-tail") + .Map("upper") + .Where("nonempty") + .WithTimer(5) + .ToKafka(outputTopic, kafka); + + return await job.Submit(jobName, ct); + } +} \ No newline at end of file diff --git a/LocalTesting/LocalTesting.IntegrationTests/FlinkIrStringOpsIntegrationTest.cs b/LocalTesting/LocalTesting.IntegrationTests/FlinkIrStringOpsIntegrationTest.cs index 2d1f6b95..f8801f1e 100644 --- a/LocalTesting/LocalTesting.IntegrationTests/FlinkIrStringOpsIntegrationTest.cs +++ b/LocalTesting/LocalTesting.IntegrationTests/FlinkIrStringOpsIntegrationTest.cs @@ -6,16 +6,14 @@ namespace LocalTesting.IntegrationTests; [TestFixture] -[Category("flinkdotnet-all")] // Consolidated single integration test -public class FlinkDotNetUnifiedIntegrationTest +[Category("flinkdotnet-basic")] +public class FlinkDotNetBasicIntegrationTest { - private const string InputTopic1 = "lt.flink.unified.input1"; // for DataStream IR job - private const string OutputTopic1 = "lt.flink.unified.output1"; - private const string InputTopic2 = "lt.flink.unified.sql.input"; // for SQL job - private const string OutputTopic2 = "lt.flink.unified.sql.output"; + private const string InputTopic = "lt.flink.basic.input"; + private const string OutputTopic = "lt.flink.basic.output"; [Test] - public async Task FlinkDotNet_Unified_KafkaToKafka_AllJobTypes() + public async Task FlinkDotNet_Basic_KafkaToKafka_Test() { // Remove forced local simulation; require real Flink cluster Environment.SetEnvironmentVariable("FLINK_FORCE_LOCAL", null); @@ -27,6 +25,7 @@ public async Task FlinkDotNet_Unified_KafkaToKafka_AllJobTypes() try { + // Wait for Kafka to be ready await app.ResourceNotifications .WaitForResourceHealthyAsync("kafka", ct) .WaitAsync(TimeSpan.FromSeconds(90), ct); @@ -34,77 +33,45 @@ await app.ResourceNotifications var kafka = await app.GetConnectionStringAsync("kafka", ct); await WaitForKafkaReady(kafka!, TimeSpan.FromSeconds(90), ct); - // Optional Flink readiness: try but ignore failures when forcing local execution - if (!string.Equals(Environment.GetEnvironmentVariable("FLINK_FORCE_LOCAL"), "1", StringComparison.OrdinalIgnoreCase)) - { - await WaitForFlinkReadyAsync("http://localhost:8081/v1/overview", TimeSpan.FromSeconds(30), ct); - } - else - { - _ = Task.Run(async () => - { - try { await WaitForFlinkReadyAsync("http://localhost:8081/v1/overview", TimeSpan.FromSeconds(10), ct); } - catch { /* ignored: running local fallback */ } - }, ct); - } + // Wait for Flink to be ready + await WaitForFlinkReadyAsync("http://localhost:8081/v1/overview", TimeSpan.FromSeconds(60), ct); - await EnsureGatewayAsync(ct); // Gateway health still required + // Wait for Gateway to be ready + await EnsureGatewayAsync(ct); // Create topics - await CreateTopicAsync(kafka!, InputTopic1, 4); - await CreateTopicAsync(kafka!, OutputTopic1, 4); - await CreateTopicAsync(kafka!, InputTopic2, 4); - await CreateTopicAsync(kafka!, OutputTopic2, 4); - - // 1. Submit IR/DataStream style job (map + split + concat + timer) - var irJob = FlinkDotNet.Flink.JobBuilder - .FromKafka(InputTopic1, kafka) - .Map("split:,") - .Map("concat:-tail") + await CreateTopicAsync(kafka!, InputTopic, 1); + await CreateTopicAsync(kafka!, OutputTopic, 1); + + // Submit a simple DataStream job + var job = FlinkDotNet.Flink.JobBuilder + .FromKafka(InputTopic, kafka) .Map("upper") - .Where("nonempty") - .WithTimer(5) - .ToKafka(OutputTopic1, kafka); - var irSubmit = await irJob.Submit("lt-ir-composite", ct); - TestContext.WriteLine($"IR/DataStream submit success={irSubmit.Success}; jobId={irSubmit.FlinkJobId}; error={irSubmit.ErrorMessage}"); - Assert.That(irSubmit.Success, Is.True, "IR/DataStream pipeline must submit successfully"); - - // 2. Submit SQL job (create table -> select -> insert) - var sqlStatements = new[] - { - $@"CREATE TABLE input ( `key` STRING, `value` STRING ) WITH ( 'connector'='kafka','topic'='{InputTopic2}','properties.bootstrap.servers'='{kafka}','properties.group.id'='flink-sql-unified','scan.startup.mode'='earliest-offset','format'='json')", - $@"CREATE TABLE output ( `key` STRING, `value` STRING ) WITH ( 'connector'='kafka','topic'='{OutputTopic2}','properties.bootstrap.servers'='{kafka}','format'='json')", - "INSERT INTO output SELECT `key`, `value` FROM input" } - ; - var sqlJob = FlinkDotNet.Pipelines.FlinkDotNet.Sql(sqlStatements); - var sqlSubmit = await sqlJob.Submit("lt-sql-unified", ct); - TestContext.WriteLine($"SQL submit success={sqlSubmit.Success}; jobId={sqlSubmit.FlinkJobId}; error={sqlSubmit.ErrorMessage}"); - Assert.That(sqlSubmit.Success, Is.True, "SQL pipeline must submit successfully (runner jar / connectors optional in forced local mode)"); - - // Produce IR input - var irProduced = 10; // smaller for faster CI - await ProduceCsvMessagesAsync(kafka!, InputTopic1, irProduced, ct); - // Produce SQL input - var sqlProduced = 10; - await ProduceSimpleMessagesAsync(kafka!, InputTopic2, sqlProduced, ct); - - // Consume IR output (split increases count so expect >= produced) - var irConsumed = await ConsumeAsync(kafka!, OutputTopic1, irProduced / 2, TimeSpan.FromSeconds(30), ct); - TestContext.WriteLine($"IR/DataStream consumed={irConsumed}"); - Assert.That(irConsumed, Is.GreaterThan(0), "IR stream should produce some outputs in local simulation"); - - // Consume SQL output (at least some routed messages) - var sqlConsumed = await ConsumeAsync(kafka!, OutputTopic2, 1, TimeSpan.FromSeconds(30), ct); - TestContext.WriteLine($"SQL consumed={sqlConsumed}"); - Assert.That(sqlConsumed, Is.GreaterThanOrEqualTo(0), "SQL job local simulation should not fail"); + .ToKafka(OutputTopic, kafka); + + var submitResult = await job.Submit("lt-basic-test", ct); + TestContext.WriteLine($"Job submit success={submitResult.Success}; jobId={submitResult.FlinkJobId}; error={submitResult.ErrorMessage}"); + Assert.That(submitResult.Success, Is.True, "Job must submit successfully"); + + // Produce test messages + var messageCount = 10; + await ProduceSimpleMessagesAsync(kafka!, InputTopic, messageCount, ct); + + // Consume and verify output + var consumedCount = await ConsumeAsync(kafka!, OutputTopic, messageCount, TimeSpan.FromSeconds(30), ct); + TestContext.WriteLine($"Consumed {consumedCount} messages"); + Assert.That(consumedCount, Is.GreaterThanOrEqualTo(messageCount), "All messages should be processed"); + } + finally + { + try { await app.DisposeAsync(); } catch { } } - finally { try { await app.DisposeAsync(); } catch { } } } #region Helpers private static async Task EnsureGatewayAsync(CancellationToken ct) { - // Flink Job Gateway health endpoint (ASP.NET) – allow 404/>=200 <500 as healthy + // Flink Job Gateway health endpoint (ASP.NET) await WaitForHttpOkAsync("http://localhost:8080/api/v1/health", TimeSpan.FromSeconds(60), ct); } @@ -122,23 +89,6 @@ private static async Task CreateTopicAsync(string bootstrapServers, string topic } } - private static async Task ProduceCsvMessagesAsync(string bootstrap, string topic, int count, CancellationToken ct) - { - using var producer = new ProducerBuilder(new ProducerConfig - { - BootstrapServers = bootstrap, - EnableIdempotence = true, - Acks = Acks.All, - LingerMs = 5 - }).Build(); - for (int i = 0; i < count; i++) - { - var value = $"segA{i},segB{i},segC{i}"; // 3 tokens for split - await producer.ProduceAsync(topic, new Message { Key = $"k-{i % 8}", Value = value }, ct); - } - producer.Flush(TimeSpan.FromSeconds(10)); - } - private static async Task ProduceSimpleMessagesAsync(string bootstrap, string topic, int count, CancellationToken ct) { using var producer = new ProducerBuilder(new ProducerConfig @@ -148,10 +98,12 @@ private static async Task ProduceSimpleMessagesAsync(string bootstrap, string to Acks = Acks.All, LingerMs = 5 }).Build(); + for (int i = 0; i < count; i++) { - await producer.ProduceAsync(topic, new Message { Key = $"k-{i % 16}", Value = $"msg-{i}" }, ct); + await producer.ProduceAsync(topic, new Message { Key = $"key-{i}", Value = $"value-{i}" }, ct); } + producer.Flush(TimeSpan.FromSeconds(10)); } @@ -160,19 +112,22 @@ private static Task ConsumeAsync(string bootstrap, string topic, int expec var config = new ConsumerConfig { BootstrapServers = bootstrap, - GroupId = $"lt-flink-unified-consumer-{Guid.NewGuid()}", + GroupId = $"lt-flink-basic-consumer-{Guid.NewGuid()}", AutoOffsetReset = AutoOffsetReset.Earliest, EnableAutoCommit = false }; + using var consumer = new ConsumerBuilder(config).Build(); consumer.Subscribe(topic); var sw = Stopwatch.StartNew(); long total = 0; + while (sw.Elapsed < timeout && total < expectedMin && !ct.IsCancellationRequested) { var cr = consumer.Consume(TimeSpan.FromMilliseconds(250)); if (cr != null) total++; } + consumer.Close(); return Task.FromResult(total); } @@ -181,6 +136,7 @@ private static async Task WaitForHttpOkAsync(string url, TimeSpan timeout, Cance { using var http = new HttpClient { Timeout = TimeSpan.FromSeconds(5) }; var sw = Stopwatch.StartNew(); + while (sw.Elapsed < timeout) { try @@ -189,8 +145,10 @@ private static async Task WaitForHttpOkAsync(string url, TimeSpan timeout, Cance if ((int)resp.StatusCode >= 200 && (int)resp.StatusCode < 500) return; // tolerate 404 placeholder } catch { } + await Task.Delay(500, ct); } + throw new TimeoutException($"HTTP probe timed out for {url}"); } @@ -198,6 +156,7 @@ private static async Task WaitForFlinkReadyAsync(string overviewUrl, TimeSpan ti { using var http = new HttpClient { Timeout = TimeSpan.FromSeconds(5) }; var sw = Stopwatch.StartNew(); + while (sw.Elapsed < timeout && !ct.IsCancellationRequested) { try @@ -210,14 +169,17 @@ private static async Task WaitForFlinkReadyAsync(string overviewUrl, TimeSpan ti } } catch { } + await Task.Delay(1000, ct); } + throw new TimeoutException("Flink JobManager REST API not ready: " + overviewUrl); } private static async Task WaitForKafkaReady(string bootstrapServers, TimeSpan timeout, CancellationToken ct) { var sw = Stopwatch.StartNew(); + while (sw.Elapsed < timeout && !ct.IsCancellationRequested) { try @@ -227,9 +189,11 @@ private static async Task WaitForKafkaReady(string bootstrapServers, TimeSpan ti if (md?.Brokers?.Count > 0) return; } catch { } + await Task.Delay(500, ct); } + throw new TimeoutException($"Kafka did not become ready within {timeout.TotalSeconds:F0}s at {bootstrapServers}"); } #endregion -} +} \ No newline at end of file