Login
[x]
Log in using an account from:
Fedora Account System
Red Hat Associate
Red Hat Customer
Or login using a Red Hat Bugzilla account
Forgot Password
Login:
Hide Forgot
Create an Account
Red Hat Bugzilla – Attachment 645040 Details for
Bug 876688
HA replication of propagated bindings can lead to incorrect configuration
[?]
New
Simple Search
Advanced Search
My Links
Browse
Requests
Reports
Current State
Search
Tabular reports
Graphical reports
Duplicates
Other Reports
User Changes
Plotly Reports
Bug Status
Bug Severity
Non-Defaults
|
Product Dashboard
Help
Page Help!
Bug Writing Guidelines
What's new
Browser Support Policy
5.0.4.rh83 Release notes
FAQ
Guides index
User guide
Web Services
Contact
Legal
This site requires JavaScript to be enabled to function correctly, please enable it.
[patch]
Quick patch to exlude propagated bindings from replication
bz876688.patch (text/plain), 13.84 KB, created by
Jason Dillaman
on 2012-11-14 18:13:54 UTC
(
hide
)
Description:
Quick patch to exlude propagated bindings from replication
Filename:
MIME Type:
Creator:
Jason Dillaman
Created:
2012-11-14 18:13:54 UTC
Size:
13.84 KB
patch
obsolete
>diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp >index 90cb1a7..8d55f3b 100644 >--- a/qpid/cpp/src/qpid/broker/Bridge.cpp >+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp >@@ -49,6 +49,11 @@ using qpid::management::ManagementAgent; > using std::string; > namespace _qmf = qmf::org::apache::qpid::broker; > >+namespace { >+const std::string QPID_REPLICATE("qpid.replicate"); >+const std::string NONE("none"); >+} >+ > namespace qpid { > namespace broker { > >@@ -333,6 +338,7 @@ void Bridge::propagateBinding(const string& key, const string& tagList, > } > string newTagList(tagList + string(tagList.empty() ? "" : ",") + localTag); > >+ bindArgs.setString(QPID_REPLICATE, NONE); > bindArgs.setString(qpidFedOp, op); > bindArgs.setString(qpidFedTags, newTagList); > if (origin.empty()) >diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp >index 533c5b6..c8c3764 100644 >--- a/qpid/cpp/src/qpid/broker/Broker.cpp >+++ b/qpid/cpp/src/qpid/broker/Broker.cpp >@@ -1251,6 +1251,7 @@ void Broker::bind(const std::string& queueName, > QPID_LOG_CAT(debug, model, "Create binding. exchange:" << exchangeName > << " queue:" << queueName > << " key:" << key >+ << " arguments:" << arguments > << " user:" << userId > << " rhost:" << connectionId); > } >diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.cpp b/qpid/cpp/src/qpid/broker/DirectExchange.cpp >index 2fa7ce0..773a99d 100644 >--- a/qpid/cpp/src/qpid/broker/DirectExchange.cpp >+++ b/qpid/cpp/src/qpid/broker/DirectExchange.cpp >@@ -70,7 +70,7 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con > > if (args == 0 || fedOp.empty() || fedOp == fedOpBind) { > Mutex::ScopedLock l(lock); >- Binding::shared_ptr b(new Binding(routingKey, queue, this, FieldTable(), fedOrigin)); >+ Binding::shared_ptr b(new Binding(routingKey, queue, this, args ? *args : FieldTable(), fedOrigin)); > BoundKey& bk = bindings[routingKey]; > if (exclusiveBinding) bk.queues.clear(); > >diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp >index 56c894c..43c67af 100644 >--- a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp >+++ b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp >@@ -54,7 +54,7 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const > bool propagate = false; > > if (args == 0 || fedOp.empty() || fedOp == fedOpBind) { >- Binding::shared_ptr binding (new Binding ("", queue, this, FieldTable(), fedOrigin)); >+ Binding::shared_ptr binding (new Binding ("", queue, this, args ? *args : FieldTable(), fedOrigin)); > if (bindings.add_unless(binding, MatchQueue(queue))) { > binding->startManagement(); > propagate = fedBinding.addOrigin(queue->getName(), fedOrigin); >diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp >index 9975d26..842f8e2 100644 >--- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp >+++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp >@@ -47,6 +47,7 @@ namespace { > const std::string empty; > > // federation related args and values >+ const std::string QPID_RESERVED("qpid."); > const std::string qpidFedOp("qpid.fed.op"); > const std::string qpidFedTags("qpid.fed.tags"); > const std::string qpidFedOrigin("qpid.fed.origin"); >@@ -120,8 +121,8 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co > //matching (they are internally added properties > //controlling binding propagation but not relevant to > //actual routing) >- Binding::shared_ptr binding (new Binding (bindingKey, queue, this, extra_args)); >- BoundKey bk(binding); >+ Binding::shared_ptr binding (new Binding (bindingKey, queue, this, args ? *args : FieldTable())); >+ BoundKey bk(binding, extra_args); > if (bindings.add_unless(bk, MatchArgs(queue, &extra_args))) { > binding->startManagement(); > propagate = bk.fedBinding.addOrigin(queue->getName(), fedOrigin); >@@ -216,7 +217,7 @@ void HeadersExchange::route(Deliverable& msg) > Bindings::ConstPtr p = bindings.snapshot(); > if (p.get()) { > for (std::vector<BoundKey>::const_iterator i = p->begin(); i != p->end(); ++i) { >- if (match((*i).binding->args, *args)) { >+ if (match((*i).args, *args)) { > b->push_back((*i).binding); > } > } >@@ -230,7 +231,7 @@ bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, cons > Bindings::ConstPtr p = bindings.snapshot(); > if (p.get()){ > for (std::vector<BoundKey>::const_iterator i = p->begin(); i != p->end(); ++i) { >- if ( (!args || equal((*i).binding->args, *args)) && (!queue || (*i).binding->queue == queue)) { >+ if ( (!args || equal((*i).args, *args)) && (!queue || (*i).binding->queue == queue)) { > return true; > } > } >@@ -247,10 +248,7 @@ void HeadersExchange::getNonFedArgs(const FieldTable* args, FieldTable& nonFedAr > > for (qpid::framing::FieldTable::ValueMap::const_iterator i=args->begin(); i != args->end(); ++i) > { >- const string & name(i->first); >- if (name == qpidFedOp || >- name == qpidFedTags || >- name == qpidFedOrigin) >+ if (i->first.find(QPID_RESERVED) == 0) > { > continue; > } >diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.h b/qpid/cpp/src/qpid/broker/HeadersExchange.h >index d10892b..f8a13bd 100644 >--- a/qpid/cpp/src/qpid/broker/HeadersExchange.h >+++ b/qpid/cpp/src/qpid/broker/HeadersExchange.h >@@ -38,8 +38,9 @@ class HeadersExchange : public virtual Exchange { > struct BoundKey > { > Binding::shared_ptr binding; >+ qpid::framing::FieldTable args; > FedBinding fedBinding; >- BoundKey(Binding::shared_ptr binding_) : binding(binding_) {} >+ BoundKey(Binding::shared_ptr binding_, const qpid::framing::FieldTable& args_) : binding(binding_), args(args_) {} > }; > > struct MatchArgs >diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp >index b33d0ef..c4d363b 100644 >--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp >+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp >@@ -856,9 +856,17 @@ void SemanticState::detached() > void SemanticState::addBinding(const string& queueName, const string& exchangeName, > const string& routingKey, const framing::FieldTable& arguments) > { >+ QPID_LOG (debug, "SemanticState::addBinding [" >+ << "queue=" << queueName << ", " >+ << "exchange=" << exchangeName << ", " >+ << "key=" << routingKey << ", " >+ << "args=" << arguments << "]"); > std::string fedOp = arguments.getAsString(qpidFedOp); >+ if ((arguments.isSet(qpidFedOp)) && (fedOp.empty())) { >+ fedOp = fedOpBind; >+ } > std::string fedOrigin = arguments.getAsString(qpidFedOrigin); >- if (((fedOp.empty()) && (arguments.getAsString(X_SCOPE) == SESSION)) || (fedOp == fedOpBind)) { >+ if ((arguments.getAsString(X_SCOPE) == SESSION) || (fedOp == fedOpBind)) { > bindings.insert(boost::make_tuple(queueName, exchangeName, routingKey, fedOrigin)); > } > else if (fedOp == fedOpUnbind) { >@@ -869,6 +877,10 @@ void SemanticState::addBinding(const string& queueName, const string& exchangeNa > void SemanticState::removeBinding(const string& queueName, const string& exchangeName, > const string& routingKey) > { >+ QPID_LOG (debug, "SemanticState::removeBinding [" >+ << "queue=" << queueName << ", " >+ << "exchange=" << exchangeName << ", " >+ << "key=" << routingKey) > bindings.erase(boost::make_tuple(queueName, exchangeName, routingKey, "")); > } > >@@ -876,7 +888,7 @@ void SemanticState::unbindSessionBindings() > { > //unbind session-scoped bindings > for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) { >- QPID_LOG (debug, "Unbinding session-scoped bindings [" >+ QPID_LOG (debug, "SemanticState::unbindSessionBindings [" > << "queue=" << i->get<0>() << ", " > << "exchange=" << i->get<1>()<< ", " > << "key=" << i->get<2>() << ", " >diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp >index c11389b..d49464b 100644 >--- a/qpid/cpp/src/qpid/broker/TopicExchange.cpp >+++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp >@@ -179,7 +179,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons > } > } > >- Binding::shared_ptr binding (new Binding (routingPattern, queue, this, FieldTable(), fedOrigin)); >+ Binding::shared_ptr binding (new Binding (routingPattern, queue, this, args ? *args : FieldTable(), fedOrigin)); > binding->startManagement(); > bk->bindingVector.push_back(binding); > nBindings++; >diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp >index e70c81e..154a76a 100644 >--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp >+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp >@@ -26,6 +26,7 @@ > #include "qpid/broker/ConnectionObserver.h" > #include "qpid/broker/Queue.h" > #include "qpid/broker/Link.h" >+#include "qpid/broker/FedOps.h" > #include "qpid/framing/FieldTable.h" > #include "qpid/framing/FieldValue.h" > #include "qpid/log/Statement.h" >@@ -540,17 +541,19 @@ void BrokerReplicator::doEventBind(Variant::Map& values) { > exchanges.find(values[EXNAME].asString()); > boost::shared_ptr<Queue> queue = > queues.find(values[QNAME].asString()); >+ framing::FieldTable args; >+ amqp_0_10::translate(asMapVoid(values[ARGS]), args); > // We only replicate binds for a replicated queue to replicated > // exchange that both exist locally. > if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && >- queue && replicationTest.replicateLevel(queue->getSettings())) >+ queue && replicationTest.replicateLevel(queue->getSettings()) && >+ replicationTest.replicateLevel(args)) > { >- framing::FieldTable args; >- amqp_0_10::translate(asMapVoid(values[ARGS]), args); > string key = values[KEY].asString(); > QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName() > << " queue=" << queue->getName() >- << " key=" << key); >+ << " key=" << key >+ << " args=" << args); > queue->bind(exchange, key, args); > } > } >@@ -565,13 +568,11 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) { > if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && > queue && replicationTest.replicateLevel(queue->getSettings())) > { >- framing::FieldTable args; >- amqp_0_10::translate(asMapVoid(values[ARGS]), args); > string key = values[KEY].asString(); > QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName() > << " queue=" << queue->getName() > << " key=" << key); >- exchange->unbind(queue, key, &args); >+ exchange->unbind(queue, key, 0); > } > } > >@@ -695,16 +696,19 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) { > boost::shared_ptr<Exchange> exchange = exchanges.find(exName); > boost::shared_ptr<Queue> queue = queues.find(qName); > >+ framing::FieldTable args; >+ amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); >+ > // Automatically replicate binding if queue and exchange exist and are replicated > if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && >- queue && replicationTest.replicateLevel(queue->getSettings())) >+ queue && replicationTest.replicateLevel(queue->getSettings()) && >+ replicationTest.replicateLevel(args)) > { > string key = values[BINDING_KEY].asString(); > QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName > << " queue:" << qName >- << " key:" << key); >- framing::FieldTable args; >- amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); >+ << " key:" << key >+ << " args:" << args); > queue->bind(exchange, key, args); > } > } >diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py >index 01de484..60e0c1b 100755 >--- a/qpid/cpp/src/tests/ha_tests.py >+++ b/qpid/cpp/src/tests/ha_tests.py >@@ -473,6 +473,23 @@ class ReplicationTests(HaBrokerTest): > self.fail("Excpected no-such-queue exception") > except NotFound: pass > >+ def test_replicate_binding(self): >+ """Verify that binding replication can be disabled""" >+ primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL) >+ primary.promote() >+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) >+ ps = primary.connect().session() >+ ps.sender("ex;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':all}, type:'fanout'}}}") >+ ps.sender("q;{create:always,node:{type:queue,x-declare:{arguments:{'qpid.replicate':all}},x-bindings:[{exchange:'ex',queue:'q',key:'',arguments:{'qpid.replicate':none}}]}}") >+ backup.wait_backup("q") >+ >+ primary.kill() >+ assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die >+ backup.promote() >+ bs = backup.connect_admin().session() >+ bs.sender("ex").send(Message("msg")) >+ self.assert_browse_retry(bs, "q", []) >+ > def test_invalid_replication(self): > """Verify that we reject an attempt to declare a queue with invalid replication value.""" > cluster = HaCluster(self, 1, ha_replicate="all") >
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
View Attachment As Diff
View Attachment As Raw
Actions:
View
|
Diff
Attachments on
bug 876688
: 645040