Login
[x]
Log in using an account from:
Fedora Account System
Red Hat Associate
Red Hat Customer
Or login using a Red Hat Bugzilla account
Forgot Password
Login:
Hide Forgot
Create an Account
Red Hat Bugzilla – Attachment 617570 Details for
Bug 860701
QMF queries for HA replication take too long to process
[?]
New
Simple Search
Advanced Search
My Links
Browse
Requests
Reports
Current State
Search
Tabular reports
Graphical reports
Duplicates
Other Reports
User Changes
Plotly Reports
Bug Status
Bug Severity
Non-Defaults
|
Product Dashboard
Help
Page Help!
Bug Writing Guidelines
What's new
Browser Support Policy
5.0.4.rh83 Release notes
FAQ
Guides index
User guide
Web Services
Contact
Legal
This site requires JavaScript to be enabled to function correctly, please enable it.
[patch]
Quick patch to greatly reduce lock contention within QMF
qpid-4286.patch (text/plain), 120.62 KB, created by
Jason Dillaman
on 2012-09-26 14:31:57 UTC
(
hide
)
Description:
Quick patch to greatly reduce lock contention within QMF
Filename:
MIME Type:
Creator:
Jason Dillaman
Created:
2012-09-26 14:31:57 UTC
Size:
120.62 KB
patch
obsolete
>diff --git a/qpid/cpp/include/qpid/management/Manageable.h b/qpid/cpp/include/qpid/management/Manageable.h >index 1e5cd8b..fd1e604 100644 >--- a/qpid/cpp/include/qpid/management/Manageable.h >+++ b/qpid/cpp/include/qpid/management/Manageable.h >@@ -55,7 +55,7 @@ class QPID_COMMON_EXTERN Manageable > // > // This accessor function returns a pointer to the management object. > // >- virtual ManagementObject* GetManagementObject(void) const = 0; >+ virtual ManagementObject::shared_ptr GetManagementObject(void) const = 0; > > // Every "Manageable" object must implement ManagementMethod. This > // function is called when a remote management client invokes a method >diff --git a/qpid/cpp/include/qpid/management/ManagementObject.h b/qpid/cpp/include/qpid/management/ManagementObject.h >index 16bf210..2aca6fb 100644 >--- a/qpid/cpp/include/qpid/management/ManagementObject.h >+++ b/qpid/cpp/include/qpid/management/ManagementObject.h >@@ -25,7 +25,7 @@ > > #include "qpid/management/Mutex.h" > #include "qpid/types/Variant.h" >- >+#include <boost/shared_ptr.hpp> > #include <map> > #include <vector> > >@@ -155,6 +155,8 @@ protected: > QPID_COMMON_EXTERN uint32_t writeTimestampsSize() const; > > public: >+ typedef boost::shared_ptr<ManagementObject> shared_ptr; >+ > QPID_COMMON_EXTERN static const uint8_t MD5_LEN = 16; > QPID_COMMON_EXTERN static int maxThreads; > //typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&); >@@ -227,8 +229,8 @@ protected: > //QPID_COMMON_EXTERN void mapDecode(const types::Variant::Map& map); > }; > >-typedef std::map<ObjectId, ManagementObject*> ManagementObjectMap; >-typedef std::vector<ManagementObject*> ManagementObjectVector; >+typedef std::map<ObjectId, ManagementObject::shared_ptr> ManagementObjectMap; >+typedef std::vector<ManagementObject::shared_ptr> ManagementObjectVector; > > }} > >diff --git a/qpid/cpp/managementgen/qmfgen/templates/Class.h b/qpid/cpp/managementgen/qmfgen/templates/Class.h >index 95939f3..c6bfb0e 100644 >--- a/qpid/cpp/managementgen/qmfgen/templates/Class.h >+++ b/qpid/cpp/managementgen/qmfgen/templates/Class.h >@@ -76,6 +76,8 @@ QPID_BROKER_CLASS_EXTERN class /*MGEN:Class.NameCap*/ : public ::qpid::managemen > void aggregatePerThreadStats(struct PerThreadStats*) const; > /*MGEN:ENDIF*/ > public: >+ typedef boost::shared_ptr</*MGEN:Class.NameCap*/> shared_ptr; >+ > QPID_BROKER_EXTERN static void writeSchema(std::string& schema); > QPID_BROKER_EXTERN void mapEncodeValues(::qpid::types::Variant::Map& map, > bool includeProperties=true, >diff --git a/qpid/cpp/src/posix/QpiddBroker.cpp b/qpid/cpp/src/posix/QpiddBroker.cpp >index 76e3bb6..036d06c 100644 >--- a/qpid/cpp/src/posix/QpiddBroker.cpp >+++ b/qpid/cpp/src/posix/QpiddBroker.cpp >@@ -140,7 +140,7 @@ struct QpiddDaemon : public Daemon { > uint16_t port=brokerPtr->getPort(options->daemon.transport); > ready(port); // Notify parent. > if (options->parent->broker.enableMgmt && (options->parent->broker.port == 0 || options->daemon.transport != TCP)) { >- dynamic_cast<qmf::org::apache::qpid::broker::Broker*>(brokerPtr->GetManagementObject())->set_port(port); >+ boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(brokerPtr->GetManagementObject())->set_port(port); > } > brokerPtr->run(); > } >@@ -196,7 +196,7 @@ int QpiddBroker::execute (QpiddOptions *options) { > uint16_t port = brokerPtr->getPort(myOptions->daemon.transport); > cout << port << endl; > if (options->broker.enableMgmt) { >- dynamic_cast<qmf::org::apache::qpid::broker::Broker*>(brokerPtr->GetManagementObject())->set_port(port); >+ boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(brokerPtr->GetManagementObject())->set_port(port); > } > } > brokerPtr->run(); >diff --git a/qpid/cpp/src/qpid/acl/Acl.cpp b/qpid/cpp/src/qpid/acl/Acl.cpp >index 6eeda9f..7bbea66 100644 >--- a/qpid/cpp/src/qpid/acl/Acl.cpp >+++ b/qpid/cpp/src/qpid/acl/Acl.cpp >@@ -52,7 +52,7 @@ using qpid::management::Manageable; > using qpid::management::Args; > namespace _qmf = qmf::org::apache::qpid::acl; > >-Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(false), mgmtObject(0), >+Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(false), > connectionCounter(new ConnectionCounter(*this, aclValues.aclMaxConnectPerUser, aclValues.aclMaxConnectPerIp, aclValues.aclMaxConnectTotal)), > resourceCounter(new ResourceCounter(*this, aclValues.aclMaxQueuesPerUser)){ > >@@ -60,7 +60,7 @@ Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(fals > > if (agent != 0){ > _qmf::Package packageInit(agent); >- mgmtObject = new _qmf::Acl (agent, this, broker); >+ mgmtObject = _qmf::Acl::shared_ptr(new _qmf::Acl (agent, this, broker)); > agent->addObject (mgmtObject); > mgmtObject->set_maxConnections(aclValues.aclMaxConnectTotal); > mgmtObject->set_maxConnectionsPerIp(aclValues.aclMaxConnectPerIp); >@@ -324,9 +324,9 @@ Acl::~Acl(){ > broker->getConnectionObservers().remove(connectionCounter); > } > >-ManagementObject* Acl::GetManagementObject(void) const >+ManagementObject::shared_ptr Acl::GetManagementObject(void) const > { >- return (ManagementObject*) mgmtObject; >+ return mgmtObject; > } > > Manageable::status_t Acl::ManagementMethod (uint32_t methodId, Args& args, string& text) >diff --git a/qpid/cpp/src/qpid/acl/Acl.h b/qpid/cpp/src/qpid/acl/Acl.h >index e0513d5..25d3ad1 100644 >--- a/qpid/cpp/src/qpid/acl/Acl.h >+++ b/qpid/cpp/src/qpid/acl/Acl.h >@@ -62,7 +62,7 @@ private: > broker::Broker* broker; > bool transferAcl; > boost::shared_ptr<AclData> data; >- qmf::org::apache::qpid::acl::Acl* mgmtObject; // mgnt owns lifecycle >+ qmf::org::apache::qpid::acl::Acl::shared_ptr mgmtObject; > qpid::management::ManagementAgent* agent; > mutable qpid::sys::Mutex dataLock; > boost::shared_ptr<ConnectionCounter> connectionCounter; >@@ -114,7 +114,7 @@ private: > bool readAclFile(std::string& aclFile, std::string& errorText); > Manageable::status_t lookup (management::Args& args, std::string& text); > Manageable::status_t lookupPublish(management::Args& args, std::string& text); >- virtual qpid::management::ManagementObject* GetManagementObject(void) const; >+ virtual qpid::management::ManagementObject::shared_ptr GetManagementObject(void) const; > virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); > > }; >diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp >index d1706b5..dfc99bb 100644 >--- a/qpid/cpp/src/qpid/broker/Bridge.cpp >+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp >@@ -60,7 +60,7 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame) > Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id, > CancellationListener l, const _qmf::ArgsLinkBridge& _args, > InitializeCallback init, const std::string& _queueName, const string& ae) : >- link(_link), channel(_id), args(_args), mgmtObject(0), >+ link(_link), channel(_id), args(_args), > listener(l), name(_name), > queueName(_queueName.empty() ? "qpid.bridge_queue_" + name + "_" + link->getBroker()->getFederationTag() > : _queueName), >@@ -71,10 +71,10 @@ Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id, > { > ManagementAgent* agent = link->getBroker()->getManagementAgent(); > if (agent != 0) { >- mgmtObject = new _qmf::Bridge >+ mgmtObject = _qmf::Bridge::shared_ptr(new _qmf::Bridge > (agent, this, link, name, args.i_durable, args.i_src, args.i_dest, > args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, >- args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync); >+ args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync)); > mgmtObject->set_channelId(channel); > agent->addObject(mgmtObject); > } >@@ -296,9 +296,9 @@ uint32_t Bridge::encodedSize() const > + 2; // sync > } > >-management::ManagementObject* Bridge::GetManagementObject (void) const >+management::ManagementObject::shared_ptr Bridge::GetManagementObject (void) const > { >- return (management::ManagementObject*) mgmtObject; >+ return mgmtObject; > } > > management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId, >diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h >index ee298af..2b4d019 100644 >--- a/qpid/cpp/src/qpid/broker/Bridge.h >+++ b/qpid/cpp/src/qpid/broker/Bridge.h >@@ -72,7 +72,7 @@ class Bridge : public PersistableConfig, > > bool isDetached() const { return detached; } > >- management::ManagementObject* GetManagementObject() const; >+ management::ManagementObject::shared_ptr GetManagementObject() const; > management::Manageable::status_t ManagementMethod(uint32_t methodId, > management::Args& args, > std::string& text); >@@ -128,7 +128,7 @@ class Bridge : public PersistableConfig, > Link* const link; > const framing::ChannelId channel; > qmf::org::apache::qpid::broker::ArgsLinkBridge args; >- qmf::org::apache::qpid::broker::Bridge* mgmtObject; >+ qmf::org::apache::qpid::broker::Bridge::shared_ptr mgmtObject; > CancellationListener listener; > std::string name; > std::string queueName; >diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp >index bfca39d..89b5f29 100644 >--- a/qpid/cpp/src/qpid/broker/Broker.cpp >+++ b/qpid/cpp/src/qpid/broker/Broker.cpp >@@ -204,7 +204,6 @@ Broker::Broker(const Broker::Options& conf) : > conf.replayFlushLimit*1024, // convert kb to bytes. > conf.replayHardLimit*1024), > *this), >- mgmtObject(0), > queueCleaner(queues, &timer), > queueEvents(poller,!conf.asyncQueueEvents), > recovery(true), >@@ -225,7 +224,7 @@ Broker::Broker(const Broker::Options& conf) : > System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string(), this); > systemObject = System::shared_ptr(system); > >- mgmtObject = new _qmf::Broker(managementAgent.get(), this, system, "amqp-broker"); >+ mgmtObject = _qmf::Broker::shared_ptr(new _qmf::Broker(managementAgent.get(), this, system, "amqp-broker")); > mgmtObject->set_systemRef(system->GetManagementObject()->getObjectId()); > mgmtObject->set_port(conf.port); > mgmtObject->set_workerThreads(conf.workerThreads); >@@ -433,9 +432,9 @@ Broker::~Broker() { > QPID_LOG(notice, "Shut down"); > } > >-ManagementObject* Broker::GetManagementObject(void) const >+ManagementObject::shared_ptr Broker::GetManagementObject(void) const > { >- return (ManagementObject*) mgmtObject; >+ return mgmtObject; > } > > Manageable* Broker::GetVhostObject(void) const >diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h >index c385a3e..ef67ad8 100644 >--- a/qpid/cpp/src/qpid/broker/Broker.h >+++ b/qpid/cpp/src/qpid/broker/Broker.h >@@ -173,7 +173,7 @@ class Broker : public sys::Runnable, public Plugin::Target, > boost::shared_ptr<sys::ConnectionCodec::Factory> factory; > DtxManager dtxManager; > SessionManager sessionManager; >- qmf::org::apache::qpid::broker::Broker* mgmtObject; >+ qmf::org::apache::qpid::broker::Broker::shared_ptr mgmtObject; > Vhost::shared_ptr vhostObject; > System::shared_ptr systemObject; > QueueCleaner queueCleaner; >@@ -233,7 +233,7 @@ class Broker : public sys::Runnable, public Plugin::Target, > SessionManager& getSessionManager() { return sessionManager; } > const std::string& getFederationTag() const { return federationTag; } > >- QPID_BROKER_EXTERN management::ManagementObject* GetManagementObject() const; >+ QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject() const; > QPID_BROKER_EXTERN management::Manageable* GetVhostObject() const; > QPID_BROKER_EXTERN management::Manageable::status_t ManagementMethod( > uint32_t methodId, management::Args& args, std::string& text); >diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp >index a9302fb..ce24b7f 100644 >--- a/qpid/cpp/src/qpid/broker/Connection.cpp >+++ b/qpid/cpp/src/qpid/broker/Connection.cpp >@@ -99,7 +99,6 @@ Connection::Connection(ConnectionOutputHandler* out_, > link(link_), > mgmtClosing(false), > mgmtId(mgmtId_), >- mgmtObject(0), > links(broker_.getLinks()), > agent(0), > timer(broker_.getTimer()), >@@ -121,7 +120,7 @@ void Connection::addManagementObject() { > agent = broker.getManagementAgent(); > if (agent != 0) { > // TODO set last bool true if system connection >- mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !link, false); >+ mgmtObject = _qmf::Connection::shared_ptr(new _qmf::Connection(agent, this, parent, mgmtId, !link, false)); > mgmtObject->set_shadow(shadow); > agent->addObject(mgmtObject, objectId); > } >@@ -413,9 +412,9 @@ SessionHandler& Connection::getChannel(ChannelId id) { > return *ptr_map_ptr(i); > } > >-ManagementObject* Connection::GetManagementObject(void) const >+ManagementObject::shared_ptr Connection::GetManagementObject(void) const > { >- return (ManagementObject*) mgmtObject; >+ return mgmtObject; > } > > Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&, string&) >diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h >index d01599c..3ef9877 100644 >--- a/qpid/cpp/src/qpid/broker/Connection.h >+++ b/qpid/cpp/src/qpid/broker/Connection.h >@@ -112,7 +112,7 @@ class Connection : public sys::ConnectionInputHandler, > void closeChannel(framing::ChannelId channel); > > // Manageable entry points >- management::ManagementObject* GetManagementObject (void) const; >+ management::ManagementObject::shared_ptr GetManagementObject (void) const; > management::Manageable::status_t > ManagementMethod (uint32_t methodId, management::Args& args, std::string&); > >@@ -196,7 +196,7 @@ class Connection : public sys::ConnectionInputHandler, > const std::string mgmtId; > sys::Mutex ioCallbackLock; > std::queue<boost::function0<void> > ioCallbacks; >- qmf::org::apache::qpid::broker::Connection* mgmtObject; >+ qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject; > LinkRegistry& links; > management::ManagementAgent* agent; > sys::Timer& timer; >@@ -231,7 +231,7 @@ class Connection : public sys::ConnectionInputHandler, > > public: > >- qmf::org::apache::qpid::broker::Connection* getMgmtObject() { return mgmtObject; } >+ qmf::org::apache::qpid::broker::Connection::shared_ptr getMgmtObject() { return mgmtObject; } > }; > > }} >diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp >index a484cc0..d1dd1fa 100644 >--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp >+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp >@@ -150,7 +150,7 @@ void ConnectionHandler::Handler::startOk(const framing::FieldTable& /*clientProp > void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body) > { > const framing::FieldTable& clientProperties = body.getClientProperties(); >- qmf::org::apache::qpid::broker::Connection* mgmtObject = connection.getMgmtObject(); >+ qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject = connection.getMgmtObject(); > > if (mgmtObject != 0) { > string procName = clientProperties.getAsString(CLIENT_PROCESS_NAME); >diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp >index 82d4b4d..efb5afe 100644 >--- a/qpid/cpp/src/qpid/broker/Exchange.cpp >+++ b/qpid/cpp/src/qpid/broker/Exchange.cpp >@@ -170,19 +170,19 @@ void Exchange::routeIVE(){ > > Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) : > name(_name), durable(false), alternateUsers(0), persistenceId(0), sequence(false), >- sequenceNo(0), ive(false), mgmtExchange(0), brokerMgmtObject(0), broker(b), destroyed(false) >+ sequenceNo(0), ive(false), broker(b), destroyed(false) > { > if (parent != 0 && broker != 0) > { > ManagementAgent* agent = broker->getManagementAgent(); > if (agent != 0) > { >- mgmtExchange = new _qmf::Exchange (agent, this, parent, _name); >+ mgmtExchange = _qmf::Exchange::shared_ptr(new _qmf::Exchange (agent, this, parent, _name)); > mgmtExchange->set_durable(durable); > mgmtExchange->set_autoDelete(false); > agent->addObject(mgmtExchange, 0, durable); > if (broker) >- brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject(); >+ brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObject()); > } > } > } >@@ -190,20 +190,20 @@ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) : > Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, > Manageable* parent, Broker* b) > : name(_name), durable(_durable), alternateUsers(0), persistenceId(0), >- args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), brokerMgmtObject(0), broker(b), destroyed(false) >+ args(_args), sequence(false), sequenceNo(0), ive(false), broker(b), destroyed(false) > { > if (parent != 0 && broker != 0) > { > ManagementAgent* agent = broker->getManagementAgent(); > if (agent != 0) > { >- mgmtExchange = new _qmf::Exchange (agent, this, parent, _name); >+ mgmtExchange = _qmf::Exchange::shared_ptr(new _qmf::Exchange (agent, this, parent, _name)); > mgmtExchange->set_durable(durable); > mgmtExchange->set_autoDelete(false); > mgmtExchange->set_arguments(ManagementAgent::toMap(args)); > agent->addObject(mgmtExchange, 0, durable); > if (broker) >- brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject(); >+ brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObject()); > } > } > >@@ -299,9 +299,9 @@ void Exchange::recoveryComplete(ExchangeRegistry& exchanges) > } > } > >-ManagementObject* Exchange::GetManagementObject (void) const >+ManagementObject::shared_ptr Exchange::GetManagementObject (void) const > { >- return (ManagementObject*) mgmtExchange; >+ return mgmtExchange; > } > > void Exchange::registerDynamicBridge(DynamicBridge* db) >@@ -350,16 +350,16 @@ void Exchange::propagateFedOp(const string& routingKey, const string& tags, cons > > Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* _parent, > FieldTable _args, const string& _origin) >- : parent(_parent), queue(_queue), key(_key), args(_args), origin(_origin), mgmtBinding(0) >+ : parent(_parent), queue(_queue), key(_key), args(_args), origin(_origin) > { > } > > Exchange::Binding::~Binding () > { > if (mgmtBinding != 0) { >- ManagementObject* mo = queue->GetManagementObject(); >+ _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObject()); > if (mo != 0) >- static_cast<_qmf::Queue*>(mo)->dec_bindingCount(); >+ mo->dec_bindingCount(); > mgmtBinding->resourceDestroy (); > } > } >@@ -372,25 +372,25 @@ void Exchange::Binding::startManagement() > if (broker != 0) { > ManagementAgent* agent = broker->getManagementAgent(); > if (agent != 0) { >- ManagementObject* mo = queue->GetManagementObject(); >+ _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObject()); > if (mo != 0) { > management::ObjectId queueId = mo->getObjectId(); > >- mgmtBinding = new _qmf::Binding >- (agent, this, (Manageable*) parent, queueId, key, ManagementAgent::toMap(args)); >+ mgmtBinding = _qmf::Binding::shared_ptr(new _qmf::Binding >+ (agent, this, (Manageable*) parent, queueId, key, ManagementAgent::toMap(args))); > if (!origin.empty()) > mgmtBinding->set_origin(origin); > agent->addObject(mgmtBinding); >- static_cast<_qmf::Queue*>(mo)->inc_bindingCount(); >+ mo->inc_bindingCount(); > } > } > } > } > } > >-ManagementObject* Exchange::Binding::GetManagementObject () const >+ManagementObject::shared_ptr Exchange::Binding::GetManagementObject () const > { >- return (ManagementObject*) mgmtBinding; >+ return mgmtBinding; > } > > Exchange::MatchQueue::MatchQueue(Queue::shared_ptr q) : queue(q) {} >diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h >index fba7522..d91c45c 100644 >--- a/qpid/cpp/src/qpid/broker/Exchange.h >+++ b/qpid/cpp/src/qpid/broker/Exchange.h >@@ -51,13 +51,13 @@ public: > const std::string key; > const framing::FieldTable args; > std::string origin; >- qmf::org::apache::qpid::broker::Binding* mgmtBinding; >+ qmf::org::apache::qpid::broker::Binding::shared_ptr mgmtBinding; > > Binding(const std::string& key, boost::shared_ptr<Queue> queue, Exchange* parent = 0, > framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string()); > ~Binding(); > void startManagement(); >- management::ManagementObject* GetManagementObject() const; >+ management::ManagementObject::shared_ptr GetManagementObject() const; > }; > > private: >@@ -158,8 +158,8 @@ protected: > } > }; > >- qmf::org::apache::qpid::broker::Exchange* mgmtExchange; >- qmf::org::apache::qpid::broker::Broker* brokerMgmtObject; >+ qmf::org::apache::qpid::broker::Exchange::shared_ptr mgmtExchange; >+ qmf::org::apache::qpid::broker::Broker::shared_ptr brokerMgmtObject; > > public: > typedef boost::shared_ptr<Exchange> shared_ptr; >@@ -208,7 +208,7 @@ public: > static QPID_BROKER_EXTERN Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer); > > // Manageable entry points >- QPID_BROKER_EXTERN management::ManagementObject* GetManagementObject(void) const; >+ QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject(void) const; > > // Federation hooks > class DynamicBridge { >diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp >index 157b75c..32289c0 100644 >--- a/qpid/cpp/src/qpid/broker/Link.cpp >+++ b/qpid/cpp/src/qpid/broker/Link.cpp >@@ -144,7 +144,7 @@ Link::Link(const string& _name, > host(_host), port(_port), transport(_transport), > durable(_durable), > authMechanism(_authMechanism), username(_username), password(_password), >- persistenceId(0), mgmtObject(0), broker(_broker), state(0), >+ persistenceId(0), broker(_broker), state(0), > visitCount(0), > currentInterval(1), > closing(false), >@@ -162,7 +162,7 @@ Link::Link(const string& _name, > agent = broker->getManagementAgent(); > if (agent != 0) > { >- mgmtObject = new _qmf::Link(agent, this, parent, name, durable); >+ mgmtObject = _qmf::Link::shared_ptr(new _qmf::Link(agent, this, parent, name, durable)); > mgmtObject->set_host(host); > mgmtObject->set_port(port); > mgmtObject->set_transport(transport); >@@ -639,9 +639,9 @@ uint32_t Link::encodedSize() const > + password.size() + 1; > } > >-ManagementObject* Link::GetManagementObject (void) const >+ManagementObject::shared_ptr Link::GetManagementObject (void) const > { >- return (ManagementObject*) mgmtObject; >+ return mgmtObject; > } > > void Link::close() { >diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h >index f0cb90e..49ee3ef 100644 >--- a/qpid/cpp/src/qpid/broker/Link.h >+++ b/qpid/cpp/src/qpid/broker/Link.h >@@ -69,7 +69,7 @@ class Link : public PersistableConfig, public management::Manageable { > std::string username; > std::string password; > mutable uint64_t persistenceId; >- qmf::org::apache::qpid::broker::Link* mgmtObject; >+ qmf::org::apache::qpid::broker::Link::shared_ptr mgmtObject; > Broker* broker; > int state; > uint32_t visitCount; >@@ -181,7 +181,7 @@ class Link : public PersistableConfig, public management::Manageable { > static bool isEncodedLink(const std::string& key); > > // Manageable entry points >- management::ManagementObject* GetManagementObject(void) const; >+ management::ManagementObject::shared_ptr GetManagementObject(void) const; > management::Manageable::status_t ManagementMethod(uint32_t, management::Args&, std::string&); > > // manage the exchange owned by this link >diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp >index 7ce162e..f5f91fd 100644 >--- a/qpid/cpp/src/qpid/broker/Queue.cpp >+++ b/qpid/cpp/src/qpid/broker/Queue.cpp >@@ -97,8 +97,8 @@ const int ENQUEUE_ONLY=1; > const int ENQUEUE_AND_DEQUEUE=2; > > inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg, >- _qmf::Queue* mgmtObject, >- _qmf::Broker* brokerMgmtObject) >+ _qmf::Queue::shared_ptr mgmtObject, >+ _qmf::Broker::shared_ptr brokerMgmtObject) > { > if (mgmtObject != 0) { > _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); >@@ -121,8 +121,8 @@ inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg, > } > > inline void mgntDeqStats(const boost::intrusive_ptr<Message>& msg, >- _qmf::Queue* mgmtObject, >- _qmf::Broker* brokerMgmtObject) >+ _qmf::Queue::shared_ptr mgmtObject, >+ _qmf::Broker::shared_ptr brokerMgmtObject) > { > if (mgmtObject != 0){ > _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); >@@ -166,8 +166,6 @@ Queue::Queue(const string& _name, bool _autodelete, > messages(new MessageDeque()), > persistenceId(0), > policyExceeded(false), >- mgmtObject(0), >- brokerMgmtObject(0), > eventMode(0), > insertSeqNo(0), > broker(b), >@@ -180,9 +178,9 @@ Queue::Queue(const string& _name, bool _autodelete, > ManagementAgent* agent = broker->getManagementAgent(); > > if (agent != 0) { >- mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, _autodelete, _owner != 0); >+ mgmtObject = _qmf::Queue::shared_ptr(new _qmf::Queue(agent, this, parent, _name, _store != 0, _autodelete, _owner != 0)); > agent->addObject(mgmtObject, 0, store != 0); >- brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject(); >+ brokerMgmtObject = boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject()); > if (brokerMgmtObject) > brokerMgmtObject->inc_queueCount(); > } >@@ -191,11 +189,6 @@ Queue::Queue(const string& _name, bool _autodelete, > > Queue::~Queue() > { >- if (mgmtObject != 0) { >- mgmtObject->resourceDestroy(); >- if (brokerMgmtObject) >- brokerMgmtObject->dec_queueCount(); >- } > } > > bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg) >@@ -1389,6 +1382,12 @@ void Queue::destroyed() > Mutex::ScopedLock lock(messageLock); > observers.clear(); > } >+ >+ if (mgmtObject != 0) { >+ mgmtObject->resourceDestroy(); >+ if (brokerMgmtObject) >+ brokerMgmtObject->dec_queueCount(); >+ } > } > > void Queue::notifyDeleted() >@@ -1436,7 +1435,7 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const > { > if (mgmtObject != 0 && persistenceId == 0 && externalQueueStore) > { >- ManagementObject* childObj = externalQueueStore->GetManagementObject(); >+ ManagementObject::shared_ptr childObj = externalQueueStore->GetManagementObject(); > if (childObj != 0) > childObj->setReference(mgmtObject->getObjectId()); > } >@@ -1589,7 +1588,7 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) { > externalQueueStore = inst; > > if (inst) { >- ManagementObject* childObj = inst->GetManagementObject(); >+ ManagementObject::shared_ptr childObj = inst->GetManagementObject(); > if (childObj != 0 && mgmtObject != 0) > childObj->setReference(mgmtObject->getObjectId()); > } >@@ -1637,9 +1636,9 @@ void Queue::countLoadedFromDisk(uint64_t size) const > } > > >-ManagementObject* Queue::GetManagementObject (void) const >+ManagementObject::shared_ptr Queue::GetManagementObject (void) const > { >- return (ManagementObject*) mgmtObject; >+ return mgmtObject; > } > > Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, string& etext) >diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h >index b6c9b97..ef37348 100644 >--- a/qpid/cpp/src/qpid/broker/Queue.h >+++ b/qpid/cpp/src/qpid/broker/Queue.h >@@ -135,8 +135,8 @@ class Queue : public boost::enable_shared_from_this<Queue>, > std::string alternateExchangeName; > boost::shared_ptr<Exchange> alternateExchange; > framing::SequenceNumber sequence; >- qmf::org::apache::qpid::broker::Queue* mgmtObject; >- qmf::org::apache::qpid::broker::Broker* brokerMgmtObject; >+ qmf::org::apache::qpid::broker::Queue::shared_ptr mgmtObject; >+ qmf::org::apache::qpid::broker::Broker::shared_ptr brokerMgmtObject; > sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge. > int eventMode; > Observers observers; >@@ -356,7 +356,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, > QPID_BROKER_EXTERN void countLoadedFromDisk(uint64_t size) const; > > // Manageable entry points >- QPID_BROKER_EXTERN management::ManagementObject* GetManagementObject (void) const; >+ QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject (void) const; > management::Manageable::status_t > QPID_BROKER_EXTERN ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); > QPID_BROKER_EXTERN void query(::qpid::types::Variant::Map&) const; >diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp >index 14fe5f4..579574d 100644 >--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp >+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp >@@ -95,7 +95,7 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue, > : StatefulQueueObserver(std::string("QueueFlowLimit")), queue(_queue), queueName("<unknown>"), > flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount), > flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize), >- flowStopped(false), count(0), size(0), queueMgmtObj(0), broker(0) >+ flowStopped(false), count(0), size(0), broker(0) > { > uint32_t maxCount(0); > uint64_t maxSize(0); >@@ -107,7 +107,7 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue, > maxCount = _queue->getPolicy()->getMaxCount(); > } > broker = queue->getBroker(); >- queueMgmtObj = dynamic_cast<_qmfBroker::Queue*> (queue->GetManagementObject()); >+ queueMgmtObj = boost::dynamic_pointer_cast<_qmfBroker::Queue> (queue->GetManagementObject()); > if (queueMgmtObj) { > queueMgmtObj->set_flowStopped(isFlowControlActive()); > } >diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h >index ad8a272..2189dc5 100644 >--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h >+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h >@@ -31,14 +31,8 @@ > #include "qpid/framing/FieldTable.h" > #include "qpid/sys/AtomicValue.h" > #include "qpid/sys/Mutex.h" >+#include "qmf/org/apache/qpid/broker/Queue.h" > >-namespace qmf { >-namespace org { >-namespace apache { >-namespace qpid { >-namespace broker { >- class Queue; >-}}}}} > namespace _qmfBroker = qmf::org::apache::qpid::broker; > > namespace qpid { >@@ -116,7 +110,7 @@ class Broker; > std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> > index; > mutable qpid::sys::Mutex indexLock; > >- _qmfBroker::Queue *queueMgmtObj; >+ _qmfBroker::Queue::shared_ptr queueMgmtObj; > > const Broker *broker; > >diff --git a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp >index 2d7c820..bc7c96f 100644 >--- a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp >+++ b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp >@@ -424,7 +424,7 @@ void CyrusAuthenticator::start(const string& mechanism, const string* response) > &challenge, &challenge_len); > > processAuthenticationStep(code, challenge, challenge_len); >- qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject(); >+ qmf::org::apache::qpid::broker::Connection::shared_ptr cnxMgmt = connection.getMgmtObject(); > if ( cnxMgmt ) > cnxMgmt->set_saslMechanism(mechanism); > } >@@ -507,7 +507,7 @@ std::auto_ptr<SecurityLayer> CyrusAuthenticator::getSecurityLayer(uint16_t maxFr > if (ssf) { > securityLayer = std::auto_ptr<SecurityLayer>(new CyrusSecurityLayer(sasl_conn, maxFrameSize)); > } >- qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject(); >+ qmf::org::apache::qpid::broker::Connection::shared_ptr cnxMgmt = connection.getMgmtObject(); > if ( cnxMgmt ) > cnxMgmt->set_saslSsf(ssf); > return securityLayer; >diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp >index 9a84db5..b91ae0d 100644 >--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp >+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp >@@ -302,8 +302,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, > arguments(_arguments), > notifyEnabled(true), > syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)), >- deliveryCount(0), >- mgmtObject(0) >+ deliveryCount(0) > { > if (parent != 0 && queue.get() != 0 && queue->GetManagementObject() !=0) > { >@@ -312,17 +311,17 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, > > if (agent != 0) > { >- mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(), >- !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments)); >+ mgmtObject = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(), >+ !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments))); > agent->addObject (mgmtObject); > mgmtObject->set_creditMode("WINDOW"); > } > } > } > >-ManagementObject* SemanticState::ConsumerImpl::GetManagementObject (void) const >+ManagementObject::shared_ptr SemanticState::ConsumerImpl::GetManagementObject (void) const > { >- return (ManagementObject*) mgmtObject; >+ return mgmtObject; > } > > Manageable::status_t SemanticState::ConsumerImpl::ManagementMethod (uint32_t methodId, Args&, string&) >diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h >index 15928ce..61b38b0 100644 >--- a/qpid/cpp/src/qpid/broker/SemanticState.h >+++ b/qpid/cpp/src/qpid/broker/SemanticState.h >@@ -92,7 +92,7 @@ class SemanticState : private boost::noncopyable { > bool notifyEnabled; > const int syncFrequency; > int deliveryCount; >- qmf::org::apache::qpid::broker::Subscription* mgmtObject; >+ qmf::org::apache::qpid::broker::Subscription::shared_ptr mgmtObject; > > bool checkCredit(boost::intrusive_ptr<Message>& msg); > void allocateCredit(boost::intrusive_ptr<Message>& msg); >@@ -156,7 +156,7 @@ class SemanticState : private boost::noncopyable { > void acknowledged(const broker::QueuedMessage&) {} > > // manageable entry points >- QPID_BROKER_EXTERN management::ManagementObject* >+ QPID_BROKER_EXTERN management::ManagementObject::shared_ptr > GetManagementObject(void) const; > > QPID_BROKER_EXTERN management::Manageable::status_t >diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp >index cc02d9e..1385019 100644 >--- a/qpid/cpp/src/qpid/broker/SessionState.cpp >+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp >@@ -58,7 +58,6 @@ SessionState::SessionState( > semanticState(*this, *this), > adapter(semanticState), > msgBuilder(&broker.getStore()), >- mgmtObject(0), > asyncCommandCompleter(new AsyncCommandCompleter(this)) > { > if (!delayManagement) addManagementObject(); >@@ -71,8 +70,8 @@ void SessionState::addManagementObject() { > if (parent != 0) { > ManagementAgent* agent = getBroker().getManagementAgent(); > if (agent != 0) { >- mgmtObject = new _qmf::Session >- (agent, this, parent, getId().getName()); >+ mgmtObject = _qmf::Session::shared_ptr(new _qmf::Session >+ (agent, this, parent, getId().getName())); > mgmtObject->set_attached (0); > mgmtObject->set_detachedLifespan (0); > mgmtObject->clr_expireTime(); >@@ -149,9 +148,9 @@ void SessionState::giveReadCredit(int32_t credit) { > getConnection().outputTasks.giveReadCredit(credit); > } > >-ManagementObject* SessionState::GetManagementObject (void) const >+ManagementObject::shared_ptr SessionState::GetManagementObject (void) const > { >- return (ManagementObject*) mgmtObject; >+ return mgmtObject; > } > > Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, >diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h >index a8ff7fe..e9c6587 100644 >--- a/qpid/cpp/src/qpid/broker/SessionState.h >+++ b/qpid/cpp/src/qpid/broker/SessionState.h >@@ -109,7 +109,7 @@ class SessionState : public qpid::SessionState, > void deliver(DeliveryRecord&, bool sync); > > // Manageable entry points >- management::ManagementObject* GetManagementObject (void) const; >+ management::ManagementObject::shared_ptr GetManagementObject (void) const; > management::Manageable::status_t > ManagementMethod (uint32_t methodId, management::Args& args, std::string&); > >@@ -167,7 +167,7 @@ class SessionState : public qpid::SessionState, > SemanticState semanticState; > SessionAdapter adapter; > MessageBuilder msgBuilder; >- qmf::org::apache::qpid::broker::Session* mgmtObject; >+ qmf::org::apache::qpid::broker::Session::shared_ptr mgmtObject; > qpid::framing::SequenceSet accepted; > > // sequence numbers for pending received Execution.Sync commands >diff --git a/qpid/cpp/src/qpid/broker/System.cpp b/qpid/cpp/src/qpid/broker/System.cpp >index fa8df64..8d54427 100644 >--- a/qpid/cpp/src/qpid/broker/System.cpp >+++ b/qpid/cpp/src/qpid/broker/System.cpp >@@ -31,7 +31,7 @@ using namespace qpid::broker; > using namespace std; > namespace _qmf = qmf::org::apache::qpid::broker; > >-System::System (string _dataDir, Broker* broker) : mgmtObject(0) >+System::System (string _dataDir, Broker* broker) > { > ManagementAgent* agent = broker ? broker->getManagementAgent() : 0; > >@@ -64,7 +64,7 @@ System::System (string _dataDir, Broker* broker) : mgmtObject(0) > } > } > >- mgmtObject = new _qmf::System(agent, this, types::Uuid(systemId.c_array())); >+ mgmtObject = _qmf::System::shared_ptr(new _qmf::System(agent, this, types::Uuid(systemId.c_array()))); > qpid::sys::SystemInfo::getSystemId (osName, > nodeName, > release, >diff --git a/qpid/cpp/src/qpid/broker/System.h b/qpid/cpp/src/qpid/broker/System.h >index 6847c66..591d2a1 100644 >--- a/qpid/cpp/src/qpid/broker/System.h >+++ b/qpid/cpp/src/qpid/broker/System.h >@@ -35,7 +35,7 @@ class System : public management::Manageable > { > private: > >- qmf::org::apache::qpid::broker::System* mgmtObject; >+ qmf::org::apache::qpid::broker::System::shared_ptr mgmtObject; > framing::Uuid systemId; > std::string osName, nodeName, release, version, machine; > >@@ -45,7 +45,7 @@ class System : public management::Manageable > > System (std::string _dataDir, Broker* broker = 0); > >- management::ManagementObject* GetManagementObject (void) const >+ management::ManagementObject::shared_ptr GetManagementObject (void) const > { return mgmtObject; } > > >diff --git a/qpid/cpp/src/qpid/broker/Vhost.cpp b/qpid/cpp/src/qpid/broker/Vhost.cpp >index a9ca3b4..e72118b 100644 >--- a/qpid/cpp/src/qpid/broker/Vhost.cpp >+++ b/qpid/cpp/src/qpid/broker/Vhost.cpp >@@ -29,7 +29,7 @@ namespace qpid { namespace management { > class Manageable; > }} > >-Vhost::Vhost (qpid::management::Manageable* parentBroker, Broker* broker) : mgmtObject(0) >+Vhost::Vhost (qpid::management::Manageable* parentBroker, Broker* broker) > { > if (parentBroker != 0 && broker != 0) > { >@@ -37,7 +37,7 @@ Vhost::Vhost (qpid::management::Manageable* parentBroker, Broker* broker) : mgmt > > if (agent != 0) > { >- mgmtObject = new _qmf::Vhost(agent, this, parentBroker, "/"); >+ mgmtObject = _qmf::Vhost::shared_ptr(new _qmf::Vhost(agent, this, parentBroker, "/")); > agent->addObject(mgmtObject, 0, true); > } > } >diff --git a/qpid/cpp/src/qpid/broker/Vhost.h b/qpid/cpp/src/qpid/broker/Vhost.h >index 9554d64..599b821 100644 >--- a/qpid/cpp/src/qpid/broker/Vhost.h >+++ b/qpid/cpp/src/qpid/broker/Vhost.h >@@ -32,7 +32,7 @@ class Vhost : public management::Manageable > { > private: > >- qmf::org::apache::qpid::broker::Vhost* mgmtObject; >+ qmf::org::apache::qpid::broker::Vhost::shared_ptr mgmtObject; > > public: > >@@ -40,7 +40,7 @@ class Vhost : public management::Manageable > > Vhost (management::Manageable* parentBroker, Broker* broker = 0); > >- management::ManagementObject* GetManagementObject (void) const >+ management::ManagementObject::shared_ptr GetManagementObject (void) const > { return mgmtObject; } > void setFederationTag(const std::string& tag); > }; >diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp >index 4aab338..e4c2a4c 100644 >--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp >+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp >@@ -251,7 +251,6 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { > Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : > settings(set), > broker(b), >- mgmtObject(0), > poller(b.getPoller()), > cpg(*this), > name(settings.name), >@@ -334,7 +333,7 @@ void Cluster::initialize() { > mAgent = broker.getManagementAgent(); > if (mAgent != 0){ > _qmf::Package packageInit(mAgent); >- mgmtObject = new _qmf::Cluster (mAgent, this, &broker,name,myUrl.str()); >+ mgmtObject = _qmf::Cluster::shared_ptr(new _qmf::Cluster (mAgent, this, &broker,name,myUrl.str())); > mAgent->addObject (mgmtObject); > } > >@@ -1046,7 +1045,7 @@ void Cluster ::shutdown(const MemberId& , const Uuid& id, Lock& l) { > leave(l); > } > >-ManagementObject* Cluster::GetManagementObject() const { return mgmtObject; } >+ManagementObject::shared_ptr Cluster::GetManagementObject() const { return mgmtObject; } > > Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, string&) { > Lock l(lock); >diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h >index 40f1445..832c8c3 100644 >--- a/qpid/cpp/src/qpid/cluster/Cluster.h >+++ b/qpid/cpp/src/qpid/cluster/Cluster.h >@@ -241,7 +241,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { > ); > > // == Called in management threads. >- virtual qpid::management::ManagementObject* GetManagementObject() const; >+ virtual qpid::management::ManagementObject::shared_ptr GetManagementObject() const; > virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); > > void stopClusterNode(Lock&); >@@ -258,7 +258,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { > // Immutable members set on construction, never changed. > const ClusterSettings settings; > broker::Broker& broker; >- qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle >+ qmf::org::apache::qpid::cluster::Cluster::shared_ptr mgmtObject; // mgnt owns lifecycle > boost::shared_ptr<sys::Poller> poller; > Cpg cpg; > const std::string name; >diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp >index ffaa701..39d9ee2 100644 >--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp >+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp >@@ -63,7 +63,6 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) > systemId(broker.getSystem()->getSystemId().data()), > settings(s), > observer(new ConnectionObserver(*this, systemId)), >- mgmtObject(0), > status(STANDALONE), > membership(systemId), > replicationTest(s.replicateDefault.get()) >@@ -95,7 +94,7 @@ void HaBroker::initialize() { > if (settings.cluster && !ma) > throw Exception("Cannot start HA: management is disabled"); > _qmf::Package packageInit(ma); >- mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker"); >+ mgmtObject = _qmf::HaBroker::shared_ptr(new _qmf::HaBroker(ma, this, "ha-broker")); > mgmtObject->set_replicateDefault(settings.replicateDefault.str()); > mgmtObject->set_systemId(systemId); > ma->addObject(mgmtObject); >diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h >index 3b39b9e..2f6c7b8 100644 >--- a/qpid/cpp/src/qpid/ha/HaBroker.h >+++ b/qpid/cpp/src/qpid/ha/HaBroker.h >@@ -70,7 +70,7 @@ class HaBroker : public management::Manageable > void initialize(); > > // Implement Manageable. >- qpid::management::ManagementObject* GetManagementObject() const { return mgmtObject; } >+ qpid::management::ManagementObject::shared_ptr GetManagementObject() const { return mgmtObject; } > management::Manageable::status_t ManagementMethod ( > uint32_t methodId, management::Args& args, std::string& text); > >@@ -123,7 +123,7 @@ class HaBroker : public management::Manageable > boost::shared_ptr<ConnectionObserver> observer; // Used by Backup and Primary > boost::shared_ptr<Backup> backup; > boost::shared_ptr<Primary> primary; >- qmf::org::apache::qpid::ha::HaBroker* mgmtObject; >+ qmf::org::apache::qpid::ha::HaBroker::shared_ptr mgmtObject; > Url clientUrl, brokerUrl; > std::vector<Url> knownBrokers; > BrokerStatus status; >diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp >index 7d90ed9..8cd75af 100644 >--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp >+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp >@@ -113,8 +113,7 @@ ManagementAgent::RemoteAgent::~RemoteAgent () > if (mgmtObject != 0) { > mgmtObject->resourceDestroy(); > agent.deleteObjectNowLH(mgmtObject->getObjectId()); >- delete mgmtObject; >- mgmtObject = 0; >+ mgmtObject.reset(); > } > } > >@@ -124,7 +123,7 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) : > suppressed(false), disallowAllV1Methods(false), > vendorNameKey(defaultVendorName), productNameKey(defaultProductName), > qmf1Support(qmfV1), qmf2Support(qmfV2), maxReplyObjs(100), >- msgBuffer(MA_BUFFER_SIZE), memstat(0) >+ msgBuffer(MA_BUFFER_SIZE) > { > nextObjectId = 1; > brokerBank = 1; >@@ -135,7 +134,7 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) : > attrMap["_vendor"] = defaultVendorName; > attrMap["_product"] = defaultProductName; > >- memstat = new qmf::org::apache::qpid::broker::Memory(this, 0, "amqp-broker"); >+ memstat = _qmf::Memory::shared_ptr(new qmf::org::apache::qpid::broker::Memory(this, 0, "amqp-broker")); > addObject(memstat, "amqp-broker"); > } > >@@ -154,15 +153,6 @@ ManagementAgent::~ManagementAgent () > v2Direct.reset(); > > remoteAgents.clear(); >- >- moveNewObjectsLH(); >- for (ManagementObjectMap::iterator iter = managementObjects.begin (); >- iter != managementObjects.end (); >- iter++) { >- ManagementObject* object = iter->second; >- delete object; >- } >- managementObjects.clear(); > } > } > >@@ -315,7 +305,7 @@ void ManagementAgent::registerEvent (const string& packageName, > } > > // Deprecated: V1 objects >-ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId, bool persistent) >+ObjectId ManagementAgent::addObject(ManagementObject::shared_ptr object, uint64_t persistId, bool persistent) > { > uint16_t sequence; > uint64_t objectNum; >@@ -338,7 +328,7 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId > > > >-ObjectId ManagementAgent::addObject(ManagementObject* object, >+ObjectId ManagementAgent::addObject(ManagementObject::shared_ptr object, > const string& key, > bool persistent) > { >@@ -368,10 +358,10 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi > "emerg", "alert", "crit", "error", "warn", > "note", "info", "debug" > }; >- sys::Mutex::ScopedLock lock (userLock); > uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity; > > if (qmf1Support) { >+ sys::Mutex::ScopedLock lock (eventBufferLock); > Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); > uint32_t outLen; > >@@ -386,8 +376,8 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi > outBuffer.putRawData(sBuf); > outLen = MA_BUFFER_SIZE - outBuffer.available(); > outBuffer.reset(); >- sendBufferLH(outBuffer, outLen, mExchange, >- "console.event.1.0." + event.getPackageName() + "." + event.getEventName()); >+ sendBuffer(outBuffer, outLen, mExchange, >+ "console.event.1.0." + event.getPackageName() + "." + event.getEventName(), &eventBufferLock); > QPID_LOG(debug, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName()); > } > >@@ -425,7 +415,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi > Variant::List list_; > list_.push_back(map_); > ListCodec::encode(list_, content); >- sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); >+ sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str()); > QPID_LOG(debug, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName()); > } > } >@@ -484,7 +474,7 @@ void ManagementAgent::clientAdded (const string& routingKey) > encodeHeader(outBuffer, 'x'); > outLen = outBuffer.getPosition(); > outBuffer.reset(); >- sendBufferLH(outBuffer, outLen, dExchange, rkeys.front()); >+ sendBuffer(outBuffer, outLen, dExchange, rkeys.front(), &userLock); > QPID_LOG(debug, "SEND ConsoleAddedIndication to=" << rkeys.front()); > rkeys.pop_front(); > } >@@ -495,8 +485,8 @@ void ManagementAgent::clusterUpdate() { > // Set clientWasAdded so that on the next periodicProcessing we will do > // a full update on all cluster members. > sys::Mutex::ScopedLock l(userLock); >- moveNewObjectsLH(); // keep lists consistent with updater/updatee. >- moveDeletedObjectsLH(); >+ moveNewObjects(); // keep lists consistent with updater/updatee. >+ moveDeletedObjects(); > clientWasAdded = true; > debugSnapshot("Cluster member joined"); > } >@@ -522,12 +512,13 @@ bool ManagementAgent::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) > return h1 == 'A' && h2 == 'M' && h3 == '2'; > } > >-// NOTE WELL: assumes userLock is held by caller (LH) >+// NOTE WELL: assumes unlockMutex is held by caller if provided > // NOTE EVEN WELLER: drops this lock when delivering the message!!! >-void ManagementAgent::sendBufferLH(Buffer& buf, >- uint32_t length, >- qpid::broker::Exchange::shared_ptr exchange, >- const string& routingKey) >+void ManagementAgent::sendBuffer(Buffer& buf, >+ uint32_t length, >+ qpid::broker::Exchange::shared_ptr exchange, >+ const string& routingKey, >+ sys::Mutex* unlockMutex) > { > if (suppressed) { > QPID_LOG(debug, "Suppressing management message to " << routingKey); >@@ -561,38 +552,42 @@ void ManagementAgent::sendBufferLH(Buffer& buf, > msg->getFrames().append(content); > msg->setIsManagementMessage(true); > >- { >- sys::Mutex::ScopedUnlock u(userLock); >- >- DeliverableMessage deliverable (msg); >- try { >+ DeliverableMessage deliverable (msg); >+ try { >+ if (unlockMutex) { >+ sys::Mutex::ScopedUnlock u(*unlockMutex); > exchange->route(deliverable); >- } catch(exception&) {} >- } >+ } >+ else { >+ exchange->route(deliverable); >+ } >+ } catch(exception&) {} > buf.reset(); > } > > >-void ManagementAgent::sendBufferLH(Buffer& buf, >- uint32_t length, >- const string& exchange, >- const string& routingKey) >+void ManagementAgent::sendBuffer(Buffer& buf, >+ uint32_t length, >+ const string& exchange, >+ const string& routingKey, >+ sys::Mutex* unlockMutex) > { > qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange)); > if (ex.get() != 0) >- sendBufferLH(buf, length, ex, routingKey); >+ sendBuffer(buf, length, ex, routingKey, unlockMutex); > } > > >-// NOTE WELL: assumes userLock is held by caller (LH) >+// NOTE WELL: assumes unlockMutex is held by caller if provided > // NOTE EVEN WELLER: drops this lock when delivering the message!!! >-void ManagementAgent::sendBufferLH(const string& data, >- const string& cid, >- const Variant::Map& headers, >- const string& content_type, >- qpid::broker::Exchange::shared_ptr exchange, >- const string& routingKey, >- uint64_t ttl_msec) >+void ManagementAgent::sendBuffer(const string& data, >+ const string& cid, >+ const Variant::Map& headers, >+ const string& content_type, >+ qpid::broker::Exchange::shared_ptr exchange, >+ const string& routingKey, >+ uint64_t ttl_msec, >+ sys::Mutex* unlockMutex) > { > Variant::Map::const_iterator i; > >@@ -638,34 +633,37 @@ void ManagementAgent::sendBufferLH(const string& data, > msg->getFrames().append(content); > msg->setIsManagementMessage(true); > >- { >- sys::Mutex::ScopedUnlock u(userLock); >- >- DeliverableMessage deliverable (msg); >- try { >+ DeliverableMessage deliverable (msg); >+ try { >+ if (unlockMutex) { >+ sys::Mutex::ScopedUnlock u(*unlockMutex); > exchange->route(deliverable); >- } catch(exception&) {} >- } >+ } >+ else { >+ exchange->route(deliverable); >+ } >+ } catch(exception&) {} > } > > >-void ManagementAgent::sendBufferLH(const string& data, >- const string& cid, >- const Variant::Map& headers, >- const string& content_type, >- const string& exchange, >- const string& routingKey, >- uint64_t ttl_msec) >+void ManagementAgent::sendBuffer(const string& data, >+ const string& cid, >+ const Variant::Map& headers, >+ const string& content_type, >+ const string& exchange, >+ const string& routingKey, >+ uint64_t ttl_msec, >+ sys::Mutex* unlockMutex) > { > qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange)); > if (ex.get() != 0) >- sendBufferLH(data, cid, headers, content_type, ex, routingKey, ttl_msec); >+ sendBuffer(data, cid, headers, content_type, ex, routingKey, ttl_msec, unlockMutex); > } > > > /** Objects that have been added since the last periodic poll are temporarily > * saved in the newManagementObjects list. This allows objects to be >- * added without needing to block on the userLock (addLock is used instead). >+ * added without needing to block on the userLock (objectLock is used instead). > * These new objects need to be integrated into the object database > * (managementObjects) *before* they can be properly managed. This routine > * performs the integration. >@@ -675,34 +673,33 @@ void ManagementAgent::sendBufferLH(const string& data, > * duplicate object ids. To avoid clashes, don't put deleted objects > * into the active object database. > */ >-void ManagementAgent::moveNewObjectsLH() >+void ManagementAgent::moveNewObjects() > { >- sys::Mutex::ScopedLock lock (addLock); >+ sys::Mutex::ScopedLock lock(addLock); >+ sys::Mutex::ScopedLock objLock (objectLock); > while (!newManagementObjects.empty()) { >- ManagementObject *object = newManagementObjects.back(); >+ ManagementObject::shared_ptr object = newManagementObjects.back(); > newManagementObjects.pop_back(); > > if (object->isDeleted()) { > DeletedObject::shared_ptr dptr(new DeletedObject(object, qmf1Support, qmf2Support)); > pendingDeletedObjs[dptr->getKey()].push_back(dptr); >- delete object; > } else { // add to active object list, check for duplicates. > ObjectId oid = object->getObjectId(); > ManagementObjectMap::iterator destIter = managementObjects.find(oid); > if (destIter != managementObjects.end()) { > // duplicate found. It is OK if the old object has been marked > // deleted, just replace the old with the new. >- ManagementObject *oldObj = destIter->second; >- if (oldObj->isDeleted()) { >- DeletedObject::shared_ptr dptr(new DeletedObject(oldObj, qmf1Support, qmf2Support)); >- pendingDeletedObjs[dptr->getKey()].push_back(dptr); >- delete oldObj; >- } else { >+ ManagementObject::shared_ptr oldObj = destIter->second; >+ if (!oldObj->isDeleted()) { > // Duplicate non-deleted objects? This is a user error - oids must be unique. > // for now, leak the old object (safer than deleting - may still be referenced) > // and complain loudly... > QPID_LOG(error, "Detected two management objects with the same identifier: " << oid); >+ oldObj->resourceDestroy(); > } >+ DeletedObject::shared_ptr dptr(new DeletedObject(oldObj, qmf1Support, qmf2Support)); >+ pendingDeletedObjs[dptr->getKey()].push_back(dptr); > // QPID-3666: be sure to replace the -index- also, as non-key members of > // the index object may be different for the new object! So erase the > // entry, rather than []= assign here: >@@ -723,24 +720,35 @@ void ManagementAgent::periodicProcessing (void) > string routingKey; > string sBuf; > >- moveNewObjectsLH(); >+ moveNewObjects(); > > // > // If we're publishing updates, get the latest memory statistics and uptime now > // > if (publish) { > uint64_t uptime = sys::Duration(startTime, sys::now()); >- static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); >- qpid::sys::MemStat::loadMemInfo(memstat); >+ boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime); >+ qpid::sys::MemStat::loadMemInfo(memstat.get()); >+ } >+ >+ // >+ // Use a copy of the management object map to avoid holding the objectLock >+ // >+ ManagementObjectVector localManagementObjects; >+ { >+ sys::Mutex::ScopedLock objLock(objectLock); >+ std::transform(managementObjects.begin(), managementObjects.end(), >+ std::back_inserter(localManagementObjects), >+ boost::bind(&ManagementObjectMap::value_type::second, _1)); > } > > // > // Clear the been-here flag on all objects in the map. > // >- for (ManagementObjectMap::iterator iter = managementObjects.begin(); >- iter != managementObjects.end(); >+ for (ManagementObjectVector::iterator iter = localManagementObjects.begin(); >+ iter != localManagementObjects.end(); > iter++) { >- ManagementObject* object = iter->second; >+ ManagementObject::shared_ptr object = *iter; > object->setFlags(0); > if (clientWasAdded) { > object->setForcePublish(true); >@@ -755,22 +763,24 @@ void ManagementAgent::periodicProcessing (void) > // if we sent the active update first, _then_ the delete update, clients > // would incorrectly think the object was deleted. See QPID-2997 > // >- bool objectsDeleted = moveDeletedObjectsLH(); >+ bool objectsDeleted = moveDeletedObjects(); >+ PendingDeletedObjsMap localPendingDeletedObjs; >+ { >+ sys::Mutex::ScopedLock objLock(objectLock); >+ localPendingDeletedObjs.swap(pendingDeletedObjs); >+ } > > // > // If we are not publishing updates, just clear the pending deletes. There's no > // need to tell anybody. > // > if (!publish) >- pendingDeletedObjs.clear(); >+ localPendingDeletedObjs.clear(); > >- if (!pendingDeletedObjs.empty()) { >- // use a temporary copy of the pending deletes so dropping the lock when >- // the buffer is sent is safe. >- PendingDeletedObjsMap tmp(pendingDeletedObjs); >- pendingDeletedObjs.clear(); >- >- for (PendingDeletedObjsMap::iterator mIter = tmp.begin(); mIter != tmp.end(); mIter++) { >+ if (!localPendingDeletedObjs.empty()) { >+ for (PendingDeletedObjsMap::iterator mIter = localPendingDeletedObjs.begin(); >+ mIter != localPendingDeletedObjs.end(); >+ mIter++) { > std::string packageName; > std::string className; > msgBuffer.reset(); >@@ -806,7 +816,7 @@ void ManagementAgent::periodicProcessing (void) > stringstream key; > key << "console.obj.1.0." << packageName << "." << className; > msgBuffer.reset(); >- sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK >+ sendBuffer(msgBuffer, contentSize, mExchange, key.str(), &userLock); // UNLOCKS USERLOCK > QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" > << key.str() << " len=" << contentSize); > } >@@ -835,7 +845,7 @@ void ManagementAgent::periodicProcessing (void) > headers["qmf.content"] = "_data"; > headers["qmf.agent"] = name_address; > >- sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK >+ sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0, &userLock); // UNLOCKS USERLOCK > QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); > } > } >@@ -849,7 +859,7 @@ void ManagementAgent::periodicProcessing (void) > stringstream key; > key << "console.obj.1.0." << packageName << "." << className; > msgBuffer.reset(); >- sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK >+ sendBuffer(msgBuffer, contentSize, mExchange, key.str(), &userLock); // UNLOCKS USERLOCK > QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize); > } > >@@ -872,7 +882,7 @@ void ManagementAgent::periodicProcessing (void) > headers["qmf.content"] = "_data"; > headers["qmf.agent"] = name_address; > >- sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK >+ sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0, &userLock); // UNLOCKS USERLOCK > QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); > } > } >@@ -880,9 +890,7 @@ void ManagementAgent::periodicProcessing (void) > } > > // >- // Process the entire object map. Remember: we drop the userLock each time we call >- // sendBuffer(). This allows the managementObjects map to be altered during the >- // sendBuffer() call, so always restart the search after a sendBuffer() call >+ // Process the entire object map. > // > // If publish is disabled, don't send any updates. > // >@@ -892,14 +900,14 @@ void ManagementAgent::periodicProcessing (void) > uint32_t pcount; > uint32_t scount; > uint32_t v1Objs, v2Objs; >- ManagementObjectMap::iterator baseIter; >+ ManagementObjectVector::iterator baseIter; > std::string packageName; > std::string className; > >- for (baseIter = managementObjects.begin(); >- baseIter != managementObjects.end(); >+ for (baseIter = localManagementObjects.begin(); >+ baseIter != localManagementObjects.end(); > baseIter++) { >- ManagementObject* baseObject = baseIter->second; >+ ManagementObject::shared_ptr baseObject = *baseIter; > // > // Skip until we find a base object requiring processing... > // >@@ -910,7 +918,7 @@ void ManagementAgent::periodicProcessing (void) > } > } > >- if (baseIter == managementObjects.end()) >+ if (baseIter == localManagementObjects.end()) > break; // done - all objects processed > > pcount = scount = 0; >@@ -919,12 +927,12 @@ void ManagementAgent::periodicProcessing (void) > list_.clear(); > msgBuffer.reset(); > >- for (ManagementObjectMap::iterator iter = baseIter; >- iter != managementObjects.end(); >+ for (ManagementObjectVector::iterator iter = baseIter; >+ iter != localManagementObjects.end(); > iter++) { > msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space >- ManagementObject* baseObject = baseIter->second; >- ManagementObject* object = iter->second; >+ ManagementObject::shared_ptr baseObject = *baseIter; >+ ManagementObject::shared_ptr object = *iter; > bool send_stats, send_props; > if (baseObject->isSameClass(*object) && object->getFlags() == 0) { > object->setFlags(1); >@@ -1004,7 +1012,7 @@ void ManagementAgent::periodicProcessing (void) > stringstream key; > key << "console.obj.1.0." << packageName << "." << className; > msgBuffer.reset(); >- sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK >+ sendBuffer(msgBuffer, contentSize, mExchange, key.str(), &userLock); // UNLOCKS USERLOCK > QPID_LOG(debug, "SEND V1 Multicast ContentInd to=" << key.str() > << " props=" << pcount > << " stats=" << scount >@@ -1030,7 +1038,7 @@ void ManagementAgent::periodicProcessing (void) > headers["qmf.content"] = "_data"; > headers["qmf.agent"] = name_address; > >- sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK >+ sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0, &userLock); // UNLOCKS USERLOCK > QPID_LOG(debug, "SEND Multicast ContentInd to=" << key.str() > << " props=" << pcount > << " stats=" << scount >@@ -1054,7 +1062,7 @@ void ManagementAgent::periodicProcessing (void) > contentSize = BUFSIZE - msgBuffer.available (); > msgBuffer.reset (); > routingKey = "console.heartbeat.1.0"; >- sendBufferLH(msgBuffer, contentSize, mExchange, routingKey); >+ sendBuffer(msgBuffer, contentSize, mExchange, routingKey, &userLock); > QPID_LOG(debug, "SEND HeartbeatInd to=" << routingKey); > } > >@@ -1082,7 +1090,7 @@ void ManagementAgent::periodicProcessing (void) > > // Set TTL (in msecs) on outgoing heartbeat indications based on the interval > // time to prevent stale heartbeats from getting to the consoles. >- sendBufferLH(content, "", headers, "amqp/map", v2Topic, addr_key.str(), interval * 2 * 1000); >+ sendBuffer(content, "", headers, "amqp/map", v2Topic, addr_key.str(), interval * 2 * 1000, &userLock); > > QPID_LOG(debug, "SENT AgentHeartbeat name=" << name_address); > } >@@ -1090,14 +1098,19 @@ void ManagementAgent::periodicProcessing (void) > > void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) > { >- ManagementObjectMap::iterator iter = managementObjects.find(oid); >- if (iter == managementObjects.end()) >- return; >- ManagementObject* object = iter->second; >- if (!object->isDeleted()) >- return; >+ ManagementObject::shared_ptr object; >+ { >+ sys::Mutex::ScopedLock lock(objectLock); >+ ManagementObjectMap::iterator iter = managementObjects.find(oid); >+ if (iter == managementObjects.end()) >+ return; >+ object = iter->second; >+ if (!object->isDeleted()) >+ return; >+ managementObjects.erase(oid); >+ } > >- // since sendBufferLH drops the userLock, don't call it until we >+ // since sendBuffer drops the userLock, don't call it until we > // are done manipulating the object. > #define DNOW_BUFSIZE 2048 > char msgChars[DNOW_BUFSIZE]; >@@ -1134,15 +1147,14 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) > v2key << "." << instanceNameKey; > } > >- object = 0; >- managementObjects.erase(oid); >+ object.reset(); > > // object deleted, ok to drop lock now. > > if (publish && qmf1Support) { > uint32_t contentSize = msgBuffer.getPosition(); > msgBuffer.reset(); >- sendBufferLH(msgBuffer, contentSize, mExchange, v1key.str()); >+ sendBuffer(msgBuffer, contentSize, mExchange, v1key.str(), &userLock); > QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v1key.str()); > } > >@@ -1155,7 +1167,7 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) > > string content; > ListCodec::encode(list_, content); >- sendBufferLH(content, "", headers, "amqp/list", v2Topic, v2key.str()); >+ sendBuffer(content, "", headers, "amqp/list", v2Topic, v2key.str(), 0, &userLock); > QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v2key.str()); > } > } >@@ -1171,13 +1183,13 @@ void ManagementAgent::sendCommandCompleteLH(const string& replyToKey, uint32_t s > outBuffer.putShortString (text); > outLen = MA_BUFFER_SIZE - outBuffer.available (); > outBuffer.reset (); >- sendBufferLH(outBuffer, outLen, dExchange, replyToKey); >+ sendBuffer(outBuffer, outLen, dExchange, replyToKey, &userLock); > QPID_LOG(debug, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" << > replyToKey << " seq=" << sequence); > } > >-void ManagementAgent::sendExceptionLH(const string& rte, const string& rtk, const string& cid, >- const string& text, uint32_t code, bool viaLocal) >+void ManagementAgent::sendException(const string& rte, const string& rtk, const string& cid, >+ const string& text, uint32_t code, bool viaLocal) > { > static const string addr_exchange("qmf.default.direct"); > >@@ -1195,7 +1207,7 @@ void ManagementAgent::sendExceptionLH(const string& rte, const string& rtk, cons > map["_values"] = values; > > MapCodec::encode(map, content); >- sendBufferLH(content, cid, headers, "amqp/map", rte, rtk); >+ sendBuffer(content, cid, headers, "amqp/map", rte, rtk); > > QPID_LOG(debug, "SENT Exception code=" << code <<" text=" << text); > } >@@ -1206,7 +1218,6 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable, > const bool topic, > int qmfVersion) > { >- sys::Mutex::ScopedLock lock (userLock); > Message& msg = ((DeliverableMessage&) deliverable).getMessage (); > > if (topic && qmfVersion == 1) { >@@ -1220,6 +1231,7 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable, > // schema.# > > if (routingKey == "broker") { >+ sys::Mutex::ScopedLock lock (userLock); > dispatchAgentCommandLH(msg); > return false; > } >@@ -1227,15 +1239,18 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable, > if (routingKey.length() > 6) { > > if (routingKey.compare(0, 9, "agent.1.0") == 0) { >+ sys::Mutex::ScopedLock lock (userLock); > dispatchAgentCommandLH(msg); > return false; > } > > if (routingKey.compare(0, 8, "agent.1.") == 0) { >+ sys::Mutex::ScopedLock lock (userLock); > return authorizeAgentMessageLH(msg); > } > > if (routingKey.compare(0, 7, "schema.") == 0) { >+ sys::Mutex::ScopedLock lock (userLock); > dispatchAgentCommandLH(msg); > return true; > } >@@ -1248,6 +1263,7 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable, > // Intercept messages bound to: > // "console.ind.locate.# - process these messages, and also allow them to be forwarded. > if (routingKey == "console.request.agent_locate") { >+ sys::Mutex::ScopedLock lock (userLock); > dispatchAgentCommandLH(msg); > return true; > } >@@ -1259,6 +1275,7 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable, > // "<name_address>" - the broker agent's proper name > // and do not forward them futher > if (routingKey == "broker" || routingKey == name_address) { >+ sys::Mutex::ScopedLock lock (userLock); > dispatchAgentCommandLH(msg, routingKey == "broker"); > return false; > } >@@ -1270,7 +1287,7 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable, > > void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const ConnectionToken* connToken) > { >- moveNewObjectsLH(); >+ moveNewObjects(); > > string methodName; > string packageName; >@@ -1301,7 +1318,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl > outBuffer.putMediumString("QMFv1 methods forbidden on this broker, use QMFv2"); > outLen = MA_BUFFER_SIZE - outBuffer.available(); > outBuffer.reset(); >- sendBufferLH(outBuffer, outLen, dExchange, replyToKey); >+ sendBuffer(outBuffer, outLen, dExchange, replyToKey, &userLock); > QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN reason='All QMFv1 Methods Forbidden' seq=" << sequence); > return; > } >@@ -1312,7 +1329,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl > outBuffer.putMediumString(i->second); > outLen = MA_BUFFER_SIZE - outBuffer.available(); > outBuffer.reset(); >- sendBufferLH(outBuffer, outLen, dExchange, replyToKey); >+ sendBuffer(outBuffer, outLen, dExchange, replyToKey, &userLock); > QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence); > return; > } >@@ -1328,19 +1345,26 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl > outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); > outLen = MA_BUFFER_SIZE - outBuffer.available(); > outBuffer.reset(); >- sendBufferLH(outBuffer, outLen, dExchange, replyToKey); >+ sendBuffer(outBuffer, outLen, dExchange, replyToKey, &userLock); > QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); > return; > } > } > >- ManagementObjectMap::iterator iter = numericFind(objId); >- if (iter == managementObjects.end() || iter->second->isDeleted()) { >+ ManagementObject::shared_ptr object; >+ { >+ sys::Mutex::ScopedLock lock(objectLock); >+ ManagementObjectMap::iterator iter = numericFind(objId); >+ if (iter != managementObjects.end()) >+ object = iter->second; >+ } >+ >+ if (!object || object->isDeleted()) { > outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); > outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); > } else { >- if ((iter->second->getPackageName() != packageName) || >- (iter->second->getClassName() != className)) { >+ if ((object->getPackageName() != packageName) || >+ (object->getClassName() != className)) { > outBuffer.putLong (Manageable::STATUS_PARAMETER_INVALID); > outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_PARAMETER_INVALID)); > } >@@ -1349,7 +1373,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl > try { > sys::Mutex::ScopedUnlock u(userLock); > string outBuf; >- iter->second->doMethod(methodName, inArgs, outBuf, userId); >+ object->doMethod(methodName, inArgs, outBuf, userId); > outBuffer.putRawData(outBuf); > } catch(exception& e) { > outBuffer.setPosition(pos);; >@@ -1361,15 +1385,15 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl > > outLen = MA_BUFFER_SIZE - outBuffer.available(); > outBuffer.reset(); >- sendBufferLH(outBuffer, outLen, dExchange, replyToKey); >+ sendBuffer(outBuffer, outLen, dExchange, replyToKey, &userLock); > QPID_LOG(debug, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence); > } > > >-void ManagementAgent::handleMethodRequestLH (const string& body, const string& rte, const string& rtk, >- const string& cid, const ConnectionToken* connToken, bool viaLocal) >+void ManagementAgent::handleMethodRequest (const string& body, const string& rte, const string& rtk, >+ const string& cid, const ConnectionToken* connToken, bool viaLocal) > { >- moveNewObjectsLH(); >+ moveNewObjects(); > > string methodName; > Variant::Map inMap; >@@ -1388,8 +1412,8 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r > > if ((oid = inMap.find("_object_id")) == inMap.end() || > (mid = inMap.find("_method_name")) == inMap.end()) { >- sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID), >- Manageable::STATUS_PARAMETER_INVALID, viaLocal); >+ sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID), >+ Manageable::STATUS_PARAMETER_INVALID, viaLocal); > return; > } > >@@ -1407,16 +1431,22 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r > inArgs = (mid->second).asMap(); > } > } catch(exception& e) { >- sendExceptionLH(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal); >+ sendException(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal); > return; > } > >- ManagementObjectMap::iterator iter = managementObjects.find(objId); >+ ManagementObject::shared_ptr object; >+ { >+ sys::Mutex::ScopedLock lock(objectLock); >+ ManagementObjectMap::iterator iter = managementObjects.find(objId); >+ if (iter != managementObjects.end()) >+ object = iter->second; >+ } > >- if (iter == managementObjects.end() || iter->second->isDeleted()) { >+ if (!object || object->isDeleted()) { > stringstream estr; > estr << "No object found with ID=" << objId; >- sendExceptionLH(rte, rtk, cid, estr.str(), 1, viaLocal); >+ sendException(rte, rtk, cid, estr.str(), 1, viaLocal); > return; > } > >@@ -1424,20 +1454,20 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r > AclModule* acl = broker->getAcl(); > DisallowedMethods::const_iterator i; > >- i = disallowed.find(make_pair(iter->second->getClassName(), methodName)); >+ i = disallowed.find(make_pair(object->getClassName(), methodName)); > if (i != disallowed.end()) { >- sendExceptionLH(rte, rtk, cid, i->second, Manageable::STATUS_FORBIDDEN, viaLocal); >+ sendException(rte, rtk, cid, i->second, Manageable::STATUS_FORBIDDEN, viaLocal); > return; > } > > string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId(); > if (acl != 0) { > map<acl::Property, string> params; >- params[acl::PROP_SCHEMAPACKAGE] = iter->second->getPackageName(); >- params[acl::PROP_SCHEMACLASS] = iter->second->getClassName(); >+ params[acl::PROP_SCHEMAPACKAGE] = object->getPackageName(); >+ params[acl::PROP_SCHEMACLASS] = object->getClassName(); > > if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) { >- sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN), >+ sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN), > Manageable::STATUS_FORBIDDEN, viaLocal); > return; > } >@@ -1445,13 +1475,12 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r > > // invoke the method > >- QPID_LOG(debug, "RECV MethodRequest (v2) class=" << iter->second->getPackageName() >- << ":" << iter->second->getClassName() << " method=" << >+ QPID_LOG(debug, "RECV MethodRequest (v2) class=" << object->getPackageName() >+ << ":" << object->getClassName() << " method=" << > methodName << " replyTo=" << rte << "/" << rtk << " objId=" << objId << " inArgs=" << inArgs); > > try { >- sys::Mutex::ScopedUnlock u(userLock); >- iter->second->doMethod(methodName, inArgs, callMap, userId); >+ object->doMethod(methodName, inArgs, callMap, userId); > errorCode = callMap["_status_code"].asUint32(); > if (errorCode == 0) { > outMap["_arguments"] = Variant::Map(); >@@ -1462,17 +1491,17 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r > } else > error = callMap["_status_text"].asString(); > } catch(exception& e) { >- sendExceptionLH(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal); >+ sendException(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal); > return; > } > > if (errorCode != 0) { >- sendExceptionLH(rte, rtk, cid, error, errorCode, viaLocal); >+ sendException(rte, rtk, cid, error, errorCode, viaLocal); > return; > } > > MapCodec::encode(outMap, content); >- sendBufferLH(content, cid, headers, "amqp/map", rte, rtk); >+ sendBuffer(content, cid, headers, "amqp/map", rte, rtk); > QPID_LOG(debug, "SEND MethodResponse (v2) to=" << rte << "/" << rtk << " seq=" << cid << " map=" << outMap); > } > >@@ -1489,7 +1518,7 @@ void ManagementAgent::handleBrokerRequestLH (Buffer&, const string& replyToKey, > > outLen = MA_BUFFER_SIZE - outBuffer.available (); > outBuffer.reset (); >- sendBufferLH(outBuffer, outLen, dExchange, replyToKey); >+ sendBuffer(outBuffer, outLen, dExchange, replyToKey, &userLock); > QPID_LOG(debug, "SEND BrokerResponse to=" << replyToKey); > } > >@@ -1510,7 +1539,7 @@ void ManagementAgent::handlePackageQueryLH (Buffer&, const string& replyToKey, u > outLen = MA_BUFFER_SIZE - outBuffer.available (); > if (outLen) { > outBuffer.reset (); >- sendBufferLH(outBuffer, outLen, dExchange, replyToKey); >+ sendBuffer(outBuffer, outLen, dExchange, replyToKey, &userLock); > QPID_LOG(debug, "SEND PackageInd to=" << replyToKey << " seq=" << sequence); > } > >@@ -1559,7 +1588,7 @@ void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, const string& replyTo > > outLen = MA_BUFFER_SIZE - outBuffer.available(); > outBuffer.reset(); >- sendBufferLH(outBuffer, outLen, dExchange, replyToKey); >+ sendBuffer(outBuffer, outLen, dExchange, replyToKey, &userLock); > QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << classes.front().first.name << > "(" << Uuid(classes.front().first.hash) << ") to=" << replyToKey << " seq=" << sequence); > classes.pop_front(); >@@ -1595,7 +1624,7 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, const string& replyToK > key.encode(outBuffer); > outLen = MA_BUFFER_SIZE - outBuffer.available (); > outBuffer.reset (); >- sendBufferLH(outBuffer, outLen, dExchange, replyToKey); >+ sendBuffer(outBuffer, outLen, dExchange, replyToKey, &userLock); > QPID_LOG(debug, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << > "), to=" << replyToKey << " seq=" << sequence); > >@@ -1645,7 +1674,7 @@ void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& rte, > classInfo.appendSchema(outBuffer); > outLen = MA_BUFFER_SIZE - outBuffer.available(); > outBuffer.reset(); >- sendBufferLH(outBuffer, outLen, rte, rtk); >+ sendBuffer(outBuffer, outLen, rte, rtk, &userLock); > QPID_LOG(debug, "SEND SchemaResponse to=" << rte << "/" << rtk << " seq=" << sequence); > } > else >@@ -1692,7 +1721,7 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*r > encodeClassIndication(outBuffer, pIter->first, cIter->first, cIter->second.kind); > outLen = MA_BUFFER_SIZE - outBuffer.available(); > outBuffer.reset(); >- sendBufferLH(outBuffer, outLen, mExchange, "schema.class"); >+ sendBuffer(outBuffer, outLen, mExchange, "schema.class", &userLock); > QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << > " to=schema.class"); > } >@@ -1759,7 +1788,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep > ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId(); > Uuid systemId; > >- moveNewObjectsLH(); >+ moveNewObjects(); > deleteOrphanedAgentsLH(); > RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef); > if (aIter != remoteAgents.end()) { >@@ -1783,7 +1812,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep > agent->agentBank = assignedBank; > agent->routingKey = replyToKey; > agent->connectionRef = connectionRef; >- agent->mgmtObject = new _qmf::Agent (this, agent.get()); >+ agent->mgmtObject = _qmf::Agent::shared_ptr(new _qmf::Agent (this, agent.get())); > agent->mgmtObject->set_connectionRef(agent->connectionRef); > agent->mgmtObject->set_label (label); > agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); >@@ -1804,7 +1833,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep > outBuffer.putLong (assignedBank); > outLen = MA_BUFFER_SIZE - outBuffer.available (); > outBuffer.reset (); >- sendBufferLH(outBuffer, outLen, dExchange, replyToKey); >+ sendBuffer(outBuffer, outLen, dExchange, replyToKey, &userLock); > QPID_LOG(debug, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank << > " to=" << replyToKey << " seq=" << sequence); > } >@@ -1814,7 +1843,7 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe > FieldTable ft; > FieldTable::ValuePtr value; > >- moveNewObjectsLH(); >+ moveNewObjects(); > > ft.decode(inBuffer); > >@@ -1827,9 +1856,16 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe > return; > > ObjectId selector(value->get<string>()); >- ManagementObjectMap::iterator iter = numericFind(selector); >- if (iter != managementObjects.end()) { >- ManagementObject* object = iter->second; >+ >+ ManagementObject::shared_ptr object; >+ { >+ sys::Mutex::ScopedLock lock(objectLock); >+ ManagementObjectMap::iterator iter = numericFind(selector); >+ if (iter != managementObjects.end()) >+ object = iter->second; >+ } >+ >+ if (object) { > Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); > uint32_t outLen; > >@@ -1846,7 +1882,7 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe > outBuffer.putRawData(sBuf); > outLen = MA_BUFFER_SIZE - outBuffer.available (); > outBuffer.reset (); >- sendBufferLH(outBuffer, outLen, dExchange, replyToKey); >+ sendBuffer(outBuffer, outLen, dExchange, replyToKey, &userLock); > QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); > } > } >@@ -1855,59 +1891,57 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe > } > > string className (value->get<string>()); >- std::list<ObjectId>matches; >+ std::list<ManagementObject::shared_ptr> matches; > > if (className == "memory") >- qpid::sys::MemStat::loadMemInfo(memstat); >+ qpid::sys::MemStat::loadMemInfo(memstat.get()); > > if (className == "broker") { > uint64_t uptime = sys::Duration(startTime, sys::now()); >- static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); >+ boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime); > } > > > // build up a set of all objects to be dumped >- for (ManagementObjectMap::iterator iter = managementObjects.begin(); >- iter != managementObjects.end(); >- iter++) { >- ManagementObject* object = iter->second; >- if (object->getClassName () == className) { >- matches.push_back(object->getObjectId()); >+ { >+ sys::Mutex::ScopedLock lock(objectLock); >+ for (ManagementObjectMap::iterator iter = managementObjects.begin(); >+ iter != managementObjects.end(); >+ iter++) { >+ ManagementObject::shared_ptr object = iter->second; >+ if (object->getClassName () == className) { >+ matches.push_back(object); >+ } > } > } > >- // send them (as sendBufferLH drops the userLock) >+ // send them (as sendBuffer drops the userLock) > Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); > uint32_t outLen; > while (matches.size()) { >- ObjectId objId = matches.front(); >- ManagementObjectMap::iterator oIter = managementObjects.find( objId ); >- if (oIter != managementObjects.end()) { >- ManagementObject* object = oIter->second; >- >- if (object->getConfigChanged() || object->getInstChanged()) >- object->setUpdateTime(); >- >- if (!object->isDeleted()) { >- string sProps, sStats; >- object->writeProperties(sProps); >- object->writeStatistics(sStats, true); >- >- size_t len = 8 + sProps.length() + sStats.length(); // 8 == size of header in bytes. >- if (len > MA_BUFFER_SIZE) { >- QPID_LOG(error, "Object " << objId << " too large for output buffer - discarded!"); >- } else { >- if (outBuffer.available() < len) { // not enough room in current buffer, send it. >- outLen = MA_BUFFER_SIZE - outBuffer.available (); >- outBuffer.reset (); >- sendBufferLH(outBuffer, outLen, dExchange, replyToKey); // drops lock >- QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); >- continue; // lock dropped, need to re-find _SAME_ objid as it may have been deleted. >- } >- encodeHeader(outBuffer, 'g', sequence); >- outBuffer.putRawData(sProps); >- outBuffer.putRawData(sStats); >+ ManagementObject::shared_ptr object = matches.front(); >+ if (object->getConfigChanged() || object->getInstChanged()) >+ object->setUpdateTime(); >+ >+ if (!object->isDeleted()) { >+ string sProps, sStats; >+ object->writeProperties(sProps); >+ object->writeStatistics(sStats, true); >+ >+ size_t len = 8 + sProps.length() + sStats.length(); // 8 == size of header in bytes. >+ if (len > MA_BUFFER_SIZE) { >+ QPID_LOG(error, "Object " << object->getObjectId() << " too large for output buffer - discarded!"); >+ } else { >+ if (outBuffer.available() < len) { // not enough room in current buffer, send it. >+ outLen = MA_BUFFER_SIZE - outBuffer.available (); >+ outBuffer.reset (); >+ sendBuffer(outBuffer, outLen, dExchange, replyToKey, &userLock); // drops lock >+ QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); >+ continue; // lock dropped, need to re-find _SAME_ objid as it may have been deleted. > } >+ encodeHeader(outBuffer, 'g', sequence); >+ outBuffer.putRawData(sProps); >+ outBuffer.putRawData(sStats); > } > } > matches.pop_front(); >@@ -1916,7 +1950,7 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe > outLen = MA_BUFFER_SIZE - outBuffer.available (); > if (outLen) { > outBuffer.reset (); >- sendBufferLH(outBuffer, outLen, dExchange, replyToKey); >+ sendBuffer(outBuffer, outLen, dExchange, replyToKey, &userLock); > QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); > } > >@@ -1924,9 +1958,9 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe > } > > >-void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, const string& rtk, const string& cid, bool viaLocal) >+void ManagementAgent::handleGetQuery(const string& body, const string& rte, const string& rtk, const string& cid, bool viaLocal) > { >- moveNewObjectsLH(); >+ moveNewObjects(); > > Variant::Map inMap; > Variant::Map::const_iterator i; >@@ -1945,17 +1979,17 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co > */ > i = inMap.find("_what"); > if (i == inMap.end()) { >- sendExceptionLH(rte, rtk, cid, "_what element missing in Query"); >+ sendException(rte, rtk, cid, "_what element missing in Query"); > return; > } > > if (i->second.getType() != qpid::types::VAR_STRING) { >- sendExceptionLH(rte, rtk, cid, "_what element is not a string"); >+ sendException(rte, rtk, cid, "_what element is not a string"); > return; > } > > if (i->second.asString() != "OBJECT") { >- sendExceptionLH(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported"); >+ sendException(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported"); > return; > } > >@@ -1979,11 +2013,11 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co > } > > if (className == "memory") >- qpid::sys::MemStat::loadMemInfo(memstat); >+ qpid::sys::MemStat::loadMemInfo(memstat.get()); > > if (className == "broker") { > uint64_t uptime = sys::Duration(startTime, sys::now()); >- static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); >+ boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime); > } > > /* >@@ -1995,10 +2029,14 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co > Variant::List list_; > ObjectId objId(i->second.asMap()); > >- ManagementObjectMap::iterator iter = managementObjects.find(objId); >- if (iter != managementObjects.end()) { >- ManagementObject* object = iter->second; >- >+ ManagementObject::shared_ptr object; >+ { >+ sys::Mutex::ScopedLock lock (objectLock); >+ ManagementObjectMap::iterator iter = managementObjects.find(objId); >+ if (iter != managementObjects.end()) >+ object = iter->second; >+ } >+ if (object) { > if (object->getConfigChanged() || object->getInstChanged()) > object->setUpdateTime(); > >@@ -2022,7 +2060,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co > string content; > > ListCodec::encode(list_, content); >- sendBufferLH(content, cid, headers, "amqp/list", rte, rtk); >+ sendBuffer(content, cid, headers, "amqp/list", rte, rtk); > QPID_LOG(debug, "SENT QueryResponse (query by object_id) to=" << rte << "/" << rtk); > return; > } >@@ -2032,10 +2070,18 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co > Variant::List _subList; > unsigned int objCount = 0; > >- for (ManagementObjectMap::iterator iter = managementObjects.begin(); >- iter != managementObjects.end(); >+ ManagementObjectVector localManagementObjects; >+ { >+ sys::Mutex::ScopedLock objLock(objectLock); >+ std::transform(managementObjects.begin(), managementObjects.end(), >+ std::back_inserter(localManagementObjects), >+ boost::bind(&ManagementObjectMap::value_type::second, _1)); >+ } >+ >+ for (ManagementObjectVector::iterator iter = localManagementObjects.begin(); >+ iter != localManagementObjects.end(); > iter++) { >- ManagementObject* object = iter->second; >+ ManagementObject::shared_ptr object = *iter; > if (object->getClassName() == className && > (packageName.empty() || object->getPackageName() == packageName)) { > >@@ -2050,7 +2096,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co > > object->writeTimestamps(map_); > object->mapEncodeValues(values, true, true); // write both stats and properties >- iter->first.mapEncode(oidMap); >+ object->getObjectId().mapEncode(oidMap); > > map_["_values"] = values; > map_["_object_id"] = oidMap; >@@ -2075,13 +2121,13 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co > string content; > while (_list.size() > 1) { > ListCodec::encode(_list.front().asList(), content); >- sendBufferLH(content, cid, headers, "amqp/list", rte, rtk); >+ sendBuffer(content, cid, headers, "amqp/list", rte, rtk); > _list.pop_front(); > QPID_LOG(debug, "SENT QueryResponse (partial, query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length()); > } > headers.erase("partial"); > ListCodec::encode(_list.size() ? _list.front().asList() : Variant::List(), content); >- sendBufferLH(content, cid, headers, "amqp/list", rte, rtk); >+ sendBuffer(content, cid, headers, "amqp/list", rte, rtk); > QPID_LOG(debug, "SENT QueryResponse (query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length()); > return; > } >@@ -2089,12 +2135,12 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co > // Unrecognized query - Send empty message to indicate CommandComplete > string content; > ListCodec::encode(Variant::List(), content); >- sendBufferLH(content, cid, headers, "amqp/list", rte, rtk); >+ sendBuffer(content, cid, headers, "amqp/list", rte, rtk); > QPID_LOG(debug, "SENT QueryResponse (empty) to=" << rte << "/" << rtk); > } > > >-void ManagementAgent::handleLocateRequestLH(const string&, const string& rte, const string& rtk, const string& cid) >+void ManagementAgent::handleLocateRequest(const string&, const string& rte, const string& rtk, const string& cid) > { > QPID_LOG(debug, "RCVD AgentLocateRequest"); > >@@ -2112,7 +2158,7 @@ void ManagementAgent::handleLocateRequestLH(const string&, const string& rte, co > > string content; > MapCodec::encode(map, content); >- sendBufferLH(content, cid, headers, "amqp/map", rte, rtk); >+ sendBuffer(content, cid, headers, "amqp/map", rte, rtk); > clientWasAdded = true; > > QPID_LOG(debug, "SENT AgentLocateResponse replyTo=" << rte << "/" << rtk); >@@ -2187,7 +2233,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) > } > > // look up schema for object to get package and class name >- >+ sys::Mutex::ScopedLock lock(objectLock); > ManagementObjectMap::iterator iter = managementObjects.find(objId); > > if (iter == managementObjects.end() || iter->second->isDeleted()) { >@@ -2195,7 +2241,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) > objId); > return false; > } >- >+ > packageName = iter->second->getPackageName(); > className = iter->second->getClassName(); > } >@@ -2249,8 +2295,8 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) > cid = p->getCorrelationId(); > > if (mapMsg) { >- sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN), >- Manageable::STATUS_FORBIDDEN, false); >+ sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN), >+ Manageable::STATUS_FORBIDDEN, false); > } else { > > Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); >@@ -2261,7 +2307,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) > outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); > outLen = MA_BUFFER_SIZE - outBuffer.available(); > outBuffer.reset(); >- sendBufferLH(outBuffer, outLen, rte, rtk); >+ sendBuffer(outBuffer, outLen, rte, rtk, &userLock); > } > > QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); >@@ -2309,24 +2355,26 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal) > string body; > string cid; > inBuffer.getRawData(body, bufferLen); >+ { >+ // Drop userLock now that the shared buffer is no longer needed >+ sys::Mutex::ScopedUnlock u(userLock); > >- if (p && p->hasCorrelationId()) { >- cid = p->getCorrelationId(); >- } >- >- if (opcode == "_method_request") >- return handleMethodRequestLH(body, rte, rtk, cid, msg.getPublisher(), viaLocal); >- else if (opcode == "_query_request") >- return handleGetQueryLH(body, rte, rtk, cid, viaLocal); >- else if (opcode == "_agent_locate_request") >- return handleLocateRequestLH(body, rte, rtk, cid); >+ if (p && p->hasCorrelationId()) { >+ cid = p->getCorrelationId(); >+ } > >+ if (opcode == "_method_request") >+ return handleMethodRequest(body, rte, rtk, cid, msg.getPublisher(), viaLocal); >+ else if (opcode == "_query_request") >+ return handleGetQuery(body, rte, rtk, cid, viaLocal); >+ else if (opcode == "_agent_locate_request") >+ return handleLocateRequest(body, rte, rtk, cid); >+ } > QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!"); > return; > } > > // old preV2 binary messages >- > while (inBuffer.getPosition() < bufferLen) { > uint32_t sequence; > if (!checkHeader(inBuffer, &opcode, &sequence)) >@@ -2364,7 +2412,7 @@ ManagementAgent::PackageMap::iterator ManagementAgent::findOrAddPackageLH(string > encodePackageIndication (outBuffer, result.first); > outLen = MA_BUFFER_SIZE - outBuffer.available (); > outBuffer.reset (); >- sendBufferLH(outBuffer, outLen, mExchange, "schema.package"); >+ sendBuffer(outBuffer, outLen, mExchange, "schema.package", &userLock); > QPID_LOG(debug, "SEND PackageInd package=" << name << " to=schema.package"); > > return result.first; >@@ -2676,7 +2724,7 @@ void ManagementAgent::RemoteAgent::mapDecode(const Variant::Map& map_) { > connectionRef.mapDecode(i->second.asMap()); > } > >- mgmtObject = new _qmf::Agent(&agent, this); >+ mgmtObject = _qmf::Agent::shared_ptr(new _qmf::Agent(&agent, this)); > > if ((i = map_.find("_values")) != map_.end()) { > mgmtObject->mapDecodeValues(i->second.asMap()); >@@ -2819,8 +2867,8 @@ void ManagementAgent::exportDeletedObjects(DeletedObjectList& outList) > > sys::Mutex::ScopedLock lock (userLock); > >- moveNewObjectsLH(); >- moveDeletedObjectsLH(); >+ moveNewObjects(); >+ moveDeletedObjects(); > > // now copy the pending deletes into the outList > for (PendingDeletedObjsMap::iterator mIter = pendingDeletedObjs.begin(); >@@ -2837,15 +2885,15 @@ void ManagementAgent::exportDeletedObjects(DeletedObjectList& outList) > void ManagementAgent::importDeletedObjects(const DeletedObjectList& inList) > { > sys::Mutex::ScopedLock lock (userLock); >+ sys::Mutex::ScopedLock objLock(objectLock); > // Clear out any existing deleted objects >- moveNewObjectsLH(); >+ moveNewObjects(); > pendingDeletedObjs.clear(); > ManagementObjectMap::iterator i = managementObjects.begin(); > // Silently drop any deleted objects left over from receiving the update. > while (i != managementObjects.end()) { >- ManagementObject* object = i->second; >+ ManagementObject::shared_ptr object = i->second; > if (object->isDeleted()) { >- delete object; > managementObjects.erase(i++); > } > else ++i; >@@ -2859,7 +2907,7 @@ void ManagementAgent::importDeletedObjects(const DeletedObjectList& inList) > > > // construct a DeletedObject from a management object. >-ManagementAgent::DeletedObject::DeletedObject(ManagementObject *src, bool v1, bool v2) >+ManagementAgent::DeletedObject::DeletedObject(ManagementObject::shared_ptr src, bool v1, bool v2) > : packageName(src->getPackageName()), > className(src->getClassName()) > { >@@ -2935,14 +2983,17 @@ void ManagementAgent::DeletedObject::encode(std::string& toBuffer) > } > > // Remove Deleted objects, and save for later publishing... >-bool ManagementAgent::moveDeletedObjectsLH() { >- typedef vector<pair<ObjectId, ManagementObject*> > DeleteList; >+bool ManagementAgent::moveDeletedObjects() { >+ typedef vector<pair<ObjectId, ManagementObject::shared_ptr> > DeleteList; >+ >+ sys::Mutex::ScopedLock lock (objectLock); >+ > DeleteList deleteList; > for (ManagementObjectMap::iterator iter = managementObjects.begin(); > iter != managementObjects.end(); > ++iter) > { >- ManagementObject* object = iter->second; >+ ManagementObject::shared_ptr object = iter->second; > if (object->isDeleted()) deleteList.push_back(*iter); > } > >@@ -2951,13 +3002,12 @@ bool ManagementAgent::moveDeletedObjectsLH() { > iter != deleteList.rend(); > iter++) > { >- ManagementObject* delObj = iter->second; >+ ManagementObject::shared_ptr delObj = iter->second; > assert(delObj->isDeleted()); > DeletedObject::shared_ptr dptr(new DeletedObject(delObj, qmf1Support, qmf2Support)); > > pendingDeletedObjs[dptr->getKey()].push_back(dptr); > managementObjects.erase(iter->first); >- delete iter->second; > } > return !deleteList.empty(); > } >diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h >index c7e830d..f8dfae3 100644 >--- a/qpid/cpp/src/qpid/management/ManagementAgent.h >+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h >@@ -37,6 +37,7 @@ > #include "qpid/types/Variant.h" > #include <qpid/framing/AMQFrame.h> > #include <qpid/framing/ResizableBuffer.h> >+#include <boost/shared_ptr.hpp> > #include <memory> > #include <string> > #include <map> >@@ -100,12 +101,12 @@ public: > const std::string& eventName, > uint8_t* md5Sum, > ManagementObject::writeSchemaCall_t schemaCall); >- QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object, >- uint64_t persistId = 0, >- bool persistent = false); >- QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object, >- const std::string& key, >- bool persistent = false); >+ QPID_BROKER_EXTERN ObjectId addObject (ManagementObject::shared_ptr object, >+ uint64_t persistId = 0, >+ bool persistent = false); >+ QPID_BROKER_EXTERN ObjectId addObject (ManagementObject::shared_ptr object, >+ const std::string& key, >+ bool persistent = false); > QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event, > severity_t severity = SEV_DEFAULT); > QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey); >@@ -158,7 +159,7 @@ public: > class DeletedObject { > public: > typedef boost::shared_ptr<DeletedObject> shared_ptr; >- DeletedObject(ManagementObject *, bool v1, bool v2); >+ DeletedObject(ManagementObject::shared_ptr, bool v1, bool v2); > DeletedObject( const std::string &encoded ); > ~DeletedObject() {}; > void encode( std::string& toBuffer ); >@@ -207,9 +208,9 @@ private: > uint32_t agentBank; > std::string routingKey; > ObjectId connectionRef; >- qmf::org::apache::qpid::broker::Agent* mgmtObject; >- RemoteAgent(ManagementAgent& _agent) : agent(_agent), mgmtObject(0) {} >- ManagementObject* GetManagementObject (void) const { return mgmtObject; } >+ qmf::org::apache::qpid::broker::Agent::shared_ptr mgmtObject; >+ RemoteAgent(ManagementAgent& _agent) : agent(_agent) {} >+ ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; } > > virtual ~RemoteAgent (); > void mapEncode(qpid::types::Variant::Map& _map) const; >@@ -276,7 +277,7 @@ private: > PackageMap packages; > > // >- // Protected by userLock >+ // Protected by objectLock > // > ManagementObjectMap managementObjects; > >@@ -288,11 +289,12 @@ private: > framing::Uuid uuid; > > // >- // Lock hierarchy: If a thread needs to take both addLock and userLock, >- // it MUST take userLock first, then addLock. >+ // Lock ordering: userLock -> addLock -> objectLock > // > sys::Mutex userLock; > sys::Mutex addLock; >+ sys::Mutex objectLock; >+ sys::Mutex eventBufferLock; > > qpid::broker::Exchange::shared_ptr mExchange; > qpid::broker::Exchange::shared_ptr dExchange; >@@ -335,7 +337,7 @@ private: > // list of objects that have been deleted, but have yet to be published > // one final time. > // Indexed by a string composed of the object's package and class name. >- // Protected by userLock. >+ // Protected by objectLock. > typedef std::map<std::string, DeletedObjectList> PendingDeletedObjsMap; > PendingDeletedObjsMap pendingDeletedObjs; > >@@ -348,37 +350,41 @@ private: > // > // Memory statistics object > // >- qmf::org::apache::qpid::broker::Memory *memstat; >+ qmf::org::apache::qpid::broker::Memory::shared_ptr memstat; > > void writeData (); > void periodicProcessing (void); > void deleteObjectNowLH(const ObjectId& oid); > void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); > bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); >- void sendBufferLH(framing::Buffer& buf, >- uint32_t length, >- qpid::broker::Exchange::shared_ptr exchange, >- const std::string& routingKey); >- void sendBufferLH(framing::Buffer& buf, >- uint32_t length, >- const std::string& exchange, >- const std::string& routingKey); >- void sendBufferLH(const std::string& data, >- const std::string& cid, >- const qpid::types::Variant::Map& headers, >- const std::string& content_type, >- qpid::broker::Exchange::shared_ptr exchange, >- const std::string& routingKey, >- uint64_t ttl_msec = 0); >- void sendBufferLH(const std::string& data, >- const std::string& cid, >- const qpid::types::Variant::Map& headers, >- const std::string& content_type, >- const std::string& exchange, >- const std::string& routingKey, >- uint64_t ttl_msec = 0); >- void moveNewObjectsLH(); >- bool moveDeletedObjectsLH(); >+ void sendBuffer(framing::Buffer& buf, >+ uint32_t length, >+ qpid::broker::Exchange::shared_ptr exchange, >+ const std::string& routingKey, >+ sys::Mutex* unlockMutex = NULL); >+ void sendBuffer(framing::Buffer& buf, >+ uint32_t length, >+ const std::string& exchange, >+ const std::string& routingKey, >+ sys::Mutex* unlockMutex = NULL); >+ void sendBuffer(const std::string& data, >+ const std::string& cid, >+ const qpid::types::Variant::Map& headers, >+ const std::string& content_type, >+ qpid::broker::Exchange::shared_ptr exchange, >+ const std::string& routingKey, >+ uint64_t ttl_msec = 0, >+ sys::Mutex* unlockMutex = NULL); >+ void sendBuffer(const std::string& data, >+ const std::string& cid, >+ const qpid::types::Variant::Map& headers, >+ const std::string& content_type, >+ const std::string& exchange, >+ const std::string& routingKey, >+ uint64_t ttl_msec = 0, >+ sys::Mutex* unlockMutex = NULL); >+ void moveNewObjects(); >+ bool moveDeletedObjects(); > > bool authorizeAgentMessageLH(qpid::broker::Message& msg); > void dispatchAgentCommandLH(qpid::broker::Message& msg, bool viaLocal=false); >@@ -401,7 +407,7 @@ private: > void deleteOrphanedAgentsLH(); > void sendCommandCompleteLH(const std::string& replyToKey, uint32_t sequence, > uint32_t code = 0, const std::string& text = "OK"); >- void sendExceptionLH(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false); >+ void sendException(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false); > void handleBrokerRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); > void handlePackageQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); > void handlePackageIndLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); >@@ -412,9 +418,9 @@ private: > void handleAttachRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); > void handleGetQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); > void handleMethodRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); >- void handleGetQueryLH (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, bool viaLocal); >- void handleMethodRequestLH (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal); >- void handleLocateRequestLH (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid); >+ void handleGetQuery (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, bool viaLocal); >+ void handleMethodRequest (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal); >+ void handleLocateRequest (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid); > > > size_t validateSchema(framing::Buffer&, uint8_t kind); >diff --git a/qpid/cpp/src/tests/testagent.cpp b/qpid/cpp/src/tests/testagent.cpp >index e6010a8..d538a81 100644 >--- a/qpid/cpp/src/tests/testagent.cpp >+++ b/qpid/cpp/src/tests/testagent.cpp >@@ -59,7 +59,7 @@ class CoreClass : public Manageable > { > string name; > ManagementAgent* agent; >- _qmf::Parent* mgmtObject; >+ _qmf::Parent::shared_ptr mgmtObject; > std::vector<ChildClass*> children; > Mutex vectorLock; > >@@ -68,7 +68,7 @@ public: > CoreClass(ManagementAgent* agent, string _name); > ~CoreClass() { mgmtObject->resourceDestroy(); } > >- ManagementObject* GetManagementObject(void) const >+ ManagementObject::shared_ptr GetManagementObject(void) const > { return mgmtObject; } > > void doLoop(); >@@ -78,14 +78,14 @@ public: > class ChildClass : public Manageable > { > string name; >- _qmf::Child* mgmtObject; >+ _qmf::Child::shared_ptr mgmtObject; > > public: > > ChildClass(ManagementAgent* agent, CoreClass* parent, string name); > ~ChildClass() { mgmtObject->resourceDestroy(); } > >- ManagementObject* GetManagementObject(void) const >+ ManagementObject::shared_ptr GetManagementObject(void) const > { return mgmtObject; } > > void doWork() >@@ -97,9 +97,9 @@ public: > CoreClass::CoreClass(ManagementAgent* _agent, string _name) : name(_name), agent(_agent) > { > static uint64_t persistId = 0x111222333444555LL; >- mgmtObject = new _qmf::Parent(agent, this, name); >+ mgmtObject = _qmf::Parent::shared_ptr(new _qmf::Parent(agent, this, name)); > >- agent->addObject(mgmtObject, persistId++); >+ agent->addObject(mgmtObject.get(), persistId++); > mgmtObject->set_state("IDLE"); > } > >@@ -146,9 +146,9 @@ Manageable::status_t CoreClass::ManagementMethod(uint32_t methodId, Args& args, > > ChildClass::ChildClass(ManagementAgent* agent, CoreClass* parent, string name) > { >- mgmtObject = new _qmf::Child(agent, this, parent, name); >+ mgmtObject = _qmf::Child::shared_ptr(new _qmf::Child(agent, this, parent, name)); > >- agent->addObject(mgmtObject); >+ agent->addObject(mgmtObject.get()); > } > >
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
View Attachment As Diff
View Attachment As Raw
Actions:
View
|
Diff
Attachments on
bug 860701
: 617570 |
619735