问题描述:

I have two python processes connected to Rabbitmq via pika. Each consumes a set of topics which the other publishes as a response. One uses the SelectConnection and the other uses the TornadoConnection.

Both are currently just test programs which simulate a conversation between a user and my server, and each program's on_message() is simply hard-coded to branch on the routing_key received and publish the appropriate response to it's counterpart.

Originally, after a random amount of time, typically no longer than 2 minutes, I'd get an error like:

UnexpectedFrame: Basic.publish: (505) UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead

After searching through numerous posts here on stack overflow and elsewhere, I've come to understand that this error has to do with a race condition where something is being consumed before the basic_publish was complete.

I've made a change to my code so that rather than doing an immediate basic_publish(), I pass a callback to connection.add_timeout() with a delay of 1 second. After making this change, I've been able to have numerous runs where two processes have a "conversation" with each other for > 1hr without reproducing the error.

My question is, is this just a hack which works only because I'm simulating one user? Do I need to have 2 separate channels for consuming and publishing?

def on_message(self, unused_channel, basic_deliver, properties, body):

if self._sibling_app_id == properties.app_id:

self.dispatch_message(basic_deliver, properties, body)

def dispatch_message(self, basic_deliver, properties, body):

(user_id, msg_type) = basic_deliver.routing_key.rsplit('.', 1)

if "login-response" == msg_type:

print body

elif "gid-assignment" == msg_type:

print body

elif "tutor-logout" == msg_type:

print body

elif "tutor-turn" == msg_type:

message = "i don't know"

routing_key = "%s.input" % user_id

callback = self.delayed_publish_message(routing_key, message)

self.schedule_next_message(callback, 1)

elif "nlu" == msg_type:

message = "dnk"

routing_key = "%s.nlu-response" % user_id

callback = self.delayed_publish_message(routing_key, message)

self.schedule_next_message(callback, 1)

else:

print "invalid message-type: %s" % msg_type

print body

def delayed_publish_message(self, routing_key, message):

"""returns a callback which can be passed to schedule_next_message()"""

def delayed_publish_cb():

self.publish_message(routing_key, message)

return delayed_publish_cb

def schedule_next_message(self, cb, publish_interval=None):

if self._stopping:

return

if publish_interval is None:

publish_interval = self.PUBLISH_INTERVAL

if -1 == publish_interval:

return

self._connection.add_timeout(publish_interval, cb)

def publish_message(self, routing_key, message):

if self._stopping:

return

properties = pika.BasicProperties(app_id=self._app_id,

content_type='text/plain')

self._channel.basic_publish(self.EXCHANGE, routing_key,

message, properties)

网友答案:

A channel is to be used uni-directional. The AMQP protocol specification is very clear about that:

An AMQP Session correlates two unidirectional Channels to form a bidirectional, sequential conversation between two Containers. A single Connection may have multiple independent Sessions active simultaneously, up to the negotiated Channel limit. Both Connections and Sessions are modeled by each peer as endpoints that store local and last known remote state regarding the Connection or Session in question.

Thus you are supposed to use an input and an output channel for your application.

网友答案:

I did my commits, was about to go to bed and suddenly I figured it out. I turns out that the python tutorials on rabbitmq.com still say to install pika with:

 sudo pip install pika==0.9.8

And while 0.9.8 came out sometime in 2012, I think the fix was added sometime after that release. with 0.9.9 being released sometime in 2013

So, I did:

sudo pip uninstall pika

followed by the installation instructions on the pika site:

sudo pip install pika

then I replaced all of my connection.add_timeout(1, delayed_publish_cb) with basic_publish(), crossed my fingers, ran it and my two processes exchanged about 200,000 messages with each other in less than 5 minutes without any problems

Good to know that the bug fix from back in 2012 still works.

I'll have to let the rabbitmq folks know to update their tutorial.

相关阅读:
Top