Bug 622875 - Message receiver stops receiving messages
Message receiver stops receiving messages
Status: CLOSED NOTABUG
Product: Red Hat Enterprise MRG
Classification: Red Hat
Component: qpid-cpp (Show other bugs)
1.2
All Linux
medium Severity medium
: ---
: ---
Assigned To: messaging-bugs
MRG Quality Engineering
:
Depends On:
Blocks:
  Show dependency treegraph
 
Reported: 2010-08-10 12:02 EDT by Scott Spurrier
Modified: 2010-10-25 07:19 EDT (History)
2 users (show)

See Also:
Fixed In Version:
Doc Type: Bug Fix
Doc Text:
Story Points: ---
Clone Of:
Environment:
Last Closed: 2010-10-25 07:19:35 EDT
Type: ---
Regression: ---
Mount Type: ---
Documentation: ---
CRM:
Verified Versions:
Category: ---
oVirt Team: ---
RHEL 7.3 requirements from Atomic Host:
Cloudforms Team: ---


Attachments (Terms of Use)
test code (1.55 KB, application/x-gzip)
2010-08-10 12:02 EDT, Scott Spurrier
no flags Details

  None (edit)
Description Scott Spurrier 2010-08-10 12:02:35 EDT
Created attachment 437936 [details]
test code

Description of problem: Using manual message acknowledgment along with a recommended solution (provided by Red Hat) for another problem I found a scenario in which the message listener stops receiving messages.

We implemented the following solution that was recommended by Red Hat contained in Issue 501213:

// Recommeneded solution
subscriptions_ = new SubscriptionManager( session );
SubscriptionSettings settings;
settings.flowControl = FlowControl::messageWindow(1000);
settings.autoAck = 100;
subscriptions_->subscribe( *this, "message_queue",
                          settings );
std::cout << "Subscribed through failover mgr, calling run()!" << std::endl;
subscriptions_->run();
---
This fixed the origonal problem from Issue 501213, but
I noticed while performing some tests that if my message receiver receives 1000 message (the same value that was set for the message window) and rejects all 1000 messages that the message receiver no longer receives additional messages.  I tried changing the message window to various sizes but I still received the same results after the same number of messages as was set for the window size were received and rejected sequentially.

How reproducible:

Steps to Reproduce:
1. Create a message listener with the following subscription settings:

SubscriptionSettings settings;
settings.flowControl = FlowControl::messageWindow(1);
settings.autoAck = 100;
settings.acquireMode = qpid::client::ACQUIRE_MODE_PRE_ACQUIRED;

2. In the message listeners received() method reject the message: Ex:

class MyMessageListener : public MessageListener
{
   private:
       Session& session_;

   public:
   MyMessageListener(Session& session)
       : session_(session) {}
 
   void
   MyMessageListener::received(Message& message)
   {
       cout << "Received message" << endl;
       session_.release(message); // reject the message
   }
};

3. Send two messages to the listener.

Actual results:
The message listener will only receive one of the two messages and will print the string "Received message" once, at which point it will hang.

Expected results:
The message receiver should continue to receive both messages indefinitely, causing the message "Received message" to be continuously printed to standard out.
Comment 1 Scott Spurrier 2010-08-10 12:03:09 EDT
I can reproduce this problem with the customers provided code.
When run the test it only receive one of the two messages and will print the string "Received message" once, at which point it will hang.

In the customers test code, if line 41 is commented out, then the application functions expectd.
Line 41:            
           subscriptionSettings.flowControl = FlowControl::messageWindow(1);
Comment 2 Gordon Sim 2010-08-10 12:33:36 EDT
The issue here is that the releasing of all messages means that the 'auto-ack' is never triggered (there are never any messages to accept) which also means that the broker doesn't get informed of processed messages so that it can move the window forward. The application must therefore itself move the window forward.

The simplest change to do this is probably the following:

diff -up ./main.cpp.orig ./main.cpp
--- ./main.cpp.orig	2010-08-10 17:24:34.124876657 +0100
+++ ./main.cpp	2010-08-10 17:26:28.454841536 +0100
@@ -48,6 +48,7 @@ class MyMessageListener : MessageListene
         {
             cout << "Received message" << endl;
             subscription_.release(message);
+            subscriptionManager_.getSession().markCompleted(message.getId(), true, true);
         }
 
         void start()

an alternative is:


diff -up ./main.cpp.orig ./main.cpp
--- ./main.cpp.orig	2010-08-10 17:24:34.124876657 +0100
+++ ./main.cpp	2010-08-10 17:22:16.541966292 +0100
@@ -38,7 +38,7 @@ class MyMessageListener : MessageListene
             subscriptionSettings.autoAck = 100;
 // If line 41 is commented out, then the application functions as
 // I expect.            
-            subscriptionSettings.flowControl = FlowControl::messageWindow(1);
+            subscriptionSettings.flowControl = FlowControl::messageWindow(2);
             subscriptionSettings.acquireMode = qpid::client::ACQUIRE_MODE_PRE_ACQUIRED;
             
             subscription_ = subscriptionManager_.subscribe(*this, queue, subscriptionSettings);
@@ -48,6 +48,7 @@ class MyMessageListener : MessageListene
         {
             cout << "Received message" << endl;
             subscription_.release(message);
+            subscriptionManager_.getSession().sendCompletion();
         }
 
         void start()

though that only works for a window greater than 1.

Note You need to log in before you can comment on or make changes to this bug.