Login
[x]
Log in using an account from:
Fedora Account System
Red Hat Associate
Red Hat Customer
Or login using a Red Hat Bugzilla account
Forgot Password
Login:
Hide Forgot
Create an Account
Red Hat Bugzilla – Attachment 604098 Details for
Bug 836141
destination cluster de-sync when federation link used for a longer time
[?]
New
Simple Search
Advanced Search
My Links
Browse
Requests
Reports
Current State
Search
Tabular reports
Graphical reports
Duplicates
Other Reports
User Changes
Plotly Reports
Bug Status
Bug Severity
Non-Defaults
|
Product Dashboard
Help
Page Help!
Bug Writing Guidelines
What's new
Browser Support Policy
5.0.4.rh83 Release notes
FAQ
Guides index
User guide
Web Services
Contact
Legal
This site requires JavaScript to be enabled to function correctly, please enable it.
[patch]
Patch based on 0.10-8, does not work.
cluster-fed-link-0.10.diff (text/plain), 11.25 KB, created by
Alan Conway
on 2012-08-13 21:56:09 UTC
(
hide
)
Description:
Patch based on 0.10-8, does not work.
Filename:
MIME Type:
Creator:
Alan Conway
Created:
2012-08-13 21:56:09 UTC
Size:
11.25 KB
patch
obsolete
>89fa97d Bug 836141: Destination cluster de-sync when federation link used for a longer time > >diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h >index 6d585bf..022a9b7 100644 >--- a/qpid/cpp/src/qpid/broker/Broker.h >+++ b/qpid/cpp/src/qpid/broker/Broker.h >@@ -230,6 +230,7 @@ public: > > SessionManager& getSessionManager() { return sessionManager; } > const std::string& getFederationTag() const { return federationTag; } >+ void setFederationTag(const std::string& tag) { federationTag = tag; } > > management::ManagementObject* GetManagementObject (void) const; > management::Manageable* GetVhostObject (void) const; >diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp >index c07e63e..27caf97 100644 >--- a/qpid/cpp/src/qpid/broker/Connection.cpp >+++ b/qpid/cpp/src/qpid/broker/Connection.cpp >@@ -103,8 +103,7 @@ Connection::Connection(ConnectionOutputHandler* out_, > outboundTracker(*this) > { > outboundTracker.wrap(out); >- if (isLink) >- links.notifyConnection(mgmtId, this); >+ links.notifyConnection(mgmtId, this); > // In a cluster, allow adding the management object to be delayed. > if (!delayManagement) addManagementObject(); > if (!isShadow()) broker.getConnectionCounter().inc_connectionCount(); >diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp >index 91861ad..6f50920 100644 >--- a/qpid/cpp/src/qpid/broker/Link.cpp >+++ b/qpid/cpp/src/qpid/broker/Link.cpp >@@ -67,7 +67,8 @@ Link::Link(LinkRegistry* _links, > updateUrls(false), > channelCounter(1), > connection(0), >- agent(0) >+ agent(0), >+ passive(links->isPassive()) > { > if (parent != 0 && broker != 0) > { >@@ -107,7 +108,6 @@ void Link::setStateLH (int newState) > case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break; > case STATE_FAILED : mgmtObject->set_state("Failed"); break; > case STATE_CLOSED : mgmtObject->set_state("Closed"); break; >- case STATE_PASSIVE : mgmtObject->set_state("Passive"); break; > } > } > >@@ -117,9 +117,14 @@ void Link::startConnectionLH () > // Set the state before calling connect. It is possible that connect > // will fail synchronously and call Link::closed before returning. > setStateLH(STATE_CONNECTING); >- broker->connect (host, port, transport, >- boost::bind (&Link::closed, this, _1, _2)); >- QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port); >+ if (!passive) { >+ // Passive links don't create their own connection. >+ // In a cluster the elder creates the connection and the non-elders >+ // pick up the shadow of this connection in established(). >+ broker->connect (host, port, transport, >+ boost::bind (&Link::closed, this, _1, _2)); >+ QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port); >+ } > } catch(std::exception& e) { > setStateLH(STATE_WAITING); > if (!hideManagement()) >@@ -459,16 +464,10 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& te > return Manageable::STATUS_UNKNOWN_METHOD; > } > >-void Link::setPassive(bool passive) >+void Link::setPassive(bool p) > { > Mutex::ScopedLock mutex(lock); >- if (passive) { >- setStateLH(STATE_PASSIVE); >- } else { >- if (state == STATE_PASSIVE) { >- setStateLH(STATE_WAITING); >- } else { >- QPID_LOG(warning, "Ignoring attempt to activate non-passive link"); >- } >- } >+ if (passive && !p) // transition passive->active >+ setStateLH(STATE_WAITING); >+ passive = p; > } >diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h >index 4badd8b..d6cd4b5 100644 >--- a/qpid/cpp/src/qpid/broker/Link.h >+++ b/qpid/cpp/src/qpid/broker/Link.h >@@ -76,10 +76,11 @@ namespace qpid { > static const int STATE_OPERATIONAL = 3; > static const int STATE_FAILED = 4; > static const int STATE_CLOSED = 5; >- static const int STATE_PASSIVE = 6; > > static const uint32_t MAX_INTERVAL = 32; > >+ bool passive; >+ > void setStateLH (int newState); > void startConnectionLH(); // Start the IO Connection > void destroy(); // Called when mgmt deletes this link >diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h >index 4c97e4f..d568461 100644 >--- a/qpid/cpp/src/qpid/broker/LinkRegistry.h >+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h >@@ -142,6 +142,7 @@ namespace broker { > * Called by links failing over to new address > */ > void changeAddress(const Address& oldAddress, const Address& newAddress); >+ > /** > * Called to alter passive state. In passive state the links > * and bridges managed by a link registry will be recorded and >@@ -149,8 +150,8 @@ namespace broker { > * bridges won't therefore pull or push any messages. > */ > void setPassive(bool); >+ bool isPassive() const { return passive; } > >- > /** Iterate over each link in the registry. Used for cluster updates. */ > void eachLink(boost::function<void(boost::shared_ptr<Link>)> f); > /** Iterate over each bridge in the registry. Used for cluster updates. */ >diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp >index a75d135..cacebd2 100644 >--- a/qpid/cpp/src/qpid/cluster/Connection.cpp >+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp >@@ -695,6 +695,7 @@ void Connection::managementSetupState( > agent->setBootSequence(bootSequence); > agent->setUuid(id); > agent->setName(vendor, product, instance); >+ cluster.getBroker().setFederationTag(id.str()); > } > > void Connection::config(const std::string& encoded) { >diff --git a/qpid/cpp/src/tests/cluster_test_logs.py b/qpid/cpp/src/tests/cluster_test_logs.py >index 9f7d1e2..2cc3b6d 100755 >--- a/qpid/cpp/src/tests/cluster_test_logs.py >+++ b/qpid/cpp/src/tests/cluster_test_logs.py >@@ -63,7 +63,8 @@ def filter_log(log): > 'Running in a cluster, marking store', > 'debug Sending keepalive signal to watchdog', # Watchdog timer thread > 'last broker standing joined by 1 replicas, updating queue policies.', >- 'Connection .* timed out: closing' # heartbeat connection close >+ 'Connection .* timed out: closing', # heartbeat connection close >+ "info Connection is a federation link" # out of order with connection mgmt object. > ]) > # Regex to match a UUID > uuid='\w\w\w\w\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w\w\w\w\w\w\w\w\w' >@@ -95,14 +96,15 @@ def filter_log(log): > out.write(l) > out.close() > >-def verify_logs(): >+def verify_logs(pattern="*.log"): > """Compare log files from cluster brokers, verify that they correspond correctly.""" >- for l in glob.glob("*.log"): filter_log(l) >+ for l in glob.glob(pattern): filter_log(l) > checkpoints = set() >- for l in glob.glob("*.filter"): checkpoints = checkpoints.union(set(split_log(l))) >+ for l in glob.glob("%s.filter"%(pattern)): >+ checkpoints = checkpoints.union(set(split_log(l))) > errors=[] > for c in checkpoints: >- fragments = glob.glob("*.filter.%s"%(c)) >+ fragments = glob.glob("%s.filter.%s"%(pattern, c)) > fragments.sort(reverse=True, key=os.path.getsize) > while len(fragments) >= 2: > a = fragments.pop(0) >diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py >index da2f47f..d7a8401 100755 >--- a/qpid/cpp/src/tests/cluster_tests.py >+++ b/qpid/cpp/src/tests/cluster_tests.py >@@ -655,6 +655,84 @@ acl allow all all > > self.assert_browse(s1, "q", ["foo"]) > >+ def _verify_federation(self, src_broker, src, dst_broker, dst, timeout=30): >+ """ Prove that traffic can pass between two federated brokers. >+ """ >+ tot_time = 0 >+ active = False >+ send_session = src_broker.connect().session() >+ sender = send_session.sender(src) >+ receive_session = dst_broker.connect().session() >+ receiver = receive_session.receiver(dst) >+ while not active and tot_time < timeout: >+ sender.send(Message("Hello from Source!")) >+ try: >+ receiver.fetch(timeout = 1) >+ receive_session.acknowledge() >+ # Get this far without Empty exception, and the link is good! >+ active = True >+ while True: >+ # Keep receiving msgs, as several may have accumulated >+ receiver.fetch(timeout = 1) >+ receive_session.acknowledge() >+ except Empty: >+ if not active: >+ tot_time += 1 >+ receiver.close() >+ receive_session.close() >+ sender.close() >+ send_session.close() >+ return active >+ >+ def test_federation_bridges_consistent(self): >+ """FIXME JIRA: verify that federation bridges are constructed consistently.""" >+ # 2 node cluster source, 2 node cluster destination >+ args=["--mgmt-pub-interval=1", >+ "--log-enable=trace+:management", >+ "--log-enable=trace+:Bridge" >+ ] >+ src_cluster = self.cluster(2, args=args) >+ for b in src_cluster: b.ready(); >+ dst_cluster = self.cluster(2, args=args) >+ for b in dst_cluster: b.ready(); >+ >+ cmd = self.popen(["qpid-config", >+ "--broker", src_cluster[0].host_port(), >+ "add", "queue", "srcQ"], EXPECT_EXIT_OK) >+ cmd.wait() >+ >+ cmd = self.popen(["qpid-config", >+ "--broker", dst_cluster[0].host_port(), >+ "add", "exchange", "fanout", "destX"], EXPECT_EXIT_OK) >+ cmd.wait() >+ >+ cmd = self.popen(["qpid-config", >+ "--broker", dst_cluster[0].host_port(), >+ "add", "queue", "destQ"], EXPECT_EXIT_OK) >+ cmd.wait() >+ >+ cmd = self.popen(["qpid-config", >+ "--broker", dst_cluster[0].host_port(), >+ "bind", "destX", "destQ"], EXPECT_EXIT_OK) >+ cmd.wait() >+ >+ # federate the srcQ to the destination exchange >+ dst_cluster[0].startQmf() >+ dst_broker = dst_cluster[0].qmf_session.getObjects(_class="broker")[0] >+ result = dst_broker.connect(src_cluster[0].host(), src_cluster[0].port(), False, "PLAIN", >+ "guest", "guest", "tcp") >+ self.assertEqual(result.status, 0, result); >+ >+ link = dst_cluster[0].qmf_session.getObjects(_class="link")[0] >+ result = link.bridge(False, "srcQ", "destX", "", "", "", True, False, False, 10) >+ self.assertEqual(result.status, 0, result) >+ >+ # check that traffic passes >+ assert self._verify_federation(src_cluster[0], "srcQ", dst_cluster[0], "destQ") >+ >+ cluster_test_logs.verify_logs("*%s*.log"%(dst_cluster.name)) >+ cluster_test_logs.verify_logs("*%s*.log"%(src_cluster.name)) >+ > > class LongTests(BrokerTest): > """Tests that can run for a long time if -DDURATION=<minutes> is set"""
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
View Attachment As Diff
View Attachment As Raw
Actions:
View
|
Diff
Attachments on
bug 836141
:
603210
|
603869
|
604098
|
604397
|
606198
|
609884
|
614799