Login
[x]
Log in using an account from:
Fedora Account System
Red Hat Associate
Red Hat Customer
Or login using a Red Hat Bugzilla account
Forgot Password
Login:
Hide Forgot
Create an Account
Red Hat Bugzilla – Attachment 642320 Details for
Bug 873883
Dynamic federation bindings can become out-of-sync
[?]
New
Simple Search
Advanced Search
My Links
Browse
Requests
Reports
Current State
Search
Tabular reports
Graphical reports
Duplicates
Other Reports
User Changes
Plotly Reports
Bug Status
Bug Severity
Non-Defaults
|
Product Dashboard
Help
Page Help!
Bug Writing Guidelines
What's new
Browser Support Policy
5.0.4.rh83 Release notes
FAQ
Guides index
User guide
Web Services
Contact
Legal
This site requires JavaScript to be enabled to function correctly, please enable it.
[patch]
Updated patch to auto-unbind propagated bindings when destination broker disconnects
bz873883.patch (text/plain), 11.80 KB, created by
Jason Dillaman
on 2012-11-10 21:11:10 UTC
(
hide
)
Description:
Updated patch to auto-unbind propagated bindings when destination broker disconnects
Filename:
MIME Type:
Creator:
Jason Dillaman
Created:
2012-11-10 21:11:10 UTC
Size:
11.80 KB
patch
obsolete
>diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp >index 3f35758..b33d0ef 100644 >--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp >+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp >@@ -38,9 +38,11 @@ > #include "qpid/sys/ClusterSafe.h" > #include "qpid/ptr_map.h" > #include "qpid/broker/AclModule.h" >+#include "qpid/broker/FedOps.h" > > #include <boost/bind.hpp> > #include <boost/format.hpp> >+#include <boost/tuple/tuple_comparison.hpp> > > #include <iostream> > #include <sstream> >@@ -49,6 +51,11 @@ > > #include <assert.h> > >+namespace { >+const std::string X_SCOPE("x-scope"); >+const std::string SESSION("session"); >+} >+ > namespace qpid { > namespace broker { > >@@ -90,6 +97,7 @@ void SemanticState::closed() { > dtxBuffer->fail(); > } > recover(true); >+ unbindSessionBindings(); > > //now unsubscribe, which may trigger queue deletion and thus > //needs to occur after the requeueing of unacked messages >@@ -845,4 +853,51 @@ void SemanticState::detached() > } > } > >+void SemanticState::addBinding(const string& queueName, const string& exchangeName, >+ const string& routingKey, const framing::FieldTable& arguments) >+{ >+ std::string fedOp = arguments.getAsString(qpidFedOp); >+ std::string fedOrigin = arguments.getAsString(qpidFedOrigin); >+ if (((fedOp.empty()) && (arguments.getAsString(X_SCOPE) == SESSION)) || (fedOp == fedOpBind)) { >+ bindings.insert(boost::make_tuple(queueName, exchangeName, routingKey, fedOrigin)); >+ } >+ else if (fedOp == fedOpUnbind) { >+ bindings.erase(boost::make_tuple(queueName, exchangeName, routingKey, fedOrigin)); >+ } >+} >+ >+void SemanticState::removeBinding(const string& queueName, const string& exchangeName, >+ const string& routingKey) >+{ >+ bindings.erase(boost::make_tuple(queueName, exchangeName, routingKey, "")); >+} >+ >+void SemanticState::unbindSessionBindings() >+{ >+ //unbind session-scoped bindings >+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) { >+ QPID_LOG (debug, "Unbinding session-scoped bindings [" >+ << "queue=" << i->get<0>() << ", " >+ << "exchange=" << i->get<1>()<< ", " >+ << "key=" << i->get<2>() << ", " >+ << "fedOrigin=" << i->get<3>() << "]"); >+ try { >+ std::string fedOrigin = i->get<3>(); >+ if (!fedOrigin.empty()) { >+ framing::FieldTable fedArguments; >+ fedArguments.setString(qpidFedOp, fedOpUnbind); >+ fedArguments.setString(qpidFedOrigin, fedOrigin); >+ session.getBroker().bind(i->get<0>(), i->get<1>(), i->get<2>(), fedArguments, >+ userID, connectionId); >+ } else { >+ session.getBroker().unbind(i->get<0>(), i->get<1>(), i->get<2>(), >+ userID, connectionId); >+ } >+ } >+ catch (...) { >+ } >+ } >+ bindings.clear(); >+} >+ > }} // namespace qpid::broker >diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h >index 7b3a873..302f27e 100644 >--- a/qpid/cpp/src/qpid/broker/SemanticState.h >+++ b/qpid/cpp/src/qpid/broker/SemanticState.h >@@ -49,6 +49,7 @@ > #include <boost/enable_shared_from_this.hpp> > #include <boost/intrusive_ptr.hpp> > #include <boost/cast.hpp> >+#include <boost/tuple/tuple.hpp> > > namespace qpid { > namespace broker { >@@ -167,6 +168,8 @@ class SemanticState : private boost::noncopyable { > > private: > typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap; >+ typedef boost::tuple<std::string, std::string, std::string, std::string> Binding; >+ typedef std::set<Binding> Bindings; > > SessionContext& session; > DeliveryAdapter& deliveryAdapter; >@@ -185,6 +188,8 @@ class SemanticState : private boost::noncopyable { > //needed for queue delete events in auto-delete: > const std::string connectionId; > >+ Bindings bindings; >+ > void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy); > void checkDtxTimeout(); > >@@ -193,6 +198,7 @@ class SemanticState : private boost::noncopyable { > void requestDispatch(); > void cancel(ConsumerImpl::shared_ptr); > void disable(ConsumerImpl::shared_ptr); >+ void unbindSessionBindings(); > > public: > >@@ -267,6 +273,11 @@ class SemanticState : private boost::noncopyable { > void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; } > void record(const DeliveryRecord& delivery); > DtxBufferMap& getSuspendedXids() { return suspendedXids; } >+ >+ void addBinding(const std::string& queueName, const std::string& exchangeName, >+ const std::string& routingKey, const framing::FieldTable& arguments); >+ void removeBinding(const std::string& queueName, const std::string& exchangeName, >+ const std::string& routingKey); > }; > > }} // namespace qpid::broker >diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp >index ec4e02e..c69c319 100644 >--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp >+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp >@@ -165,12 +165,14 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, > { > getBroker().bind(queueName, exchangeName, routingKey, arguments, > getConnection().getUserId(), getConnection().getUrl()); >+ state.addBinding(queueName, exchangeName, routingKey, arguments); > } > > void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName, > const string& exchangeName, > const string& routingKey) > { >+ state.removeBinding(queueName, exchangeName, routingKey); > getBroker().unbind(queueName, exchangeName, routingKey, > getConnection().getUserId(), getConnection().getUrl()); > } >diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp >index edb50fc..55cff04 100644 >--- a/qpid/cpp/src/tests/MessagingSessionTests.cpp >+++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp >@@ -1196,6 +1196,27 @@ QPID_AUTO_TEST_CASE(testBrowseOnly) > fix.session.acknowledge(); > } > >+QPID_AUTO_TEST_CASE(testLinkBindingCleanup) >+{ >+ MessagingFixture fix; >+ >+ Sender sender = fix.session.createSender("test.ex;{create:always,node:{type:topic}}"); >+ >+ Connection connection = fix.newConnection(); >+ connection.open(); >+ >+ Session session(connection.createSession()); >+ Receiver receiver1 = session.createReceiver("test.q;{create:always, node:{type:queue, x-bindings:[{exchange:test.ex,queue:test.q,key:#,arguments:{x-scope:session}}]}}"); >+ Receiver receiver2 = fix.session.createReceiver("test.q;{create:never, delete:always}"); >+ connection.close(); >+ >+ sender.send(Message("test-message"), true); >+ >+ // The session-scoped binding should be removed when receiver1's network connection is lost >+ Message in; >+ BOOST_CHECK(!receiver2.fetch(in, Duration::IMMEDIATE)); >+} >+ > QPID_AUTO_TEST_SUITE_END() > > }} // namespace qpid::tests >diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py >index dcd074e..659a319 100755 >--- a/qpid/cpp/src/tests/federation.py >+++ b/qpid/cpp/src/tests/federation.py >@@ -2604,3 +2604,110 @@ class FederationTests(TestBase010): > > self.verify_cleanup() > >+ def test_dynamic_bounce_unbinds_named_queue(self): >+ """ Verify that a propagated binding is removed when the connection is >+ bounced >+ """ >+ session = self.session >+ >+ # create the federation >+ >+ self.startQmf() >+ qmf = self.qmf >+ >+ self._setup_brokers() >+ >+ # create exchange on each broker, and retrieve the corresponding >+ # management object for that exchange >+ >+ exchanges=[] >+ for _b in self._brokers[0:2]: >+ _b.client_session.exchange_declare(exchange="fedX", type="direct") >+ self.assertEqual(_b.client_session.exchange_query(name="fedX").type, >+ "direct", "exchange_declare failed!") >+ # pull the exchange out of qmf... >+ retries = 0 >+ my_exchange = None >+ timeout = time() + 10 >+ while my_exchange is None and time() <= timeout: >+ objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange") >+ for ooo in objs: >+ if ooo.name == "fedX": >+ my_exchange = ooo >+ break >+ if my_exchange is None: >+ self.fail("QMF failed to find new exchange!") >+ exchanges.append(my_exchange) >+ >+ # on the destination broker, create a binding for propagation >+ self._brokers[0].client_session.queue_declare(queue="fedDstQ") >+ self._brokers[0].client_session.exchange_bind(queue="fedDstQ", exchange="fedX", binding_key="spud") >+ >+ # on the source broker, create a bridge queue >+ self._brokers[1].client_session.queue_declare(queue="fedSrcQ") >+ >+ # connect B1 --> B0 >+ result = self._brokers[0].qmf_object.create( "link", >+ "Link-dynamic", >+ {"host":self._brokers[1].host, >+ "port":self._brokers[1].port}, False) >+ self.assertEqual(result.status, 0) >+ >+ # bridge the "fedX" exchange: >+ result = self._brokers[0].qmf_object.create("bridge", >+ "Bridge-dynamic", >+ {"link":"Link-dynamic", >+ "src":"fedX", >+ "dest":"fedX", >+ "dynamic":True, >+ "queue":"fedSrcQ"}, False) >+ self.assertEqual(result.status, 0) >+ >+ # wait for the inter-broker links to become operational >+ operational = False >+ timeout = time() + 10 >+ while not operational and time() <= timeout: >+ operational = True >+ for _l in qmf.getObjects(_class="link"): >+ #print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.state))) >+ if _l.state != "Operational": >+ operational = False >+ self.failUnless(operational, "inter-broker links failed to become operational.") >+ >+ # wait until the binding key has propagated to the src broker >+ exchanges[1].update() >+ timeout = time() + 10 >+ while exchanges[1].bindingCount < 1 and time() <= timeout: >+ exchanges[1].update() >+ self.failUnless(exchanges[1].bindingCount == 1) >+ >+ # >+ # Tear down the bridges between the two exchanges, then wait >+ # for the bindings to be cleaned up >+ # >+ for _b in qmf.getObjects(_class="bridge"): >+ result = _b.close() >+ self.assertEqual(result.status, 0) >+ exchanges[1].update() >+ timeout = time() + 10 >+ while exchanges[1].bindingCount != 0 and time() <= timeout: >+ exchanges[1].update() >+ self.failUnless(exchanges[1].bindingCount == 0) >+ >+ self._brokers[1].client_session.queue_delete(queue="fedSrcQ") >+ >+ for _b in qmf.getObjects(_class="bridge"): >+ result = _b.close() >+ self.assertEqual(result.status, 0) >+ >+ for _l in qmf.getObjects(_class="link"): >+ result = _l.close() >+ self.assertEqual(result.status, 0) >+ >+ for _b in self._brokers[0:2]: >+ _b.client_session.exchange_delete(exchange="fedX") >+ >+ self._teardown_brokers() >+ >+ self.verify_cleanup() >+
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
View Attachment As Diff
View Attachment As Raw
Actions:
View
|
Diff
Attachments on
bug 873883
:
641618
| 642320