Description of problem:
Send a batch of messages with very low ttl to the broker and it will pass as many as it can to the connected consumer and discard the expired ones. When consumer sends the acknowledgements and they expire on a way to the broker, broker still keeps the un-confirmed messages in the queue (while waiting for the acknowledgements) although their ttl has already expired either and they should be discarded. In order to clear the queue you have to either wait for periodical purge or disconnect/reconnect the consumer.
Version-Release number of selected component (if applicable):
Steps to Reproduce:
1. create queue testqueue
2. create consumer "qpid-receive -a testqueue -b localhost:5672 -f | nl"
3. qpid-send -a testqueue -b localhost:5672 -m 100 --ttl 6 --content-string hello
queue-depth value in qpid-stat -q testqueue should be 0
queue-depth value in qpid-stat -q testqueue is not 0 (it's usually equal to the amount of messages received by the consumer)
I don't understand exactly what you mean by "consumer sends the acknowledgements and they expire on a way to the broker". Acknowledgements don't expire. If a client acknowledges a message the message will be discarded regardless of ttl.
"In order to clear the queue you have to either wait for periodical purge or disconnect/reconnect the consumer. " - actually, periodical purge will not purge expired message that are acquired. Arguably *that* is a bug, though I would argue a minor one as the messages would be discarded if the consumer acknowledges them or is disconnected.
Better scenario describing the problem (I *think* broker does not update prefetch/capacity for a consumer that just acknowledged a message that timeouted while it was acquired):
1) Have consumer (artificially slowly reacting but to every message):
qpid-receive -a testqueue -b localhost:5672 -f --capacity=5 --ack-frequency=1 | nl
2) Send huge amount of messages with tiny TTL, followed by messages _without_ TTL:
qpid-send -a testqueue -b localhost:5672 -m 100000 --ttl 6 --content-string ttl; qpid-send -a testqueue -m10 --content-string=NOttl
3) Check what messages the consumer receives - it receives some ttl messages but _no_ message without TTL.
4) qpid-stat -q testqueue shows the queue has thousands of messages.
5) qpid-stat -u shows sessUnacked=0 for the consumer (so the consumer acknowledged all received messages)
Once queue-purge-interval cleans the queue, the queue content shrinks, having just the "NOttl" messages that are _not_ offered to the consumer.
So, something prevents broker to traverse queue (in Queue::getNextMessage method?) for messages to offer them to the consumer - either expired ones (that should be purged instead of sending to consumer) or the "NOttl" ones.
Playing more on this:
- problem is also in upstream
- uncertain if the bug isnt on _client_, since taking tcpdump, latest AMQP command/frame is client sending message.accept, but _no_ session.flush or message.set-flow-mode or either the crucial message.flow
That sounds the client does acknowledge the message, but does not update flow/prefetch/capacity ?
The problem is indeed on the client side and is a result of the change for QPID-5828. The client is detecting the expired message, acking then discarding it without passing it on to the application. Unfortunately it is also not going through the code the adjusts the credit. I'll figure out how best to fix this.
Fixed upstream: https://svn.apache.org/r1741491 (test also added https://svn.apache.org/r1741505).
FYI it is a regression between qpid 0.18 and 0.34 (havent tested releases between), since:
- qpid-receive using 0.18 library talking to 0.18 broker: no issue
- qpid-receive using 0.18 library talking to 0.34 broker: no issue
- qpid-receive using 0.34 library talking to 0.18 broker: reported issue
- qpid-receive using 0.34 library talking to 0.34 broker: reported issue
Python client not affected.
One regression found: closing a consumer with unacked expired messages raises segfault with backtrace:
#0 qpid::client::amqp0_10::IncomingMessages::process (this=0x627870, handler=0x0, duration=...)
#1 0x00007ffff7649e4c in qpid::client::amqp0_10::IncomingMessages::releasePending (this=0x627870, destination="q")
#2 0x00007ffff7652ce1 in qpid::client::amqp0_10::SessionImpl::releasePending (this=0x627820, name="q")
#3 0x00007ffff764e82a in qpid::client::amqp0_10::ReceiverImpl::closeImpl (this=0x628880)
#4 0x00007ffff764f9b4 in operator() (this=0x627820, f=...)
#5 qpid::client::amqp0_10::SessionImpl::execute<qpid::client::amqp0_10::ReceiverImpl::Close> (this=0x627820, f=...)
#6 0x00007ffff764e954 in execute<qpid::client::amqp0_10::ReceiverImpl::Close> (this=<value optimized out>)
#7 qpid::client::amqp0_10::ReceiverImpl::close (this=<value optimized out>)
#8 0x0000000000405e91 in main ()
qpid-send -a q -m 1000 --ttl=30 --content-size=10; ./drain_no_ack
Where drain_no_ack is trivially setting capacity to 10k, consuming message from a receiver that is closed after that:
Connection connection(options.url, options.connectionOptions);
Session session = connection.createSession();
Receiver receiver = session.createReceiver("q");
std::cout << "Message(properties=" << message.getProperties() << " ";
if (message.getTtl().getMilliseconds()) std::cout << "TTL: " << message.getTtl().getMilliseconds();
std::cout << std::endl;
(one might need to tune --ttl=30 per his/her VM used - simply let some message be sent to the client before it expires, but the others to expire in client)
Sorry folks! My mistake. I've added a test case to cover this and a fix upstream: https://svn.apache.org/r1745195
From the git repo it appears that tag qpid-cpp-0.34-13 on 0.34-mrg includes the second fix.
Mike, can you confirm?
Correct, the complete fix is included in qpid-cpp-0.34-13 or later.
Since the problem described in this bug report should be
resolved in a recent advisory, it has been closed with a
resolution of ERRATA.
For information on the advisory, and where to find the updated
files, follow the link below.
If the solution does not work for you, open a new bug report.