I have rabbitmq cluster on k8s. I have an application that uses MassTransit with rabbitmq. When I deploy my app to k8s on startup MassTransit writes error. I am sure rabbitmq is reachable because I have also added rabbitmq health check which returns healthy. I shared also service yaml and host information.
Also I can view identity exchange is created from rabbitmq management ui
Startup.cs
var rabbitConfig = new RabbitMqConfig();
rabbitConfig.Host = Configuration.GetValue<string>("RABBIT_HOST");
rabbitConfig.UserName = Configuration.GetValue<string>("RABBIT_USER_NAME");
rabbitConfig.Password = Configuration.GetValue<string>("RABBIT_PASSWORD");
rabbitConfig.Port = Configuration.GetValue<int>("RABBIT_PORT");
services.AddSingleton(rabbitConfig);
var factory = new ConnectionFactory()
{
HostName = rabbitConfig.Host,
Password = rabbitConfig.Password,
UserName = rabbitConfig.UserName,
VirtualHost = "/",
Port = rabbitConfig.Port,
AutomaticRecoveryEnabled = true
};
var connection = factory.CreateConnection();
services.AddSingleton<IConnection>(connection);
string connectionString = Configuration.GetValue<string>("CONNECTION_STRING");
services.AddDbContext<DataContext>(options =>
{
options.UseMySql(connectionString);
});
services.AddHealthChecks()
.AddMySql(connectionString, "MySQL")
.AddRabbitMQ(name: "Rabbit");
services.AddMassTransit( x=> {
x.AddConsumers(typeof(Startup).Assembly);
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg => {
Action<MassTransit.RabbitMqTransport.IRabbitMqHostConfigurator> configure = h =>
{
h.Username(rabbitConfig.UserName);
h.Password(rabbitConfig.Password);
};
cfg.Host(new Uri(quot;rabbitmq://{rabbitConfig.Host}"),"/", configure);
cfg.ReceiveEndpoint("identity", e=> {
e.UseMessageRetry(x => x.Interval(2, 100));
});
}));
});
services.AddSingleton<IHostedService, BusService>();
Service.yaml
kind: Service
apiVersion: v1
metadata:
namespace: rabbitmq
name: rabbitmq
labels:
app: rabbitmq
spec:
selector:
app: rabbitmq
ports:
- name: rabbitmq-mgmt-port
protocol: TCP
port: 15672
targetPort: 15672
- name: rabbitmq-amqp-port
protocol: TCP
port: 5672
targetPort: 5672
host:
rabbitmq.rabbitmq.svc.cluster.local
When I deploy my application to k8s cluster MassTransit writes error like below.
MassTransit[0] Operation interrupted: RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=541, text='INTERNAL_ERROR', classId=0, methodId=0 at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout) at RabbitMQ.Client.Impl.ModelBase.QueueDeclare(String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary
2 arguments) at RabbitMQ.Client.Impl.ModelBase.QueueDeclare(String queue, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary
2 arguments) at MassTransit.RabbitMqTransport.Contexts.RabbitMqModelContext.<>c__DisplayClass19_0.b__0() at System.Threading.Tasks.Task1.InnerInvoke() at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state) --- End of stack trace from previous location where exception was thrown --- at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot, Thread threadPoolThread) --- End of stack trace from previous location where exception was thrown --- at MassTransit.RabbitMqTransport.Pipeline.ConfigureTopologyFilter
1.ConfigureTopology(ModelContext context) at MassTransit.RabbitMqTransport.Pipeline.ConfigureTopologyFilter1.<>c__DisplayClass3_0.<<GreenPipes-IFilter<MassTransit-RabbitMqTransport-ModelContext>-Send>b__0>d.MoveNext() --- End of stack trace from previous location where exception was thrown --- at GreenPipes.PipeExtensions.OneTimeSetup[T](PipeContext context, Func
2 setupMethod, PayloadFactory1 payloadFactory) at MassTransit.RabbitMqTransport.Pipeline.ConfigureTopologyFilter
1.GreenPipes.IFilter.Send(ModelContext context, IPipe1 next) at MassTransit.RabbitMqTransport.Pipeline.ReceiveEndpointFilter.GreenPipes.IFilter<MassTransit.RabbitMqTransport.ConnectionContext>.Send(ConnectionContext context, IPipe
1 next) at MassTransit.RabbitMqTransport.Pipeline.ReceiveEndpointFilter.GreenPipes.IFilter.Send(ConnectionContext context, IPipe1 next) at GreenPipes.Agents.PipeContextSupervisor
1.GreenPipes.IPipeContextSource.Send(IPipe1 pipe, CancellationToken cancellationToken) at GreenPipes.Agents.PipeContextSupervisor
1.GreenPipes.IPipeContextSource.Send(IPipe1 pipe, CancellationToken cancellationToken) at GreenPipes.Agents.PipeContextSupervisor
1.GreenPipes.IPipeContextSource.Send(IPipe`1 pipe, CancellationToken cancellationToken) at MassTransit.RabbitMqTransport.Transport.RabbitMqReceiveTransport.b__12_0()