Login
[x]
Log in using an account from:
Fedora Account System
Red Hat Associate
Red Hat Customer
Or login using a Red Hat Bugzilla account
Forgot Password
Login:
Hide Forgot
Create an Account
Red Hat Bugzilla – Attachment 606198 Details for
Bug 836141
destination cluster de-sync when federation link used for a longer time
[?]
New
Simple Search
Advanced Search
My Links
Browse
Requests
Reports
Current State
Search
Tabular reports
Graphical reports
Duplicates
Other Reports
User Changes
Plotly Reports
Bug Status
Bug Severity
Non-Defaults
|
Product Dashboard
Help
Page Help!
Bug Writing Guidelines
What's new
Browser Support Policy
5.0.4.rh83 Release notes
FAQ
Guides index
User guide
Web Services
Contact
Legal
This site requires JavaScript to be enabled to function correctly, please enable it.
[patch]
patch - current (my final) status
bz826989-844618-836141.patch (text/plain), 101.83 KB, created by
Pavel Moravec
on 2012-08-22 11:20:04 UTC
(
hide
)
Description:
patch - current (my final) status
Filename:
MIME Type:
Creator:
Pavel Moravec
Created:
2012-08-22 11:20:04 UTC
Size:
101.83 KB
patch
obsolete
>diff -rup a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp >--- a/qpid/cpp/src/qpid/broker/Bridge.cpp 2011-01-18 21:43:41.000000000 +0100 >+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp 2012-08-20 08:55:02.000000000 +0200 >@@ -46,20 +46,21 @@ void Bridge::PushHandler::handle(framing > conn->received(frame); > } > >-Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, >- const _qmf::ArgsLinkBridge& _args) : >- link(_link), id(_id), args(_args), mgmtObject(0), >- listener(l), name(Uuid(true).str()), queueName("bridge_queue_"), persistenceId(0) >+Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id, >+ CancellationListener l, const _qmf::ArgsLinkBridge& _args) : >+ link(_link), channel(_id), args(_args), mgmtObject(0), >+ listener(l), name(_name), queueName("qpid.bridge_queue_"), persistenceId(0) > { > std::stringstream title; >- title << id << "_" << link->getBroker()->getFederationTag(); >+ title << "qpid.bridge_queue_" << name << "_" << link->getBroker()->getFederationTag(); > queueName += title.str(); > ManagementAgent* agent = link->getBroker()->getManagementAgent(); > if (agent != 0) { > mgmtObject = new _qmf::Bridge >- (agent, this, link, id, args.i_durable, args.i_src, args.i_dest, >+ (agent, this, link, name, args.i_durable, args.i_src, args.i_dest, > args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, > args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync); >+ mgmtObject->set_channelId(channel); > agent->addObject(mgmtObject); > } > QPID_LOG(debug, "Bridge created from " << args.i_src << " to " << args.i_dest); >@@ -76,21 +77,21 @@ void Bridge::create(Connection& c) > conn = &c; > FieldTable options; > if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync); >- SessionHandler& sessionHandler = c.getChannel(id); >+ SessionHandler& sessionHandler = c.getChannel(channel); > if (args.i_srcIsLocal) { > if (args.i_dynamic) > throw Exception("Dynamic routing not supported for push routes"); > // Point the bridging commands at the local connection handler > pushHandler.reset(new PushHandler(&c)); >- channelHandler.reset(new framing::ChannelHandler(id, pushHandler.get())); >+ channelHandler.reset(new framing::ChannelHandler(channel, pushHandler.get())); > > session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler)); > peer.reset(new framing::AMQP_ServerProxy(*channelHandler)); >- >- session->attach(name, false); >+ >+ session->attach(queueName, false); > session->commandPoint(0,0); > } else { >- sessionHandler.attachAs(name); >+ sessionHandler.attachAs(queueName); > // Point the bridging commands at the remote peer broker > peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out)); > } >@@ -150,6 +151,7 @@ void Bridge::cancel(Connection&) > } > } > >+/** Notify the bridge that the connection has closed */ > void Bridge::closed() > { > if (args.i_dynamic) { >@@ -159,9 +161,10 @@ void Bridge::closed() > } > } > >-void Bridge::destroy() >+/** Shut down the bridge */ >+void Bridge::close() > { >- listener(this); >+ listener(this); // ask the LinkRegistry to destroy us > } > > void Bridge::setPersistenceId(uint64_t pId) const >@@ -169,13 +172,20 @@ void Bridge::setPersistenceId(uint64_t p > persistenceId = pId; > } > >-const string& Bridge::getName() const >+const std::string Bridge::ENCODED_IDENTIFIER("bridge.v2"); >+const std::string Bridge::ENCODED_IDENTIFIER_V1("bridge"); >+ >+bool Bridge::isEncodedBridge(const std::string& key) > { >- return name; >+ return key == ENCODED_IDENTIFIER || key == ENCODED_IDENTIFIER_V1; > } > >+ > Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) > { >+ string kind; >+ buffer.getShortString(kind); >+ > string host; > uint16_t port; > string src; >@@ -183,9 +193,33 @@ Bridge::shared_ptr Bridge::decode(LinkRe > string key; > string id; > string excludes; >+ string name; >+ >+ Link::shared_ptr link; >+ if (kind == ENCODED_IDENTIFIER_V1) { >+ /** previous versions identified the bridge by host:port, not by name, and >+ * transport wasn't provided. Try to find a link using those paramters. >+ */ >+ buffer.getShortString(host); >+ port = buffer.getShort(); >+ >+ link = links.getLink(host, port); >+ if (!link) { >+ QPID_LOG(error, "Bridge::decode() failed: cannot find Link for host=" << host << ", port=" << port); >+ return Bridge::shared_ptr(); >+ } >+ } else { >+ string linkName; >+ >+ buffer.getShortString(name); >+ buffer.getShortString(linkName); >+ link = links.getLink(linkName); >+ if (!link) { >+ QPID_LOG(error, "Bridge::decode() failed: cannot find Link named='" << linkName << "'"); >+ return Bridge::shared_ptr(); >+ } >+ } > >- buffer.getShortString(host); >- port = buffer.getShort(); > bool durable(buffer.getOctet()); > buffer.getShortString(src); > buffer.getShortString(dest); >@@ -197,15 +231,21 @@ Bridge::shared_ptr Bridge::decode(LinkRe > bool dynamic(buffer.getOctet()); > uint16_t sync = buffer.getShort(); > >- return links.declare(host, port, durable, src, dest, key, >- is_queue, is_local, id, excludes, dynamic, sync).first; >+ if (kind == ENCODED_IDENTIFIER_V1) { >+ /** previous versions did not provide a name for the bridge, so create one >+ */ >+ name = createName(link->getName(), src, dest, key); >+ } >+ >+ return links.declare(name, *link, durable, src, dest, key, is_queue, >+ is_local, id, excludes, dynamic, sync).first; > } > > void Bridge::encode(Buffer& buffer) const > { >- buffer.putShortString(string("bridge")); >- buffer.putShortString(link->getHost()); >- buffer.putShort(link->getPort()); >+ buffer.putShortString(ENCODED_IDENTIFIER); >+ buffer.putShortString(name); >+ buffer.putShortString(link->getName()); > buffer.putOctet(args.i_durable ? 1 : 0); > buffer.putShortString(args.i_src); > buffer.putShortString(args.i_dest); >@@ -220,9 +260,9 @@ void Bridge::encode(Buffer& buffer) cons > > uint32_t Bridge::encodedSize() const > { >- return link->getHost().size() + 1 // short-string (host) >- + 7 // short-string ("bridge") >- + 2 // port >+ return ENCODED_IDENTIFIER.size() + 1 // +1 byte length >+ + name.size() + 1 >+ + link->getName().size() + 1 > + 1 // durable > + args.i_src.size() + 1 > + args.i_dest.size() + 1 >@@ -246,7 +286,8 @@ management::Manageable::status_t Bridge: > { > if (methodId == _qmf::Bridge::METHOD_CLOSE) { > //notify that we are closed >- destroy(); >+ QPID_LOG(debug, "Bridge::close() method called on bridge '" << name << "'"); >+ close(); > return management::Manageable::STATUS_OK; > } else { > return management::Manageable::STATUS_UNKNOWN_METHOD; >@@ -293,7 +334,7 @@ void Bridge::sendReorigin() > } > bool Bridge::resetProxy() > { >- SessionHandler& sessionHandler = conn->getChannel(id); >+ SessionHandler& sessionHandler = conn->getChannel(channel); > if (!sessionHandler.getSession()) peer.reset(); > else peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out)); > return peer.get(); >@@ -305,7 +346,7 @@ void Bridge::ioThreadPropagateBinding(co > peer->getExchange().bind(queue, exchange, key, args); > } else { > QPID_LOG(error, "Cannot propagate binding for dynamic bridge as session has been detached, deleting dynamic bridge"); >- destroy(); >+ close(); > } > } > >@@ -320,4 +361,14 @@ const string& Bridge::getLocalTag() cons > return link->getBroker()->getFederationTag(); > } > >+std::string Bridge::createName(const std::string& linkName, >+ const std::string& src, >+ const std::string& dest, >+ const std::string& key) >+{ >+ std::stringstream keystream; >+ keystream << linkName << "!" << src << "!" << dest << "!" << key; >+ return keystream.str(); >+} >+ > }} >diff -rup a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h >--- a/qpid/cpp/src/qpid/broker/Bridge.h 2010-10-14 21:38:40.000000000 +0200 >+++ b/qpid/cpp/src/qpid/broker/Bridge.h 2012-08-20 08:55:02.000000000 +0200 >@@ -49,15 +49,16 @@ public: > typedef boost::shared_ptr<Bridge> shared_ptr; > typedef boost::function<void(Bridge*)> CancellationListener; > >- Bridge(Link* link, framing::ChannelId id, CancellationListener l, >+ Bridge(const std::string& name, Link* link, framing::ChannelId id, CancellationListener l, > const qmf::org::apache::qpid::broker::ArgsLinkBridge& args); > ~Bridge(); > >- void create(Connection& c); >- void cancel(Connection& c); >- void closed(); >- void destroy(); >+ QPID_BROKER_EXTERN void close(); > bool isDurable() { return args.i_durable; } >+ Link *getLink() const { return link; } >+ const std::string getSrc() const { return args.i_src; } >+ const std::string getDest() const { return args.i_dest; } >+ const std::string getKey() const { return args.i_key; } > > management::ManagementObject* GetManagementObject() const; > management::Manageable::status_t ManagementMethod(uint32_t methodId, >@@ -68,9 +69,13 @@ public: > void setPersistenceId(uint64_t id) const; > uint64_t getPersistenceId() const { return persistenceId; } > uint32_t encodedSize() const; >- void encode(framing::Buffer& buffer) const; >- const std::string& getName() const; >+ void encode(framing::Buffer& buffer) const; >+ const std::string& getName() const { return name; } >+ >+ static const std::string ENCODED_IDENTIFIER; >+ static const std::string ENCODED_IDENTIFIER_V1; > static Bridge::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer); >+ static bool isEncodedBridge(const std::string& key); > > // Exchange::DynamicBridge methods > void propagateBinding(const std::string& key, const std::string& tagList, const std::string& op, const std::string& origin, qpid::framing::FieldTable* extra_args=0); >@@ -79,6 +84,12 @@ public: > bool containsLocalTag(const std::string& tagList) const; > const std::string& getLocalTag() const; > >+ /** create a name for a bridge (if none supplied by user config) */ >+ static std::string createName(const std::string& linkName, >+ const std::string& src, >+ const std::string& dest, >+ const std::string& key); >+ > private: > struct PushHandler : framing::FrameHandler { > PushHandler(Connection* c) { conn = c; } >@@ -91,8 +102,8 @@ private: > std::auto_ptr<framing::AMQP_ServerProxy::Session> session; > std::auto_ptr<framing::AMQP_ServerProxy> peer; > >- Link* link; >- framing::ChannelId id; >+ Link* const link; >+ const framing::ChannelId channel; > qmf::org::apache::qpid::broker::ArgsLinkBridge args; > qmf::org::apache::qpid::broker::Bridge* mgmtObject; > CancellationListener listener; >@@ -103,6 +114,12 @@ private: > Connection* conn; > > bool resetProxy(); >+ >+ // connection Management (called by owning Link) >+ void create(Connection& c); >+ void cancel(Connection& c); >+ void closed(); >+ friend class Link; // to call create, cancel, closed() > }; > > >diff -rup a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp >--- a/qpid/cpp/src/qpid/broker/Broker.cpp 2012-08-20 08:54:35.000000000 +0200 >+++ b/qpid/cpp/src/qpid/broker/Broker.cpp 2012-08-20 08:55:02.000000000 +0200 >@@ -33,6 +33,7 @@ > #include "qpid/broker/Link.h" > #include "qpid/broker/ExpiryPolicy.h" > #include "qpid/broker/QueueFlowLimit.h" >+#include "qpid/broker/NameGenerator.h" > > #include "qmf/org/apache/qpid/broker/Package.h" > #include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h" >@@ -416,7 +417,7 @@ Manageable* Broker::GetVhostObject(void) > > Manageable::status_t Broker::ManagementMethod (uint32_t methodId, > Args& args, >- string&) >+ string& text) > { > Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; > >@@ -431,6 +432,14 @@ Manageable::status_t Broker::ManagementM > status = Manageable::STATUS_OK; > break; > case _qmf::Broker::METHOD_CONNECT : { >+ /** Management is creating a Link to a remote broker using the host and port of >+ * the remote. This (old) interface does not allow management to specify a name >+ * for the link, nor does it allow multiple Links to the same remote. Use the >+ * "create()" broker method if these features are needed. >+ * TBD: deprecate this interface. >+ */ >+ QPID_LOG(info, "The Broker::connect() method will be removed in a future release of QPID." >+ " Please use the Broker::create() method with type='link' instead."); > _qmf::ArgsBrokerConnect& hp= > dynamic_cast<_qmf::ArgsBrokerConnect&>(args); > >@@ -438,13 +447,24 @@ Manageable::status_t Broker::ManagementM > string transport = hp.i_transport.empty() ? TCP_TRANSPORT : hp.i_transport; > if (!getProtocolFactory(transport)) { > QPID_LOG(error, "Transport '" << transport << "' not supported"); >+ text = "transport type not supported"; > return Manageable::STATUS_NOT_IMPLEMENTED; > } >- std::pair<Link::shared_ptr, bool> response = >- links.declare (hp.i_host, hp.i_port, transport, hp.i_durable, >- hp.i_authMechanism, hp.i_username, hp.i_password); >- if (hp.i_durable && response.second) >- store->create(*response.first); >+ >+ // Does a link to the remote already exist? If so, re-use the existing link >+ // - this behavior is backward compatible with previous releases. >+ if (!links.getLink(hp.i_host, hp.i_port, transport)) { >+ // new link, need to generate a unique name for it >+ std::pair<Link::shared_ptr, bool> response = >+ links.declare(Link::createName(transport, hp.i_host, hp.i_port), >+ hp.i_host, hp.i_port, transport, >+ hp.i_durable, hp.i_authMechanism, hp.i_username, hp.i_password); >+ if (!response.first) { >+ text = "Unable to create Link"; >+ status = Manageable::STATUS_PARAMETER_INVALID; >+ break; >+ } >+ } > status = Manageable::STATUS_OK; > break; > } >@@ -497,6 +517,8 @@ const std::string TYPE_QUEUE("queue"); > const std::string TYPE_EXCHANGE("exchange"); > const std::string TYPE_TOPIC("topic"); > const std::string TYPE_BINDING("binding"); >+const std::string TYPE_LINK("link"); >+const std::string TYPE_BRIDGE("bridge"); > const std::string DURABLE("durable"); > const std::string AUTO_DELETE("auto-delete"); > const std::string ALTERNATE_EXCHANGE("alternate-exchange"); >@@ -506,6 +528,26 @@ const std::string EXCHANGE_NAME("exchang > > const std::string _TRUE("true"); > const std::string _FALSE("false"); >+ >+// parameters for creating a Link object, see mgmt schema >+const std::string HOST("host"); >+const std::string PORT("port"); >+const std::string TRANSPORT("transport"); >+const std::string AUTH_MECHANISM("authMechanism"); >+const std::string USERNAME("username"); >+const std::string PASSWORD("password"); >+ >+// parameters for creating a Bridge object, see mgmt schema >+const std::string LINK("link"); >+const std::string SRC("src"); >+const std::string DEST("dest"); >+const std::string KEY("key"); >+const std::string TAG("tag"); >+const std::string EXCLUDES("excludes"); >+const std::string SRC_IS_QUEUE("srcIsQueue"); >+const std::string SRC_IS_LOCAL("srcIsLocal"); >+const std::string DYNAMIC("dynamic"); >+const std::string SYNC("sync"); > } > > struct InvalidBindingIdentifier : public qpid::Exception >@@ -555,6 +597,25 @@ struct UnknownObjectType : public qpid:: > std::string getPrefix() const { return "unknown object type"; } > }; > >+struct ReservedObjectName : public qpid::Exception >+{ >+ ReservedObjectName(const std::string& type) : qpid::Exception(type) {} >+ std::string getPrefix() const { return std::string("names prefixed with '") >+ + QPID_NAME_PREFIX + std::string("' are reserved"); } >+}; >+ >+struct UnsupportedTransport : public qpid::Exception >+{ >+ UnsupportedTransport(const std::string& type) : qpid::Exception(type) {} >+ std::string getPrefix() const { return "transport is not supported"; } >+}; >+ >+struct InvalidParameter : public qpid::Exception >+{ >+ InvalidParameter(const std::string& type) : qpid::Exception(type) {} >+ std::string getPrefix() const { return "invalid parameter to method call"; } >+}; >+ > void Broker::createObject(const std::string& type, const std::string& name, > const Variant::Map& properties, bool /*strict*/, const ConnectionState* context) > { >@@ -626,6 +687,109 @@ void Broker::createObject(const std::str > amqp_0_10::translate(extensions, arguments); > > bind(binding.queue, binding.exchange, binding.key, arguments, userId, connectionId); >+ >+ } else if (type == TYPE_LINK) { >+ >+ QPID_LOG (debug, "createObject: Link; name=" << name << "; args=" << properties ); >+ >+ if (name.compare(0, QPID_NAME_PREFIX.length(), QPID_NAME_PREFIX) == 0) { >+ QPID_LOG(error, "Link name='" << name << "' cannot use the reserved prefix '" << QPID_NAME_PREFIX << "'"); >+ throw ReservedObjectName(name); >+ } >+ >+ std::string host; >+ uint16_t port = 0; >+ std::string transport = TCP_TRANSPORT; >+ bool durable = false; >+ std::string authMech, username, password; >+ >+ if (!getProtocolFactory(transport)) { >+ QPID_LOG(error, "Transport '" << transport << "' not supported."); >+ throw UnsupportedTransport(transport); >+ } >+ >+ for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { >+ if (i->first == HOST) host = i->second.asString(); >+ else if (i->first == PORT) port = i->second.asUint16(); >+ else if (i->first == TRANSPORT) transport = i->second.asString(); >+ else if (i->first == DURABLE) durable = bool(i->second); >+ else if (i->first == AUTH_MECHANISM) authMech = i->second.asString(); >+ else if (i->first == USERNAME) username = i->second.asString(); >+ else if (i->first == PASSWORD) password = i->second.asString(); >+ else { >+ // TODO: strict checking here >+ } >+ } >+ >+ std::pair<boost::shared_ptr<Link>, bool> rc; >+ rc = links.declare(name, host, port, transport, durable, authMech, username, password); >+ if (!rc.first) { >+ QPID_LOG (error, "Failed to create Link object, name=" << name << " remote=" << host << ":" << port << >+ "; transport=" << transport << "; durable=" << (durable?"T":"F") << "; authMech=\"" << authMech << "\""); >+ throw InvalidParameter(name); >+ } >+ if (!rc.second) { >+ QPID_LOG (error, "Failed to create a new Link object, name=" << name << " already exists."); >+ throw ObjectAlreadyExists(name); >+ } >+ >+ } else if (type == TYPE_BRIDGE) { >+ >+ QPID_LOG (debug, "createObject: Bridge; name=" << name << "; args=" << properties ); >+ >+ if (name.compare(0, QPID_NAME_PREFIX.length(), QPID_NAME_PREFIX) == 0) { >+ QPID_LOG(error, "Bridge name='" << name << "' cannot use the reserved prefix '" << QPID_NAME_PREFIX << "'"); >+ throw ReservedObjectName(name); >+ } >+ >+ std::string linkName; >+ std::string src; >+ std::string dest; >+ std::string key; >+ std::string id; >+ std::string excludes; >+ bool durable = false; >+ bool srcIsQueue = false; >+ bool srcIsLocal = false; >+ bool dynamic = false; >+ uint16_t sync = 0; >+ >+ for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { >+ >+ if (i->first == LINK) linkName = i->second.asString(); >+ else if (i->first == SRC) src = i->second.asString(); >+ else if (i->first == DEST) dest = i->second.asString(); >+ else if (i->first == KEY) key = i->second.asString(); >+ else if (i->first == TAG) id = i->second.asString(); >+ else if (i->first == EXCLUDES) excludes = i->second.asString(); >+ else if (i->first == SRC_IS_QUEUE) srcIsQueue = bool(i->second); >+ else if (i->first == SRC_IS_LOCAL) srcIsLocal = bool(i->second); >+ else if (i->first == DYNAMIC) dynamic = bool(i->second); >+ else if (i->first == SYNC) sync = i->second.asUint16(); >+ else if (i->first == DURABLE) durable = bool(i->second); >+ else { >+ // TODO: strict checking here >+ } >+ } >+ >+ boost::shared_ptr<Link> link; >+ if (linkName.empty() || !(link = links.getLink(linkName))) { >+ QPID_LOG(error, "Link '" << linkName << "' not found; bridge create failed."); >+ throw InvalidParameter(name); >+ } >+ std::pair<Bridge::shared_ptr, bool> rc = >+ links.declare(name, *link, durable, src, dest, key, srcIsQueue, srcIsLocal, id, excludes, >+ dynamic, sync); >+ >+ if (!rc.first) { >+ QPID_LOG (error, "Failed to create Bridge object, name=" << name << " link=" << linkName << >+ "; src=" << src << "; dest=" << dest << "; key=" << key); >+ throw InvalidParameter(name); >+ } >+ if (!rc.second) { >+ QPID_LOG (error, "Failed to create a new Bridge object, name=" << name << " already exists."); >+ throw ObjectAlreadyExists(name); >+ } > } else { > throw UnknownObjectType(type); > } >@@ -648,6 +812,16 @@ void Broker::deleteObject(const std::str > } else if (type == TYPE_BINDING) { > BindingIdentifier binding(name); > unbind(binding.queue, binding.exchange, binding.key, userId, connectionId); >+ } else if (type == TYPE_LINK) { >+ boost::shared_ptr<Link> link = links.getLink(name); >+ if (link) { >+ link->close(); >+ } >+ } else if (type == TYPE_BRIDGE) { >+ boost::shared_ptr<Bridge> bridge = links.getBridge(name); >+ if (bridge) { >+ bridge->close(); >+ } > } else { > throw UnknownObjectType(type); > } >diff -rup a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h >--- a/qpid/cpp/src/qpid/broker/Broker.h 2011-03-14 12:44:14.000000000 +0100 >+++ b/qpid/cpp/src/qpid/broker/Broker.h 2012-08-20 09:01:55.000000000 +0200 >@@ -230,6 +230,7 @@ public: > > SessionManager& getSessionManager() { return sessionManager; } > const std::string& getFederationTag() const { return federationTag; } >+ void setFederationTag(const std::string& tag) { federationTag = tag; } > > management::ManagementObject* GetManagementObject (void) const; > management::Manageable* GetVhostObject (void) const; >diff -rup a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp >--- a/qpid/cpp/src/qpid/broker/Connection.cpp 2012-08-20 08:54:35.000000000 +0200 >+++ b/qpid/cpp/src/qpid/broker/Connection.cpp 2012-08-22 09:43:58.494981102 +0200 >@@ -103,9 +103,8 @@ Connection::Connection(ConnectionOutputH > outboundTracker(*this) > { > outboundTracker.wrap(out); >- if (isLink) >- links.notifyConnection(mgmtId, this); > // In a cluster, allow adding the management object to be delayed. >+ links.notifyConnection(mgmtId, this); > if (!delayManagement) addManagementObject(); > if (!isShadow()) broker.getConnectionCounter().inc_connectionCount(); > } >diff -rup a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp >--- a/qpid/cpp/src/qpid/broker/Link.cpp 2012-08-20 08:54:35.000000000 +0200 >+++ b/qpid/cpp/src/qpid/broker/Link.cpp 2012-08-22 08:18:20.122048019 +0200 >@@ -32,6 +32,7 @@ > #include "qpid/broker/AclModule.h" > #include "qpid/broker/Exchange.h" > #include "qpid/framing/amqp_types.h" >+#include "qpid/broker/NameGenerator.h" > > namespace qpid { > namespace broker { >@@ -107,18 +108,24 @@ boost::shared_ptr<Exchange> Link::linkEx > return Exchange::shared_ptr(new LinkExchange(_name)); > } > >-Link::Link(LinkRegistry* _links, >- MessageStore* _store, >- string& _host, >+namespace { >+const string FAILOVER_EX_PREFIX("qpid.link.fx."); >+const string FAILOVER_Q_PREFIX("qpid.link.fq."); >+} >+ >+Link::Link(const string& _name, >+ LinkRegistry* _links, >+ const string& _host, > uint16_t _port, > string& _transport, >+ DestroyedListener l, > bool _durable, > string& _authMechanism, > string& _username, > string& _password, > Broker* _broker, > Manageable* parent) >- : links(_links), store(_store), >+ : name(_name), links(_links), > configuredTransport(_transport), configuredHost(_host), configuredPort(_port), > host(_host), port(_port), transport(_transport), > durable(_durable), >@@ -132,22 +139,28 @@ Link::Link(LinkRegistry* _links, > channelCounter(1), > connection(0), > agent(0), >- failoverChannel(0) >+ listener(l), >+ failoverChannel(0), >+ passive(links->isPassive()) > { > if (parent != 0 && broker != 0) > { > agent = broker->getManagementAgent(); > if (agent != 0) > { >- mgmtObject = new _qmf::Link(agent, this, parent, _host, _port, _transport, _durable); >+ mgmtObject = new _qmf::Link(agent, this, parent, name, durable); >+ mgmtObject->set_host(host); >+ mgmtObject->set_port(port); >+ mgmtObject->set_transport(transport); > agent->addObject(mgmtObject, 0, durable); > } > } > setStateLH(STATE_WAITING); >+ startConnectionLH(); > >- stringstream _name; >- _name << "qpid.link." << transport << ":" << host << ":" << port; >- std::pair<Exchange::shared_ptr, bool> rc = broker->getExchanges().declare(_name.str(), >+ stringstream exchangeName; >+ exchangeName << FAILOVER_EX_PREFIX << name; >+ std::pair<Exchange::shared_ptr, bool> rc = broker->getExchanges().declare(exchangeName.str(), > exchangeTypeName); > failoverExchange = boost::static_pointer_cast<LinkExchange>(rc.first); > assert(failoverExchange); >@@ -183,7 +196,6 @@ void Link::setStateLH (int newState) > case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break; > case STATE_FAILED : mgmtObject->set_state("Failed"); break; > case STATE_CLOSED : mgmtObject->set_state("Closed"); break; >- case STATE_PASSIVE : mgmtObject->set_state("Passive"); break; > } > } > >@@ -193,9 +205,14 @@ void Link::startConnectionLH () > // Set the state before calling connect. It is possible that connect > // will fail synchronously and call Link::closed before returning. > setStateLH(STATE_CONNECTING); >- broker->connect (host, port, transport, >- boost::bind (&Link::closed, this, _1, _2)); >- QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port); >+ if (!passive) { >+ // Passive links don't create their own connection. >+ // In a cluster the elder creates the connection and the non-elders >+ // pick up the shadow of this connection in established(). >+ broker->connect (host, port, transport, >+ boost::bind (&Link::closed, this, _1, _2)); >+ QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port); >+ } > } catch(std::exception& e) { > setStateLH(STATE_WAITING); > if (!hideManagement()) >@@ -214,6 +231,11 @@ void Link::established () > > { > Mutex::ScopedLock mutex(lock); >+ >+ if (!hideManagement() && connection->GetManagementObject()) { >+ mgmtObject->set_connectionRef(connection->GetManagementObject()->getObjectId()); >+ } >+ > setStateLH(STATE_OPERATIONAL); > currentInterval = 1; > visitCount = 0; >@@ -238,12 +260,13 @@ void Link::closed (int, std::string text > > connection = 0; > >- if (state == STATE_OPERATIONAL) { >- stringstream addr; >- addr << host << ":" << port; >- QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str()); >- if (!hideManagement() && agent) >+ if (!hideManagement()) { >+ mgmtObject->set_connectionRef(qpid::management::ObjectId()); >+ if (state == STATE_OPERATIONAL && agent) { >+ stringstream addr; >+ addr << host << ":" << port; > agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str())); >+ } > } > > for (Bridges::iterator i = active.begin(); i != active.end(); i++) { >@@ -287,15 +310,18 @@ void Link::destroy () > } > // Now delete all bridges on this link (don't hold the lock for this). > for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++) >- (*i)->destroy(); >+ (*i)->close(); > toDelete.clear(); >- links->destroy (host, port); >+ listener(this); // notify LinkRegistry that this Link has been destroyed > } > > void Link::add(Bridge::shared_ptr bridge) > { > Mutex::ScopedLock mutex(lock); > created.push_back (bridge); >+ if (connection) { >+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); >+ } > } > > void Link::cancel(Bridge::shared_ptr bridge) >@@ -375,7 +401,7 @@ void Link::maintenanceVisit () > // remote may not have a failover exchange, allow the subscription to fail. > // > >- const std::string queueName = "qpid.link." + framing::Uuid(true).str(); >+ const std::string queueName = FAILOVER_Q_PREFIX + name + ':' + connection->getMgmtId(); > failoverChannel = nextChannel(); > > SessionHandler& sessionHandler = connection->getChannel(failoverChannel); >@@ -418,7 +444,7 @@ void Link::maintenanceVisit () > { > visitCount = 0; > //switch host and port to next in url list if possible >- if (!tryFailover()) { >+ if (!tryFailoverLH()) { > currentInterval *= 2; > if (currentInterval > MAX_INTERVAL) > currentInterval = MAX_INTERVAL; >@@ -430,35 +456,37 @@ void Link::maintenanceVisit () > connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); > } > >-void Link::reconnect(const qpid::Address& a) >+void Link::reconnectLH(const qpid::Address& a) > { >- Mutex::ScopedLock mutex(lock); > host = a.host; > port = a.port; > transport = a.protocol; >- startConnectionLH(); >+ > if (!hideManagement()) { > stringstream errorString; >- errorString << "Failed over to " << a; >+ errorString << "Failing over to " << a; > mgmtObject->set_lastError(errorString.str()); >+ mgmtObject->set_host(host); >+ mgmtObject->set_port(port); >+ mgmtObject->set_transport(transport); > } > } > >-bool Link::tryFailover() >+bool Link::tryFailoverLH() > { > if (reconnectNext >= url.size()) reconnectNext = 0; > if (url.empty()) return false; > Address next = url[reconnectNext++]; > if (next.host != host || next.port != port || next.protocol != transport) { >- links->changeAddress(Address(transport, host, port), next); >- QPID_LOG(debug, "Link failing over to " << host << ":" << port); >+ QPID_LOG(notice, "Inter-broker link '" << name << "' failing over to " << next); >+ reconnectLH(next); > return true; > } else { > return false; > } > } > >-// Management updates for a linke are inconsistent in a cluster, so they are >+// Management updates for a link are inconsistent in a cluster, so they are > // suppressed. > bool Link::hideManagement() const { > return !mgmtObject || ( broker && broker->isInCluster()); >@@ -488,18 +516,34 @@ void Link::setPersistenceId(uint64_t id) > > const string& Link::getName() const > { >- return configuredHost; >+ return name; >+} >+ >+const std::string Link::ENCODED_IDENTIFIER("link.v2"); >+const std::string Link::ENCODED_IDENTIFIER_V1("link"); >+ >+bool Link::isEncodedLink(const std::string& key) >+{ >+ return key == ENCODED_IDENTIFIER || key == ENCODED_IDENTIFIER_V1; > } > > Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) > { >+ string kind; >+ buffer.getShortString(kind); >+ > string host; > uint16_t port; > string transport; > string authMechanism; > string username; > string password; >+ string name; > >+ if (kind == ENCODED_IDENTIFIER) { >+ // newer version provides a link name. >+ buffer.getShortString(name); >+ } > buffer.getShortString(host); > port = buffer.getShort(); > buffer.getShortString(transport); >@@ -508,12 +552,21 @@ Link::shared_ptr Link::decode(LinkRegist > buffer.getShortString(username); > buffer.getShortString(password); > >- return links.declare(host, port, transport, durable, authMechanism, username, password).first; >+ if (kind == ENCODED_IDENTIFIER_V1) { >+ /** previous versions identified the Link by host:port, there was no name >+ * assigned. So create a name for the new Link. >+ */ >+ name = createName(transport, host, port); >+ } >+ >+ return links.declare(name, host, port, transport, durable, authMechanism, >+ username, password).first; > } > > void Link::encode(Buffer& buffer) const > { >- buffer.putShortString(string("link")); >+ buffer.putShortString(ENCODED_IDENTIFIER); >+ buffer.putShortString(name); > buffer.putShortString(configuredHost); > buffer.putShort(configuredPort); > buffer.putShortString(configuredTransport); >@@ -525,8 +578,9 @@ void Link::encode(Buffer& buffer) const > > uint32_t Link::encodedSize() const > { >- return configuredHost.size() + 1 // short-string (host) >- + 5 // short-string ("link") >+ return ENCODED_IDENTIFIER.size() + 1 // +1 byte length >+ + name.size() + 1 >+ + configuredHost.size() + 1 // short-string (host) > + 2 // port > + configuredTransport.size() + 1 // short-string(transport) > + 1 // durable >@@ -540,70 +594,64 @@ ManagementObject* Link::GetManagementObj > return (ManagementObject*) mgmtObject; > } > >+void Link::close() { >+ QPID_LOG(debug, "Link::close(), link=" << name ); >+ Mutex::ScopedLock mutex(lock); >+ if (!closing) { >+ closing = true; >+ if (state != STATE_CONNECTING && connection) { >+ //connection can only be closed on the connections own IO processing thread >+ connection->requestIOProcessing(boost::bind(&Link::destroy, this)); >+ } >+ } >+} >+ > Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& text) > { > switch (op) > { > case _qmf::Link::METHOD_CLOSE : >- if (!closing) { >- closing = true; >- if (state != STATE_CONNECTING && connection) { >- //connection can only be closed on the connections own IO processing thread >- connection->requestIOProcessing(boost::bind(&Link::destroy, this)); >- } >- } >+ close(); > return Manageable::STATUS_OK; > > case _qmf::Link::METHOD_BRIDGE : >+ /* TBD: deprecate this interface in favor of the Broker::create() method. The >+ * Broker::create() method allows the user to assign a name to the bridge. >+ */ >+ QPID_LOG(info, "The Link::bridge() method will be removed in a future release of QPID." >+ " Please use the Broker::create() method with type='bridge' instead."); > _qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args; >- QPID_LOG(debug, "Link::bridge() request received"); >- >- // Durable bridges are only valid on durable links >- if (iargs.i_durable && !durable) { >- text = "Can't create a durable route on a non-durable link"; >- return Manageable::STATUS_USER; >- } >+ QPID_LOG(debug, "Link::bridge() request received; src=" << iargs.i_src << >+ "; dest=" << iargs.i_dest << "; key=" << iargs.i_key); > >- if (iargs.i_dynamic) { >- Exchange::shared_ptr exchange = getBroker()->getExchanges().get(iargs.i_src); >- if (exchange.get() == 0) { >- text = "Exchange not found"; >- return Manageable::STATUS_USER; >- } >- if (!exchange->supportsDynamicBinding()) { >- text = "Exchange type does not support dynamic routing"; >- return Manageable::STATUS_USER; >+ // Does a bridge already exist that has the src/dest/key? If so, re-use the >+ // existing bridge - this behavior is backward compatible with previous releases. >+ Bridge::shared_ptr bridge = links->getBridge(*this, iargs.i_src, iargs.i_dest, iargs.i_key); >+ if (!bridge) { >+ // need to create a new bridge on this link. >+ std::pair<Bridge::shared_ptr, bool> rc = >+ links->declare( Bridge::createName(name, iargs.i_src, iargs.i_dest, iargs.i_key), >+ *this, iargs.i_durable, >+ iargs.i_src, iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue, >+ iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes, >+ iargs.i_dynamic, iargs.i_sync); >+ if (!rc.first) { >+ text = "invalid parameters"; >+ return Manageable::STATUS_PARAMETER_INVALID; > } > } >- >- std::string _host(configuredHost); // convert to nonconst :^( >- std::pair<Bridge::shared_ptr, bool> result = >- links->declare (_host, configuredPort, iargs.i_durable, iargs.i_src, >- iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue, >- iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes, >- iargs.i_dynamic, iargs.i_sync); >- >- if (result.second && iargs.i_durable) >- store->create(*result.first); >- > return Manageable::STATUS_OK; > } > > return Manageable::STATUS_UNKNOWN_METHOD; > } > >-void Link::setPassive(bool passive) >+void Link::setPassive(bool p) > { > Mutex::ScopedLock mutex(lock); >- if (passive) { >- setStateLH(STATE_PASSIVE); >- } else { >- if (state == STATE_PASSIVE) { >- setStateLH(STATE_WAITING); >- } else { >- QPID_LOG(warning, "Ignoring attempt to activate non-passive link"); >- } >- } >+ if (passive && !p) // transition passive->active >+ setStateLH(STATE_WAITING); >+ passive = p; > } > > /** utility to clean up connection resources correctly */ >@@ -663,6 +711,23 @@ void Link::setState(const framing::Field > } > } > >+std::string Link::createName(const std::string& transport, >+ const std::string& host, >+ uint16_t port) >+{ >+ stringstream linkName; >+ linkName << QPID_NAME_PREFIX << transport << std::string(":") >+ << host << std::string(":") << port; >+ return linkName.str(); >+} >+ >+ >+bool Link::pendingConnection(const std::string& _host, uint16_t _port) const >+{ >+ Mutex::ScopedLock mutex(lock); >+ return (isConnecting() && _port == port && _host == host); >+} >+ > > const std::string Link::exchangeTypeName("qpid.LinkExchange"); > >diff -rup a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h >--- a/qpid/cpp/src/qpid/broker/Link.h 2012-08-20 08:54:35.000000000 +0200 >+++ b/qpid/cpp/src/qpid/broker/Link.h 2012-08-20 09:05:12.000000000 +0200 >@@ -24,7 +24,7 @@ > > #include <boost/shared_ptr.hpp> > #include "qpid/Url.h" >-#include "qpid/broker/MessageStore.h" >+#include "qpid/broker/BrokerImportExport.h" > #include "qpid/broker/PersistableConfig.h" > #include "qpid/broker/Bridge.h" > #include "qpid/broker/RetryList.h" >@@ -46,8 +46,8 @@ namespace qpid { > class Link : public PersistableConfig, public management::Manageable { > private: > mutable sys::Mutex lock; >+ const std::string name; > LinkRegistry* links; >- MessageStore* store; > > // these remain constant across failover - used to identify this link > const std::string configuredTransport; >@@ -80,6 +80,7 @@ namespace qpid { > uint channelCounter; > Connection* connection; > management::ManagementAgent* agent; >+ boost::function<void(Link*)> listener; > boost::shared_ptr<broker::LinkExchange> failoverExchange; // subscribed to remote's amq.failover exchange > uint failoverChannel; > std::string failoverSession; >@@ -89,28 +90,34 @@ namespace qpid { > static const int STATE_OPERATIONAL = 3; > static const int STATE_FAILED = 4; > static const int STATE_CLOSED = 5; >- static const int STATE_PASSIVE = 6; > > static const uint32_t MAX_INTERVAL = 32; > >+ bool passive; >+ > void setStateLH (int newState); > void startConnectionLH(); // Start the IO Connection >- void destroy(); // Called when mgmt deletes this link >+ void destroy(); // Cleanup connection before link goes away > void ioThreadProcessing(); // Called on connection's IO thread by request >- bool tryFailover(); // Called during maintenance visit >+ bool tryFailoverLH(); // Called during maintenance visit > bool hideManagement() const; >- void closeConnection(const std::string& reason); >+ void reconnectLH(const Address&); //called by LinkRegistry >+ void notifyConnectionForced(const std::string text); >+ void closeConnection(const std::string& reason); >+ bool pendingConnection(const std::string& host, uint16_t port) const; // is Link trying to connect to this remote? > > friend class LinkRegistry; // to call established, opened, closed > > public: > typedef boost::shared_ptr<Link> shared_ptr; >+ typedef boost::function<void(Link*)> DestroyedListener; > >- Link(LinkRegistry* links, >- MessageStore* store, >- std::string& host, >+ Link(const std::string& name, >+ LinkRegistry* links, >+ const std::string& host, > uint16_t port, > std::string& transport, >+ DestroyedListener l, > bool durable, > std::string& authMechanism, > std::string& username, >@@ -138,6 +145,10 @@ namespace qpid { > > QPID_BROKER_EXTERN void setUrl(const Url&); // Set URL for reconnection. > >+ // Close the link. >+ QPID_BROKER_EXTERN void close(); >+ >+ // connection management (called by LinkRegistry) > void established(); // Called when connection is created > void closed(int, std::string); // Called when connection goes away > void setConnection(Connection*); // Set pointer to the AMQP Connection >@@ -148,8 +159,8 @@ namespace qpid { > std::string getPassword() { return password; } > Broker* getBroker() { return broker; } > >- void notifyConnectionForced(const std::string text); > void setPassive(bool p); >+ bool isConnecting() const { return state == STATE_CONNECTING; } > > // PersistableConfig: > void setPersistenceId(uint64_t id) const; >@@ -158,7 +169,10 @@ namespace qpid { > void encode(framing::Buffer& buffer) const; > const std::string& getName() const; > >+ static const std::string ENCODED_IDENTIFIER; >+ static const std::string ENCODED_IDENTIFIER_V1; > static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer); >+ static bool isEncodedLink(const std::string& key); > > // Manageable entry points > management::ManagementObject* GetManagementObject(void) const; >@@ -171,6 +185,11 @@ namespace qpid { > // replicate internal state of this Link for clustering > void getState(framing::FieldTable& state) const; > void setState(const framing::FieldTable& state); >+ >+ /** create a name for a link (if none supplied by user config) */ >+ static std::string createName(const std::string& transport, >+ const std::string& host, >+ uint16_t port); > }; > } > } >diff -rup a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp >--- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp 2012-08-20 08:54:35.000000000 +0200 >+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp 2012-08-20 08:55:02.000000000 +0200 >@@ -78,8 +78,6 @@ void LinkRegistry::periodicMaintenance ( > { > Mutex::ScopedLock locker(lock); > >- linksToDestroy.clear(); >- bridgesToDestroy.clear(); > if (passiveChanged) { > if (passive) { QPID_LOG(info, "Passivating links"); } > else { QPID_LOG(info, "Activating links"); } >@@ -90,41 +88,37 @@ void LinkRegistry::periodicMaintenance ( > } > for (LinkMap::iterator i = links.begin(); i != links.end(); i++) > i->second->maintenanceVisit(); >- //now process any requests for re-addressing >- for (AddressMap::iterator i = reMappings.begin(); i != reMappings.end(); i++) >- updateAddress(i->first, i->second); >- reMappings.clear(); >-} >- >-void LinkRegistry::changeAddress(const qpid::Address& oldAddress, const qpid::Address& newAddress) >-{ >- //done on periodic maintenance thread; hold changes in separate >- //map to avoid modifying the link map that is iterated over >- reMappings[createKey(oldAddress)] = newAddress; >-} >- >-bool LinkRegistry::updateAddress(const std::string& oldKey, const qpid::Address& newAddress) >-{ >- std::string newKey = createKey(newAddress); >- if (links.find(newKey) != links.end()) { >- QPID_LOG(error, "Attempted to update key from " << oldKey << " to " << newKey << " which is already in use"); >- return false; >- } else { >- LinkMap::iterator i = links.find(oldKey); >- if (i == links.end()) { >- QPID_LOG(error, "Attempted to update key from " << oldKey << " which does not exist, to " << newKey); >- return false; >- } else { >- links[newKey] = i->second; >- i->second->reconnect(newAddress); >- links.erase(oldKey); >- QPID_LOG(info, "Updated link key from " << oldKey << " to " << newKey); >- return true; >- } >- } > } > >-pair<Link::shared_ptr, bool> LinkRegistry::declare(string& host, >+/** find link by the *configured* remote address */ >+boost::shared_ptr<Link> LinkRegistry::getLink(const std::string& host, >+ uint16_t port, >+ const std::string& transport) >+{ >+ Mutex::ScopedLock locker(lock); >+ for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) { >+ Link::shared_ptr& link = i->second; >+ if (link->getHost() == host && >+ link->getPort() == port && >+ (transport.empty() || link->getTransport() == transport)) >+ return link; >+ } >+ return boost::shared_ptr<Link>(); >+} >+ >+/** find link by name */ >+boost::shared_ptr<Link> LinkRegistry::getLink(const std::string& name) >+{ >+ Mutex::ScopedLock locker(lock); >+ LinkMap::iterator l = links.find(name); >+ if (l != links.end()) >+ return l->second; >+ return boost::shared_ptr<Link>(); >+} >+ >+ >+pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& name, >+ const string& host, > uint16_t port, > string& transport, > bool durable, >@@ -134,25 +128,55 @@ pair<Link::shared_ptr, bool> LinkRegistr > > { > Mutex::ScopedLock locker(lock); >- string key = createKey(host, port); > >- LinkMap::iterator i = links.find(key); >+ LinkMap::iterator i = links.find(name); > if (i == links.end()) > { > Link::shared_ptr link; > >- link = Link::shared_ptr (new Link (this, store, host, port, transport, durable, >- authMechanism, username, password, >- broker, parent)); >+ link = Link::shared_ptr (new Link (name, this, host, port, transport, >+ boost::bind(&LinkRegistry::linkDestroyed, this, _1), >+ durable, authMechanism, username, password, broker, >+ parent)); >+ if (durable && store) store->create(*link); > if (passive) link->setPassive(true); >- links[key] = link; >+ links[name] = link; >+ pendingLinks[name] = link; >+ QPID_LOG(debug, "Creating new link; name=" << name ); > return std::pair<Link::shared_ptr, bool>(link, true); > } > return std::pair<Link::shared_ptr, bool>(i->second, false); > } > >-pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host, >- uint16_t port, >+/** find bridge by link & route info */ >+Bridge::shared_ptr LinkRegistry::getBridge(const Link& link, >+ const std::string& src, >+ const std::string& dest, >+ const std::string& key) >+{ >+ Mutex::ScopedLock locker(lock); >+ for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) { >+ if (i->second->getSrc() == src && i->second->getDest() == dest && >+ i->second->getKey() == key && i->second->getLink() && >+ i->second->getLink()->getName() == link.getName()) { >+ return i->second; >+ } >+ } >+ return Bridge::shared_ptr(); >+} >+ >+/** find bridge by name */ >+Bridge::shared_ptr LinkRegistry::getBridge(const std::string& name) >+{ >+ Mutex::ScopedLock locker(lock); >+ BridgeMap::iterator b = bridges.find(name); >+ if (b != bridges.end()) >+ return b->second; >+ return Bridge::shared_ptr(); >+} >+ >+pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& name, >+ Link& link, > bool durable, > std::string& src, > std::string& dest, >@@ -165,18 +189,26 @@ pair<Bridge::shared_ptr, bool> LinkRegis > uint16_t sync) > { > Mutex::ScopedLock locker(lock); >- QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")"); > >- string linkKey = createKey(host, port); >- stringstream keystream; >- keystream << linkKey << "!" << src << "!" << dest << "!" << key; >- string bridgeKey = keystream.str(); >- >- LinkMap::iterator l = links.find(linkKey); >- if (l == links.end()) >- return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false); >+ // Durable bridges are only valid on durable links >+ if (durable && !link.isDurable()) { >+ QPID_LOG(error, "Can't create a durable route '" << name << "' on a non-durable link '" << link.getName()); >+ return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false); >+ } > >- BridgeMap::iterator b = bridges.find(bridgeKey); >+ if (dynamic) { >+ Exchange::shared_ptr exchange = broker->getExchanges().get(src); >+ if (exchange.get() == 0) { >+ QPID_LOG(error, "Exchange not found, name='" << src << "'" ); >+ return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false); >+ } >+ if (!exchange->supportsDynamicBinding()) { >+ QPID_LOG(error, "Exchange type does not support dynamic routing, name='" << src << "'"); >+ return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false); >+ } >+ } >+ >+ BridgeMap::iterator b = bridges.find(name); > if (b == bridges.end()) > { > _qmf::ArgsLinkBridge args; >@@ -194,55 +226,54 @@ pair<Bridge::shared_ptr, bool> LinkRegis > args.i_sync = sync; > > bridge = Bridge::shared_ptr >- (new Bridge (l->second.get(), l->second->nextChannel(), >- boost::bind(&LinkRegistry::destroy, this, >- host, port, src, dest, key), args)); >- bridges[bridgeKey] = bridge; >- l->second->add(bridge); >+ (new Bridge (name, &link, link.nextChannel(), >+ boost::bind(&LinkRegistry::destroyBridge, this, _1), >+ args)); >+ bridges[name] = bridge; >+ link.add(bridge); >+ if (durable && store) >+ store->create(*bridge); >+ >+ QPID_LOG(debug, "Bridge '" << name <<"' declared on link '" << link.getName() << >+ "' from " << src << " to " << dest << " (" << key << ")"); >+ > return std::pair<Bridge::shared_ptr, bool>(bridge, true); > } > return std::pair<Bridge::shared_ptr, bool>(b->second, false); > } > >-void LinkRegistry::destroy(const string& host, const uint16_t port) >+/** called back by the link when it has completed its cleanup and can be removed. */ >+void LinkRegistry::linkDestroyed(Link *link) > { >+ QPID_LOG(debug, "LinkRegistry::destroy(); link= " << link->getName()); > Mutex::ScopedLock locker(lock); >- string key = createKey(host, port); > >- LinkMap::iterator i = links.find(key); >+ pendingLinks.erase(link->getName()); >+ LinkMap::iterator i = links.find(link->getName()); > if (i != links.end()) > { > if (i->second->isDurable() && store) > store->destroy(*(i->second)); >- linksToDestroy[key] = i->second; > links.erase(i); > } > } > >-void LinkRegistry::destroy(const std::string& host, >- const uint16_t port, >- const std::string& src, >- const std::string& dest, >- const std::string& key) >+/** called back by bridge when its destruction has been requested */ >+void LinkRegistry::destroyBridge(Bridge *bridge) > { >+ QPID_LOG(debug, "LinkRegistry::destroy(); bridge= " << bridge->getName()); > Mutex::ScopedLock locker(lock); >- string linkKey = createKey(host, port); >- stringstream keystream; >- keystream << linkKey << "!" << src << "!" << dest << "!" << key; >- string bridgeKey = keystream.str(); > >- LinkMap::iterator l = links.find(linkKey); >- if (l == links.end()) >- return; >- >- BridgeMap::iterator b = bridges.find(bridgeKey); >+ BridgeMap::iterator b = bridges.find(bridge->getName()); > if (b == bridges.end()) > return; > >- l->second->cancel(b->second); >+ Link *link = b->second->getLink(); >+ if (link) { >+ link->cancel(b->second); >+ } > if (b->second->isDurable()) > store->destroy(*(b->second)); >- bridgesToDestroy[bridgeKey] = b->second; > bridges.erase(b); > } > >@@ -255,29 +286,76 @@ MessageStore* LinkRegistry::getStore() c > return store; > } > >-Link::shared_ptr LinkRegistry::findLink(const std::string& keyOrMgmtId) >-{ >- // Convert keyOrMgmtId to a host:port key. >- // >- // TODO aconway 2011-02-01: centralize code that constructs/parses >- // connection management IDs. Currently sys:: protocol factories >- // and IO plugins construct the IDs and LinkRegistry parses them. >- size_t separator = keyOrMgmtId.find('-'); >- if (separator == std::string::npos) separator = 0; >- std::string key = keyOrMgmtId.substr(separator+1, std::string::npos); >+namespace { >+ void extractHostPort(const std::string& connId, std::string *host, uint16_t *port) >+ { >+ // Extract host and port of remote broker from connection id string. >+ // >+ // TODO aconway 2011-02-01: centralize code that constructs/parses connection >+ // management IDs. Currently sys:: protocol factories and IO plugins construct the >+ // IDs and LinkRegistry parses them. >+ // KAG: current connection id format assumed: >+ // "localhost:port-remotehost:port". In the case of IpV6, the host addresses are >+ // contained within brackets "[...]", example: >+ // connId="[::1]:36859-[::1]:48603". Liberal use of "asserts" provided to alert us >+ // if this assumption changes! >+ size_t separator = connId.find('-'); >+ assert(separator != std::string::npos); >+ std::string remote = connId.substr(separator+1, std::string::npos); >+ separator = remote.rfind(":"); >+ assert(separator != std::string::npos); >+ *host = remote.substr(0, separator); >+ // IPv6 - host is bracketed by "[]", strip them >+ if ((*host)[0] == '[' && (*host)[host->length() - 1] == ']') { >+ *host = host->substr(1, host->length() - 2); >+ } >+ try { >+ *port = boost::lexical_cast<uint16_t>(remote.substr(separator+1, std::string::npos)); >+ } catch (const boost::bad_lexical_cast&) { >+ QPID_LOG(error, "Invalid format for connection identifier! '" << connId << "'"); >+ assert(false); >+ } >+ } >+} > >+/** find the Link that corresponds to the given connection */ >+Link::shared_ptr LinkRegistry::findLink(const std::string& connId) >+{ > Mutex::ScopedLock locker(lock); >- LinkMap::iterator l = links.find(key); >- if (l != links.end()) return l->second; >- else return Link::shared_ptr(); >+ ConnectionMap::iterator c = connections.find(connId); >+ if (c != connections.end()) { >+ LinkMap::iterator l = links.find(c->second); >+ if (l != links.end()) >+ return l->second; >+ } >+ return Link::shared_ptr(); > } > > void LinkRegistry::notifyConnection(const std::string& key, Connection* c) > { >- Link::shared_ptr link = findLink(key); >+ // find a link that is attempting to connect to the remote, and >+ // create a mapping from connection id to link >+ QPID_LOG(debug, "LinkRegistry::notifyConnection(); key=" << key ); >+ std::string host; >+ uint16_t port; >+ extractHostPort( key, &host, &port ); >+ Link::shared_ptr link; >+ { >+ Mutex::ScopedLock locker(lock); >+ for (LinkMap::iterator l = pendingLinks.begin(); l != pendingLinks.end(); ++l) { >+ if (l->second->pendingConnection(host, port)) { >+ link = l->second; >+ pendingLinks.erase(l); >+ connections[key] = link->getName(); >+ QPID_LOG(debug, "LinkRegistry:: found pending =" << link->getName()); >+ break; >+ } >+ } >+ } >+ > if (link) { >- link->established(); > link->setConnection(c); >+ link->established(); > c->setUserId(str(format("%1%@%2%") % link->getUsername() % realm)); > } > } >@@ -286,6 +364,10 @@ void LinkRegistry::notifyClosed(const st > { > Link::shared_ptr link = findLink(key); > if (link) { >+ { >+ Mutex::ScopedLock locker(lock); >+ pendingLinks[link->getName()] = link; >+ } > link->closed(0, "Closed by peer"); > } > } >@@ -294,6 +376,10 @@ void LinkRegistry::notifyConnectionForce > { > Link::shared_ptr link = findLink(key); > if (link) { >+ { >+ Mutex::ScopedLock locker(lock); >+ pendingLinks[link->getName()] = link; >+ } > link->notifyConnectionForced(text); > } > } >@@ -374,20 +460,6 @@ std::string LinkRegistry::getAuthIdentit > } > > >-std::string LinkRegistry::createKey(const qpid::Address& a) { >- // TODO aconway 2010-05-11: key should also include protocol/transport to >- // be unique. Requires refactor of LinkRegistry interface. >- return createKey(a.host, a.port); >-} >- >-std::string LinkRegistry::createKey(const std::string& host, uint16_t port) { >- // TODO aconway 2010-05-11: key should also include protocol/transport to >- // be unique. Requires refactor of LinkRegistry interface. >- stringstream keystream; >- keystream << host << ":" << port; >- return keystream.str(); >-} >- > void LinkRegistry::setPassive(bool p) > { > Mutex::ScopedLock locker(lock); >@@ -397,10 +469,12 @@ void LinkRegistry::setPassive(bool p) > } > > void LinkRegistry::eachLink(boost::function<void(boost::shared_ptr<Link>)> f) { >+ Mutex::ScopedLock locker(lock); > for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) f(i->second); > } > > void LinkRegistry::eachBridge(boost::function<void(boost::shared_ptr<Bridge>)> f) { >+ Mutex::ScopedLock locker(lock); > for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) f(i->second); > } > >diff -rup a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h >--- a/qpid/cpp/src/qpid/broker/LinkRegistry.h 2011-01-18 21:43:41.000000000 +0100 >+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h 2012-08-20 09:05:48.000000000 +0200 >@@ -54,13 +54,12 @@ namespace broker { > > typedef std::map<std::string, boost::shared_ptr<Link> > LinkMap; > typedef std::map<std::string, Bridge::shared_ptr> BridgeMap; >- typedef std::map<std::string, Address> AddressMap; >+ typedef std::map<std::string, std::string> ConnectionMap; > >- LinkMap links; >- LinkMap linksToDestroy; >- BridgeMap bridges; >- BridgeMap bridgesToDestroy; >- AddressMap reMappings; >+ LinkMap links; /** indexed by name of Link */ >+ BridgeMap bridges; /** indexed by name of Bridge */ >+ ConnectionMap connections; /** indexed by connection identifier, gives link name */ >+ LinkMap pendingLinks; /** pending connection, indexed by name of Link */ > > qpid::sys::Mutex lock; > Broker* broker; >@@ -73,10 +72,12 @@ namespace broker { > std::string realm; > > void periodicMaintenance (); >- bool updateAddress(const std::string& oldKey, const Address& newAddress); > boost::shared_ptr<Link> findLink(const std::string& key); >- static std::string createKey(const Address& address); >- static std::string createKey(const std::string& host, uint16_t port); >+ >+ /** Notify the registry that a Link has been destroyed */ >+ void linkDestroyed(Link*); >+ /** Request to destroy a Bridge */ >+ void destroyBridge(Bridge*); > > public: > LinkRegistry (); // Only used in store tests >@@ -84,16 +85,27 @@ namespace broker { > ~LinkRegistry(); > > std::pair<boost::shared_ptr<Link>, bool> >- declare(std::string& host, >+ declare(const std::string& name, >+ const std::string& host, > uint16_t port, > std::string& transport, > bool durable, > std::string& authMechanism, > std::string& username, > std::string& password); >+ /** determine if Link exists */ >+ QPID_BROKER_EXTERN boost::shared_ptr<Link> >+ getLink(const std::string& name); >+ /** host,port,transport will be matched against the configured values, which may >+ be different from the current values due to failover */ >+ QPID_BROKER_EXTERN boost::shared_ptr<Link> >+ getLink(const std::string& configHost, >+ uint16_t configPort, >+ const std::string& configTransport = std::string()); >+ > std::pair<Bridge::shared_ptr, bool> >- declare(std::string& host, >- uint16_t port, >+ declare(const std::string& name, >+ Link& link, > bool durable, > std::string& src, > std::string& dest, >@@ -104,13 +116,15 @@ namespace broker { > std::string& excludes, > bool dynamic, > uint16_t sync); >+ /** determine if Bridge exists */ >+ QPID_BROKER_EXTERN Bridge::shared_ptr >+ getBridge(const std::string& name); >+ QPID_BROKER_EXTERN Bridge::shared_ptr >+ getBridge(const Link& link, >+ const std::string& src, >+ const std::string& dest, >+ const std::string& key); > >- void destroy(const std::string& host, const uint16_t port); >- void destroy(const std::string& host, >- const uint16_t port, >- const std::string& src, >- const std::string& dest, >- const std::string& key); > > /** > * Register the manageable parent for declared queues >@@ -139,16 +153,13 @@ namespace broker { > uint16_t getPort (const std::string& key); > > /** >- * Called by links failing over to new address >- */ >- void changeAddress(const Address& oldAddress, const Address& newAddress); >- /** > * Called to alter passive state. In passive state the links > * and bridges managed by a link registry will be recorded and > * updated but links won't actually establish connections and > * bridges won't therefore pull or push any messages. > */ > void setPassive(bool); >+ bool isPassive() const { return passive; } > > > /** Iterate over each link in the registry. Used for cluster updates. */ >diff -rup a/qpid/cpp/src/qpid/broker/NameGenerator.h b/qpid/cpp/src/qpid/broker/NameGenerator.h >--- a/qpid/cpp/src/qpid/broker/NameGenerator.h 2007-08-28 21:38:17.000000000 +0200 >+++ b/qpid/cpp/src/qpid/broker/NameGenerator.h 2012-08-20 08:55:02.000000000 +0200 >@@ -32,6 +32,7 @@ namespace qpid { > NameGenerator(const std::string& base); > std::string generate(); > }; >+ const std::string QPID_NAME_PREFIX("qpid."); // reserved for private names > } > } > >diff -rup a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp >--- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp 2012-08-20 08:54:35.000000000 +0200 >+++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp 2012-08-20 08:55:02.000000000 +0200 >@@ -144,11 +144,13 @@ RecoverableTransaction::shared_ptr Recov > RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer& buffer) > { > string kind; >- >+ uint32_t p = buffer.getPosition(); > buffer.getShortString (kind); >- if (kind == "link") >+ buffer.setPosition(p); >+ >+ if (Link::isEncodedLink(kind)) > return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Link::decode (links, buffer))); >- else if (kind == "bridge") >+ else if (Bridge::isEncodedBridge(kind)) > return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Bridge::decode (links, buffer))); > > return RecoverableConfig::shared_ptr(); // TODO: raise an exception instead >diff -rup a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h >--- a/qpid/cpp/src/qpid/broker/SemanticState.h 2011-02-25 18:30:59.000000000 +0100 >+++ b/qpid/cpp/src/qpid/broker/SemanticState.h 2012-08-20 08:55:02.000000000 +0200 >@@ -139,6 +139,8 @@ class SemanticState : private boost::non > uint32_t getByteCredit() const { return byteCredit; } > std::string getResumeId() const { return resumeId; }; > uint64_t getResumeTtl() const { return resumeTtl; } >+ uint32_t getDeliveryCount() const { return deliveryCount; } >+ void setDeliveryCount(uint32_t _deliveryCount) { deliveryCount = _deliveryCount; } > const framing::FieldTable& getArguments() const { return arguments; } > > SemanticState& getParent() { return *parent; } >diff -rup a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp >--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp 2012-08-20 08:54:35.000000000 +0200 >+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp 2012-08-20 08:55:02.000000000 +0200 >@@ -604,7 +604,6 @@ void Cluster::configChange ( > void Cluster::setReady(Lock&) { > state = READY; > mcast.setReady(); >- broker.getQueueEvents().enable(); > enableClusterSafe(); // Enable cluster-safe assertions. > } > >@@ -930,6 +929,13 @@ void Cluster::checkUpdateIn(Lock& l) { > failoverExchange->setUrls(getUrls(l)); > mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); > state = CATCHUP; >+ /* In CATCHUP mode the update has finished, and we are consuming >+ ** whatever backlog of messages has built up during the update. >+ ** We should enable queue events here, or messages that are received >+ ** during this phase will not be replicated properly. ( If there are >+ ** relevant replication queues. ) >+ */ >+ broker.getQueueEvents().enable(); > memberUpdate(l); > // NB: don't updateMgmtMembership() here as we are not in the deliver > // thread. It will be updated on delivery of the "ready" we just mcast. >diff -rup a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp >--- a/qpid/cpp/src/qpid/cluster/Connection.cpp 2012-08-20 08:54:35.000000000 +0200 >+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp 2012-08-20 09:06:17.000000000 +0200 >@@ -407,11 +407,12 @@ void Connection::shadowSetUser(const std > connection->setUserId(userId); > } > >-void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position) >+void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position, const uint32_t deliveryCount) > { > broker::SemanticState::ConsumerImpl& c = semanticState().find(name); > c.position = position; > c.setBlocked(blocked); >+ c.setDeliveryCount(deliveryCount); > if (notifyEnabled) c.enableNotify(); else c.disableNotify(); > updateIn.consumerNumbering.add(c.shared_from_this()); > } >@@ -697,68 +698,42 @@ void Connection::managementSetupState( > agent->setBootSequence(bootSequence); > agent->setUuid(id); > agent->setName(vendor, product, instance); >+ cluster.getBroker().setFederationTag(id.str()); > } > > void Connection::config(const std::string& encoded) { > Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); > string kind; >+ uint32_t p = buf.getPosition(); > buf.getShortString (kind); >- if (kind == "link") { >+ buf.setPosition(p); >+ if (broker::Link::isEncodedLink(kind)) { > broker::Link::shared_ptr link = >- broker::Link::decode(cluster.getBroker().getLinks(), buf); >+ broker::Link::decode(cluster.getBroker().getLinks(), buf); > QPID_LOG(debug, cluster << " updated link " > << link->getHost() << ":" << link->getPort()); > } >- else if (kind == "bridge") { >+ else if (broker::Bridge::isEncodedBridge(kind)) { > broker::Bridge::shared_ptr bridge = >- broker::Bridge::decode(cluster.getBroker().getLinks(), buf); >+ broker::Bridge::decode(cluster.getBroker().getLinks(), buf); > QPID_LOG(debug, cluster << " updated bridge " << bridge->getName()); > } > else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind)); > } > >-namespace { >- // find a Link that matches the given Address >- class LinkFinder { >- qpid::Address id; >- boost::shared_ptr<broker::Link> link; >- public: >- LinkFinder(const qpid::Address& _id) : id(_id) {} >- boost::shared_ptr<broker::Link> getLink() { return link; } >- void operator() (boost::shared_ptr<broker::Link> l) >- { >- if (!link) { >- qpid::Address addr(l->getTransport(), l->getHost(), l->getPort()); >- if (id == addr) { >- link = l; >- } >- } >- } >- }; >-} >- > void Connection::internalState(const std::string& type, > const std::string& name, > const framing::FieldTable& state) > { > if (type == "link") { >- // name is the string representation of the Link's _configured_ destination address >- Url dest; >- try { >- dest = name; >- } catch(...) { >- throw Exception(QPID_MSG("Update failed, invalid format for Link destination address: " << name)); >- } >- assert(dest.size()); >- LinkFinder finder(dest[0]); >- cluster.getBroker().getLinks().eachLink(boost::ref(finder)); >- if (finder.getLink()) { >+ boost::shared_ptr<qpid::broker::Link> link(cluster.getBroker().getLinks().getLink(name)); >+ if (link.get()) { > try { >- finder.getLink()->setState(state); >+ link->setState(state); > } catch(...) { > throw Exception(QPID_MSG("Update failed, invalid state for Link " << name << ", state: " << state)); > } >- QPID_LOG(debug, cluster << " updated link " << dest[0] << " with state: " << state); >+ QPID_LOG(debug, cluster << " updated link " << name << " with state: " << state); > } else throw Exception(QPID_MSG("Update failed, unable to find Link named: " << name)); > } > else throw Exception(QPID_MSG("Update failed, invalid object type for internal state replication: " << type)); >diff -rup a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h >--- a/qpid/cpp/src/qpid/cluster/Connection.h 2012-08-20 08:54:35.000000000 +0200 >+++ b/qpid/cpp/src/qpid/cluster/Connection.h 2012-08-20 08:55:02.000000000 +0200 >@@ -107,7 +107,7 @@ class Connection : > // Called for data delivered from the cluster. > void deliveredFrame(const EventFrame&); > >- void consumerState(const std::string& name, bool blocked, bool notifyEnabled, const qpid::framing::SequenceNumber& position); >+ void consumerState(const std::string& name, bool blocked, bool notifyEnabled, const qpid::framing::SequenceNumber& position, const uint32_t deliveryCount); > > // ==== Used in catch-up mode to build initial state. > // >diff -rup a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp >--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp 2012-08-20 08:54:35.000000000 +0200 >+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp 2012-08-20 08:55:02.000000000 +0200 >@@ -505,7 +505,8 @@ void UpdateClient::updateConsumer( > ci->getName(), > ci->isBlocked(), > ci->isNotifyEnabled(), >- ci->position >+ ci->position, >+ ci->getDeliveryCount() > ); > consumerNumbering.add(ci.get()); > >@@ -616,10 +617,8 @@ void UpdateClient::updateLink(const boos > // now push the current state > framing::FieldTable state; > link->getState(state); >- std::ostringstream os; >- os << qpid::Address(link->getTransport(), link->getHost(), link->getPort()); > ClusterConnectionProxy(session).internalState(std::string("link"), >- os.str(), >+ link->getName(), > state); > } > >diff -rup a/qpid/cpp/src/tests/cluster_test_logs.py b/qpid/cpp/src/tests/cluster_test_logs.py >--- a/qpid/cpp/src/tests/cluster_test_logs.py 2012-08-20 08:54:35.000000000 +0200 >+++ b/qpid/cpp/src/tests/cluster_test_logs.py 2012-08-20 09:07:45.000000000 +0200 >@@ -65,6 +65,7 @@ def filter_log(log): > 'debug Sending keepalive signal to watchdog', # Watchdog timer thread > 'last broker standing joined by 1 replicas, updating queue policies.', > 'Connection .* timed out: closing', # heartbeat connection close >+ "info Connection is a federation link", # out of order with connection mgmt object. > "org.apache.qpid.broker:bridge:" # ignore bridge index > ]) > # Regex to match a UUID >@@ -97,14 +98,15 @@ def filter_log(log): > out.write(l) > out.close() > >-def verify_logs(): >+def verify_logs(pattern="*.log"): > """Compare log files from cluster brokers, verify that they correspond correctly.""" >- for l in glob.glob("*.log"): filter_log(l) >+ for l in glob.glob(pattern): filter_log(l) > checkpoints = set() >- for l in glob.glob("*.filter"): checkpoints = checkpoints.union(set(split_log(l))) >+ for l in glob.glob("%s.filter"%(pattern)): >+ checkpoints = checkpoints.union(set(split_log(l))) > errors=[] > for c in checkpoints: >- fragments = glob.glob("*.filter.%s"%(c)) >+ fragments = glob.glob("%s.filter.%s"%(pattern, c)) > fragments.sort(reverse=True, key=os.path.getsize) > while len(fragments) >= 2: > a = fragments.pop(0) >diff -rup a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py >--- a/qpid/cpp/src/tests/cluster_tests.py 2012-08-20 08:54:35.000000000 +0200 >+++ b/qpid/cpp/src/tests/cluster_tests.py 2012-08-20 09:09:35.000000000 +0200 >@@ -772,6 +772,35 @@ acl allow all all > > > >+ def _verify_federation(self, src_broker, src, dst_broker, dst, timeout=30): >+ """ Prove that traffic can pass between two federated brokers. >+ """ >+ tot_time = 0 >+ active = False >+ send_session = src_broker.connect().session() >+ sender = send_session.sender(src) >+ receive_session = dst_broker.connect().session() >+ receiver = receive_session.receiver(dst) >+ while not active and tot_time < timeout: >+ sender.send(Message("Hello from Source!")) >+ try: >+ receiver.fetch(timeout = 1) >+ receive_session.acknowledge() >+ # Get this far without Empty exception, and the link is good! >+ active = True >+ while True: >+ # Keep receiving msgs, as several may have accumulated >+ receiver.fetch(timeout = 1) >+ receive_session.acknowledge() >+ except Empty: >+ if not active: >+ tot_time += 1 >+ receiver.close() >+ receive_session.close() >+ sender.close() >+ send_session.close() >+ return active >+ > def test_federation_failover(self): > """ > Verify that federation operates across failures occuring in a cluster. >@@ -782,38 +811,6 @@ acl allow all all > cluster to newly-added members > """ > >- TIMEOUT = 30 >- def verify(src_broker, src, dst_broker, dst, timeout=TIMEOUT): >- """ Prove that traffic can pass from source fed broker to >- destination fed broker >- """ >- tot_time = 0 >- active = False >- send_session = src_broker.connect().session() >- sender = send_session.sender(src) >- receive_session = dst_broker.connect().session() >- receiver = receive_session.receiver(dst) >- while not active and tot_time < timeout: >- sender.send(Message("Hello from Source!")) >- try: >- receiver.fetch(timeout = 1) >- receive_session.acknowledge() >- # Get this far without Empty exception, and the link is good! >- active = True >- while True: >- # Keep receiving msgs, as several may have accumulated >- receiver.fetch(timeout = 1) >- receive_session.acknowledge() >- except Empty: >- if not active: >- tot_time += 1 >- receiver.close() >- receive_session.close() >- sender.close() >- send_session.close() >- self.assertTrue(active, "Bridge failed to become active") >- >- > # 2 node cluster source, 2 node cluster destination > src_cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL) > for _b in src_cluster: _b.ready() >@@ -852,42 +849,219 @@ acl allow all all > self.assertEqual(result.status, 0, result) > > # check that traffic passes >- verify(src_cluster[0], "srcQ", dst_cluster[0], "destQ") >+ assert self._verify_federation(src_cluster[0], "srcQ", dst_cluster[0], "destQ") > > # add src[2] broker to source cluster > src_cluster.start(expect=EXPECT_EXIT_FAIL); >- for _b in src_cluster: _b.ready(); >- verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ") >+ src_cluster.ready(); >+ assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[0], "destQ") > > # Kill src[0]. dst[0] should fail over to src[1] > src_cluster[0].kill() > for b in src_cluster[1:]: b.ready() >- verify(src_cluster[1], "srcQ", dst_cluster[0], "destQ") >+ assert self._verify_federation(src_cluster[1], "srcQ", dst_cluster[0], "destQ") > > # Kill src[1], dst[0] should fail over to src[2] > src_cluster[1].kill() > for b in src_cluster[2:]: b.ready() >- verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ") >+ assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[0], "destQ") > > # Kill dest[0], force failover to dest[1] > dst_cluster[0].kill() > for b in dst_cluster[1:]: b.ready() >- verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ") >+ assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[1], "destQ") > > # Add dest[2] > # dest[1] syncs dest[2] to current remote state > dst_cluster.start(expect=EXPECT_EXIT_FAIL); > for b in dst_cluster[1:]: b.ready() >- verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ") >+ assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[1], "destQ") > > # Kill dest[1], force failover to dest[2] > dst_cluster[1].kill() > for b in dst_cluster[2:]: b.ready() >- verify(src_cluster[2], "srcQ", dst_cluster[2], "destQ") >+ assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[2], "destQ") > > for i in range(2, len(src_cluster)): src_cluster[i].kill() > for i in range(2, len(dst_cluster)): dst_cluster[i].kill() > >+ def _verify_federation(self, src_broker, src, dst_broker, dst, timeout=30): >+ """ Prove that traffic can pass between two federated brokers. >+ """ >+ tot_time = 0 >+ active = False >+ send_session = src_broker.connect().session() >+ sender = send_session.sender(src) >+ receive_session = dst_broker.connect().session() >+ receiver = receive_session.receiver(dst) >+ while not active and tot_time < timeout: >+ sender.send(Message("Hello from Source!")) >+ try: >+ receiver.fetch(timeout = 1) >+ receive_session.acknowledge() >+ # Get this far without Empty exception, and the link is good! >+ active = True >+ while True: >+ # Keep receiving msgs, as several may have accumulated >+ receiver.fetch(timeout = 1) >+ receive_session.acknowledge() >+ except Empty: >+ if not active: >+ tot_time += 1 >+ receiver.close() >+ receive_session.close() >+ sender.close() >+ send_session.close() >+ return active >+ >+ def test_federation_bridges_consistent(self): >+ """FIXME JIRA: verify that federation bridges are constructed consistently.""" >+ # 2 node cluster source, 2 node cluster destination >+ args=["--mgmt-pub-interval=1", >+ "--log-enable=trace+:management", >+ "--log-enable=trace+:Bridge" >+ ] >+ src_cluster = self.cluster(2, args=args) >+ for b in src_cluster: b.ready(); >+ dst_cluster = self.cluster(2, args=args) >+ for b in dst_cluster: b.ready(); >+ >+ cmd = self.popen(["qpid-config", >+ "--broker", src_cluster[0].host_port(), >+ "add", "queue", "srcQ"], EXPECT_EXIT_OK) >+ cmd.wait() >+ >+ cmd = self.popen(["qpid-config", >+ "--broker", dst_cluster[0].host_port(), >+ "add", "exchange", "fanout", "destX"], EXPECT_EXIT_OK) >+ cmd.wait() >+ >+ cmd = self.popen(["qpid-config", >+ "--broker", dst_cluster[0].host_port(), >+ "add", "queue", "destQ"], EXPECT_EXIT_OK) >+ cmd.wait() >+ >+ cmd = self.popen(["qpid-config", >+ "--broker", dst_cluster[0].host_port(), >+ "bind", "destX", "destQ"], EXPECT_EXIT_OK) >+ cmd.wait() >+ >+ # federate the srcQ to the destination exchange >+ dst_cluster[0].startQmf() >+ dst_broker = dst_cluster[0].qmf_session.getObjects(_class="broker")[0] >+ result = dst_broker.connect(src_cluster[0].host(), src_cluster[0].port(), False, "PLAIN", >+ "guest", "guest", "tcp") >+ self.assertEqual(result.status, 0, result); >+ >+ link = dst_cluster[0].qmf_session.getObjects(_class="link")[0] >+ result = link.bridge(False, "srcQ", "destX", "", "", "", True, False, False, 10) >+ self.assertEqual(result.status, 0, result) >+ >+ # check that traffic passes >+ assert self._verify_federation(src_cluster[0], "srcQ", dst_cluster[0], "destQ") >+ >+ cluster_test_logs.verify_logs("*%s*.log"%(dst_cluster.name)) >+ cluster_test_logs.verify_logs("*%s*.log"%(src_cluster.name)) >+ >+ def test_federation_multilink_failover(self): >+ """ >+ Verify that multi-link federation operates across failures occuring in >+ a cluster. >+ """ >+ >+ # 1 node cluster source, 1 node cluster destination >+ src_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL) >+ src_cluster.ready(); >+ dst_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL) >+ dst_cluster.ready(); >+ >+ # federate a direct binding across two separate links >+ >+ # first, create a direct exchange bound to two queues using different >+ # bindings >+ cmd = self.popen(["qpid-config", >+ "--broker", src_cluster[0].host_port(), >+ "add", "exchange", "direct", "FedX"], >+ EXPECT_EXIT_OK) >+ cmd.wait() >+ >+ cmd = self.popen(["qpid-config", >+ "--broker", dst_cluster[0].host_port(), >+ "add", "exchange", "direct", "FedX"], >+ EXPECT_EXIT_OK) >+ cmd.wait() >+ >+ cmd = self.popen(["qpid-config", >+ "--broker", dst_cluster[0].host_port(), >+ "add", "queue", "destQ1"], >+ EXPECT_EXIT_OK) >+ cmd.wait() >+ >+ cmd = self.popen(["qpid-config", >+ "--broker", dst_cluster[0].host_port(), >+ "bind", "FedX", "destQ1", "one"], >+ EXPECT_EXIT_OK) >+ cmd.wait() >+ >+ cmd = self.popen(["qpid-config", >+ "--broker", dst_cluster[0].host_port(), >+ "add", "queue", "destQ2"], >+ EXPECT_EXIT_OK) >+ cmd.wait() >+ >+ cmd = self.popen(["qpid-config", >+ "--broker", dst_cluster[0].host_port(), >+ "bind", "FedX", "destQ2", "two"], >+ EXPECT_EXIT_OK) >+ cmd.wait() >+ >+ # Create two separate links between the dst and source brokers, bind >+ # each to different keys >+ dst_cluster[0].startQmf() >+ dst_broker = dst_cluster[0].qmf_session.getObjects(_class="broker")[0] >+ >+ for _l in [("link1", "bridge1", "one"), >+ ("link2", "bridge2", "two")]: >+ result = dst_broker.create("link", _l[0], >+ {"host":src_cluster[0].host(), >+ "port":src_cluster[0].port()}, >+ False) >+ self.assertEqual(result.status, 0, result); >+ result = dst_broker.create("bridge", _l[1], >+ {"link":_l[0], >+ "src":"FedX", >+ "dest":"FedX", >+ "key":_l[2]}, False) >+ self.assertEqual(result.status, 0); >+ >+ # check that traffic passes >+ assert self._verify_federation(src_cluster[0], "FedX/one", dst_cluster[0], "destQ1") >+ assert self._verify_federation(src_cluster[0], "FedX/two", dst_cluster[0], "destQ2") >+ >+ # add new member, verify traffic >+ src_cluster.start(expect=EXPECT_EXIT_FAIL); >+ src_cluster.ready(); >+ >+ dst_cluster.start(expect=EXPECT_EXIT_FAIL); >+ dst_cluster.ready(); >+ >+ assert self._verify_federation(src_cluster[0], "FedX/one", dst_cluster[0], "destQ1") >+ assert self._verify_federation(src_cluster[0], "FedX/two", dst_cluster[0], "destQ2") >+ >+ src_cluster[0].kill() >+ for b in src_cluster[1:]: b.ready() >+ >+ assert self._verify_federation(src_cluster[1], "FedX/one", dst_cluster[0], "destQ1") >+ assert self._verify_federation(src_cluster[1], "FedX/two", dst_cluster[0], "destQ2") >+ >+ dst_cluster[0].kill() >+ for b in dst_cluster[1:]: b.ready() >+ >+ assert self._verify_federation(src_cluster[1], "FedX/one", dst_cluster[1], "destQ1") >+ assert self._verify_federation(src_cluster[1], "FedX/two", dst_cluster[1], "destQ2") >+ >+ for i in range(1, len(src_cluster)): src_cluster[i].kill() >+ for i in range(1, len(dst_cluster)): dst_cluster[i].kill() > > > class LongTests(BrokerTest): >diff -rup a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py >--- a/qpid/cpp/src/tests/federation.py 2012-08-20 08:54:35.000000000 +0200 >+++ b/qpid/cpp/src/tests/federation.py 2012-08-20 08:55:02.000000000 +0200 >@@ -23,6 +23,7 @@ from qpid.testlib import TestBase010 > from qpid.datatypes import Message > from qpid.queue import Empty > from qpid.util import URL >+import qpid.messaging > from time import sleep, time > > >@@ -94,6 +95,11 @@ class FederationTests(TestBase010): > break > self._brokers.append(_b) > >+ # add a new-style messaging connection to each broker >+ for _b in self._brokers: >+ _b.connection = qpid.messaging.Connection(_b.url) >+ _b.connection.open() >+ > def _teardown_brokers(self): > """ Un-does _setup_brokers() > """ >@@ -103,7 +109,7 @@ class FederationTests(TestBase010): > if not _b.client_session.error(): > _b.client_session.close(timeout=10) > _b.client_conn.close(timeout=10) >- >+ _b.connection.close() > > def test_bridge_create_and_close(self): > self.startQmf(); >@@ -127,18 +133,28 @@ class FederationTests(TestBase010): > self.verify_cleanup() > > def test_pull_from_exchange(self): >+ """ This test uses an alternative method to manage links and bridges >+ via the broker object. >+ """ > session = self.session >- >+ > self.startQmf() > qmf = self.qmf > broker = qmf.getObjects(_class="broker")[0] >- result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") >- self.assertEqual(result.status, 0) > >+ # create link >+ link_args = {"host":self.remote_host(), "port":self.remote_port(), "durable":False, >+ "authMechanism":"PLAIN", "username":"guest", "password":"guest", >+ "transport":"tcp"} >+ result = broker.create("link", "test-link-1", link_args, False) >+ self.assertEqual(result.status, 0, result) > link = qmf.getObjects(_class="link")[0] >- result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False, 0) >- self.assertEqual(result.status, 0) > >+ # create bridge >+ bridge_args = {"link":"test-link-1", "src":"amq.direct", "dest":"amq.fanout", >+ "key":"my-key"} >+ result = broker.create("bridge", "test-bridge-1", bridge_args, False); >+ self.assertEqual(result.status, 0, result) > bridge = qmf.getObjects(_class="bridge")[0] > > #setup queue to receive messages from local broker >@@ -164,10 +180,11 @@ class FederationTests(TestBase010): > self.fail("Got unexpected message in queue: " + extra.body) > except Empty: None > >- result = bridge.close() >- self.assertEqual(result.status, 0) >- result = link.close() >- self.assertEqual(result.status, 0) >+ result = broker.delete("bridge", "test-bridge-1", {}) >+ self.assertEqual(result.status, 0, result) >+ >+ result = broker.delete("link", "test-link-1", {}) >+ self.assertEqual(result.status, 0, result) > > self.verify_cleanup() > >@@ -2089,3 +2106,167 @@ class FederationTests(TestBase010): > self.verify_cleanup() > > >+ def test_multilink_direct(self): >+ """ Verify that two distinct links can be created between federated >+ brokers. >+ """ >+ self.startQmf() >+ qmf = self.qmf >+ self._setup_brokers() >+ src_broker = self._brokers[0] >+ dst_broker = self._brokers[1] >+ >+ # create a direct exchange on each broker >+ for _b in [src_broker, dst_broker]: >+ _b.client_session.exchange_declare(exchange="fedX.direct", type="direct") >+ self.assertEqual(_b.client_session.exchange_query(name="fedX.direct").type, >+ "direct", "exchange_declare failed!") >+ >+ # create destination queues >+ for _q in [("HiQ", "high"), ("MedQ", "medium"), ("LoQ", "low")]: >+ dst_broker.client_session.queue_declare(queue=_q[0], auto_delete=True) >+ dst_broker.client_session.exchange_bind(queue=_q[0], exchange="fedX.direct", binding_key=_q[1]) >+ >+ # create two connections, one for high priority traffic >+ for _q in ["HiPri", "Traffic"]: >+ result = dst_broker.qmf_object.create("link", _q, >+ {"host":src_broker.host, >+ "port":src_broker.port}, >+ False) >+ self.assertEqual(result.status, 0); >+ >+ links = qmf.getObjects(_broker=dst_broker.qmf_broker, _class="link") >+ for _l in links: >+ if _l.name == "HiPri": >+ hi_link = _l >+ elif _l.name == "Traffic": >+ data_link = _l >+ else: >+ self.fail("Unexpected Link found: " + _l.name) >+ >+ # now create a route for messages sent with key "high" to use the >+ # hi_link >+ result = dst_broker.qmf_object.create("bridge", "HiPriBridge", >+ {"link":hi_link.name, >+ "src":"fedX.direct", >+ "dest":"fedX.direct", >+ "key":"high"}, False) >+ self.assertEqual(result.status, 0); >+ >+ >+ # create routes for the "medium" and "low" links to use the normal >+ # data_link >+ for _b in [("MediumBridge", "medium"), ("LowBridge", "low")]: >+ result = dst_broker.qmf_object.create("bridge", _b[0], >+ {"link":data_link.name, >+ "src":"fedX.direct", >+ "dest":"fedX.direct", >+ "key":_b[1]}, False) >+ self.assertEqual(result.status, 0); >+ >+ # now wait for the links to become operational >+ for _l in [hi_link, data_link]: >+ expire_time = time() + 30 >+ while _l.state != "Operational" and time() < expire_time: >+ _l.update() >+ self.assertEqual(_l.state, "Operational", "Link failed to become operational") >+ >+ # verify each link uses a different connection >+ self.assertNotEqual(hi_link.connectionRef, data_link.connectionRef, >+ "Different links using the same connection") >+ >+ hi_conn = qmf.getObjects(_broker=dst_broker.qmf_broker, >+ _objectId=hi_link.connectionRef)[0] >+ data_conn = qmf.getObjects(_broker=dst_broker.qmf_broker, >+ _objectId=data_link.connectionRef)[0] >+ >+ # wait for the bindings to come up on the source broker >+ for _x in qmf.getObjects(_broker=src_broker.qmf_broker, >+ _class="exchange"): >+ if _x.name == "fedX.direct": >+ expire_time = time() + 30 >+ while _x.bindingCount != 3 and time() < expire_time: >+ _x.update() >+ self.assertEqual(_x.bindingCount, 3, "Bridges failed to become operational") >+ break >+ >+ # send hi data, verify only goes over hi link >+ >+ r_ssn = dst_broker.connection.session() >+ hi_receiver = r_ssn.receiver("HiQ"); >+ med_receiver = r_ssn.receiver("MedQ"); >+ low_receiver = r_ssn.receiver("LoQ"); >+ >+ for _c in [hi_conn, data_conn]: >+ _c.update() >+ self.assertEqual(_c.msgsToClient, 0, "Unexpected messages received") >+ >+ s_ssn = src_broker.connection.session() >+ hi_sender = s_ssn.sender("fedX.direct/high") >+ med_sender = s_ssn.sender("fedX.direct/medium") >+ low_sender = s_ssn.sender("fedX.direct/low") >+ >+ try: >+ hi_sender.send(qpid.messaging.Message(content="hi priority")) >+ msg = hi_receiver.fetch(timeout=10) >+ r_ssn.acknowledge() >+ self.assertEqual(msg.content, "hi priority"); >+ except: >+ self.fail("Hi Pri message failure") >+ >+ hi_conn.update() >+ data_conn.update() >+ self.assertEqual(hi_conn.msgsToClient, 1, "Expected 1 hi pri message") >+ self.assertEqual(data_conn.msgsToClient, 0, "Expected 0 data messages") >+ >+ # send low and medium, verify it does not go over hi link >+ >+ try: >+ med_sender.send(qpid.messaging.Message(content="medium priority")) >+ msg = med_receiver.fetch(timeout=10) >+ r_ssn.acknowledge() >+ self.assertEqual(msg.content, "medium priority"); >+ except: >+ self.fail("Medium Pri message failure") >+ >+ hi_conn.update() >+ data_conn.update() >+ self.assertEqual(hi_conn.msgsToClient, 1, "Expected 1 hi pri message") >+ self.assertEqual(data_conn.msgsToClient, 1, "Expected 1 data message") >+ >+ try: >+ low_sender.send(qpid.messaging.Message(content="low priority")) >+ msg = low_receiver.fetch(timeout=10) >+ r_ssn.acknowledge() >+ self.assertEqual(msg.content, "low priority"); >+ except: >+ self.fail("Low Pri message failure") >+ >+ hi_conn.update() >+ data_conn.update() >+ self.assertEqual(hi_conn.msgsToClient, 1, "Expected 1 hi pri message") >+ self.assertEqual(data_conn.msgsToClient, 2, "Expected 2 data message") >+ >+ # cleanup >+ >+ for _b in qmf.getObjects(_broker=dst_broker.qmf_broker,_class="bridge"): >+ result = _b.close() >+ self.assertEqual(result.status, 0) >+ >+ for _l in qmf.getObjects(_broker=dst_broker.qmf_broker,_class="link"): >+ result = _l.close() >+ self.assertEqual(result.status, 0) >+ >+ for _q in [("HiQ", "high"), ("MedQ", "medium"), ("LoQ", "low")]: >+ dst_broker.client_session.exchange_unbind(queue=_q[0], exchange="fedX.direct", binding_key=_q[1]) >+ dst_broker.client_session.queue_delete(queue=_q[0]) >+ >+ for _b in [src_broker, dst_broker]: >+ _b.client_session.exchange_delete(exchange="fedX.direct") >+ >+ self._teardown_brokers() >+ >+ self.verify_cleanup() >+ >+ >+ >diff -rup a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml >--- a/qpid/cpp/xml/cluster.xml 2012-08-20 08:54:35.000000000 +0200 >+++ b/qpid/cpp/xml/cluster.xml 2012-08-20 08:55:02.000000000 +0200 >@@ -175,6 +175,7 @@ > <field name="blocked" type="bit"/> > <field name="notifyEnabled" type="bit"/> > <field name="position" type="sequence-no"/> >+ <field name="deliveryCount" type="uint32"/> > </control> > > <!-- Delivery-record for outgoing messages sent but not yet accepted. --> >diff -rup a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml >--- a/qpid/specs/management-schema.xml 2011-03-02 20:02:00.000000000 +0100 >+++ b/qpid/specs/management-schema.xml 2012-08-20 08:55:02.000000000 +0200 >@@ -282,10 +282,12 @@ > This class represents an inter-broker connection. > > <property name="vhostRef" type="objId" references="Vhost" access="RC" index="y" parentRef="y"/> >- <property name="host" type="sstr" access="RC" index="y"/> >- <property name="port" type="uint16" access="RC" index="y"/> >- <property name="transport" type="sstr" access="RC"/> >+ <property name="name" type="sstr" access="RC" index="y"/> >+ <property name="host" type="sstr" access="RO"/> >+ <property name="port" type="uint16" access="RO"/> >+ <property name="transport" type="sstr" access="RO"/> > <property name="durable" type="bool" access="RC"/> >+ <property name="connectionRef" type="objId" references="Connection" access="RO"/> > > <statistic name="state" type="sstr" desc="Operational state of the link"/> > <statistic name="lastError" type="lstr" desc="Reason link is not operational"/> >@@ -314,7 +316,8 @@ > --> > <class name="Bridge"> > <property name="linkRef" type="objId" references="Link" access="RC" index="y" parentRef="y"/> >- <property name="channelId" type="uint16" access="RC" index="y"/> >+ <property name="name" type="sstr" access="RC" index="y"/> >+ <property name="channelId" type="uint16" access="RO"/> > <property name="durable" type="bool" access="RC"/> > <property name="src" type="sstr" access="RC"/> > <property name="dest" type="sstr" access="RC"/>
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
View Attachment As Diff
View Attachment As Raw
Actions:
View
|
Diff
Attachments on
bug 836141
:
603210
|
603869
|
604098
|
604397
| 606198 |
609884
|
614799