Bug 710402 - qpid c++ high-level api receivers receiving concurrently from same address/queue can see queue empty in case it is not [get confused by rcv.fetch(msg, timeout=0)]
Summary: qpid c++ high-level api receivers receiving concurrently from same address/qu...
Keywords:
Status: NEW
Alias: None
Product: Red Hat Enterprise MRG
Classification: Red Hat
Component: qpid-cpp
Version: Development
Hardware: Unspecified
OS: Unspecified
medium
unspecified
Target Milestone: ---
: ---
Assignee: messaging-bugs
QA Contact: MRG Quality Engineering
URL:
Whiteboard:
Depends On:
Blocks:
TreeView+ depends on / blocked
 
Reported: 2011-06-03 10:28 UTC by Frantisek Reznicek
Modified: 2021-03-16 12:46 UTC (History)
4 users (show)

Fixed In Version:
Doc Type: Bug Fix
Doc Text:
Clone Of:
Environment:
Last Closed:
Target Upstream Version:


Attachments (Terms of Use)

Description Frantisek Reznicek 2011-06-03 10:28:23 UTC
Description of problem:

qpid c++ high-level api receivers (qpid-receive) receiving concurrently from same address/queue can see address empty in case it is not [get confused by rcv.fetch(msg, timeout=0)].

Testing scenario (focused on cluster f/c):

      # phase 1 - start cluster in maximal width (if needed)
      # ---------------------------------------------------------------------
      # phase 2 - remove the queue (if needed)
      # ---------------------------------------------------------------------
      # phase 3 - send the messages to broker A until checkpoint (on fg)
      #           all clients in sequence to the same address / queue
      # ---------------------------------------------------------------------
      # phase 4 - check that address exists and check msg depth and flow flag
      # ---------------------------------------------------------------------
      # phase 5 - start sending rest of msgs to broker A (on bg - new thread)
      #           in parallel to one address / queue
      # ---------------------------------------------------------------------
      # phase 6 - wait for particular flow control flag flowStopped (QMF) to B
      # ---------------------------------------------------------------------
      # phase 7 - doublecheck that sender[s] has not finished yet
      #           at least one sender has to be blocked
      # ---------------------------------------------------------------------
      # phase 8 - receive part of the messages on node B (to get f/c stopped)
      #           serial/sequence reception by clients from the same address
      # ---------------------------------------------------------------------
      # phase 9 - wait for particular f/c flag T->F on broker B
      # ---------------------------------------------------------------------
      # phase 10 - doublecheck that sender[s] is not blocked anymore (finished)
      #            join sender[s] & report
      # ---------------------------------------------------------------------
      # phase 12 - finish reception on node A
      #            receivers in sequence from the same queue
      # ---------------------------------------------------------------------
      # phase 13 - check of messages on receiver side
      # ---------------------------------------------------------------------
      # phase 14 - check existence of the empty address / queue (QMF)
      # ---------------------------------------------------------------------
      # phase 15 - empty address / queue clean-up
      # ---------------------------------------------------------------------
      # phase Z - check cluster topology
      # ---------------------------------------------------------------------
      # phase LAST - shutdown cluster (if --maximize_broker_uptime not given)
      # ---------------------------------------------------------------------

The observed behavior is following:
At the phase 8 and/or 12 when multiple qpid-receive clients are receiving from te same address, all instructed to receive fixed number of messages and fetch's timeout equal to 0.
All of the receivers are run either sequentially or threaded in parallel.

It was reliably observed that some of the receivers received expected number of messages, but some of them received LESS than the amount and exited with exit code 0.

I reviewed the scenario with Ken, who suggested to use fetch's timeout > 0.

After that change I was always seeing that receivers received the expected amount of messages.

Based on known observations/facts I believe there might be glitches on the internal information about queue emptiness which is read by rcv.fetch() method.

The above scenario avoids the situation with no messages in the queue and a receiver[s] waiting for message.

Version-Release number of selected component (if applicable):
  python-qpid-0.10-1.el5.noarch
  python-qpid-qmf-0.10-10.el5.x86_64
  qpid-cpp-client-0.10-7.el5.x86_64
  qpid-cpp-client-devel-0.10-7.el5.x86_64
  qpid-cpp-client-devel-docs-0.10-7.el5.x86_64
  qpid-cpp-client-ssl-0.10-7.el5.x86_64
  qpid-cpp-mrg-debuginfo-0.10-7.el5.x86_64
  qpid-cpp-server-0.10-7.el5.x86_64
  qpid-cpp-server-cluster-0.10-7.el5.x86_64
  qpid-cpp-server-devel-0.10-7.el5.x86_64
  qpid-cpp-server-ssl-0.10-7.el5.x86_64
  qpid-cpp-server-store-0.10-7.el5.x86_64
  qpid-cpp-server-xml-0.10-7.el5.x86_64
  qpid-java-client-0.10-6.el5.noarch
  qpid-java-common-0.10-6.el5.noarch
  qpid-java-example-0.10-6.el5.noarch
  qpid-qmf-0.10-10.el5.x86_64
  qpid-qmf-debuginfo-0.10-10.el5.x86_64
  qpid-qmf-devel-0.10-10.el5.x86_64
  qpid-tools-0.10-5.el5.noarch
  sesame-0.10-1.el5.x86_64
  sesame-debuginfo-0.10-1.el5.x86_64


How reproducible:
>70%

Steps to Reproduce:
see steps above and additional info section.
  
Actual results:
rcv.fetch(msg, timeout=0) can claim the address is empty in the case it is not.

Expected results:
rcv.fetch(msg, timeout=0) should not claim the address is empty in the case it is not.

Additional info:
The detailed cluster test unit with the scenario:
  # -------------------------------------------------------------------------
  # basic cluster flow control test N senders to N receivers (1 address)
  # -------------------------------------------------------------------------
  def test_cluster_flow_control_NtoNvia1(self):
    for i_loop in range(self.d_opts['test_loop_cnt']):
      # loop the test core
      self.log_msg("TEST '%s' loop %d/%d started. [tst_cnt:%d, err_cnt:%d]" % \
      (inspect.stack()[0][3], i_loop+1, self.d_opts['test_loop_cnt'],
       self.get_tst_cnt(), self.get_err_cnt()));
    
      # constants
      # possible sender/receiver capacities
      possible_capacities = [ 0, 10, 100, 1000 ];
      cli_cnt = random.randint(2, 4);       # nr. of sequence/concurrent clients
      stop_cnt = 1000;                         # queue stop count threshold
      #stop_cnt = 100;                         # queue stop count threshold
      resume_cnt = int(stop_cnt * 0.7);        # queue resume count threshold
      msg_cnt = int(stop_cnt * 1.2 / cli_cnt); # total message count per client
      # tx & rx two phases/steps per client
      msg_cnt_check_point = int(stop_cnt * 0.6 / cli_cnt);
      
      # msg_cnt correction (if needed due to above rounding float->int)
      if ( msg_cnt != (msg_cnt_check_point + (msg_cnt - msg_cnt_check_point)) ):
        msg_cnt = msg_cnt_check_point + (msg_cnt - msg_cnt_check_point);
      msg_layout_check = "msg_";
      msg_layout = 'msg_%06d';

      #print cli_cnt, stop_cnt, resume_cnt, msg_cnt, msg_cnt_check_point;
      
      q_name = "%s_mq_%d" % (inspect.stack()[0][3], i_loop);
      durability = 'no';
      if ((i_loop % 2) == 1):
        durability = 'yes';
      
      capacities = [ ];
      if (i_loop >= len(possible_capacities)):
        # different capacities a client from possible capacities
        for j in range(cli_cnt):
          capacities.append(possible_capacities[
                                random.randint(0, len(possible_capacities)-1)]);
      else:
        # same capacity a client from possible capacities
        capacities = [ possible_capacities[i_loop] ] * cli_cnt;
      
      self.log_msg( ("config: msg_cnt:%d, msg_layout:%s, thresholds:%d/%d, " + \
                     "cli_cnt: %d, q_name:%s, durability: %s, capacities:%s" )%\
                     (msg_cnt, msg_layout, stop_cnt, resume_cnt, cli_cnt,
                      q_name, durability, capacities) );

      # phase 1 - start cluster in maximal width (if needed)
      # ---------------------------------------------------------------------
      r = self.d_cluster.goto_state(
                              in_cluster_config = self.d_cluster.get_node_cnt(),
                              in_order_from_back = False,
                              in_step_by_step_ena = True);
      self.add_check(in_desc =
                            'cluster topology change (ecode: %s exp. True)' % r,
                     in_result = (r == True) );
      self.report();
      self.log_msg("Cluster state: %d of %d up and running" % \
                   (self.d_cluster.get_width(), self.d_cluster.get_node_cnt()));
      self.add_check(in_desc = 'Cluster is fully up (%d of %d up)' % \
                    (self.d_cluster.get_width(), self.d_cluster.get_node_cnt()),
                     in_result = \
                (self.d_cluster.get_width() == self.d_cluster.get_node_cnt()) );
      self.report();

      # phase 2 - remove the queue (if needed)
      # ---------------------------------------------------------------------
      i_exec = mrg_texec(in_threading_ena = False);
      cmd = "qpid-config queues | grep %s && qpid-config del queue %s --force";
      i_exec.run_and_wait(cmd % (q_name, q_name),
                          in_to = 10,
                          host = self.d_cluster.get_node('A').get_host(),
                          user = self.d_opts['ssh_user'],
                          password = self.d_opts['ssh_pass'],
                          in_killena = False);
      self.add_run_and_report(i_exec, None);

      # phase 3 - send the messages to broker A until checkpoint (on fg)
      #           all clients in sequence to the same address / queue
      # ---------------------------------------------------------------------
      for j in range(cli_cnt):
        # browse clients / addresses
        i_exec_tx = mrg_texec(in_threading_ena = False);
        cmd_tx = '~/%s/qpid-send -b "%s:5672" --failover-updates --content-string "%s" --content-index-offset %d -m %d --connection-options="{ username: guest, password:guest }" --address "%s; {create: sender, delete:receiver, node:{ durable: %s, x-declare :{ arguments :{ qpid.flow_stop_count: %sL, qpid.flow_resume_count: %dL}}}}"  --durable %s --sequence yes --capacity %d';
        # launch & no wait for the finish at the moment
        i_exec_tx.run_and_wait(cmd_tx % (self.d_rmachine_test_datadir,
                                         self.d_cluster.get_node('A').get_host(),
                                         msg_layout, j * msg_cnt_check_point,
                                         msg_cnt_check_point,
                                         q_name, (durability == 'yes'),
                                         stop_cnt, resume_cnt, durability,
                                         capacities[j]),

                        in_to = 120,
                        host = self.d_cluster.get_node('A').get_host(),
                        user = self.d_opts['ssh_user'],
                        password = self.d_opts['ssh_pass'],
                        in_killena = True);
        self.add_run_and_report(i_exec_tx, 0);

      # phase 4 - check that address exists and check msg depth and flow flag
      # ---------------------------------------------------------------------
      self.log_msg("QMF obj. query corresponding to queue %s started" % q_name);
      i_qmf = mrg_qmf_console(in_url = 'amqp://guest/guest@%s' % \
                                     (self.d_cluster.get_node('B').get_host()));
      tqueue = i_qmf.get_object(in_class = 'queue', in_name = q_name);

      ts = time.time(); # get current time snap
      while True:
        # update the object
        tqueue = i_qmf.get_object(in_class = 'queue', in_name = q_name);

        if ( (tqueue != None) and \
             (type(tqueue.flowStopped) == types.BooleanType) ):
          # flag detected stop
          break;

        if ((time.time() - ts) > self.d_opts['qmf_data_timeout']):
          # wait max timeout
          self.log_msg("%s(): waiting for queue QMF data timeouted (to:%d)" % \
                      (inspect.stack()[0][3], self.d_opts['qmf_data_timeout']));
          break;

      self.add_check(in_desc = 'Queue %s flag flowStopped == %s exp. False' % \
                                                   (q_name, tqueue.flowStopped),
                     in_result = (tqueue.flowStopped == False) );
      self.report();
      self.add_check(in_desc = 'Queue %s has msg-depth %d exp. %d' % \
                       (q_name, tqueue.msgDepth, msg_cnt_check_point * cli_cnt),
             in_result = (tqueue.msgDepth == (msg_cnt_check_point * cli_cnt)) );
      self.report();


      # phase 5 - start sending rest of msgs to broker A (on bg - new thread)
      #           in parallel to one address / queue
      # ---------------------------------------------------------------------
      i_exec_txs = [ ];
      for j in range(cli_cnt):
        # browse clients / addresses
        i_exec_tx = mrg_texec(in_threading_ena = True);
        cmd_tx = '~/%s/qpid-send -b "%s:5672" --failover-updates --content-string "%s" --content-index-offset %d -m %d --connection-options="{ username: guest, password:guest }" --address "%s; {create: sender, delete:receiver, node:{ durable: %s, x-declare :{ arguments :{ qpid.flow_stop_count: %sL, qpid.flow_resume_count: %dL}}}}" --durable %s --sequence yes --capacity %d'
        # launch & no wait for the finish at the moment
        i_exec_tx.start(cmd_tx % (self.d_rmachine_test_datadir,
                                  self.d_cluster.get_node('A').get_host(),
                                  msg_layout,
              (msg_cnt_check_point * cli_cnt) + j*(msg_cnt-msg_cnt_check_point),
                                  msg_cnt-msg_cnt_check_point, q_name,
                                  (durability == 'yes'), stop_cnt,
                                  resume_cnt, durability, capacities[j]),
                        in_to = 120,
                        host = self.d_cluster.get_node('A').get_host(),
                        user = self.d_opts['ssh_user'],
                        password = self.d_opts['ssh_pass'],
                        in_killena = True);
        i_exec_txs.append(i_exec_tx);
      
      # phase 6 - wait for particular flow control flag flowStopped (QMF) to B
      # ---------------------------------------------------------------------
      self.log_msg("QMF obj. query corresponding to queue %s started" % q_name);

      # reusing i_qmf from step 4
      tqueue = i_qmf.get_object(in_class = 'queue', in_name = q_name);
      ts = time.time(); # get current time snap
      while True:
        # update the object
        tqueue = i_qmf.get_object(in_class = 'queue', in_name = q_name);

        if ( (tqueue != None) and (tqueue.flowStopped == True)):
          # flag detected stop
          break;

        if ((time.time() - ts) > self.d_opts['qmf_data_timeout']):
          # wait max timeout
          self.log_msg("%s(): waiting for queue QMF data timeouted (to:%d)" % \
                      (inspect.stack()[0][3], self.d_opts['qmf_data_timeout']));
          break;

      self.add_check(in_desc = 'Queue %s flag flowStopped == %s exp. True' % \
                                                   (q_name, tqueue.flowStopped),
                     in_result = (tqueue.flowStopped == True) );
      self.report();
      m, M = get_fc_stop_range(in_stop_cnt_thr = stop_cnt,
                               in_capacity = sum(capacities),
                               in_max_cnt = msg_cnt * cli_cnt);
      self.add_check(in_desc = 'Queue %s has msg-depth %d exp. %d ... %d' % \
                                     (q_name, tqueue.msgDepth, m, M),
                     in_result = ( (tqueue.msgDepth >= m) and \
                                   (tqueue.msgDepth <= M) ) );
      self.report();
      
      # phase 7 - doublecheck that sender[s] has not finished yet
      #           at least one sender has to be blocked
      # ---------------------------------------------------------------------
      blocked_cnt = 0;
      for j in range(cli_cnt):
        # browse clients / addresses
        if (i_exec_txs[j].finished() == False):
          blocked_cnt += 1;
      
      self.add_check(in_desc = 'A sender has to be blocked atm (blocked_cnt:%d)'\
                      % blocked_cnt,
                     in_result = (blocked_cnt > 0) );
      self.report();

      
      # phase 8 - receive part of the messages on node B (to get f/c stopped)
      #           serial/sequence reception by clients from the same address
      # ---------------------------------------------------------------------
      i_exec_rxs1 = [ ];
      for j in range(cli_cnt):
        # browse clients / addresses
        i_exec_rx = mrg_texec(in_threading_ena = False);
        cmd_rx = '~/%s/qpid-receive -b "%s:5672" --failover-updates  --connection-options="{ username: guest, password:guest }" --address "%s ; {create: sender, node:{ durable: %s, x-declare :{ arguments :{ qpid.flow_stop_count: %dL, qpid.flow_resume_count: %dL}}}}" --ignore-duplicates -m %d'
        i_exec_rx.run_and_wait(cmd_rx % (self.d_rmachine_test_datadir,
                                         self.d_cluster.get_node('B').get_host(),
                                         q_name, (durability == 'yes'),
                                         stop_cnt, resume_cnt,
                                         (msg_cnt - msg_cnt_check_point)),
                        in_to = 120,
                        host = self.d_cluster.get_node('B').get_host(),
                        user = self.d_opts['ssh_user'],
                        password = self.d_opts['ssh_pass'],
                        in_killena = True);
        i_exec_rxs1.append(i_exec_rx);
        self.add_run_and_report(i_exec_rx, 0);
        
        # check number of received messages per receiver
        i_tp = mrg_tp( i_exec_rx.get_stdout() );
        i_tp.read_input();
        i_tp.do_filter(msg_layout_check);
        self.add_check(in_desc = 'Receiver 1-%d data check (%d exp. %d)' % \
                  (j, len(i_tp.get_strlist()), (msg_cnt - msg_cnt_check_point)),
                       in_result = (len(i_tp.get_strlist()) == \
                                               (msg_cnt - msg_cnt_check_point)),
                       in_stdout = i_exec_rx.get_stdout(),
                       in_stderr = i_exec_rx.get_stderr());
        self.report();


      # phase 9 - wait for particular f/c flag T->F on broker B
      # ---------------------------------------------------------------------
      self.log_msg("QMF obj. query corresponding to queue %s started" % q_name);

      # reusing i_qmf from step 4
      tqueue = i_qmf.get_object(in_class = 'queue', in_name = q_name);
      ts = time.time(); # get current time snap
      while True:
        # update the object
        tqueue = i_qmf.get_object(in_class = 'queue', in_name = q_name);

        if ( (tqueue != None) and (tqueue.flowStopped == False)):
          # flag detected stop
          break;

        if ((time.time() - ts) > (self.d_opts['qmf_data_timeout']/2)):
          for k in i_exec_rxs1:
            print k, k.finished(),
            if(k.finished()):
              print self.format_str(k.get_stdout());
              
          print tqueue.msgDepth, tqueue.flowStopped;
        
        if ((time.time() - ts) > self.d_opts['qmf_data_timeout']):
          # wait max timeout
          self.log_msg("%s(): waiting for queue QMF data timeouted (to:%d)" % \
                      (inspect.stack()[0][3], self.d_opts['qmf_data_timeout']));
          break;

      self.add_check(in_desc = 'Queue %s flag flowStopped == %s exp. False' % \
                                                   (q_name, tqueue.flowStopped),
                     in_result = (tqueue.flowStopped == False) );
      self.report();
      self.add_check(in_desc = 'Queue %s has msg-depth %d exp. %d' % \
                         (q_name, tqueue.msgDepth, msg_cnt_check_point*cli_cnt),
                 in_result = (tqueue.msgDepth == msg_cnt_check_point*cli_cnt) );
      self.report();

      
      # phase 10 - doublecheck that sender[s] is not blocked anymore (finished)
      #            join sender[s] & report
      # ---------------------------------------------------------------------
      for j in range(cli_cnt):
        # browse clients / addresses
        self.add_check(in_desc = 'Sender has to be finished atm (finished:%s)' %\
                                                       i_exec_txs[j].finished(),
                       in_result = (i_exec_txs[j].finished() == True) );
        self.report();
        if (i_exec_txs[j].finished() == True):
          i_exec_txs[j].join();
        
        # report the (finished & joined) sender
        self.add_run_and_report(i_exec_txs[j], 0);

      # phase 12 - finish reception on node A
      #            receivers in sequence from the same queue
      # ---------------------------------------------------------------------
      i_exec_rxs2 = [ ];
      for j in range(cli_cnt):
        # browse clients / addresses
        i_exec_rx = mrg_texec(in_threading_ena = False);
        cmd_rx = '~/%s/qpid-receive -b "%s:5672" --failover-updates  --connection-options="{ username: guest, password:guest }" --address "%s ; {create: sender, node:{ durable: %s, x-declare :{ arguments :{ qpid.flow_stop_count: %dL, qpid.flow_resume_count: %dL}}}}" --ignore-duplicates -m %d'
        i_exec_rx.run_and_wait(cmd_rx % (self.d_rmachine_test_datadir,
                                         self.d_cluster.get_node('A').get_host(),
                                         q_name, (durability == 'yes'),
                                         stop_cnt, resume_cnt,
                                         msg_cnt_check_point),
                        in_to = 120,
                        host = self.d_cluster.get_node('A').get_host(),
                        user = self.d_opts['ssh_user'],
                        password = self.d_opts['ssh_pass'],
                        in_killena = True);
        self.add_run_and_report(i_exec_rx, 0);
        i_exec_rxs2.append(i_exec_rx);

        # check number of received messages per receiver
        i_tp = mrg_tp( i_exec_rx.get_stdout() );
        i_tp.read_input();
        i_tp.do_filter(msg_layout_check);
        self.add_check(in_desc = 'Receiver 2-%d data check (%d exp. %d)' % \
                              (j, len(i_tp.get_strlist()), msg_cnt_check_point),
                       in_result = (len(i_tp.get_strlist()) == \
                                                           msg_cnt_check_point),
                       in_stdout = i_exec_rx.get_stdout(),
                       in_stderr = i_exec_rx.get_stderr());
        self.report();
      

      # phase 13 - check of messages on receiver side
      # ---------------------------------------------------------------------
      lstdout = [ ];
      lstderr = [ ];
      for j in range(cli_cnt):
        # browse clients / addresses
        lstdout = lstdout + i_exec_rxs1[j].get_stdout();
        lstdout = lstdout + i_exec_rxs2[j].get_stdout();
        
        lstderr = lstderr + i_exec_rxs1[j].get_stderr();
        lstderr = lstderr + i_exec_rxs2[j].get_stderr();


      i_tp = mrg_tp( lstdout );
      i_tp.read_input();
      i_tp.do_filter(msg_layout_check);
      self.add_check(in_desc = 'Receiver data check (%d exp. %d)' % \
                                     (len(i_tp.get_strlist()), msg_cnt*cli_cnt),
                     in_result = (len(i_tp.get_strlist()) == msg_cnt*cli_cnt),
                     in_stdout = lstdout,
                     in_stderr = lstderr);
      self.report();

      # phase 14 - check existence of the empty address / queue (QMF)
      # ---------------------------------------------------------------------
      self.log_msg("QMF obj. query corresponding to queue %s started" % q_name);

      # reusing i_qmf from step 4
      tqueue = i_qmf.get_object(in_class = 'queue', in_name = q_name);
      ts = time.time(); # get current time snap
      while True:
        # update the object
        tqueue = i_qmf.get_object(in_class = 'queue', in_name = q_name);

        if ( (tqueue != None) and (tqueue.flowStopped == False) and \
             (tqueue.msgDepth == 0) ):
          # flag detected stop
          break;

        if ((time.time() - ts) > self.d_opts['qmf_data_timeout']):
          # wait max timeout
          self.log_msg("%s(): waiting for queue QMF data timeouted (to:%d)" % \
                      (inspect.stack()[0][3], self.d_opts['qmf_data_timeout']));
          break;

      self.add_check(in_desc = 'Queue %s flag flowStopped == %s exp. False' % \
                                                   (q_name, tqueue.flowStopped),
                     in_result = (tqueue.flowStopped == False) );
      self.report();
      self.add_check(in_desc = 'Queue %s has msg-depth %d exp. %d' % \
                                                   (q_name, tqueue.msgDepth, 0),
                     in_result = (tqueue.msgDepth == 0) );
      self.report();
      
      # phase 15 - empty address / queue clean-up
      # ---------------------------------------------------------------------
      i_exec = mrg_texec(in_threading_ena = False);
      cmd = "qpid-config del queue %s --force";
      i_exec.run_and_wait(cmd % (q_name),
                          in_to = 10,
                          host = self.d_cluster.get_node('B').get_host(),
                          user = self.d_opts['ssh_user'],
                          password = self.d_opts['ssh_pass'],
                          in_killena = False);
      self.add_run_and_report(i_exec, 0);
      
      # phase Z - check cluster topology
      # ---------------------------------------------------------------------
      self.add_check(in_desc = 'Cluster should be fully up (%d of %d up)' % \
                    (self.d_cluster.get_width(), self.d_cluster.get_node_cnt()),
                     in_result = \
                (self.d_cluster.get_width() == self.d_cluster.get_node_cnt()) );
      self.report();

      # stop the cluster (conditioned)
      if (self.d_opts['maximize_broker_uptime'] == False):
        self.log_msg("Shutdown cluster from back");
        self.d_cluster.goto_state(in_cluster_config = 0,
                                  in_order_from_back = True,  # stop from back
                                  in_step_by_step_ena = True);

      # final state
      self.log_msg("Cluster state: %d of %d up and running" % \
                   (self.d_cluster.get_width(), self.d_cluster.get_node_cnt()));
      self.log_msg("TEST '%s' loop %d/%d finished. [tst_cnt:%d, err_cnt:%d]" % \
      (inspect.stack()[0][3], i_loop+1, self.d_opts['test_loop_cnt'],
       self.get_tst_cnt(), self.get_err_cnt()));

Comment 2 Gordon Sim 2011-06-03 10:42:29 UTC
My guess is this is related to prefetch and can be avoided by setting --capacity to 0 on qpid-receive. 

By default the capacity is 1000, meaning that more messages can be prefetched by one receiver than it intends to consume. Those messages *will* be returned to the queue when that receiver completes, but that may happen after some other receiver has been unable to get any more.


Note You need to log in before you can comment on or make changes to this bug.