Masstransit error when try to connect rabbitmq cluster on k8s

2/29/2020

I have rabbitmq cluster on k8s. I have an application that uses MassTransit with rabbitmq. When I deploy my app to k8s on startup MassTransit writes error. I am sure rabbitmq is reachable because I have also added rabbitmq health check which returns healthy. I shared also service yaml and host information.

Also I can view identity exchange is created from rabbitmq management ui

Startup.cs

        var rabbitConfig = new RabbitMqConfig();
        rabbitConfig.Host = Configuration.GetValue<string>("RABBIT_HOST");
        rabbitConfig.UserName = Configuration.GetValue<string>("RABBIT_USER_NAME");
        rabbitConfig.Password = Configuration.GetValue<string>("RABBIT_PASSWORD");
        rabbitConfig.Port = Configuration.GetValue<int>("RABBIT_PORT");

        services.AddSingleton(rabbitConfig);

        var factory = new ConnectionFactory()
        {
            HostName = rabbitConfig.Host,
            Password = rabbitConfig.Password,
            UserName = rabbitConfig.UserName,
            VirtualHost = "/",
            Port = rabbitConfig.Port,
            AutomaticRecoveryEnabled = true
        };

        var connection = factory.CreateConnection();
        services.AddSingleton<IConnection>(connection);

        string connectionString =  Configuration.GetValue<string>("CONNECTION_STRING");

        services.AddDbContext<DataContext>(options =>
        {
            options.UseMySql(connectionString);
        }); 

        services.AddHealthChecks()
            .AddMySql(connectionString, "MySQL")
            .AddRabbitMQ(name: "Rabbit");

        services.AddMassTransit( x=> {   

            x.AddConsumers(typeof(Startup).Assembly);

            x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg => {
                Action<MassTransit.RabbitMqTransport.IRabbitMqHostConfigurator> configure = h =>
                {
                    h.Username(rabbitConfig.UserName);
                    h.Password(rabbitConfig.Password);
                };
                cfg.Host(new Uri(
quot;rabbitmq://{rabbitConfig.Host}"),"/", configure)
; cfg.ReceiveEndpoint("identity", e=> { e.UseMessageRetry(x => x.Interval(2, 100)); }); })); }); services.AddSingleton<IHostedService, BusService>();

Service.yaml

kind: Service
apiVersion: v1
metadata:
  namespace:  rabbitmq
  name: rabbitmq
  labels:
    app: rabbitmq
spec:
  selector:
    app: rabbitmq
  ports:
   - name: rabbitmq-mgmt-port
     protocol: TCP
     port: 15672
     targetPort: 15672
   - name: rabbitmq-amqp-port
     protocol: TCP
     port: 5672
     targetPort: 5672

host:

rabbitmq.rabbitmq.svc.cluster.local

When I deploy my application to k8s cluster MassTransit writes error like below.

MassTransit[0] Operation interrupted: RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=541, text='INTERNAL_ERROR', classId=0, methodId=0 at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout) at RabbitMQ.Client.Impl.ModelBase.QueueDeclare(String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary2 arguments) at RabbitMQ.Client.Impl.ModelBase.QueueDeclare(String queue, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary2 arguments) at MassTransit.RabbitMqTransport.Contexts.RabbitMqModelContext.<>c__DisplayClass19_0.b__0() at System.Threading.Tasks.Task1.InnerInvoke() at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state) --- End of stack trace from previous location where exception was thrown --- at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot, Thread threadPoolThread) --- End of stack trace from previous location where exception was thrown --- at MassTransit.RabbitMqTransport.Pipeline.ConfigureTopologyFilter1.ConfigureTopology(ModelContext context) at MassTransit.RabbitMqTransport.Pipeline.ConfigureTopologyFilter1.<>c__DisplayClass3_0.<<GreenPipes-IFilter<MassTransit-RabbitMqTransport-ModelContext>-Send>b__0>d.MoveNext() --- End of stack trace from previous location where exception was thrown --- at GreenPipes.PipeExtensions.OneTimeSetup[T](PipeContext context, Func2 setupMethod, PayloadFactory1 payloadFactory) at MassTransit.RabbitMqTransport.Pipeline.ConfigureTopologyFilter1.GreenPipes.IFilter.Send(ModelContext context, IPipe1 next) at MassTransit.RabbitMqTransport.Pipeline.ReceiveEndpointFilter.GreenPipes.IFilter<MassTransit.RabbitMqTransport.ConnectionContext>.Send(ConnectionContext context, IPipe1 next) at MassTransit.RabbitMqTransport.Pipeline.ReceiveEndpointFilter.GreenPipes.IFilter.Send(ConnectionContext context, IPipe1 next) at GreenPipes.Agents.PipeContextSupervisor1.GreenPipes.IPipeContextSource.Send(IPipe1 pipe, CancellationToken cancellationToken) at GreenPipes.Agents.PipeContextSupervisor1.GreenPipes.IPipeContextSource.Send(IPipe1 pipe, CancellationToken cancellationToken) at GreenPipes.Agents.PipeContextSupervisor1.GreenPipes.IPipeContextSource.Send(IPipe`1 pipe, CancellationToken cancellationToken) at MassTransit.RabbitMqTransport.Transport.RabbitMqReceiveTransport.b__12_0()

-- ozman
asp.net-core
kubernetes
masstransit
rabbitmq

0 Answers