Login
[x]
Log in using an account from:
Fedora Account System
Red Hat Associate
Red Hat Customer
Or login using a Red Hat Bugzilla account
Forgot Password
Login:
Hide Forgot
Create an Account
Red Hat Bugzilla – Attachment 625107 Details for
Bug 783603
Support for asynchronous reporting of measurements
[?]
New
Simple Search
Advanced Search
My Links
Browse
Requests
Reports
Current State
Search
Tabular reports
Graphical reports
Duplicates
Other Reports
User Changes
Plotly Reports
Bug Status
Bug Severity
Non-Defaults
|
Product Dashboard
Help
Page Help!
Bug Writing Guidelines
What's new
Browser Support Policy
5.0.4.rh83 Release notes
FAQ
Guides index
User guide
Web Services
Contact
Legal
This site requires JavaScript to be enabled to function correctly, please enable it.
[patch]
patch that eventually expires obsolete or disabled metrics
0001-For-measurement-collection-runnable-add-expiration-o.patch (text/plain), 14.92 KB, created by
Elias Ross
on 2012-10-10 18:58:47 UTC
(
hide
)
Description:
patch that eventually expires obsolete or disabled metrics
Filename:
MIME Type:
Creator:
Elias Ross
Created:
2012-10-10 18:58:47 UTC
Size:
14.92 KB
patch
obsolete
>From da8648c21e50f958593b10a3df6e2aa295e7703e Mon Sep 17 00:00:00 2001 >From: Elias Ross <elias_ross@apple.com> >Date: Wed, 22 Aug 2012 17:00:53 -0700 >Subject: [PATCH] For measurement collection runnable, add expiration of old > requests > >--- > .../measurement/MeasurementCollectorRunnable.java | 87 +++++++++++--------- > .../org/rhq/core/pc/CollectorThreadPoolTest.java | 53 ++++++++---- > 2 files changed, 83 insertions(+), 57 deletions(-) > >diff --git a/modules/core/plugin-api/src/main/java/org/rhq/core/pluginapi/measurement/MeasurementCollectorRunnable.java b/modules/core/plugin-api/src/main/java/org/rhq/core/pluginapi/measurement/MeasurementCollectorRunnable.java >index bf303b9..c555b3d 100644 >--- a/modules/core/plugin-api/src/main/java/org/rhq/core/pluginapi/measurement/MeasurementCollectorRunnable.java >+++ b/modules/core/plugin-api/src/main/java/org/rhq/core/pluginapi/measurement/MeasurementCollectorRunnable.java >@@ -1,17 +1,20 @@ > package org.rhq.core.pluginapi.measurement; > >+import java.util.Date; >+import java.util.Iterator; >+import java.util.Map; >+import java.util.Map.Entry; > import java.util.Set; >-import java.util.concurrent.CopyOnWriteArraySet; >+import java.util.concurrent.ConcurrentHashMap; > import java.util.concurrent.Future; > import java.util.concurrent.FutureTask; > import java.util.concurrent.ScheduledExecutorService; > import java.util.concurrent.TimeUnit; > import java.util.concurrent.atomic.AtomicBoolean; >-import java.util.concurrent.locks.ReentrantLock; >+import java.util.concurrent.atomic.AtomicReference; > > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; >- > import org.rhq.core.domain.measurement.MeasurementReport; > import org.rhq.core.domain.measurement.MeasurementScheduleRequest; > import org.rhq.core.pluginapi.availability.AvailabilityCollectorRunnable; >@@ -82,16 +85,16 @@ > private final long initialDelay; > > /** >- * The last known measurements for the resource that this collector is monitoring. >- * You must synchronize access to this via the R/W lock. >+ * Holds a reference to the last measurements obtained from the measurement facet. >+ * This reference is reset when new measurements are obtained. > */ >- private MeasurementReport cachedReport = new MeasurementReport(); >- private ReentrantLock cachedReportLock = new ReentrantLock(); >+ private AtomicReference<MeasurementReport> cachedReportHolder = new AtomicReference<MeasurementReport>(new MeasurementReport()); > > /** >- * Accumulated report. >+ * Requested metrics and the date they were last requested. >+ * Eventually entries in this are expired. > */ >- private Set<MeasurementScheduleRequest> requestedMetrics = new CopyOnWriteArraySet<MeasurementScheduleRequest>(); >+ private Map<MeasurementScheduleRequest, Date> requestedMetrics = new ConcurrentHashMap<MeasurementScheduleRequest, Date>(); > > /** > * Just a cache of the facet toString used in log messages. We don't want to keep calling toString on the >@@ -103,7 +106,7 @@ > * Creates a collector instance that will perform measurement reporting for a particular managed resource. > * > * The interval is the time, in milliseconds, this collector will wait between reports. >- * A typically value should be something around 30 minutes, but its minium allowed value is 60 seconds. >+ * A typically value should be something around 30 minutes, but its minimum allowed value is 60 seconds. > * > * @param measured the object that is used to periodically check the managed resource (must not be <code>null</code>) > * @param interval the initial delay, in millis, before the first collection is performed. >@@ -136,7 +139,6 @@ public MeasurementCollectorRunnable(MeasurementFacet measured, long initialDelay > this.initialDelay = initialDelay; > this.interval = interval; > this.threadPool = threadPool; >- this.cachedReport = new MeasurementReport(); > this.facetId = measured.toString(); > } > >@@ -147,16 +149,13 @@ public MeasurementCollectorRunnable(MeasurementFacet measured, long initialDelay > * their {@link MeasurementFacet#getValues()} method should simply be calling this method. > */ > public void getLastValues(MeasurementReport report, Set<MeasurementScheduleRequest> metrics) throws Exception { >- requestedMetrics.addAll(metrics); >- cachedReportLock.lock(); >- try { >- // For all metrics being requested, take their cached values last collected and transfer them to the given report. >- // Note that we only remove the metrics that were being requested, leaving any cached data intact so they can >- // be retreived later when they are requested. >- report.add(cachedReport, metrics, true); >- } finally { >- cachedReportLock.unlock(); >- } >+ for (MeasurementScheduleRequest metric : metrics) >+ requestedMetrics.put(metric, new Date()); >+ // For all metrics being requested, take their cached values last collected and transfer them to the given report. >+ // Note that we only remove the metrics that were being requested, leaving any cached data intact so they can >+ // be retrieved later when they are requested. >+ MeasurementReport cachedReport = cachedReportHolder.get(); >+ report.add(cachedReport, metrics, true); > } > > /** >@@ -181,14 +180,7 @@ public void start() { > public void stop() { > started.set(false); > task.cancel(true); >- >- cachedReportLock.lock(); >- try { >- cachedReport = new MeasurementReport(); >- } finally { >- cachedReportLock.unlock(); >- } >- >+ cachedReportHolder.set(new MeasurementReport()); > requestedMetrics.clear(); > > log.debug("measurement collector stopped: " + facetId); >@@ -200,30 +192,43 @@ public void stop() { > * You should not be calling this method directly - use {@link #start()} instead. > */ > public void run() { >- log.debug("measurement collector is collecting now: " + facetId); >+ boolean debug = log.isDebugEnabled(); >+ if (debug) { >+ log.debug("measurement collector is collecting now: " + facetId); >+ } > > ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader(); > Thread.currentThread().setContextClassLoader(contextClassloader); > > try { >+ expireRequested(); > // collect the new data for all metrics previous requested in the past >- MeasurementReport newData = new MeasurementReport(); >- measured.getValues(newData, requestedMetrics); >- if (log.isDebugEnabled()) { >- log.debug("measurement collector latest data: " + newData); >+ MeasurementReport cachedReport = new MeasurementReport(); >+ if (requestedMetrics.isEmpty()) { >+ log.debug("no cached values"); >+ } else { >+ measured.getValues(cachedReport, requestedMetrics.keySet()); > } >- >- // put the new data in our cached report (lastReport) >- cachedReportLock.lock(); >- try { >- cachedReport.add(newData, null, false); >- } finally { >- cachedReportLock.unlock(); >+ if (debug) { >+ log.debug("measurement collector latest data: " + cachedReport); > } >+ cachedReportHolder.set(cachedReport); > } catch (Exception e) { > log.warn("measurement collector failed to get values", e); > } finally { > Thread.currentThread().setContextClassLoader(originalClassloader); > } > } >+ >+ private void expireRequested() { >+ long now = System.currentTimeMillis(); >+ for (Iterator<Entry<MeasurementScheduleRequest, Date>> i = requestedMetrics.entrySet().iterator(); i.hasNext(); ) { >+ Entry<MeasurementScheduleRequest, Date> me = i.next(); >+ long until = me.getValue().getTime() + me.getKey().getInterval() * 2; >+ if (now > until) { >+ log.debug("no longer requesting measurement " + me); >+ i.remove(); >+ } >+ } >+ } > } >diff --git a/modules/core/plugin-container/src/test/java/org/rhq/core/pc/CollectorThreadPoolTest.java b/modules/core/plugin-container/src/test/java/org/rhq/core/pc/CollectorThreadPoolTest.java >index 238d1ef..47b7570 100644 >--- a/modules/core/plugin-container/src/test/java/org/rhq/core/pc/CollectorThreadPoolTest.java >+++ b/modules/core/plugin-container/src/test/java/org/rhq/core/pc/CollectorThreadPoolTest.java >@@ -31,6 +31,7 @@ > import org.testng.annotations.AfterTest; > import org.testng.annotations.BeforeTest; > import org.testng.annotations.Test; >+import static org.testng.AssertJUnit.*; > > import org.rhq.core.domain.measurement.AvailabilityType; > import org.rhq.core.domain.measurement.DataType; >@@ -47,6 +48,7 @@ > @Test > public class CollectorThreadPoolTest { > >+ private static long INTERVAL = 30 * 1000; > protected final Log log = LogFactory.getLog(getClass()); > private CollectorThreadPool threadPool; > >@@ -94,11 +96,11 @@ private void log(String string) { > public void testMeasurement() throws Exception { > log("testMeasurement"); > TestMeasumentFacet component = new TestMeasumentFacet(); >- // 0L means do the initial collection immediately with no delay - so our test can run fast >+ // 0L means do the initial collection immediately with no delay - so our test can run fast > MeasurementCollectorRunnable runnable = new MeasurementCollectorRunnable(component, 0L, 500L, null, > this.threadPool.getExecutor()); > Set<MeasurementScheduleRequest> metrics = new HashSet<MeasurementScheduleRequest>(); >- metrics.add(new MeasurementScheduleRequest(0, "name", 0, true, DataType.TRAIT)); >+ metrics.add(new MeasurementScheduleRequest(0, "name", INTERVAL, true, DataType.TRAIT)); > MeasurementReport report = new MeasurementReport(); > runnable.getLastValues(report, metrics); > assert 0 == report.getCollectionTime(); >@@ -111,7 +113,7 @@ public void testMeasurement() throws Exception { > > report = new MeasurementReport(); > runnable.getLastValues(report, metrics); >- assert 42 == report.getCollectionTime(); >+ assertEquals(42, report.getCollectionTime()); > assert !report.getTraitData().isEmpty(); > > runnable.stop(); >@@ -176,7 +178,7 @@ public void testMultipleMeasurements() throws Exception { > // now we only ask for individual metrics, not all of them at once > report = new MeasurementReport(); > runnable.getLastValues(report, onlyNumericMetric); >- assert 1000 == report.getCollectionTime(); >+ assertEquals(1000L, report.getCollectionTime()); > assert report.getNumericData().size() == 1; > assert report.getTraitData().isEmpty() : "we didn't ask for the trait data"; > assert report.getCallTimeData().isEmpty() : "we didn't ask for the calltime data"; >@@ -256,7 +258,7 @@ public void testMultipleCollectionPeriods() throws Exception { > } > System.out.println("done."); > >- assert 1001L == report.getCollectionTime(); >+ assertEquals(1001L, report.getCollectionTime()); > assert report.getCallTimeData().size() == 1 : report.getCallTimeData(); > assert report.getNumericData().size() == 1 : report.getNumericData(); > nextNumeric = report.getNumericData().iterator().next(); >@@ -267,6 +269,7 @@ public void testMultipleCollectionPeriods() throws Exception { > } > > public void testLotsOfDataMeasurements() throws Exception { >+ log.info("testLotsOf"); > TestLotsOfDataMeasurementFacet component = new TestLotsOfDataMeasurementFacet(); > Set<MeasurementScheduleRequest> allMetrics = new HashSet<MeasurementScheduleRequest>(); > allMetrics.add(component.getNumericMetricSchedule()); >@@ -276,31 +279,49 @@ public void testLotsOfDataMeasurements() throws Exception { > MeasurementCollectorRunnable runnable = new MeasurementCollectorRunnable(component, 0L, 123L, null, > this.threadPool.getExecutor()); > >- runnable.getLastValues(new MeasurementReport(), allMetrics); // prime the pump so we begin collecting everything immediately >+ log.info("prime the pump so we begin collecting everything immediately"); >+ runnable.getLastValues(new MeasurementReport(), allMetrics); > runnable.start(); > Thread.sleep(1000L); > >- // this tests to make sure we can have multiple data points for a single metric >+ log.info("this tests to make sure we can have multiple data points for a single metric"); > MeasurementReport report = new MeasurementReport(); > runnable.getLastValues(report, allMetrics); >- assert 1000L == report.getCollectionTime(); >- assert report.getCallTimeData().size() == 1; >+ assertEquals(1000L, report.getCollectionTime()); >+ assertEquals(1, report.getCallTimeData().size()); > CallTimeData nextCalltime = report.getCallTimeData().iterator().next(); >- assert nextCalltime.getValues().size() == 2; >- assert report.getNumericData().size() == 3; >+ assertEquals(2, nextCalltime.getValues().size()); >+ assertEquals(3, report.getNumericData().size()); > MeasurementDataNumeric nextNumeric = report.getNumericData().iterator().next(); > assert nextNumeric.getValue() == (double) 1; > assert report.getTraitData().size() == 3; > MeasurementDataTrait nextTrait = report.getTraitData().iterator().next(); > assert nextTrait.getValue().equals("1"); > >- // make sure all the data has been flushed now >+ log.info("make sure data is not returned again"); > report = new MeasurementReport(); > runnable.getLastValues(report, allMetrics); >- assert 1000L == report.getCollectionTime(); >- assert report.getNumericData().isEmpty(); >- assert report.getTraitData().isEmpty(); >- assert report.getCallTimeData().isEmpty(); >+ assertEquals(1000L, report.getCollectionTime()); >+ assertEquals(0, report.getCallTimeData().size()); >+ assertEquals(0, report.getNumericData().size()); >+ assertEquals(0, report.getTraitData().size()); >+ >+ log.info("test expiration"); >+ for (MeasurementScheduleRequest msr : allMetrics) { >+ msr.setInterval(100); >+ } >+ // updates expiration >+ runnable.getLastValues(new MeasurementReport(), allMetrics); >+ Thread.sleep(200); >+ log.info("... expire"); >+ runnable.run(); >+ report = new MeasurementReport(); >+ log.info("... getLastValues"); >+ runnable.getLastValues(report, allMetrics); >+ assertEquals(0, report.getCollectionTime()); >+ assertEquals(0, report.getCallTimeData().size()); >+ assertEquals(0, report.getTraitData().size()); >+ assertEquals(0, report.getNumericData().size()); > } > > protected class TestAvailabilityFacet implements AvailabilityFacet { >-- >1.7.9.3 >
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
View Attachment As Diff
View Attachment As Raw
Actions:
View
|
Diff
Attachments on
bug 783603
:
566971
|
593489
|
593545
| 625107