Skip to content

Stream single active consumer coordinator crashes with plain SAC consumer and a stream consumer at the same time #13835

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Gsantomaggio opened this issue Apr 30, 2025 · 0 comments · Fixed by #13857
Assignees
Labels
Milestone

Comments

@Gsantomaggio
Copy link
Member

Gsantomaggio commented Apr 30, 2025

Describe the bug

Stream does not support both a plain SAC consumer and a super stream consumer simultaneously.
The rabbit_stream_sac_coordinator crashes with this stack:

�[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0> handle_leader err {'EXIT',�[0m
�[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0>                    {function_clause,�[0m
�[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0>                     [{rabbit_stream_sac_coordinator,evaluate_active_consumer,�[0m
�[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0>                       [{group,�[0m
�[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0>                         [{consumer,<0.6910.0>,0,�[0m
�[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0>                           <<"192.168.65.1:31201 -> 172.17.0.2:5552">>,true},�[0m
�[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0>                          {consumer,<0.6910.0>,1,�[0m
�[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0>                           <<"192.168.65.1:31201 -> 172.17.0.2:5552">>,false}],�[0m
�[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0>                         -1}],�[0m
�[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0>                       [{file,"rabbit_stream_sac_coordinator.erl"},{line,721}]},�[0m
�[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0>                      {rabbit_stream_sac_coordinator,do_register_consumer,8,�[0m
�[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0>                       [{file,"rabbit_stream_sac_coordinator.erl"},{line,545}]},�[0m
�[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0>                      {rabbit_stream_coordinator,apply,3,�[0m
�[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0>                       [{file,"rabbit_stream_coordinator.erl"},{line,565}]},�[0m
�[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0>                      {ra_server,apply_with,2,�[0m
�[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0>                       [{file,"src/ra_server.erl"},{line,2609}]},�[0m
�[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0>                      {ra_log,fold,5,[{file,"src/ra_log.erl"},{line,371}]},�[0m
�[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0>                      {ra_server,apply_to,5,�[0m
�[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0>                       [{file,"src/ra_server.erl"},{line,2549}]},�[0m
�[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0>                      {ra_server,handle_leader,2,�[0m
�[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0>                       [{file,"src/ra_server.erl"},{line,606}]},�[0m
�[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0>                      {ra_server_proc,handle_leader,2,�[0m
�[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0>                       [{file,"src/ra_server_proc.erl"},{line,1127}]}]}}�[0m
�[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0> ** State machine rabbit_stream_coordinator terminating�[0m
�[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0> ** Last event = {info,{ra_log_event,{written,{341,341,1}}}}�[0m
�[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0> ** When server state  = [{id,{rabbit_stream_coordinator,rabbit@02688c1643de}},�[0m
�[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0>                          {opt,terminate},�[0m
�[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0>                          {raft_state,leader},�[0m
�[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0>                          {leader_last_seen,undefined},�[0m
�[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0>                          {num_pending_commands,0},�[0m
�[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0>                          {num_low_priority_commands,0},�[0m
�[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0>                          {num_pending_applied_notifications,0},�[0m
�[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0>                          {election_timeout_set,false},�[0m
�[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0>                          {ra_server_state,�[0m
�[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0>                           #{id =>�[0m
�[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0>                              {rabbit_stream_coordinator,rabbit@02688c1643de},�[0m
�[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0>                             machine =>�[0m
�[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0>                              #{num_monitors => 3,�[0m
�[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0>                                single_active_consumer =>�[0m
�[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0>                                 #{groups =>�[0m
�[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0>                                    #{{<<"/">>,<<"super-stream-issue-easy-0">>,�[0m
�[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0>                                       <<"my-group">>} =>�[0m
�[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0>                                       #{num_consumers => 1,�[0m
�[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0>                                         partition_index => -1}},�[0m
�[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0>                                   num_groups => 1},�[0m
�[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0>                                num_streams => 2,�[0m
�[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0>                                streams =>�[0m
�[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0>                                 #{"__super-stream-issue-0_1746003194949165113" =>�[0m

Reproduction steps

Per conversation with @kjnilsson

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.Reliable;

namespace Start;

public class SuperStreamIssueEasy
{
    public static async Task Start()
    {
        Console.WriteLine("Super Stream Issue");
        const string streamSystemName = "super-stream-issue-easy";
        var streamSystem = await StreamSystem.Create(new StreamSystemConfig()
        {
            ConnectionPoolConfig = new ConnectionPoolConfig()
            {
                ConsumersPerConnection = 200
            }
        });
        var factory = new ConnectionFactory();
        var connection = factory.CreateConnection();
        var channel = connection.CreateModel();
        channel.ExchangeDeclare(streamSystemName, "direct", true, false, null);
        var channelQ = connection.CreateModel();
        channelQ.QueueDeclare($"{streamSystemName}-0", true, false, false,
            new Dictionary<string, object>()
            {
                { "x-queue-type", "stream" },
            });
        channelQ.QueueBind($"{streamSystemName}-0", streamSystemName, "0");

        var config = new ConsumerConfig(streamSystem, $"{streamSystemName}-0")
        {
            OffsetSpec = new OffsetTypeFirst(),
            IsSuperStream = false,
            IsSingleActiveConsumer = true,
            Reference = "my-group",
            ConsumerUpdateListener =
                (s, s1, arg3) => { return Task.FromResult<IOffsetType>(new OffsetTypeNext()); },
            MessageHandler = async (stream, consumerSource, context, message) =>
            {
                Console.WriteLine(
                    $"body: {Encoding.UTF8.GetString(message.Data.Contents)}");
                await Task.CompletedTask;
            }
        };

        var consumer = await Consumer.Create(config);


        var configSuper = new ConsumerConfig(streamSystem, streamSystemName)
        {
            OffsetSpec = new OffsetTypeFirst(),
            IsSuperStream = true,
            IsSingleActiveConsumer = true,
            Reference = "my-group",
            ConsumerUpdateListener =
                (s, s1, arg3) => { return Task.FromResult<IOffsetType>(new OffsetTypeNext()); },
            MessageHandler = async (stream, consumerSource, context, message) =>
            {
                Console.WriteLine(
                    $"body: {Encoding.UTF8.GetString(message.Data.Contents)}");
                await Task.CompletedTask;
            }
        };

        var consumerSuper = await Consumer.Create(configSuper);
    }
}

Expected behavior

Stream should support both a plain SAC consumer and a stream consumer simultaneously.

Additional context

Same issue with RabbitMQ 3.13.x and 4.1.x

@kjnilsson kjnilsson added this to the 4.1.1 milestone Apr 30, 2025
@acogoluegnes acogoluegnes self-assigned this May 6, 2025
acogoluegnes added a commit that referenced this issue May 6, 2025
Consumers with a same name, consuming from the same stream should have
the same partition index. This commit adds a check to enforce this rule
and make the subscription fail if it does not comply.

Fixes #13835
mergify bot pushed a commit that referenced this issue May 6, 2025
Consumers with a same name, consuming from the same stream should have
the same partition index. This commit adds a check to enforce this rule
and make the subscription fail if it does not comply.

Fixes #13835

(cherry picked from commit cad8b70)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants