FlinkDotNet is a comprehensive .NET framework that enables developers to build and submit streaming jobs to Apache Flink 2.1.0 clusters using a fluent C# API. It provides extensive compatibility with Apache Flink 2.1.0 features including dynamic scaling, adaptive scheduling, reactive mode, and enterprise-scale multi-cluster orchestration.
In today's data-driven enterprise landscape, choosing the right messaging and stream processing architecture is critical for scalability, reliability, and maintainability. This section provides a comprehensive analysis of why Kafka + FlinkDotNet + Temporal represents the optimal choice for modern real-time data processing at enterprise scale.
Criteria | Apache Kafka | Amazon Kinesis | Azure Service Bus | Amazon SQS | Azure Event Hubs |
---|---|---|---|---|---|
Best Use Case | High-throughput streaming, event sourcing | AWS-native streaming | Enterprise messaging | Simple queuing | Big data ingestion |
Throughput | Millions/sec | Thousands/sec | Thousands/sec | ~3000/sec per queue | Millions/sec |
Message Retention | Configurable (days to years) | 24 hours - 7 days | 14 days max | 14 days max | 1-7 days |
Message Ordering | Per-partition | Per-shard | Session-based | FIFO queues only | Per-partition |
Multi-Region | Self-managed | Native | Native | Native | Native |
Cost Model | Infrastructure + ops | Per shard/hour | Per message + storage | Per message | Per throughput unit |
Ecosystem | Rich (Kafka Connect, etc.) | AWS-specific | Azure-specific | Limited | Azure-specific |
Schema Evolution | Yes (Schema Registry) | Limited | No | No | Limited |
Complexity | High | Medium | Low | Very Low | Medium |
Note: When evaluating stream processing solutions, it's important to distinguish between Kafka (the message broker) and complete streaming solutions (Kafka + Kafka Streams or Apache Flink-based solutions).
- Choose Kafka alone when you need: Message queuing, event storage, basic pub/sub, data pipeline transport
- Choose Kafka + Kafka Streams when you need: High throughput stream processing, Java/Scala ecosystem, tight Kafka integration, stream topologies, local state management
- Choose Kinesis when you have: AWS-only environment, moderate throughput needs, integrated AWS services requirement
- Choose Service Bus when you need: Enterprise messaging patterns, complex routing, Azure-native integration
- Choose SQS when you need: Simple queuing, AWS integration, low operational overhead
- Choose Event Hubs when you need: Azure-native big data ingestion, moderate complexity
When choosing between streaming architectures, it's important to compare complete solutions. This section compares Kafka + Kafka Streams (the complete Kafka ecosystem) with FlinkDotNet + Temporal for enterprise-scale stream processing:
Capability | Kafka + Kafka Streams | FlinkDotNet + Temporal |
---|---|---|
Stream Processing | Kafka Streams provides rich processing (windowing, joins, aggregations) | FlinkDotNet provides equivalent stream processing with Apache Flink 2.1.0 features |
Fault Tolerance | At-least-once processing, exactly-once within Kafka topics | Exactly-once guarantees with Apache Flink checkpointing + Temporal workflows |
State Management | Local state stores with changelog topics for fault tolerance | FlinkDotNet savepoints + Temporal durable state persistence |
Scaling | Horizontal scaling via Kafka partitions, manual rebalancing | FlinkDotNet adaptive scheduler + automatic scaling with Temporal orchestration |
Complex Workflows | Limited to stream processing topologies | Temporal workflows handle long-running, multi-step business processes |
Error Handling | Stream-level error handling and dead letter queues | Temporal's advanced retry policies, compensation patterns, and workflow recovery |
Cross-System Coordination | Limited to Kafka ecosystem, requires external orchestration | Temporal natively coordinates across Kafka + databases + APIs + external systems |
Language Ecosystem | Java/Scala native, limited .NET support | Full .NET integration with C# APIs and .NET ecosystem |
Choose Kafka + Kafka Streams when:
- Your team has strong Java/Scala expertise
- You need tight integration with the Kafka ecosystem
- Your processing requirements fit well within stream processing topologies
- You want to minimize infrastructure complexity (single technology stack)
- Your use cases are primarily stream transformations and aggregations
Choose FlinkDotNet + Temporal when:
- Your team uses .NET and C# as primary languages
- You need complex, long-running business process orchestration
- You require advanced fault tolerance and workflow recovery capabilities
- You need to coordinate across multiple external systems and APIs
- You want Apache Flink 2.1.0 features like adaptive scheduling and reactive mode
Kafka + Kafka Streams Architecture:
Kafka (Message Broker) + Kafka Streams (Stream Processing)
β β
Message Topics β Stream Topologies
Partitioned Data β Stateful Processing
At-least-once β Local State Stores
FlinkDotNet + Temporal Architecture:
Kafka (Data Highway) + FlinkDotNet (Processing Engine) + Temporal (Orchestration Brain)
β β β
Stream Transport β Real-time Processing β Durable Coordination
Partitioned Topics β Windowing/Aggregations β Multi-step Workflows
At-least-once β Exactly-once Processing β Workflow Guarantees
- Traditional ESB: Monolithic, vendor lock-in, limited scalability, expensive licensing
- Our Stack: Microservices-friendly, open-source, elastic scaling, cloud-native
- Serverless: Vendor lock-in, cold starts, limited processing time, complex local development
- Our Stack: Multi-cloud, consistent performance, unlimited processing time, local development with Aspire
- Big Data: Batch-oriented, complex cluster management, high latency, Java-centric
- Our Stack: Stream-first with batch capability, managed scaling, low latency, .NET ecosystem
- Pulsar + Flink + Airflow: Java-centric ecosystem, complex multi-system integration, separate orchestration layer
- Our Stack: .NET-native APIs with unified Flink integration, simplified operations via Temporal workflows
The Kafka + FlinkDotNet + Temporal architecture excels in scenarios requiring reusable patterns across diverse business cases within the same enterprise infrastructure:
Scenario: Real-time trade processing, risk calculation, and regulatory reporting
// Reusable pattern: Event-driven processing with orchestration
var tradingWorkflow = Temporal.WorkflowBuilder
.OnKafkaEvent("trades")
.FlinkProcess(env => env
.FromKafka("raw-trades")
.Map(trade => trade.EnrichWithMarketData())
.Rebalance() // Dynamic scaling
.Filter(trade => trade.PassesRiskChecks())
.ToKafka("validated-trades"))
.OrchestrateLongRunning(async () => {
await settleTradeAsync();
await updatePortfolioAsync();
await generateRegulatoryReportAsync();
});
Business Cases Served by Same Architecture:
- Real-time Trading: Low-latency order processing
- Risk Management: Continuous position monitoring
- Regulatory Reporting: End-of-day compliance workflows
- Customer Notifications: Trade confirmations and alerts
- Data Analytics: Real-time dashboards and ML model feeding
Scenario: Orders from web, mobile, in-store processed through unified pipeline
// Reusable pattern: Multi-channel aggregation with coordination
var orderWorkflow = Temporal.WorkflowBuilder
.OnMultipleKafkaEvents("web-orders", "mobile-orders", "pos-orders")
.FlinkProcess(env => env
.UnionStreams("web-orders", "mobile-orders", "pos-orders")
.Map(order => order.Normalize())
.KeyBy(order => order.CustomerId)
.Window(TimeWindow.Of(Time.Minutes(5))) // Order bundling
.Aggregate(orders => orders.Combine())
.ToKafka("unified-orders"))
.OrchestrateLongRunning(async (order) => {
await inventoryCheckAsync(order);
await paymentProcessingAsync(order);
await fulfillmentCoordinationAsync(order);
await customerNotificationAsync(order);
});
Business Cases Served by Same Architecture:
- Order Processing: Multi-channel order aggregation
- Inventory Management: Real-time stock updates
- Payment Processing: Fraud detection and authorization
- Fulfillment: Warehouse and shipping coordination
- Customer Experience: Real-time order tracking
- Analytics: Customer behavior analysis and recommendations
Scenario: Production line monitoring, predictive maintenance, quality control
// Reusable pattern: IoT data processing with ML integration
var manufacturingWorkflow = Temporal.WorkflowBuilder
.OnKafkaEvent("sensor-data")
.FlinkProcess(env => env
.FromKafka("iot-sensors")
.KeyBy(reading => reading.MachineId)
.Window(SlidingTimeWindow.Of(Time.Minutes(10), Time.Minutes(1)))
.Aggregate(readings => readings.CalculateMetrics())
.Filter(metrics => metrics.AnomalyScore > 0.8)
.ToKafka("anomaly-alerts"))
.OrchestrateLongRunning(async (anomaly) => {
var prediction = await callMLModelAsync(anomaly);
await scheduleMaintenanceAsync(prediction);
await notifyTechniciansAsync(anomaly);
await adjustProductionParametersAsync(prediction);
});
Business Cases Served by Same Architecture:
- Predictive Maintenance: Equipment failure prediction
- Quality Control: Real-time defect detection
- Production Optimization: Throughput maximization
- Supply Chain: Just-in-time inventory
- Energy Management: Power consumption optimization
- Compliance: Environmental and safety monitoring
Scenario: Continuous patient monitoring, care team coordination, emergency response
// Reusable pattern: Critical event processing with care coordination
var healthcareWorkflow = Temporal.WorkflowBuilder
.OnKafkaEvent("patient-vitals")
.FlinkProcess(env => env
.FromKafka("vital-signs")
.KeyBy(vital => vital.PatientId)
.Map(vital => vital.AnalyzeWithAI()) // Real-time AI analysis
.Filter(analysis => analysis.RequiresIntervention)
.Rebalance() // Load balancing for critical alerts
.ToKafka("critical-alerts"))
.OrchestrateLongRunning(async (alert) => {
await notifyNursingStationAsync(alert);
await escalateToPhysicianAsync(alert);
await prepareEmergencyProtocolsAsync(alert);
await updatePatientRecordAsync(alert);
});
Business Cases Served by Same Architecture:
- Patient Monitoring: Continuous vital sign analysis
- Emergency Response: Critical event escalation
- Care Coordination: Multi-provider workflow
- Medical Records: Real-time documentation
- Resource Management: Staff and equipment allocation
- Compliance: HIPAA audit trails
Scenario: Live streaming, content moderation, audience engagement
// Reusable pattern: Media processing with real-time engagement
var mediaWorkflow = Temporal.WorkflowBuilder
.OnKafkaEvent("content-streams")
.FlinkProcess(env => env
.FromKafka("live-content")
.Map(content => content.ExtractMetadata())
.Filter(content => content.PassesModerationAsync()) // AI content moderation
.PartitionCustom((content, partitions) =>
content.ContentType.GetHashCode() % partitions)
.ToKafka("moderated-content"))
.OrchestrateLongRunning(async (content) => {
await generateThumbnailsAsync(content);
await createSubtitlesAsync(content); // AI-powered
await distributeToChannelsAsync(content);
await trackEngagementMetricsAsync(content);
});
Business Cases Served by Same Architecture:
- Content Processing: Real-time transcoding and optimization
- Content Moderation: AI-powered safety checks
- Audience Engagement: Real-time interactions and comments
- Analytics: Viewing patterns and recommendations
- Monetization: Dynamic ad insertion
- Distribution: Multi-platform content delivery
// Pattern: Real-time AI inference with fallback strategies
var aiWorkflow = Temporal.WorkflowBuilder
.OnKafkaEvent("inference-requests")
.FlinkProcess(env => env
.FromKafka("ai-requests")
.Map(request => request.PreprocessForModel())
.KeyBy(request => request.ModelType) // Route by AI model
.Rebalance() // Distribute load across AI workers
.ToKafka("preprocessed-requests"))
.OrchestrateLongRunning(async (request) => {
try {
var result = await callPrimaryAIModelAsync(request);
await cacheResultAsync(result);
return result;
} catch (ModelUnavailableException) {
return await callFallbackModelAsync(request);
}
});
AI/GenAI Use Cases:
- Document Processing: PDF/image extraction β LLM analysis β structured data
- Customer Support: Real-time chat β sentiment analysis β automated responses
- Content Generation: User input β GPT processing β personalized content
- Fraud Detection: Transaction streams β ML models β risk scoring
- Predictive Analytics: Historical data β AI models β future predictions
The architecture provides unified patterns for both business applications and DevOps workflows:
// Same patterns for CI/CD as business workflows
var buildWorkflow = Temporal.WorkflowBuilder
.OnKafkaEvent("git-commits")
.FlinkProcess(env => env
.FromKafka("code-changes")
.Filter(change => change.AffectsProduction())
.Map(change => change.DetermineTestStrategy())
.ToKafka("build-requests"))
.OrchestrateLongRunning(async (buildRequest) => {
await runTestSuiteAsync(buildRequest);
await buildArtifactsAsync(buildRequest);
await deployToStagingAsync(buildRequest);
await runIntegrationTestsAsync(buildRequest);
await deployToProductionAsync(buildRequest);
});
DevOps Benefits:
- Unified Architecture: Same infrastructure for business and DevOps
- Observability: Consistent monitoring across all workflows
- Scalability: Elastic CI/CD that scales with development team
- Reliability: Temporal's workflow guarantees for deployments
- Cost Efficiency: Shared infrastructure reduces operational overhead
Note: These are estimated costs based on typical enterprise deployments and may vary significantly based on specific requirements, scale, and implementation choices.
Solution | Initial Setup | Annual Operations | 3-Year TCO | Vendor Lock-in Risk |
---|---|---|---|---|
Our Stack | Medium | Low (open-source) | $2.5M+ | Low |
Full AWS | Low | High (per-message) | $4.2M+ | High |
Full Azure | Low | High (per-message) | $3.8M+ | High |
Traditional ESB | High | Very High (licensing) | $6.1M+ | Very High |
Note: These metrics represent potential improvements and will vary based on team experience, project complexity, and implementation quality.
- Time to Production: Potentially 60% faster with reusable patterns
- Developer Onboarding: .NET developers can be productive immediately
- Maintenance Overhead: Potential 70% reduction with unified architecture
- Bug Resolution: Potentially faster debugging with consistent patterns
FlinkDotNet implements extensive Apache Flink 2.1.0 feature support for .NET developers, including:
- Dynamic Scaling: Change job parallelism without stopping jobs
- Adaptive Scheduler: Intelligent resource management and automatic parallelism adjustment
- Reactive Mode: Automatic adaptation to available cluster resources
- Advanced Partitioning: Rebalance, rescale, forward, shuffle, broadcast, and custom partitioning
- Savepoint-based Scaling: Scale jobs using savepoints for state consistency
- Fine-grained Resource Management: Slot sharing groups and resource profiles
- Temporal Multi-cluster Orchestration: Enterprise-scale coordination across thousands of clusters
FlinkDotNet provides comprehensive support for Apache Flink 2.1.0's dynamic scaling capabilities:
var env = Flink.GetExecutionEnvironment();
var dataStream = env.FromCollection(new[] { 1, 2, 3, 4, 5 });
// Rebalance: Uniformly distribute data across all parallel operators
var rebalanced = dataStream
.Map(x => x * 2)
.Rebalance() // Apache Flink 2.1.0 rebalance operation
.Filter(x => x > 5);
// Rescale: Distribute to subset of operators (more efficient for different parallelisms)
var rescaled = dataStream
.Map(x => x * 3)
.Rescale() // Apache Flink 2.1.0 rescale operation
.Filter(x => x > 10);
// Forward: Direct forwarding (same parallelism required)
var forwarded = dataStream
.Forward() // Apache Flink 2.1.0 forward partitioning
.Map(x => x + 1);
// Shuffle: Random distribution
var shuffled = dataStream
.Shuffle() // Apache Flink 2.1.0 shuffle partitioning
.Map(x => x * 2);
// Broadcast: Send to all operators
var broadcasted = dataStream
.Broadcast() // Apache Flink 2.1.0 broadcast partitioning
.Map(x => x + 10);
// Custom partitioning
var customPartitioned = dataStream
.PartitionCustom(
(key, numPartitions) => key % numPartitions, // Custom partitioner
x => x.GetHashCode() // Key selector
);
await env.ExecuteAsync("Dynamic Partitioning Example");
var env = Flink.GetExecutionEnvironment();
// Configure parallelism and scaling parameters
env.SetParallelism(8) // Default parallelism
.SetMaxParallelism(128) // Maximum parallelism for scaling
.EnableAdaptiveScheduler() // Apache Flink 2.1.0 adaptive scheduler
.EnableReactiveMode(); // Apache Flink 2.1.0 reactive mode
var dataStream = env.FromCollection(data)
.SetParallelism(4) // Operator-specific parallelism
.SetMaxParallelism(64) // Operator-specific max parallelism
.SlotSharingGroup("data-processing") // Fine-grained resource management
.Map(x => processData(x))
.Rebalance() // Dynamic rebalancing
.SetParallelism(8); // Scale specific operation
await env.ExecuteAsync("Scalable Processing Job");
// Start job from savepoint for scaling
var env = Flink.GetExecutionEnvironment()
.FromSavepoint("/path/to/savepoint") // Restore from savepoint
.SetParallelism(16); // New parallelism
// Execute job asynchronously to get JobClient
var jobClient = await env.ExecuteAsyncJob("Scaled Job");
// Trigger savepoint for scaling
var savepointResult = await jobClient.TriggerSavepointAsync("/path/to/new/savepoint");
// Stop job with savepoint for clean scaling
var stopResult = await jobClient.StopWithSavepointAsync("/path/to/scaling/savepoint", drain: true);
// Cancel with savepoint (alternative approach)
var cancelResult = await jobClient.CancelWithSavepointAsync();
// Monitor job status during scaling
var status = await jobClient.GetJobStatusAsync();
Console.WriteLine($"Job {status.JobName}: {status.State}, Parallelism: {status.Parallelism}/{status.MaxParallelism}");
FlinkDotNet provides a comprehensive, multi-layered architecture supporting everything from single jobs to enterprise-scale orchestration:
// Modern DataStream API (Apache Flink 2.1.0 compatible)
var env = Flink.GetExecutionEnvironment();
env.SetParallelism(4)
.EnableAdaptiveScheduler()
.EnableReactiveMode();
var dataStream = env.FromCollection(new[] { 1, 2, 3, 4, 5 });
dataStream
.Map(x => x * 2)
.Rebalance() // Rebalance across all operators
.Filter(x => x > 5)
.Rescale() // Rescale to subset
.Print();
await env.ExecuteAsync("My Job");
// JobBuilder API (Alternative fluent approach)
var job = Flink.JobBuilder
.FromKafka("orders")
.Where("Amount > 100")
.GroupBy("Region")
.Aggregate("SUM", "Amount")
.ToKafka("high-value-orders");
await job.Submit("Processing Job");
// Enterprise-scale FlinkDotNet.Orchestration for thousands of clusters
var orchestra = new FlinkOrchestra(logger);
// Provision clusters with auto-scaling
await orchestra.ProvisionClusterAsync(new ClusterConfiguration
{
Name = "production-cluster",
TaskSlots = 8,
TaskManagers = 4
});
// Submit jobs with intelligent placement
var result = await orchestra.SubmitJobAsync(jobDefinition, SubmissionStrategy.BestFit);
// Start Temporal FlinkDotNet.Orchestration workflows
await orchestra.StartOrchestrationWorkflowAsync(new OrchestrationRequest
{
TargetClusters = 1000,
MinClusters = 10,
MaxClusters = 5000
});
var config = new ExecutionConfig()
.SetParallelism(8)
.SetMaxParallelism(128)
.EnableAdaptiveScheduler() // Apache Flink 2.1.0 intelligent scheduling
.EnableReactiveMode() // Apache Flink 2.1.0 elastic scaling
.SetRestartStrategy("exponential-delay") // Advanced fault tolerance
.EnableSlotSharing() // Resource optimization
.EnableObjectReuse() // Performance optimization
.SetAutoWatermarkInterval(200); // Event time processing
var env = Flink.GetExecutionEnvironment(config);
FlinkDotNet provides a complete enterprise-scale integration solution with multi-layered architecture:
- .NET SDK (FlinkDotNet.DataStream): Complete Apache Flink 2.1.0 streaming API
- JobBuilder SDK (Flink.JobBuilder): Fluent C# DSL for rapid development
- Intermediate Representation (IR): JSON-based job definitions
- Job Gateway: HTTP service that bridges .NET applications with Apache Flink clusters
- FlinkDotNet.Orchestration: Multi-cluster job orchestration with intelligent placement strategies
- FlinkDotNet.ClusterManager: Actor-based cluster lifecycle management
- FlinkDotNet.Temporal: Temporal.io workflow definitions for durable orchestration
- FlinkDotNet.Resilience: Circuit breakers, retry policies, and health checkers
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β FlinkDotNet.Orchestration β
β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β
β β Cluster A β β Cluster B β β Cluster N β ... β
β β (Actor-based) β β (Actor-based) β β (Actor-based) β β
β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β FlinkDotNet.Temporal Workflows β
β ββββββββββββββββββββββββ ββββββββββββββββββββββββ β
β β Auto-scaling β β Job Distribution β β
β β Workflows β β Workflows β β
β ββββββββββββββββββββββββ ββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Apache Flink 2.1.0 Compatible APIs β
β βββββββββββββββββββββββ βββββββββββββββββββββββ β
β β FlinkDotNet β β Flink.JobBuilder β β
β β .DataStream β β (Fluent DSL) β β
β β (Apache Flink 2.1.0 β β (Rapid β β
β β compatible API) β β Development) β β
β βββββββββββββββββββββββ βββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Apache Flink 2.1.0 Clusters β
β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β
β β JobManager + β β JobManager + β β JobManager + β ... β
β β TaskManagers β β TaskManagers β β TaskManagers β β
β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
The FlinkDotNet.Gateway acts as a bridge between .NET applications and Apache Flink 2.1.0 clusters, supporting advanced scaling features:
- Job Submission: .NET applications submit job definitions via HTTP to the gateway
- IR Translation: Gateway translates JSON IR to Flink JobGraph
- Cluster Communication: Gateway communicates with Flink JobManager via REST API
- Status Monitoring: Gateway provides job status and metrics back to .NET applications
- Orchestra Coordination: FlinkOrchestra manages job distribution across thousands of clusters
- Actor-based Management: Each cluster is managed by an independent ClusterActor
- Temporal Workflows: Long-running orchestration processes with exactly-once guarantees
- Intelligent Placement: Jobs routed to optimal clusters based on health, capacity, and locality
- Auto-scaling: Dynamic cluster provisioning and decommissioning based on demand
- Adaptive Scheduling: Apache Flink 2.1.0 adaptive scheduler integration
- Reactive Scaling: Automatic adaptation to available resources
βββββββββββββββββββ HTTP βββββββββββββββββββ Orchestration βββββββββββββββββββ
β .NET App βββββββββββββββΆβ FlinkDotNet βββββββββββββββββββββββΆβ FlinkDotNet β
β β β Gateway β β Orchestra β
β DataStream/ ββββββββββββββββ ββββββββββββββββββββββββ (Multi-cluster) β
β JobBuilder APIs β JSON IR βββββββββββββββββββ Job Distribution βββββββββββββββββββ
βββββββββββββββββββ β β
βΌ βΌ
βββββββββββββββββββ βββββββββββββββββββ
β Apache Flink β β ClusterManager β
β JobManager ββββββββββββββββββββββββββ Actors β
β (Single) β REST APIs + Scaling β (Thousands) β
βββββββββββββββββββ βββββββββββββββββββ
The gateway and orchestra handle:
- Authentication & Authorization: Secure access to Flink clusters
- Load Balancing: Distribute jobs across multiple Flink clusters
- Monitoring & Metrics: Real-time job status and performance metrics across all clusters
- Error Handling: Graceful error recovery and retry logic with circuit breakers
- Auto-scaling: Intelligent cluster provisioning and capacity management
- Health Aggregation: Cross-cluster health monitoring and issue detection
- Dynamic Scaling: Apache Flink 2.1.0 savepoint-based scaling workflows
- Adaptive Scheduling: Integration with Flink 2.1.0 adaptive scheduler
- Reactive Mode: Automatic parallelism adjustment based on cluster resources
FlinkDotNet/
βββ FlinkDotNet.Common/ # Core types and configuration
β βββ Configuration # Configuration, ExecutionConfig with Flink 2.1.0 features
β βββ TypeInfo # Types, TypeInformation
β βββ JobManagement # JobClient with scaling capabilities
βββ FlinkDotNet.DataStream/ # Apache Flink 2.1.0 compatible streaming API
β βββ StreamExecutionEnvironment # Main entry point with adaptive/reactive modes
β βββ DataStream # Core streaming API with partitioning strategies
β βββ Functions # User functions
β βββ Connectors # Sources and sinks
βββ FlinkDotNet.Orchestration/ # Multi-cluster orchestration
β βββ Services # FlinkOrchestra, ClusterActorBridge
β βββ Models # ClusterStatus, JobSubmissionResult
β βββ Interfaces # IFlinkOrchestra, IFlinkClusterActor
βββ FlinkDotNet.ClusterManager/ # Individual cluster management
β βββ Actors # FlinkClusterActor (actor-based lifecycle)
β βββ Models # ClusterConfiguration, ClusterMetrics
β βββ Interfaces # IFlinkClusterActor
βββ FlinkDotNet.Temporal/ # Temporal.io workflow definitions
β βββ Workflows # ClusterOrchestrationWorkflow
β βββ Activities # Cluster management activities
β βββ Models # Workflow request/response models
βββ FlinkDotNet.Resilience/ # Fault tolerance patterns
β βββ CircuitBreakers # Prevent cascade failures
β βββ RetryPolicies # Exponential backoff strategies
β βββ HealthCheckers # Cluster health validation
βββ Flink.JobBuilder/ # Fluent DSL for rapid development
β βββ FlinkJobBuilder # Main fluent DSL
β βββ Models # JobDefinition, IR models
β βββ Extensions # Extension methods
βββ FlinkDotNet.Table/ # Table API (future)
βββ FlinkDotNet.Testing/ # Testing utilities
βββ FlinkDotNet.Util/ # Utility classes
βββ FlinkDotNet/ # Main unified API entry point
var env = Flink.GetExecutionEnvironment();
// Configure Apache Flink 2.1.0 features
env.SetParallelism(4)
.SetMaxParallelism(128) // Enable dynamic scaling
.EnableAdaptiveScheduler() // Automatic parallelism adjustment
.EnableReactiveMode() // Adapt to cluster resources
.EnableCheckpointing(5000); // Checkpointing for fault tolerance
var numbers = env.FromCollection(Enumerable.Range(1, 1000));
var result = numbers
.Filter(x => x % 2 == 0) // Even numbers only
.Map(x => x * x) // Square them
.Rebalance() // Apache Flink 2.1.0 rebalancing
.SetParallelism(8) // Scale this operation
.Sum(); // Sum the results
await env.ExecuteAsync("Even Squares with Dynamic Scaling");
var env = Flink.GetExecutionEnvironment();
env.EnableAdaptiveScheduler()
.EnableReactiveMode();
var dataStream = env.FromCollection(generateData());
// Demonstrate all Apache Flink 2.1.0 partitioning strategies
var processed = dataStream
.Map(x => processData(x))
.SetParallelism(4)
.SlotSharingGroup("data-processing") // Fine-grained resource management
// Rebalance: Uniform distribution across all operators
.Rebalance()
.Map(x => enrichData(x))
.SetParallelism(8)
// Rescale: Efficient distribution for different parallelisms
.Rescale()
.Filter(x => x.IsValid)
.SetParallelism(4)
// Forward: Direct forwarding (same parallelism)
.Forward()
.Map(x => finalProcessing(x))
.SetParallelism(4)
// Custom partitioning based on business logic
.PartitionCustom(
(key, numPartitions) => key.GetHashCode() % numPartitions,
x => x.CustomerId
)
.SlotSharingGroup("customer-processing");
await env.ExecuteAsync("Advanced Partitioning Example");
var job = Flink.JobBuilder
.FromKafka("input-topic", config => {
config.BootstrapServers = "localhost:9092";
config.GroupId = "processing-group";
})
.Map("processed = transform(data)")
.Where("processed.isValid")
.ToKafka("output-topic");
// Configure Apache Flink 2.1.0 features for the job
await job.Configure(config => {
config.EnableAdaptiveScheduler()
.EnableReactiveMode()
.SetParallelism(8)
.SetMaxParallelism(128);
}).Submit("Kafka Processing with Auto-Scaling");
var job = Flink.JobBuilder
.FromKafka("events")
.GroupBy("userId")
.Window("TUMBLING", 5, "MINUTES")
.Aggregate("COUNT", "*")
.ToKafka("user-activity");
await job.Configure(config => {
config.EnableReactiveMode() // Adapt to cluster resources
.SetRestartStrategy("exponential-delay") // Advanced fault tolerance
.EnableSlotSharing(); // Resource optimization
}).Submit("User Activity with Reactive Scaling");
var orchestra = new FlinkOrchestra(logger);
// Provision a new cluster with Apache Flink 2.1.0 features
var cluster = await orchestra.ProvisionClusterAsync(new ClusterConfiguration
{
Name = "production-west",
TaskSlots = 16,
TaskManagers = 8,
Region = "us-west-2",
HighAvailability = true,
AdaptiveSchedulerEnabled = true, // Enable Apache Flink 2.1.0 adaptive scheduler
ReactiveModeEnabled = true // Enable reactive mode
});
// Get cluster health across all clusters
var health = await orchestra.GetClusterHealthAsync();
Console.WriteLine($"Overall Health Score: {health.OverallHealthScore:F1}%");
Console.WriteLine($"Total Clusters: {health.TotalClusters}");
Console.WriteLine($"Healthy: {health.HealthyClusters}, Critical: {health.CriticalClusters}");
// Define a job with Apache Flink 2.1.0 configuration
var jobDefinition = new FlinkJobDefinition
{
JobId = "analytics-pipeline",
JobName = "Real-time Analytics",
JobGraph = "...", // Generated from DataStream/JobBuilder
Parallelism = 8,
MaxParallelism = 128, // Enable dynamic scaling
AdaptiveSchedulerEnabled = true, // Intelligent resource management
ReactiveModeEnabled = true, // Automatic adaptation
Priority = JobPriority.High
};
// Submit with intelligent placement
var result = await orchestra.SubmitJobAsync(jobDefinition, SubmissionStrategy.BestFit);
if (result.Success)
{
Console.WriteLine($"Job {result.JobId} submitted to cluster {result.ClusterId}");
Console.WriteLine($"Flink Job ID: {result.FlinkJobId}");
// Monitor scaling behavior
var jobClient = result.JobClient;
var status = await jobClient.GetJobStatusAsync();
Console.WriteLine($"Current Parallelism: {status.Parallelism}/{status.MaxParallelism}");
}
// Execute job with scaling capabilities
var jobClient = await env.ExecuteAsyncJob("Scalable Analytics Job");
// Monitor and scale using savepoints
var status = await jobClient.GetJobStatusAsync();
Console.WriteLine($"Initial Parallelism: {status.Parallelism}");
// Create savepoint for scaling
var savepointResult = await jobClient.TriggerSavepointAsync("/savepoints/scaling-point");
if (savepointResult.Success)
{
Console.WriteLine($"Savepoint created at: {savepointResult.SavepointPath}");
// Stop job gracefully for scaling
var stopResult = await jobClient.StopWithSavepointAsync(savepointPath: savepointResult.SavepointPath, drain: true);
if (stopResult.Success)
{
// Restart with new parallelism
var scaledEnv = Flink.GetExecutionEnvironment()
.FromSavepoint(stopResult.SavepointPath) // Restore from savepoint
.SetParallelism(16) // New parallelism
.SetMaxParallelism(256) // New max parallelism
.EnableAdaptiveScheduler()
.EnableReactiveMode();
// Re-execute with scaled configuration
var scaledJobClient = await scaledEnv.ExecuteAsyncJob("Scaled Analytics Job");
var scaledStatus = await scaledJobClient.GetJobStatusAsync();
Console.WriteLine($"Scaled Parallelism: {scaledStatus.Parallelism}");
}
}
// Start long-running orchestration workflow with Apache Flink 2.1.0 features
var workflowId = await orchestra.StartOrchestrationWorkflowAsync(new OrchestrationRequest
{
RequestId = "scaling-request-1",
TargetClusters = 500,
MinClusters = 50,
MaxClusters = 2000,
ScalingPolicy = "demand-based",
AdaptiveSchedulerEnabled = true, // Enable intelligent scheduling across clusters
ReactiveModeEnabled = true // Enable reactive scaling
});
Console.WriteLine($"Started orchestration workflow: {workflowId}");
// Monitor and scale dynamically
var scalingResult = await orchestra.ScaleOrchestraAsync(targetCapacity: 750);
Console.WriteLine($"Scaled from {scalingResult.PreviousCapacity} to {scalingResult.NewCapacity} clusters");
FlinkDotNet includes built-in backpressure support with Apache Flink 2.1.0 enhancements to ensure system stability:
using Flink.JobBuilder.Backpressure;
// Configure rate limiter with adaptive behavior
var rateLimiter = new TokenBucketRateLimiter(
rateLimit: 1000.0, // 1000 operations per second
burstCapacity: 2000.0 // Handle bursts up to 2000
);
// Use in your application with automatic backpressure handling
if (rateLimiter.TryAcquire())
{
await ProcessMessage(message);
}
else
{
// Apache Flink 2.1.0 handles backpressure automatically
// This provides additional application-level control
await Task.Delay(100); // Wait and retry
}
// Configure backpressure in execution environment
var env = Flink.GetExecutionEnvironment();
env.GetConfig()
.SetProperty("taskmanager.network.memory.max-buffers-per-channel", "10")
.SetProperty("taskmanager.network.memory.buffers-per-channel", "2")
.EnableObjectReuse(); // Reduce GC pressure
FlinkDotNet includes comprehensive testing capabilities with Apache Flink 2.1.0 integration:
[Fact]
public async Task TestStreamProcessingWithScaling()
{
var env = Flink.GetExecutionEnvironment();
env.EnableAdaptiveScheduler()
.EnableReactiveMode()
.SetMaxParallelism(128);
var testData = new[] { 1, 2, 3, 4, 5 };
var result = env.FromCollection(testData)
.Map(x => x * 2)
.Rebalance() // Test Apache Flink 2.1.0 rebalancing
.SetParallelism(4) // Test dynamic parallelism
.CollectAsync();
var expected = new[] { 2, 4, 6, 8, 10 };
Assert.Equal(expected, await result);
}
[Fact]
public async Task TestSavepointBasedScaling()
{
var jobClient = await env.ExecuteAsyncJob("Test Scaling Job");
// Test savepoint creation
var savepointResult = await jobClient.TriggerSavepointAsync();
Assert.True(savepointResult.Success);
// Test graceful stopping with savepoint
var stopResult = await jobClient.StopWithSavepointAsync(drain: true);
Assert.True(stopResult.Success);
Assert.True(stopResult.Drained);
}
The project includes comprehensive stress tests that validate:
- High-throughput processing (1M+ messages)
- Backpressure handling with Apache Flink 2.1.0 improvements
- Fault tolerance and recovery with adaptive scheduling
- Dynamic scaling scenarios and savepoint-based workflows
- Reactive mode adaptation to resource changes
FlinkDotNet integrates with .NET Aspire for local development with Apache Flink 2.1.0 features.
Platform-Specific Setup: Before using Aspire locally, ensure the workload is installed:
- Windows/macOS: Aspire is typically included with .NET SDK (.NET 8+)
- Linux: Manual installation required:
dotnet workload install aspire
// LocalTesting/Program.cs
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka");
// Apache Flink 2.1.0 cluster with advanced features
var flink = builder.AddContainer("flink", "flink:2.0-latest")
.WithEnvironment("FLINK_PROPERTIES",
"scheduler-mode: adaptive\n" +
"scheduler.adaptive.scaling-enabled: true\n" +
"scheduler.adaptive.resource.wait-timeout: 60s\n" +
"execution.checkpointing.interval: 5s\n" +
"parallelism.default: 4\n" +
"parallelism.default.sink: 8\n" +
"taskmanager.numberOfTaskSlots: 8");
var gateway = builder.AddProject<Projects.FlinkDotNet_Gateway>("gateway")
.WithReference(flink);
var testApp = builder.AddProject<Projects.TestApp>("testapp")
.WithReference(gateway)
.WithReference(kafka);
builder.Build().Run();
FlinkDotNet implements comprehensive build and test validation to ensure code quality and prevent build failures.
# Verify .NET 9.0 requirement
dotnet --version # Must show 9.0.x
# Run complete validation
./scripts/validate-build-and-tests.ps1
# Quick build check (skip tests)
./scripts/validate-build-and-tests.ps1 -SkipTests
- β ALL builds MUST pass before commits/merges
- β .NET 9.0.x required for all development
- β Three solutions validated: FlinkDotNet, Sample, LocalTesting
- β Automated blocking of build failures via GitHub Actions
# Always run before committing
./scripts/pre-commit-validation.ps1
- π Complete Guide - Detailed enforcement rules and troubleshooting
- π Quick Start - 2-minute developer setup guide
Important: Build failures are automatically blocked. Fix build errors before proceeding with any development work.
-
Clone and Build FlinkDotNet Repository
# TODO: NuGet packages are not yet published. Use repository for now. # Future: A single FlinkDotNet NuGet package will be available that includes everything. git clone https://github.com/devstress/FlinkDotnet.git cd FlinkDotnet dotnet build FlinkDotNet/FlinkDotNet.sln --configuration Release
-
Set up Apache Flink 2.1.0 cluster
- Download and install Apache Flink 2.1.0
- Start JobManager and TaskManager with adaptive scheduler enabled
- Configure reactive mode if desired
-
Deploy FlinkDotNet.Gateway
- Configure connection to your Flink cluster
- Deploy as web service or container
- Enable Apache Flink 2.1.0 feature support
-
Build and submit your first job with scaling capabilities
// Apache Flink 2.1.0 compatible approach (DataStream API) var env = Flink.GetExecutionEnvironment(); env.EnableAdaptiveScheduler() .EnableReactiveMode() .SetMaxParallelism(128); var stream = env.FromCollection(new[] { 1, 2, 3 }) .Rebalance() .SetParallelism(4); await env.ExecuteAsync("My First Scaling Job"); // Alternative approach (JobBuilder for rapid development) var job = Flink.JobBuilder .FromKafka("source") .Map("value = process(data)") .ToKafka("destination"); await job.Configure(config => config.EnableAdaptiveScheduler()) .Submit("My First JobBuilder Job");
-
Use FlinkDotNet Repository (All Components Included)
# TODO: NuGet packages are not yet published. Use repository for now. # Future: A single FlinkDotNet NuGet package will include all orchestration components. # The repository already contains all enterprise components: # - FlinkDotNet.Orchestration, ClusterManager, Temporal, Resilience cd FlinkDotnet dotnet build --configuration Release
-
Set up Temporal Server
# Using Docker docker run -p 7233:7233 -p 8233:8233 temporalio/auto-setup:latest
-
Initialize Orchestra service with Apache Flink 2.1.0 features
var services = new ServiceCollection(); services.AddLogging(); services.AddSingleton<IFlinkOrchestra, FlinkOrchestra>(); var provider = services.BuildServiceProvider(); var orchestra = provider.GetRequiredService<IFlinkOrchestra>();
-
Start with cluster provisioning and scaling
// Provision your first cluster with Apache Flink 2.1.0 features var cluster = await orchestra.ProvisionClusterAsync(new ClusterConfiguration { Name = "starter-cluster", TaskSlots = 4, TaskManagers = 2, AdaptiveSchedulerEnabled = true, // Enable intelligent scheduling ReactiveModeEnabled = true // Enable automatic adaptation }); // Check overall health and scaling capabilities var health = await orchestra.GetClusterHealthAsync(); Console.WriteLine($"Health Score: {health.OverallHealthScore:F1}%"); Console.WriteLine($"Adaptive Scheduler: {cluster.AdaptiveSchedulerEnabled}"); Console.WriteLine($"Reactive Mode: {cluster.ReactiveModeEnabled}");
- Dynamic Scaling and Rebalancing Guide
- Adaptive Scheduler Configuration
- Reactive Mode Implementation
- Savepoint-based Scaling Workflows
- Fine-grained Resource Management
FlinkDotNet includes comprehensive observability testing that validates message-per-second metrics across all system layers with 1 million message processing.
π View Observability Test Runs - Monitor real-time observability metrics test execution with 1 million messages
The observability tests process 1 million messages to validate:
- Kafka Producer Metrics: Messages-per-second rates across topics and partitions
- Flink Processing Metrics: Real-time stream processing throughput and latency
- Temporal Workflow Metrics: Workflow execution rates and completion times
- End-to-End Flow Metrics: Complete pipeline throughput from Kafka β Flink β Temporal
During test execution, the GitHub Actions workflow displays real-time metrics:
π Kafka Producer Metrics:
π€ producer-1: 15,247.5 msg/sec
π Flink Processing Metrics:
π₯ Input Rate - job-1: 15,200 msg/sec
π€ Output Rate - job-1: 15,195 msg/sec
π End-to-End Flow Metrics:
π Kafka β Flink: 15,247.5 msg/sec
π End-to-End Total: 15,180.2 msg/sec
The 1 million message test validates:
- β High-Volume Processing: System handles enterprise-scale message volumes
- β Metrics Collection: All observability metrics are captured under load
- β Performance Monitoring: Real-time throughput tracking across all layers
- β System Stability: Infrastructure remains stable during high-throughput processing
Run locally:
dotnet test IntegrationTests/IntegrationTests.sln \
--filter "Category=observability" \
--configuration Release
FlinkDotNet provides complete Apache Flink 2.1.0 compatibility including:
- Adaptive Scheduler: Automatic parallelism adjustment based on workload characteristics
- Reactive Mode: Elastic scaling that adapts to available cluster resources
- Dynamic Scaling: Change job parallelism without stopping jobs using savepoints
- Advanced Partitioning: All Apache Flink 2.1.0 partitioning strategies (rebalance, rescale, forward, shuffle, broadcast, custom)
- Fine-grained Resource Management: Slot sharing groups and resource profiles
- Enhanced Fault Tolerance: Advanced restart strategies and checkpointing
// Enable all Apache Flink 2.1.0 features
var env = Flink.GetExecutionEnvironment()
.EnableAdaptiveScheduler() // Intelligent resource management
.EnableReactiveMode() // Elastic scaling
.SetMaxParallelism(256) // Dynamic scaling support
.EnableCheckpointing(5000); // Enhanced fault tolerance
var scalableStream = env.FromCollection(data)
.Rebalance() // Apache Flink 2.1.0 rebalancing
.SetParallelism(8) // Dynamic parallelism
.SlotSharingGroup("processing"); // Fine-grained resources
FlinkDotNet supports multiple scaling approaches:
-
Reactive Mode Scaling (Automatic)
env.EnableReactiveMode(); // Automatically adapts to cluster resources
-
Adaptive Scheduler (Intelligent)
env.EnableAdaptiveScheduler(); // AI-driven parallelism adjustment
-
Savepoint-based Scaling (Manual)
var jobClient = await env.ExecuteAsyncJob("My Job"); var savepoint = await jobClient.TriggerSavepointAsync(); // Restart with new parallelism from savepoint
-
Runtime Partitioning (Dynamic)
dataStream.Rebalance() // Redistribute uniformly .Rescale() // Efficient subset distribution .Forward() // Direct forwarding .Shuffle(); // Random distribution
Choose based on your use case:
- DataStream API: Use for Apache Flink 2.1.0 compatibility, complex stream processing, and when you need full control over scaling and partitioning
- JobBuilder API: Use for rapid development, simple pipelines, and when you prefer fluent syntax
- Orchestra API: Use for enterprise-scale multi-cluster deployments with thousands of jobs
Example decision matrix:
// Complex processing with scaling requirements
var env = Flink.GetExecutionEnvironment()
.EnableAdaptiveScheduler()
.EnableReactiveMode();
var stream = env.FromCollection(data)
.Rebalance()
.SetParallelism(8);
// Simple pipeline with fluent syntax
var job = Flink.JobBuilder
.FromKafka("input")
.Map("process(data)")
.ToKafka("output");
// Enterprise multi-cluster orchestration
var orchestra = new FlinkOrchestra(logger);
await orchestra.SubmitJobAsync(jobDef, SubmissionStrategy.BestFit);
FlinkDotNet maintains full compatibility while adding Apache Flink 2.1.0 features:
- Keep existing code: All existing DataStream and JobBuilder code continues to work
- Add Apache Flink 2.1.0 features gradually: Enable adaptive scheduler, reactive mode, and advanced partitioning as needed
- Scale incrementally: Start with single cluster, add orchestration layer when needed
- Optimize performance: Use new partitioning strategies and resource management features
Migration example:
// Existing code (still works)
var env = Flink.GetExecutionEnvironment();
var stream = env.FromCollection(data).Map(x => x * 2);
// Enhanced with Apache Flink 2.1.0 features
var enhancedEnv = Flink.GetExecutionEnvironment()
.EnableAdaptiveScheduler() // Add intelligent scheduling
.EnableReactiveMode() // Add elastic scaling
.SetMaxParallelism(128); // Enable dynamic scaling
var enhancedStream = enhancedEnv.FromCollection(data)
.Map(x => x * 2)
.Rebalance() // Add efficient rebalancing
.SetParallelism(8); // Set optimal parallelism
The architecture is designed for incremental adoption - you can start with basic features and scale to enterprise levels with Apache Flink 2.1.0 capabilities as your requirements grow.
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests
- Submit a pull request
See CONTRIBUTING.md for detailed guidelines.
MIT License - see LICENSE file for details.