Login
[x]
Log in using an account from:
Fedora Account System
Red Hat Associate
Red Hat Customer
Or login using a Red Hat Bugzilla account
Forgot Password
Login:
Hide Forgot
Create an Account
Red Hat Bugzilla – Attachment 628211 Details for
Bug 867030
HA throughput issues during longevity testing
[?]
New
Simple Search
Advanced Search
My Links
Browse
Requests
Reports
Current State
Search
Tabular reports
Graphical reports
Duplicates
Other Reports
User Changes
Plotly Reports
Bug Status
Bug Severity
Non-Defaults
|
Product Dashboard
Help
Page Help!
Bug Writing Guidelines
What's new
Browser Support Policy
5.0.4.rh83 Release notes
FAQ
Guides index
User guide
Web Services
Contact
Legal
This site requires JavaScript to be enabled to function correctly, please enable it.
[patch]
Proposed patch
bz867030.patch (text/plain), 6.87 KB, created by
Jason Dillaman
on 2012-10-16 15:12:42 UTC
(
hide
)
Description:
Proposed patch
Filename:
MIME Type:
Creator:
Jason Dillaman
Created:
2012-10-16 15:12:42 UTC
Size:
6.87 KB
patch
obsolete
>diff --git a/qpid/cpp/include/qpid/framing/SequenceNumber.h b/qpid/cpp/include/qpid/framing/SequenceNumber.h >index 00fa246..baa4b97 100644 >--- a/qpid/cpp/include/qpid/framing/SequenceNumber.h >+++ b/qpid/cpp/include/qpid/framing/SequenceNumber.h >@@ -23,6 +23,7 @@ > > #include "qpid/framing/amqp_types.h" > #include <boost/operators.hpp> >+#include <boost/functional/hash/hash.hpp> > #include <iosfwd> > #include "qpid/CommonImportExport.h" > >@@ -71,6 +72,11 @@ inline SequenceNumber operator-(const SequenceNumber& a, int32_t n) { > return SequenceNumber(a.getValue() - n); > } > >+inline std::size_t hash_value(const SequenceNumber& sequenceNumber) { >+ boost::hash<uint32_t> hasher; >+ return hasher(sequenceNumber.getValue()); >+} >+ > struct Window > { > SequenceNumber hwm; >diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp >index e04803c..aa90db1 100644 >--- a/qpid/cpp/src/qpid/broker/Queue.cpp >+++ b/qpid/cpp/src/qpid/broker/Queue.cpp >@@ -1909,7 +1909,7 @@ Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {} > > bool Queue::UsageBarrier::acquire() > { >- Monitor::ScopedLock l(parent.messageLock); /** @todo: use a dedicated lock instead of messageLock */ >+ Monitor::ScopedLock l(usageLock); > if (parent.deleted) { > return false; > } else { >@@ -1920,15 +1920,15 @@ bool Queue::UsageBarrier::acquire() > > void Queue::UsageBarrier::release() > { >- Monitor::ScopedLock l(parent.messageLock); >- if (--count == 0) parent.messageLock.notifyAll(); >+ Monitor::ScopedLock l(usageLock); >+ if (--count == 0) usageLock.notifyAll(); > } > > void Queue::UsageBarrier::destroy() > { >- Monitor::ScopedLock l(parent.messageLock); >+ Monitor::ScopedLock l(usageLock); > parent.deleted = true; >- while (count) parent.messageLock.wait(); >+ while (count) usageLock.wait(); > } > > }} >diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h >index d4beed9..f4b22b2 100644 >--- a/qpid/cpp/src/qpid/broker/Queue.h >+++ b/qpid/cpp/src/qpid/broker/Queue.h >@@ -75,6 +75,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, > { > Queue& parent; > uint count; >+ qpid::sys::Monitor usageLock; > > UsageBarrier(Queue&); > bool acquire(); >@@ -125,7 +126,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, > * o consumerCount (TBD: move under separate lock) > * o Queue::UsageBarrier (TBD: move under separate lock) > */ >- mutable qpid::sys::Monitor messageLock; >+ mutable qpid::sys::Mutex messageLock; > mutable qpid::sys::Mutex ownershipLock; > mutable uint64_t persistenceId; > framing::FieldTable settings; >diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp >index fe8ca9f..37c172b 100644 >--- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp >+++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp >@@ -219,7 +219,7 @@ bool FlowToDiskPolicy::checkLimit(boost::intrusive_ptr<Message> m) > > RingQueuePolicy::RingQueuePolicy(const std::string& _name, > uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : >- QueuePolicy(_name, _maxCount, _maxSize, _type), queue(MAX_PRIORITY_LEVELS, Messages()), strict(_type == RING_STRICT) {} >+ QueuePolicy(_name, _maxCount, _maxSize, _type), queue(MAX_PRIORITY_LEVELS, OrderedMessages()), strict(_type == RING_STRICT) {} > > void RingQueuePolicy::enqueued(const QueuedMessage& m) > { >@@ -279,7 +279,7 @@ bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m) > queueIter != queue.end(); > ++queueIter) > { >- Messages& messages = *queueIter; >+ OrderedMessages& messages = *queueIter; > if (messages.empty()) continue; > > QueuedMessage oldest = messages.front(); >@@ -308,16 +308,18 @@ bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m) > > void RingQueuePolicy::getPendingDequeues(Messages& result) > { >- result = pendingDequeues; >+ result = Messages(pendingDequeues.begin(), pendingDequeues.end()); > } > >-bool RingQueuePolicy::find(const QueuedMessage& m, Messages& q, bool remove) >+bool RingQueuePolicy::find(const QueuedMessage& m, OrderedMessages& q, bool remove) > { >- for (Messages::iterator i = q.begin(); i != q.end(); i++) { >- if (i->payload == m.payload) { >- if (remove) q.erase(i); >- return true; >- } >+ typedef OrderedMessages::nth_index<1>::type HashedMessages; >+ >+ HashedMessages& hashedMessages = q.get<1>(); >+ HashedMessages::iterator i = hashedMessages.find(m.position); >+ if (i != hashedMessages.end()) { >+ if (remove) hashedMessages.erase(i); >+ return true; > } > return false; > } >diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.h b/qpid/cpp/src/qpid/broker/QueuePolicy.h >index 659dcb0..6ad3a4b 100644 >--- a/qpid/cpp/src/qpid/broker/QueuePolicy.h >+++ b/qpid/cpp/src/qpid/broker/QueuePolicy.h >@@ -24,6 +24,10 @@ > #include <deque> > #include <iostream> > #include <memory> >+#include <boost/multi_index_container.hpp> >+#include <boost/multi_index/hashed_index.hpp> >+#include <boost/multi_index/member.hpp> >+#include <boost/multi_index/sequenced_index.hpp> > #include "qpid/broker/BrokerImportExport.h" > #include "qpid/broker/QueuedMessage.h" > #include "qpid/framing/FieldTable.h" >@@ -113,13 +117,23 @@ class RingQueuePolicy : public QueuePolicy > bool checkLimit(boost::intrusive_ptr<Message> msg); > void getPendingDequeues(Messages& result); > private: >- typedef std::vector<Messages> PriorityLevels; >+ typedef boost::multi_index_container < >+ QueuedMessage, >+ boost::multi_index::indexed_by< >+ boost::multi_index::sequenced<>, >+ boost::multi_index::hashed_unique< >+ boost::multi_index::member<QueuedMessage, framing::SequenceNumber, &QueuedMessage::position> >+ > >+ > >+ > OrderedMessages; >+ >+ typedef std::vector<OrderedMessages> PriorityLevels; > > PriorityLevels queue; >- Messages pendingDequeues; >+ OrderedMessages pendingDequeues; > const bool strict; > >- bool find(const QueuedMessage&, Messages&, bool remove); >+ bool find(const QueuedMessage&, OrderedMessages&, bool remove); > }; > > }} >diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp >index 47baa6f..4a6fef3 100644 >--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp >+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp >@@ -149,7 +149,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa > args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, > false/*exclusive*/, "", 0, settings); > // FIXME aconway 2012-05-22: use a finite credit window? >- peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); >+ peer.getMessage().flow(getName(), 0, 100); > peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); > } > catch(const exception& e) {
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
View Attachment As Diff
View Attachment As Raw
Actions:
View
|
Diff
Attachments on
bug 867030
: 628211