问题描述:

Here's the MT 2.9.9 version that works:

public sealed class DiagnosticConsumer : Consumes<DiagnosticMessage>.All

{

public void Consume(DiagnosticMessage message)

{

Console.WriteLine("Got {0} with timestamp {1}", message.Message, message.Timestamp);

}

}

Bus setup:

var bus = ServiceBusFactory.New(sbc =>

{

sbc.UseRabbitMq(r => r.ConfigureHost(new Uri("rabbitmq://localhost/notifications/tests"),

c =>

{

c.SetUsername("test_user");

c.SetPassword("testuser123");

}));

sbc.ReceiveFrom("rabbitmq://localhost/notifications/tests");

sbc.Subscribe(s =>

{

s.Consumer<DiagnosticConsumer>();

});

});

using (bus)

{

bus.Publish(new DiagnosticMessage { Message = "Test msg", Timestamp = DateTimeOffset.Now });

Console.WriteLine("Published!");

Console.ReadLine();

}

That works as expected and when using RabbitMQ Management I can see that the following:

Now, using MT 3.0.1-alpha along with MassTransit.RabbitMQ 3.0.1-alpha and the following setup:

public sealed class DiagnosticConsumer : IConsumer<DiagnosticMessage>

{

public async Task Consume(ConsumeContext<DiagnosticMessage> context)

{

Console.WriteLine("Got message: {0}", context.Message);

}

}

Bus configuration:

public static async Task RunQueue()

{

var bus = Bus.Factory

.CreateUsingRabbitMq(c =>

{

var host =

c.Host(

new Uri(

"rabbitmq://localhost/notifications"),

conf =>

{

conf.Username("test_user");

conf.Password("testuser123");

});

c.ReceiveEndpoint(host, "tests", conf =>

{

conf.Consumer<DiagnosticConsumer>();

});

});

using (var handle = await bus.Start())

{

await bus.Publish(new DiagnosticMessage{ Message = "Pinging!", Timestamp = DateTimeOffset.Now});

Console.WriteLine("Waiting to finish...");

Console.ReadLine();

await handle.Stop();

}

}

In this scenario, nothing really happens, consumer never gets the message and the management console also tells a different story:

In both cases, it's the exact same queue and setup, which in itself is really simple. Is there anything wrong on my side that I would have to fix to use latest version of MassTransit, or is it just alpha-version bug?

网友答案:

This is the second report of such a problem, can you check the bindings of the message type to the queue? I believe the alpha has an issue where the published message types are not bound to the queue properly by default. Another user reported the same thing last week, it's on my list to investigate with a clean virtual host.

Since Send works, and Publish doesn't, that's likely the issue.

网友答案:

I found a half-answer to my own question.

Apparently publishing works if it's done via ISendEndpoint:

var address = new Uri("rabbitmq://localhost/notifications/tests");
var endpoint = await bus.GetSendEndpoint(address);
await endpoint.Send(new DiagnosticMessage{ Message = "Pinging!", Timestamp = DateTimeOffset.Now});

In this case, message is delivered successfully.

That being said, I'm not sure what the point of Publish is in its current form or why messages pushed through it are not delivered, especially inside the consumer when using ConsumeContext<>. It also seems to cause problems if you want to schedule messages, since they're being scheduled using Publish.

Until the behavior is changed or someone else has a better answer as to why all this happens, I guess I found the solution to my immediate problem.

[Edit]

As Chris Patterson noted in his answer, the exchange type for the message was not bound to the queue. Setting that up manually via the management console got Publish to work.

相关阅读:
Top