Bug 622875
Summary: | Message receiver stops receiving messages | ||||||
---|---|---|---|---|---|---|---|
Product: | Red Hat Enterprise MRG | Reporter: | Scott Spurrier <spurrier> | ||||
Component: | qpid-cpp | Assignee: | messaging-bugs <messaging-bugs> | ||||
Status: | CLOSED NOTABUG | QA Contact: | MRG Quality Engineering <mrgqe-bugs> | ||||
Severity: | medium | Docs Contact: | |||||
Priority: | medium | ||||||
Version: | 1.2 | CC: | gsim, tao | ||||
Target Milestone: | --- | ||||||
Target Release: | --- | ||||||
Hardware: | All | ||||||
OS: | Linux | ||||||
Whiteboard: | |||||||
Fixed In Version: | Doc Type: | Bug Fix | |||||
Doc Text: | Story Points: | --- | |||||
Clone Of: | Environment: | ||||||
Last Closed: | 2010-10-25 11:19:35 UTC | Type: | --- | ||||
Regression: | --- | Mount Type: | --- | ||||
Documentation: | --- | CRM: | |||||
Verified Versions: | Category: | --- | |||||
oVirt Team: | --- | RHEL 7.3 requirements from Atomic Host: | |||||
Cloudforms Team: | --- | Target Upstream Version: | |||||
Embargoed: | |||||||
Attachments: |
|
I can reproduce this problem with the customers provided code. When run the test it only receive one of the two messages and will print the string "Received message" once, at which point it will hang. In the customers test code, if line 41 is commented out, then the application functions expectd. Line 41: subscriptionSettings.flowControl = FlowControl::messageWindow(1); The issue here is that the releasing of all messages means that the 'auto-ack' is never triggered (there are never any messages to accept) which also means that the broker doesn't get informed of processed messages so that it can move the window forward. The application must therefore itself move the window forward. The simplest change to do this is probably the following: diff -up ./main.cpp.orig ./main.cpp --- ./main.cpp.orig 2010-08-10 17:24:34.124876657 +0100 +++ ./main.cpp 2010-08-10 17:26:28.454841536 +0100 @@ -48,6 +48,7 @@ class MyMessageListener : MessageListene { cout << "Received message" << endl; subscription_.release(message); + subscriptionManager_.getSession().markCompleted(message.getId(), true, true); } void start() an alternative is: diff -up ./main.cpp.orig ./main.cpp --- ./main.cpp.orig 2010-08-10 17:24:34.124876657 +0100 +++ ./main.cpp 2010-08-10 17:22:16.541966292 +0100 @@ -38,7 +38,7 @@ class MyMessageListener : MessageListene subscriptionSettings.autoAck = 100; // If line 41 is commented out, then the application functions as // I expect. - subscriptionSettings.flowControl = FlowControl::messageWindow(1); + subscriptionSettings.flowControl = FlowControl::messageWindow(2); subscriptionSettings.acquireMode = qpid::client::ACQUIRE_MODE_PRE_ACQUIRED; subscription_ = subscriptionManager_.subscribe(*this, queue, subscriptionSettings); @@ -48,6 +48,7 @@ class MyMessageListener : MessageListene { cout << "Received message" << endl; subscription_.release(message); + subscriptionManager_.getSession().sendCompletion(); } void start() though that only works for a window greater than 1. |
Created attachment 437936 [details] test code Description of problem: Using manual message acknowledgment along with a recommended solution (provided by Red Hat) for another problem I found a scenario in which the message listener stops receiving messages. We implemented the following solution that was recommended by Red Hat contained in Issue 501213: // Recommeneded solution subscriptions_ = new SubscriptionManager( session ); SubscriptionSettings settings; settings.flowControl = FlowControl::messageWindow(1000); settings.autoAck = 100; subscriptions_->subscribe( *this, "message_queue", settings ); std::cout << "Subscribed through failover mgr, calling run()!" << std::endl; subscriptions_->run(); --- This fixed the origonal problem from Issue 501213, but I noticed while performing some tests that if my message receiver receives 1000 message (the same value that was set for the message window) and rejects all 1000 messages that the message receiver no longer receives additional messages. I tried changing the message window to various sizes but I still received the same results after the same number of messages as was set for the window size were received and rejected sequentially. How reproducible: Steps to Reproduce: 1. Create a message listener with the following subscription settings: SubscriptionSettings settings; settings.flowControl = FlowControl::messageWindow(1); settings.autoAck = 100; settings.acquireMode = qpid::client::ACQUIRE_MODE_PRE_ACQUIRED; 2. In the message listeners received() method reject the message: Ex: class MyMessageListener : public MessageListener { private: Session& session_; public: MyMessageListener(Session& session) : session_(session) {} void MyMessageListener::received(Message& message) { cout << "Received message" << endl; session_.release(message); // reject the message } }; 3. Send two messages to the listener. Actual results: The message listener will only receive one of the two messages and will print the string "Received message" once, at which point it will hang. Expected results: The message receiver should continue to receive both messages indefinitely, causing the message "Received message" to be continuously printed to standard out.