Login
[x]
Log in using an account from:
Fedora Account System
Red Hat Associate
Red Hat Customer
Or login using a Red Hat Bugzilla account
Forgot Password
Login:
Hide Forgot
Create an Account
Red Hat Bugzilla – Attachment 616619 Details for
Bug 854666
cluster initial update stall when a queue has >10k messages with message groups set
[?]
New
Simple Search
Advanced Search
My Links
Browse
Requests
Reports
Current State
Search
Tabular reports
Graphical reports
Duplicates
Other Reports
User Changes
Plotly Reports
Bug Status
Bug Severity
Non-Defaults
|
Product Dashboard
Help
Page Help!
Bug Writing Guidelines
What's new
Browser Support Policy
5.0.4.rh83 Release notes
FAQ
Guides index
User guide
Web Services
Contact
Legal
This site requires JavaScript to be enabled to function correctly, please enable it.
[patch]
Patch proposal
bz854666.patch (text/plain), 11.65 KB, created by
Pavel Moravec
on 2012-09-24 15:26:30 UTC
(
hide
)
Description:
Patch proposal
Filename:
MIME Type:
Creator:
Pavel Moravec
Created:
2012-09-24 15:26:30 UTC
Size:
11.65 KB
patch
obsolete
>diff -rup a/src/qpid/broker/MessageGroupManager.cpp b/src/qpid/broker/MessageGroupManager.cpp >--- a/src/qpid/broker/MessageGroupManager.cpp 2012-03-28 23:00:40.000000000 +0200 >+++ b/src/qpid/broker/MessageGroupManager.cpp 2012-09-24 15:54:09.949774659 +0200 >@@ -367,103 +367,90 @@ namespace { > const std::string GROUP_ACQUIRED_CT("acquired-ct"); > const std::string GROUP_POSITIONS("positions"); > const std::string GROUP_ACQUIRED_MSGS("acquired-msgs"); >- const std::string GROUP_STATE("group-state"); >+ const std::string GROUP_NR("number"); > } > > > /** Runs on UPDATER to snapshot current state */ >-void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const >+bool MessageGroupManager::getState(qpid::framing::FieldTable& state ) const > { > using namespace qpid::framing; >- state.clear(); >- framing::Array groupState(TYPE_CODE_MAP); >- for (GroupMap::const_iterator g = messageGroups.begin(); >- g != messageGroups.end(); ++g) { >- >- framing::FieldTable group; >- group.setString(GROUP_NAME, g->first); >- group.setString(GROUP_OWNER, g->second.owner); >- group.setInt(GROUP_ACQUIRED_CT, g->second.acquired); >- framing::Array positions(TYPE_CODE_UINT32); >- framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN); >- for (GroupState::MessageFifo::const_iterator p = g->second.members.begin(); >- p != g->second.members.end(); ++p) { >- positions.push_back(framing::Array::ValuePtr(new IntegerValue( p->position ))); >- acquiredMsgs.push_back(framing::Array::ValuePtr(new BoolValue( p->acquired ))); >- } >- group.setArray(GROUP_POSITIONS, positions); >- group.setArray(GROUP_ACQUIRED_MSGS, acquiredMsgs); >- groupState.push_back(framing::Array::ValuePtr(new FieldTableValue(group))); >+ GroupMap::const_iterator g; >+ if (state.count() == 0) { >+ g = messageGroups.begin(); >+ state.setInt(GROUP_NR, 0); >+ } >+ else { >+ g = messageGroups.find(state.getAsString(GROUP_NAME)); >+ g++; // we need to set state of the _next_ group.. >+ int group_nr = state.getAsInt(GROUP_NR); >+ state.clear(); >+ state.setInt(GROUP_NR, group_nr+1); > } >- state.setArray(GROUP_STATE, groupState); > >- QPID_LOG(debug, "Queue \"" << qName << "\": replicating message group state, key=" << groupIdHeader); >-} >+ state.setString(GROUP_NAME, g->first); >+ state.setString(GROUP_OWNER, g->second.owner); >+ state.setInt(GROUP_ACQUIRED_CT, g->second.acquired); >+ framing::Array positions(TYPE_CODE_UINT32); >+ framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN); >+ for (GroupState::MessageFifo::const_iterator p = g->second.members.begin(); >+ p != g->second.members.end(); ++p) { >+ positions.push_back(framing::Array::ValuePtr(new IntegerValue( p->position ))); >+ acquiredMsgs.push_back(framing::Array::ValuePtr(new BoolValue( p->acquired ))); >+ } >+ state.setArray(GROUP_POSITIONS, positions); >+ state.setArray(GROUP_ACQUIRED_MSGS, acquiredMsgs); > >+ QPID_LOG(debug, "Queue \"" << qName << "\": replicating message group state, key=" << groupIdHeader << ", group " << g->first); >+ return (++g != messageGroups.end()); // true iff some more groups exist >+} > > /** called on UPDATEE to set state from snapshot */ > void MessageGroupManager::setState(const qpid::framing::FieldTable& state) > { > using namespace qpid::framing; >- messageGroups.clear(); >- freeGroups.clear(); >- cachedGroup = 0; >- >- framing::Array groupState(TYPE_CODE_MAP); >+ if (state.getAsInt(GROUP_NR) == 0) { >+ messageGroups.clear(); >+ freeGroups.clear(); >+ cachedGroup = 0; >+ } > >- bool ok = state.getArray(GROUP_STATE, groupState); >+ MessageGroupManager::GroupState groupstate; >+ if (!state.isSet(GROUP_NAME) || !state.isSet(GROUP_OWNER) || !state.isSet(GROUP_ACQUIRED_CT)) { >+ QPID_LOG(error, "Invalid message group state information for queue \"" << >+ qName << "\": fields missing error!"); >+ return; >+ } >+ groupstate.group = state.getAsString(GROUP_NAME); >+ groupstate.owner = state.getAsString(GROUP_OWNER); >+ groupstate.acquired = state.getAsInt(GROUP_ACQUIRED_CT); >+ framing::Array positions(TYPE_CODE_UINT32); >+ bool ok = state.getArray(GROUP_POSITIONS, positions); > if (!ok) { >- QPID_LOG(error, "Unable to find message group state information for queue \"" << >- qName << "\": cluster inconsistency error!"); >+ QPID_LOG(error, "Invalid message group state information for queue \"" << >+ qName << "\": position encoding error!"); >+ return; >+ } >+ framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN); >+ ok = state.getArray(GROUP_ACQUIRED_MSGS, acquiredMsgs); >+ if (!ok || positions.count() != acquiredMsgs.count()) { >+ QPID_LOG(error, "Invalid message group state information for queue \"" << >+ qName << "\": acquired flag encoding error!"); > return; > } > >- for (framing::Array::const_iterator g = groupState.begin(); >- g != groupState.end(); ++g) { >- framing::FieldTable group; >- ok = framing::getEncodedValue<FieldTable>(*g, group); >- if (!ok) { >- QPID_LOG(error, "Invalid message group state information for queue \"" << >- qName << "\": table encoding error!"); >- return; >- } >- MessageGroupManager::GroupState state; >- if (!group.isSet(GROUP_NAME) || !group.isSet(GROUP_OWNER) || !group.isSet(GROUP_ACQUIRED_CT)) { >- QPID_LOG(error, "Invalid message group state information for queue \"" << >- qName << "\": fields missing error!"); >- return; >- } >- state.group = group.getAsString(GROUP_NAME); >- state.owner = group.getAsString(GROUP_OWNER); >- state.acquired = group.getAsInt(GROUP_ACQUIRED_CT); >- framing::Array positions(TYPE_CODE_UINT32); >- ok = group.getArray(GROUP_POSITIONS, positions); >- if (!ok) { >- QPID_LOG(error, "Invalid message group state information for queue \"" << >- qName << "\": position encoding error!"); >- return; >- } >- framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN); >- ok = group.getArray(GROUP_ACQUIRED_MSGS, acquiredMsgs); >- if (!ok || positions.count() != acquiredMsgs.count()) { >- QPID_LOG(error, "Invalid message group state information for queue \"" << >- qName << "\": acquired flag encoding error!"); >- return; >- } >- >- Array::const_iterator a = acquiredMsgs.begin(); >- for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p) { >- GroupState::MessageState mState((*p)->getIntegerValue<uint32_t, 4>()); >- mState.acquired = (*a++)->getIntegerValue<bool>(); >- state.members.push_back(mState); >- } >- >- messageGroups[state.group] = state; >- if (!state.owned()) { >- assert(state.members.size()); >- freeGroups[state.members.front().position] = &messageGroups[state.group]; >- } >+ Array::const_iterator a = acquiredMsgs.begin(); >+ for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p) { >+ GroupState::MessageState mState((*p)->getIntegerValue<uint32_t, 4>()); >+ mState.acquired = (*a++)->getIntegerValue<bool>(); >+ groupstate.members.push_back(mState); >+ } >+ >+ messageGroups[groupstate.group] = groupstate; >+ if (!groupstate.owned()) { >+ assert(groupstate.members.size()); >+ freeGroups[groupstate.members.front().position] = &messageGroups[groupstate.group]; > } > >- QPID_LOG(debug, "Queue \"" << qName << "\": message group state replicated, key =" << groupIdHeader) >+ QPID_LOG(debug, "Queue \"" << qName << "\": message group state replicated, key=" << groupIdHeader << ", group " << groupstate.group); > } >diff -rup a/src/qpid/broker/MessageGroupManager.h b/src/qpid/broker/MessageGroupManager.h >--- a/src/qpid/broker/MessageGroupManager.h 2012-03-28 23:00:40.000000000 +0200 >+++ b/src/qpid/broker/MessageGroupManager.h 2012-09-24 15:55:52.436896512 +0200 >@@ -112,7 +112,7 @@ class MessageGroupManager : public State > void dequeued( const QueuedMessage& qm ); > void consumerAdded( const Consumer& ) {}; > void consumerRemoved( const Consumer& ) {}; >- void getState(qpid::framing::FieldTable& state ) const; >+ bool getState(qpid::framing::FieldTable& state ) const; > void setState(const qpid::framing::FieldTable&); > > // MessageDistributor iface >diff -rup a/src/qpid/broker/QueueFlowLimit.cpp b/src/qpid/broker/QueueFlowLimit.cpp >--- a/src/qpid/broker/QueueFlowLimit.cpp 2012-05-17 00:28:22.000000000 +0200 >+++ b/src/qpid/broker/QueueFlowLimit.cpp 2012-09-24 15:52:31.012811585 +0200 >@@ -338,7 +338,7 @@ namespace { > } > > /** Runs on UPDATER to snapshot current state */ >-void QueueFlowLimit::getState(qpid::framing::FieldTable& state ) const >+bool QueueFlowLimit::getState(qpid::framing::FieldTable& state ) const > { > sys::Mutex::ScopedLock l(indexLock); > state.clear(); >@@ -356,6 +356,7 @@ void QueueFlowLimit::getState(qpid::fram > state.setArray("pendingMsgSeqs", seqs); > } > QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicating pending msgs, range=" << ss); >+ return false; > } > > >diff -rup a/src/qpid/broker/QueueFlowLimit.h b/src/qpid/broker/QueueFlowLimit.h >--- a/src/qpid/broker/QueueFlowLimit.h 2011-10-07 16:21:48.000000000 +0200 >+++ b/src/qpid/broker/QueueFlowLimit.h 2012-09-24 15:56:01.887812442 +0200 >@@ -89,7 +89,7 @@ class Broker; > QPID_BROKER_EXTERN void requeued(const QueuedMessage&) {}; > > /** for clustering: */ >- QPID_BROKER_EXTERN void getState(qpid::framing::FieldTable&) const; >+ QPID_BROKER_EXTERN bool getState(qpid::framing::FieldTable&) const; > QPID_BROKER_EXTERN void setState(const qpid::framing::FieldTable&); > > uint32_t getFlowStopCount() const { return flowStopCount; } >diff -rup a/src/qpid/broker/StatefulQueueObserver.h b/src/qpid/broker/StatefulQueueObserver.h >--- a/src/qpid/broker/StatefulQueueObserver.h 2011-04-28 14:25:59.000000000 +0200 >+++ b/src/qpid/broker/StatefulQueueObserver.h 2012-09-24 15:55:41.445797808 +0200 >@@ -47,7 +47,7 @@ class StatefulQueueObserver : public Que > /** This method should return the observer's internal state as an opaque > * map. > */ >- virtual void getState(qpid::framing::FieldTable& state ) const = 0; >+ virtual bool getState(qpid::framing::FieldTable& state ) const = 0; > > /** The input map represents the internal state of the peer observer that > * this observer should synchonize to. >diff -rup a/src/qpid/cluster/UpdateClient.cpp b/src/qpid/cluster/UpdateClient.cpp >--- a/src/qpid/cluster/UpdateClient.cpp 2012-07-19 17:54:08.000000000 +0200 >+++ b/src/qpid/cluster/UpdateClient.cpp 2012-09-24 15:52:44.392909635 +0200 >@@ -708,12 +708,16 @@ void UpdateClient::updateObserver(const > boost::shared_ptr<broker::QueueObserver> o) > { > qpid::framing::FieldTable state; >+ state.clear(); > broker::StatefulQueueObserver *so = dynamic_cast<broker::StatefulQueueObserver *>(o.get()); > if (so) { >- so->getState( state ); >- std::string id(so->getId()); >- QPID_LOG(debug, *this << " updating queue " << q->getName() << "'s observer " << id); >- ClusterConnectionProxy(session).queueObserverState( q->getName(), id, state ); >+ bool more_to_update = true; >+ while (more_to_update) { >+ more_to_update = so->getState( state ); >+ std::string id(so->getId()); >+ QPID_LOG(debug, *this << " updating queue " << q->getName() << "'s observer " << id); >+ ClusterConnectionProxy(session).queueObserverState( q->getName(), id, state ); >+ } > } > } >
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
View Attachment As Diff
View Attachment As Raw
Actions:
View
|
Diff
Attachments on
bug 854666
: 616619