| Summary: | qpid c++ high-level api receivers receiving concurrently from same address/queue can see queue empty in case it is not [get confused by rcv.fetch(msg, timeout=0)] | ||
|---|---|---|---|
| Product: | Red Hat Enterprise MRG | Reporter: | Frantisek Reznicek <freznice> |
| Component: | qpid-cpp | Assignee: | messaging-bugs <messaging-bugs> |
| Status: | NEW --- | QA Contact: | MRG Quality Engineering <mrgqe-bugs> |
| Severity: | unspecified | Docs Contact: | |
| Priority: | medium | ||
| Version: | Development | CC: | gsim, iboverma, jross, kgiusti |
| Target Milestone: | --- | ||
| Target Release: | --- | ||
| Hardware: | Unspecified | ||
| OS: | Unspecified | ||
| Whiteboard: | |||
| Fixed In Version: | Doc Type: | Bug Fix | |
| Doc Text: | Story Points: | --- | |
| Clone Of: | Environment: | ||
| Last Closed: | Type: | --- | |
| Regression: | --- | Mount Type: | --- |
| Documentation: | --- | CRM: | |
| Verified Versions: | Category: | --- | |
| oVirt Team: | --- | RHEL 7.3 requirements from Atomic Host: | |
| Cloudforms Team: | --- | Target Upstream Version: | |
My guess is this is related to prefetch and can be avoided by setting --capacity to 0 on qpid-receive. By default the capacity is 1000, meaning that more messages can be prefetched by one receiver than it intends to consume. Those messages *will* be returned to the queue when that receiver completes, but that may happen after some other receiver has been unable to get any more. |
Description of problem: qpid c++ high-level api receivers (qpid-receive) receiving concurrently from same address/queue can see address empty in case it is not [get confused by rcv.fetch(msg, timeout=0)]. Testing scenario (focused on cluster f/c): # phase 1 - start cluster in maximal width (if needed) # --------------------------------------------------------------------- # phase 2 - remove the queue (if needed) # --------------------------------------------------------------------- # phase 3 - send the messages to broker A until checkpoint (on fg) # all clients in sequence to the same address / queue # --------------------------------------------------------------------- # phase 4 - check that address exists and check msg depth and flow flag # --------------------------------------------------------------------- # phase 5 - start sending rest of msgs to broker A (on bg - new thread) # in parallel to one address / queue # --------------------------------------------------------------------- # phase 6 - wait for particular flow control flag flowStopped (QMF) to B # --------------------------------------------------------------------- # phase 7 - doublecheck that sender[s] has not finished yet # at least one sender has to be blocked # --------------------------------------------------------------------- # phase 8 - receive part of the messages on node B (to get f/c stopped) # serial/sequence reception by clients from the same address # --------------------------------------------------------------------- # phase 9 - wait for particular f/c flag T->F on broker B # --------------------------------------------------------------------- # phase 10 - doublecheck that sender[s] is not blocked anymore (finished) # join sender[s] & report # --------------------------------------------------------------------- # phase 12 - finish reception on node A # receivers in sequence from the same queue # --------------------------------------------------------------------- # phase 13 - check of messages on receiver side # --------------------------------------------------------------------- # phase 14 - check existence of the empty address / queue (QMF) # --------------------------------------------------------------------- # phase 15 - empty address / queue clean-up # --------------------------------------------------------------------- # phase Z - check cluster topology # --------------------------------------------------------------------- # phase LAST - shutdown cluster (if --maximize_broker_uptime not given) # --------------------------------------------------------------------- The observed behavior is following: At the phase 8 and/or 12 when multiple qpid-receive clients are receiving from te same address, all instructed to receive fixed number of messages and fetch's timeout equal to 0. All of the receivers are run either sequentially or threaded in parallel. It was reliably observed that some of the receivers received expected number of messages, but some of them received LESS than the amount and exited with exit code 0. I reviewed the scenario with Ken, who suggested to use fetch's timeout > 0. After that change I was always seeing that receivers received the expected amount of messages. Based on known observations/facts I believe there might be glitches on the internal information about queue emptiness which is read by rcv.fetch() method. The above scenario avoids the situation with no messages in the queue and a receiver[s] waiting for message. Version-Release number of selected component (if applicable): python-qpid-0.10-1.el5.noarch python-qpid-qmf-0.10-10.el5.x86_64 qpid-cpp-client-0.10-7.el5.x86_64 qpid-cpp-client-devel-0.10-7.el5.x86_64 qpid-cpp-client-devel-docs-0.10-7.el5.x86_64 qpid-cpp-client-ssl-0.10-7.el5.x86_64 qpid-cpp-mrg-debuginfo-0.10-7.el5.x86_64 qpid-cpp-server-0.10-7.el5.x86_64 qpid-cpp-server-cluster-0.10-7.el5.x86_64 qpid-cpp-server-devel-0.10-7.el5.x86_64 qpid-cpp-server-ssl-0.10-7.el5.x86_64 qpid-cpp-server-store-0.10-7.el5.x86_64 qpid-cpp-server-xml-0.10-7.el5.x86_64 qpid-java-client-0.10-6.el5.noarch qpid-java-common-0.10-6.el5.noarch qpid-java-example-0.10-6.el5.noarch qpid-qmf-0.10-10.el5.x86_64 qpid-qmf-debuginfo-0.10-10.el5.x86_64 qpid-qmf-devel-0.10-10.el5.x86_64 qpid-tools-0.10-5.el5.noarch sesame-0.10-1.el5.x86_64 sesame-debuginfo-0.10-1.el5.x86_64 How reproducible: >70% Steps to Reproduce: see steps above and additional info section. Actual results: rcv.fetch(msg, timeout=0) can claim the address is empty in the case it is not. Expected results: rcv.fetch(msg, timeout=0) should not claim the address is empty in the case it is not. Additional info: The detailed cluster test unit with the scenario: # ------------------------------------------------------------------------- # basic cluster flow control test N senders to N receivers (1 address) # ------------------------------------------------------------------------- def test_cluster_flow_control_NtoNvia1(self): for i_loop in range(self.d_opts['test_loop_cnt']): # loop the test core self.log_msg("TEST '%s' loop %d/%d started. [tst_cnt:%d, err_cnt:%d]" % \ (inspect.stack()[0][3], i_loop+1, self.d_opts['test_loop_cnt'], self.get_tst_cnt(), self.get_err_cnt())); # constants # possible sender/receiver capacities possible_capacities = [ 0, 10, 100, 1000 ]; cli_cnt = random.randint(2, 4); # nr. of sequence/concurrent clients stop_cnt = 1000; # queue stop count threshold #stop_cnt = 100; # queue stop count threshold resume_cnt = int(stop_cnt * 0.7); # queue resume count threshold msg_cnt = int(stop_cnt * 1.2 / cli_cnt); # total message count per client # tx & rx two phases/steps per client msg_cnt_check_point = int(stop_cnt * 0.6 / cli_cnt); # msg_cnt correction (if needed due to above rounding float->int) if ( msg_cnt != (msg_cnt_check_point + (msg_cnt - msg_cnt_check_point)) ): msg_cnt = msg_cnt_check_point + (msg_cnt - msg_cnt_check_point); msg_layout_check = "msg_"; msg_layout = 'msg_%06d'; #print cli_cnt, stop_cnt, resume_cnt, msg_cnt, msg_cnt_check_point; q_name = "%s_mq_%d" % (inspect.stack()[0][3], i_loop); durability = 'no'; if ((i_loop % 2) == 1): durability = 'yes'; capacities = [ ]; if (i_loop >= len(possible_capacities)): # different capacities a client from possible capacities for j in range(cli_cnt): capacities.append(possible_capacities[ random.randint(0, len(possible_capacities)-1)]); else: # same capacity a client from possible capacities capacities = [ possible_capacities[i_loop] ] * cli_cnt; self.log_msg( ("config: msg_cnt:%d, msg_layout:%s, thresholds:%d/%d, " + \ "cli_cnt: %d, q_name:%s, durability: %s, capacities:%s" )%\ (msg_cnt, msg_layout, stop_cnt, resume_cnt, cli_cnt, q_name, durability, capacities) ); # phase 1 - start cluster in maximal width (if needed) # --------------------------------------------------------------------- r = self.d_cluster.goto_state( in_cluster_config = self.d_cluster.get_node_cnt(), in_order_from_back = False, in_step_by_step_ena = True); self.add_check(in_desc = 'cluster topology change (ecode: %s exp. True)' % r, in_result = (r == True) ); self.report(); self.log_msg("Cluster state: %d of %d up and running" % \ (self.d_cluster.get_width(), self.d_cluster.get_node_cnt())); self.add_check(in_desc = 'Cluster is fully up (%d of %d up)' % \ (self.d_cluster.get_width(), self.d_cluster.get_node_cnt()), in_result = \ (self.d_cluster.get_width() == self.d_cluster.get_node_cnt()) ); self.report(); # phase 2 - remove the queue (if needed) # --------------------------------------------------------------------- i_exec = mrg_texec(in_threading_ena = False); cmd = "qpid-config queues | grep %s && qpid-config del queue %s --force"; i_exec.run_and_wait(cmd % (q_name, q_name), in_to = 10, host = self.d_cluster.get_node('A').get_host(), user = self.d_opts['ssh_user'], password = self.d_opts['ssh_pass'], in_killena = False); self.add_run_and_report(i_exec, None); # phase 3 - send the messages to broker A until checkpoint (on fg) # all clients in sequence to the same address / queue # --------------------------------------------------------------------- for j in range(cli_cnt): # browse clients / addresses i_exec_tx = mrg_texec(in_threading_ena = False); cmd_tx = '~/%s/qpid-send -b "%s:5672" --failover-updates --content-string "%s" --content-index-offset %d -m %d --connection-options="{ username: guest, password:guest }" --address "%s; {create: sender, delete:receiver, node:{ durable: %s, x-declare :{ arguments :{ qpid.flow_stop_count: %sL, qpid.flow_resume_count: %dL}}}}" --durable %s --sequence yes --capacity %d'; # launch & no wait for the finish at the moment i_exec_tx.run_and_wait(cmd_tx % (self.d_rmachine_test_datadir, self.d_cluster.get_node('A').get_host(), msg_layout, j * msg_cnt_check_point, msg_cnt_check_point, q_name, (durability == 'yes'), stop_cnt, resume_cnt, durability, capacities[j]), in_to = 120, host = self.d_cluster.get_node('A').get_host(), user = self.d_opts['ssh_user'], password = self.d_opts['ssh_pass'], in_killena = True); self.add_run_and_report(i_exec_tx, 0); # phase 4 - check that address exists and check msg depth and flow flag # --------------------------------------------------------------------- self.log_msg("QMF obj. query corresponding to queue %s started" % q_name); i_qmf = mrg_qmf_console(in_url = 'amqp://guest/guest@%s' % \ (self.d_cluster.get_node('B').get_host())); tqueue = i_qmf.get_object(in_class = 'queue', in_name = q_name); ts = time.time(); # get current time snap while True: # update the object tqueue = i_qmf.get_object(in_class = 'queue', in_name = q_name); if ( (tqueue != None) and \ (type(tqueue.flowStopped) == types.BooleanType) ): # flag detected stop break; if ((time.time() - ts) > self.d_opts['qmf_data_timeout']): # wait max timeout self.log_msg("%s(): waiting for queue QMF data timeouted (to:%d)" % \ (inspect.stack()[0][3], self.d_opts['qmf_data_timeout'])); break; self.add_check(in_desc = 'Queue %s flag flowStopped == %s exp. False' % \ (q_name, tqueue.flowStopped), in_result = (tqueue.flowStopped == False) ); self.report(); self.add_check(in_desc = 'Queue %s has msg-depth %d exp. %d' % \ (q_name, tqueue.msgDepth, msg_cnt_check_point * cli_cnt), in_result = (tqueue.msgDepth == (msg_cnt_check_point * cli_cnt)) ); self.report(); # phase 5 - start sending rest of msgs to broker A (on bg - new thread) # in parallel to one address / queue # --------------------------------------------------------------------- i_exec_txs = [ ]; for j in range(cli_cnt): # browse clients / addresses i_exec_tx = mrg_texec(in_threading_ena = True); cmd_tx = '~/%s/qpid-send -b "%s:5672" --failover-updates --content-string "%s" --content-index-offset %d -m %d --connection-options="{ username: guest, password:guest }" --address "%s; {create: sender, delete:receiver, node:{ durable: %s, x-declare :{ arguments :{ qpid.flow_stop_count: %sL, qpid.flow_resume_count: %dL}}}}" --durable %s --sequence yes --capacity %d' # launch & no wait for the finish at the moment i_exec_tx.start(cmd_tx % (self.d_rmachine_test_datadir, self.d_cluster.get_node('A').get_host(), msg_layout, (msg_cnt_check_point * cli_cnt) + j*(msg_cnt-msg_cnt_check_point), msg_cnt-msg_cnt_check_point, q_name, (durability == 'yes'), stop_cnt, resume_cnt, durability, capacities[j]), in_to = 120, host = self.d_cluster.get_node('A').get_host(), user = self.d_opts['ssh_user'], password = self.d_opts['ssh_pass'], in_killena = True); i_exec_txs.append(i_exec_tx); # phase 6 - wait for particular flow control flag flowStopped (QMF) to B # --------------------------------------------------------------------- self.log_msg("QMF obj. query corresponding to queue %s started" % q_name); # reusing i_qmf from step 4 tqueue = i_qmf.get_object(in_class = 'queue', in_name = q_name); ts = time.time(); # get current time snap while True: # update the object tqueue = i_qmf.get_object(in_class = 'queue', in_name = q_name); if ( (tqueue != None) and (tqueue.flowStopped == True)): # flag detected stop break; if ((time.time() - ts) > self.d_opts['qmf_data_timeout']): # wait max timeout self.log_msg("%s(): waiting for queue QMF data timeouted (to:%d)" % \ (inspect.stack()[0][3], self.d_opts['qmf_data_timeout'])); break; self.add_check(in_desc = 'Queue %s flag flowStopped == %s exp. True' % \ (q_name, tqueue.flowStopped), in_result = (tqueue.flowStopped == True) ); self.report(); m, M = get_fc_stop_range(in_stop_cnt_thr = stop_cnt, in_capacity = sum(capacities), in_max_cnt = msg_cnt * cli_cnt); self.add_check(in_desc = 'Queue %s has msg-depth %d exp. %d ... %d' % \ (q_name, tqueue.msgDepth, m, M), in_result = ( (tqueue.msgDepth >= m) and \ (tqueue.msgDepth <= M) ) ); self.report(); # phase 7 - doublecheck that sender[s] has not finished yet # at least one sender has to be blocked # --------------------------------------------------------------------- blocked_cnt = 0; for j in range(cli_cnt): # browse clients / addresses if (i_exec_txs[j].finished() == False): blocked_cnt += 1; self.add_check(in_desc = 'A sender has to be blocked atm (blocked_cnt:%d)'\ % blocked_cnt, in_result = (blocked_cnt > 0) ); self.report(); # phase 8 - receive part of the messages on node B (to get f/c stopped) # serial/sequence reception by clients from the same address # --------------------------------------------------------------------- i_exec_rxs1 = [ ]; for j in range(cli_cnt): # browse clients / addresses i_exec_rx = mrg_texec(in_threading_ena = False); cmd_rx = '~/%s/qpid-receive -b "%s:5672" --failover-updates --connection-options="{ username: guest, password:guest }" --address "%s ; {create: sender, node:{ durable: %s, x-declare :{ arguments :{ qpid.flow_stop_count: %dL, qpid.flow_resume_count: %dL}}}}" --ignore-duplicates -m %d' i_exec_rx.run_and_wait(cmd_rx % (self.d_rmachine_test_datadir, self.d_cluster.get_node('B').get_host(), q_name, (durability == 'yes'), stop_cnt, resume_cnt, (msg_cnt - msg_cnt_check_point)), in_to = 120, host = self.d_cluster.get_node('B').get_host(), user = self.d_opts['ssh_user'], password = self.d_opts['ssh_pass'], in_killena = True); i_exec_rxs1.append(i_exec_rx); self.add_run_and_report(i_exec_rx, 0); # check number of received messages per receiver i_tp = mrg_tp( i_exec_rx.get_stdout() ); i_tp.read_input(); i_tp.do_filter(msg_layout_check); self.add_check(in_desc = 'Receiver 1-%d data check (%d exp. %d)' % \ (j, len(i_tp.get_strlist()), (msg_cnt - msg_cnt_check_point)), in_result = (len(i_tp.get_strlist()) == \ (msg_cnt - msg_cnt_check_point)), in_stdout = i_exec_rx.get_stdout(), in_stderr = i_exec_rx.get_stderr()); self.report(); # phase 9 - wait for particular f/c flag T->F on broker B # --------------------------------------------------------------------- self.log_msg("QMF obj. query corresponding to queue %s started" % q_name); # reusing i_qmf from step 4 tqueue = i_qmf.get_object(in_class = 'queue', in_name = q_name); ts = time.time(); # get current time snap while True: # update the object tqueue = i_qmf.get_object(in_class = 'queue', in_name = q_name); if ( (tqueue != None) and (tqueue.flowStopped == False)): # flag detected stop break; if ((time.time() - ts) > (self.d_opts['qmf_data_timeout']/2)): for k in i_exec_rxs1: print k, k.finished(), if(k.finished()): print self.format_str(k.get_stdout()); print tqueue.msgDepth, tqueue.flowStopped; if ((time.time() - ts) > self.d_opts['qmf_data_timeout']): # wait max timeout self.log_msg("%s(): waiting for queue QMF data timeouted (to:%d)" % \ (inspect.stack()[0][3], self.d_opts['qmf_data_timeout'])); break; self.add_check(in_desc = 'Queue %s flag flowStopped == %s exp. False' % \ (q_name, tqueue.flowStopped), in_result = (tqueue.flowStopped == False) ); self.report(); self.add_check(in_desc = 'Queue %s has msg-depth %d exp. %d' % \ (q_name, tqueue.msgDepth, msg_cnt_check_point*cli_cnt), in_result = (tqueue.msgDepth == msg_cnt_check_point*cli_cnt) ); self.report(); # phase 10 - doublecheck that sender[s] is not blocked anymore (finished) # join sender[s] & report # --------------------------------------------------------------------- for j in range(cli_cnt): # browse clients / addresses self.add_check(in_desc = 'Sender has to be finished atm (finished:%s)' %\ i_exec_txs[j].finished(), in_result = (i_exec_txs[j].finished() == True) ); self.report(); if (i_exec_txs[j].finished() == True): i_exec_txs[j].join(); # report the (finished & joined) sender self.add_run_and_report(i_exec_txs[j], 0); # phase 12 - finish reception on node A # receivers in sequence from the same queue # --------------------------------------------------------------------- i_exec_rxs2 = [ ]; for j in range(cli_cnt): # browse clients / addresses i_exec_rx = mrg_texec(in_threading_ena = False); cmd_rx = '~/%s/qpid-receive -b "%s:5672" --failover-updates --connection-options="{ username: guest, password:guest }" --address "%s ; {create: sender, node:{ durable: %s, x-declare :{ arguments :{ qpid.flow_stop_count: %dL, qpid.flow_resume_count: %dL}}}}" --ignore-duplicates -m %d' i_exec_rx.run_and_wait(cmd_rx % (self.d_rmachine_test_datadir, self.d_cluster.get_node('A').get_host(), q_name, (durability == 'yes'), stop_cnt, resume_cnt, msg_cnt_check_point), in_to = 120, host = self.d_cluster.get_node('A').get_host(), user = self.d_opts['ssh_user'], password = self.d_opts['ssh_pass'], in_killena = True); self.add_run_and_report(i_exec_rx, 0); i_exec_rxs2.append(i_exec_rx); # check number of received messages per receiver i_tp = mrg_tp( i_exec_rx.get_stdout() ); i_tp.read_input(); i_tp.do_filter(msg_layout_check); self.add_check(in_desc = 'Receiver 2-%d data check (%d exp. %d)' % \ (j, len(i_tp.get_strlist()), msg_cnt_check_point), in_result = (len(i_tp.get_strlist()) == \ msg_cnt_check_point), in_stdout = i_exec_rx.get_stdout(), in_stderr = i_exec_rx.get_stderr()); self.report(); # phase 13 - check of messages on receiver side # --------------------------------------------------------------------- lstdout = [ ]; lstderr = [ ]; for j in range(cli_cnt): # browse clients / addresses lstdout = lstdout + i_exec_rxs1[j].get_stdout(); lstdout = lstdout + i_exec_rxs2[j].get_stdout(); lstderr = lstderr + i_exec_rxs1[j].get_stderr(); lstderr = lstderr + i_exec_rxs2[j].get_stderr(); i_tp = mrg_tp( lstdout ); i_tp.read_input(); i_tp.do_filter(msg_layout_check); self.add_check(in_desc = 'Receiver data check (%d exp. %d)' % \ (len(i_tp.get_strlist()), msg_cnt*cli_cnt), in_result = (len(i_tp.get_strlist()) == msg_cnt*cli_cnt), in_stdout = lstdout, in_stderr = lstderr); self.report(); # phase 14 - check existence of the empty address / queue (QMF) # --------------------------------------------------------------------- self.log_msg("QMF obj. query corresponding to queue %s started" % q_name); # reusing i_qmf from step 4 tqueue = i_qmf.get_object(in_class = 'queue', in_name = q_name); ts = time.time(); # get current time snap while True: # update the object tqueue = i_qmf.get_object(in_class = 'queue', in_name = q_name); if ( (tqueue != None) and (tqueue.flowStopped == False) and \ (tqueue.msgDepth == 0) ): # flag detected stop break; if ((time.time() - ts) > self.d_opts['qmf_data_timeout']): # wait max timeout self.log_msg("%s(): waiting for queue QMF data timeouted (to:%d)" % \ (inspect.stack()[0][3], self.d_opts['qmf_data_timeout'])); break; self.add_check(in_desc = 'Queue %s flag flowStopped == %s exp. False' % \ (q_name, tqueue.flowStopped), in_result = (tqueue.flowStopped == False) ); self.report(); self.add_check(in_desc = 'Queue %s has msg-depth %d exp. %d' % \ (q_name, tqueue.msgDepth, 0), in_result = (tqueue.msgDepth == 0) ); self.report(); # phase 15 - empty address / queue clean-up # --------------------------------------------------------------------- i_exec = mrg_texec(in_threading_ena = False); cmd = "qpid-config del queue %s --force"; i_exec.run_and_wait(cmd % (q_name), in_to = 10, host = self.d_cluster.get_node('B').get_host(), user = self.d_opts['ssh_user'], password = self.d_opts['ssh_pass'], in_killena = False); self.add_run_and_report(i_exec, 0); # phase Z - check cluster topology # --------------------------------------------------------------------- self.add_check(in_desc = 'Cluster should be fully up (%d of %d up)' % \ (self.d_cluster.get_width(), self.d_cluster.get_node_cnt()), in_result = \ (self.d_cluster.get_width() == self.d_cluster.get_node_cnt()) ); self.report(); # stop the cluster (conditioned) if (self.d_opts['maximize_broker_uptime'] == False): self.log_msg("Shutdown cluster from back"); self.d_cluster.goto_state(in_cluster_config = 0, in_order_from_back = True, # stop from back in_step_by_step_ena = True); # final state self.log_msg("Cluster state: %d of %d up and running" % \ (self.d_cluster.get_width(), self.d_cluster.get_node_cnt())); self.log_msg("TEST '%s' loop %d/%d finished. [tst_cnt:%d, err_cnt:%d]" % \ (inspect.stack()[0][3], i_loop+1, self.d_opts['test_loop_cnt'], self.get_tst_cnt(), self.get_err_cnt()));