Bug 622875

Summary: Message receiver stops receiving messages
Product: Red Hat Enterprise MRG Reporter: Scott Spurrier <spurrier>
Component: qpid-cppAssignee: messaging-bugs <messaging-bugs>
Status: CLOSED NOTABUG QA Contact: MRG Quality Engineering <mrgqe-bugs>
Severity: medium Docs Contact:
Priority: medium    
Version: 1.2CC: gsim, tao
Target Milestone: ---   
Target Release: ---   
Hardware: All   
OS: Linux   
Whiteboard:
Fixed In Version: Doc Type: Bug Fix
Doc Text:
Story Points: ---
Clone Of: Environment:
Last Closed: 2010-10-25 11:19:35 UTC Type: ---
Regression: --- Mount Type: ---
Documentation: --- CRM:
Verified Versions: Category: ---
oVirt Team: --- RHEL 7.3 requirements from Atomic Host:
Cloudforms Team: --- Target Upstream Version:
Embargoed:
Attachments:
Description Flags
test code none

Description Scott Spurrier 2010-08-10 16:02:35 UTC
Created attachment 437936 [details]
test code

Description of problem: Using manual message acknowledgment along with a recommended solution (provided by Red Hat) for another problem I found a scenario in which the message listener stops receiving messages.

We implemented the following solution that was recommended by Red Hat contained in Issue 501213:

// Recommeneded solution
subscriptions_ = new SubscriptionManager( session );
SubscriptionSettings settings;
settings.flowControl = FlowControl::messageWindow(1000);
settings.autoAck = 100;
subscriptions_->subscribe( *this, "message_queue",
                          settings );
std::cout << "Subscribed through failover mgr, calling run()!" << std::endl;
subscriptions_->run();
---
This fixed the origonal problem from Issue 501213, but
I noticed while performing some tests that if my message receiver receives 1000 message (the same value that was set for the message window) and rejects all 1000 messages that the message receiver no longer receives additional messages.  I tried changing the message window to various sizes but I still received the same results after the same number of messages as was set for the window size were received and rejected sequentially.

How reproducible:

Steps to Reproduce:
1. Create a message listener with the following subscription settings:

SubscriptionSettings settings;
settings.flowControl = FlowControl::messageWindow(1);
settings.autoAck = 100;
settings.acquireMode = qpid::client::ACQUIRE_MODE_PRE_ACQUIRED;

2. In the message listeners received() method reject the message: Ex:

class MyMessageListener : public MessageListener
{
   private:
       Session& session_;

   public:
   MyMessageListener(Session& session)
       : session_(session) {}
 
   void
   MyMessageListener::received(Message& message)
   {
       cout << "Received message" << endl;
       session_.release(message); // reject the message
   }
};

3. Send two messages to the listener.

Actual results:
The message listener will only receive one of the two messages and will print the string "Received message" once, at which point it will hang.

Expected results:
The message receiver should continue to receive both messages indefinitely, causing the message "Received message" to be continuously printed to standard out.

Comment 1 Scott Spurrier 2010-08-10 16:03:09 UTC
I can reproduce this problem with the customers provided code.
When run the test it only receive one of the two messages and will print the string "Received message" once, at which point it will hang.

In the customers test code, if line 41 is commented out, then the application functions expectd.
Line 41:            
           subscriptionSettings.flowControl = FlowControl::messageWindow(1);

Comment 2 Gordon Sim 2010-08-10 16:33:36 UTC
The issue here is that the releasing of all messages means that the 'auto-ack' is never triggered (there are never any messages to accept) which also means that the broker doesn't get informed of processed messages so that it can move the window forward. The application must therefore itself move the window forward.

The simplest change to do this is probably the following:

diff -up ./main.cpp.orig ./main.cpp
--- ./main.cpp.orig	2010-08-10 17:24:34.124876657 +0100
+++ ./main.cpp	2010-08-10 17:26:28.454841536 +0100
@@ -48,6 +48,7 @@ class MyMessageListener : MessageListene
         {
             cout << "Received message" << endl;
             subscription_.release(message);
+            subscriptionManager_.getSession().markCompleted(message.getId(), true, true);
         }
 
         void start()

an alternative is:


diff -up ./main.cpp.orig ./main.cpp
--- ./main.cpp.orig	2010-08-10 17:24:34.124876657 +0100
+++ ./main.cpp	2010-08-10 17:22:16.541966292 +0100
@@ -38,7 +38,7 @@ class MyMessageListener : MessageListene
             subscriptionSettings.autoAck = 100;
 // If line 41 is commented out, then the application functions as
 // I expect.            
-            subscriptionSettings.flowControl = FlowControl::messageWindow(1);
+            subscriptionSettings.flowControl = FlowControl::messageWindow(2);
             subscriptionSettings.acquireMode = qpid::client::ACQUIRE_MODE_PRE_ACQUIRED;
             
             subscription_ = subscriptionManager_.subscribe(*this, queue, subscriptionSettings);
@@ -48,6 +48,7 @@ class MyMessageListener : MessageListene
         {
             cout << "Received message" << endl;
             subscription_.release(message);
+            subscriptionManager_.getSession().sendCompletion();
         }
 
         void start()

though that only works for a window greater than 1.