Login
[x]
Log in using an account from:
Fedora Account System
Red Hat Associate
Red Hat Customer
Or login using a Red Hat Bugzilla account
Forgot Password
Login:
Hide Forgot
Create an Account
Red Hat Bugzilla – Attachment 892568 Details for
Bug 1093948
Traits and call time measurements stored in Cassandra, RHQ storage node
[?]
New
Simple Search
Advanced Search
My Links
Browse
Requests
Reports
Current State
Search
Tabular reports
Graphical reports
Duplicates
Other Reports
User Changes
Plotly Reports
Bug Status
Bug Severity
Non-Defaults
|
Product Dashboard
Help
Page Help!
Bug Writing Guidelines
What's new
Browser Support Policy
5.0.4.rh83 Release notes
FAQ
Guides index
User guide
Web Services
Contact
Legal
This site requires JavaScript to be enabled to function correctly, please enable it.
[patch]
Patch for master (commit c53bdea0)
0001-Support-for-metric-traits-and-call-times-in-Cassandr.patch (text/plain), 219.23 KB, created by
Elias Ross
on 2014-05-05 14:46:16 UTC
(
hide
)
Description:
Patch for master (commit c53bdea0)
Filename:
MIME Type:
Creator:
Elias Ross
Created:
2014-05-05 14:46:16 UTC
Size:
219.23 KB
patch
obsolete
>From 0d422d13349cb49f7df6751fabef0e96557792fe Mon Sep 17 00:00:00 2001 >From: Elias Ross <elias_ross@apple.com> >Date: Tue, 29 Apr 2014 12:21:58 -0700 >Subject: [PATCH] Support for metric traits and call times in Cassandra storage > >Traits are stored into one table, call times one table. > >There is a secondary index on call name in Cassandra. (Unclear if this is >really needed, but required when doing filtered queries. Unfortunately >the UI) > >TTL is used for expiry. No more need to purge this data hourly. > >Migration tooling added but not really tested. > >Things yet to do: > >Paging and custom sorting doesn't work at all. There is lots of work to >support sorting, less for paging. > >The UI shows the latest trait value and timestamp when data was last >reported, not when it changed. To display properly, need to access (the entire) >history of a schedule, but this might be too many round trips for this piece of >data. The UI can probably change to remove this, or the UI can simply retrieve >all history. > >Possible issues: > >Hard to do data paging in Cassandra. Since custom sorting isn't supported, >the entire result set needs to be retrieved and sorted in memory. (This is what >a 'real database' does in memory.) > >Cassandra does have some support for ranges on hashed fields but would need to build >infrastructure around this. PageControl could hold this token, possibly. > >Purging of duplicate history items happens weekly. The reason is that traits >are updated usually every hour to 24 hours. With a small window, there are >few duplicates. The window to look back is about 8 days. Even if a trait >doesn't change, the history table will have duplicates. > >How to support runtime change of TTL? Currently the server must be restarted >to change the TTL. Older data not changed. (Same issue exists for metrics, though.) > >TODOs: >* Data is pulled from Oracle. Can fields (display name) be duplicated in traits table? >* Documentation (of course) >--- > .../src/main/resources/schema/update/0003.xml | 30 ++ > .../org/rhq/core/domain/alert/AlertCondition.java | 34 +- > .../AlertConditionTraitCategoryComposite.java | 15 +- > .../core/domain/criteria/CallTimeDataCriteria.java | 51 +++ > .../criteria/MeasurementDataTraitCriteria.java | 58 ++- > .../criteria/MeasurementScheduleCriteria.java | 47 +++ > .../domain/measurement/MeasurementDataTrait.java | 75 +--- > .../measurement/MeasurementScheduleRequest.java | 19 +- > .../domain/measurement/calltime/CallTimeData.java | 13 +- > .../calltime/CallTimeDataComposite.java | 11 + > .../measurement/calltime/CallTimeDataKey.java | 6 - > .../measurement/calltime/CallTimeDataValue.java | 36 -- > .../org/rhq/core/domain/resource/Resource.java | 2 +- > .../java/org/rhq/core/domain/util/PageList.java | 13 +- > .../domain/measurement/test/MeasurementTest.java | 41 -- > modules/enterprise/server/data-migration/pom.xml | 5 + > .../rhq/server/metrics/migrator/DataMigrator.java | 75 +++- > .../metrics/migrator/DataMigratorRunner.java | 22 + > .../migrator/workers/AbstractMigrationWorker.java | 68 +++- > .../migrator/workers/AggregateDataMigrator.java | 43 +- > .../migrator/workers/CallTimeDataMigrator.java | 140 +++++++ > .../metrics/migrator/workers/DeleteAllData.java | 23 +- > .../workers/MetricsIndexUpdateAccumulator.java | 5 +- > .../metrics/migrator/workers/MigrationQuery.java | 13 +- > .../metrics/migrator/workers/RawDataMigrator.java | 54 +-- > .../migrator/workers/TraitDataMigrator.java | 133 ++++++ > .../test/MeasurementDataManagerTest.java | 46 +-- > .../server/scheduler/jobs/DataPurgeJobTest.java | 97 ----- > .../alert/engine/internal/AgentConditionCache.java | 22 +- > .../rhq/enterprise/server/core/StartupBean.java | 12 +- > .../measurement/CallTimeDataManagerBean.java | 449 +++++++-------------- > .../measurement/CallTimeDataManagerLocal.java | 8 - > .../measurement/MeasurementDataManagerBean.java | 304 ++++++-------- > .../measurement/MeasurementDataManagerLocal.java | 21 +- > .../MeasurementScheduleManagerLocal.java | 2 +- > .../server/resource/ResourceManagerBean.java | 21 +- > .../server/scheduler/jobs/DataPurgeJob.java | 47 --- > .../server/scheduler/jobs/TraitCleanup.java | 38 ++ > .../server/storage/StorageClientManager.java | 54 ++- > .../server/system/SystemManagerBean.java | 10 +- > .../rhq/server/metrics/CallTimeConfiguration.java | 67 +++ > .../java/org/rhq/server/metrics/CallTimeDAO.java | 236 +++++++++++ > .../java/org/rhq/server/metrics/CallTimeRow.java | 110 +++++ > .../java/org/rhq/server/metrics/MetricsDAO.java | 5 +- > .../server/metrics/StorageClientThreadFactory.java | 11 +- > .../rhq/server/metrics/StorageResultSetFuture.java | 13 + > .../java/org/rhq/server/metrics/TraitsCleanup.java | 129 ++++++ > .../rhq/server/metrics/TraitsConfiguration.java | 43 ++ > .../java/org/rhq/server/metrics/TraitsDAO.java | 257 ++++++++++++ > .../org/rhq/server/metrics/CallTimeDAOTest.java | 147 +++++++ > .../java/org/rhq/server/metrics/TraitsDAOTest.java | 175 ++++++++ > 51 files changed, 2339 insertions(+), 1017 deletions(-) > create mode 100644 modules/common/cassandra-schema/src/main/resources/schema/update/0003.xml > create mode 100644 modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/CallTimeDataMigrator.java > create mode 100644 modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/TraitDataMigrator.java > create mode 100644 modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/scheduler/jobs/TraitCleanup.java > create mode 100644 modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/CallTimeConfiguration.java > create mode 100644 modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/CallTimeDAO.java > create mode 100644 modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/CallTimeRow.java > create mode 100644 modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/TraitsCleanup.java > create mode 100644 modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/TraitsConfiguration.java > create mode 100644 modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/TraitsDAO.java > create mode 100644 modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/CallTimeDAOTest.java > create mode 100644 modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/TraitsDAOTest.java > >diff --git a/modules/common/cassandra-schema/src/main/resources/schema/update/0003.xml b/modules/common/cassandra-schema/src/main/resources/schema/update/0003.xml >new file mode 100644 >index 0000000..608e6aa >--- /dev/null >+++ b/modules/common/cassandra-schema/src/main/resources/schema/update/0003.xml >@@ -0,0 +1,30 @@ >+<updatePlan> >+ <step> >+ CREATE TABLE rhq.measurement_data_traits ( >+ schedule_id int, >+ time timestamp, >+ value text, >+ PRIMARY KEY (schedule_id, time) >+ ) WITH CLUSTERING ORDER BY (time desc) >+ AND compression={'sstable_compression': 'LZ4Compressor'}; >+ </step> >+ >+ <step> >+ CREATE TABLE rhq.calltime ( >+ schedule_id int, >+ start timeuuid, >+ end timestamp, >+ dest text, >+ min double, >+ max double, >+ total double, >+ count bigint, >+ PRIMARY KEY (schedule_id, start) >+ ) WITH CLUSTERING ORDER BY (start asc); >+ </step> >+ <step> >+ CREATE INDEX calltime_dest ON rhq.calltime(dest); >+ </step> >+ >+</updatePlan> >+ >diff --git a/modules/core/domain/src/main/java/org/rhq/core/domain/alert/AlertCondition.java b/modules/core/domain/src/main/java/org/rhq/core/domain/alert/AlertCondition.java >index a43816f..908cdc1 100644 >--- a/modules/core/domain/src/main/java/org/rhq/core/domain/alert/AlertCondition.java >+++ b/modules/core/domain/src/main/java/org/rhq/core/domain/alert/AlertCondition.java >@@ -52,7 +52,7 @@ > > /** > * An alert condition (e.g. ActiveThreads > 100) as configured in an alert definition. >- * >+ * > * @author Joseph Marques > */ > @Entity >@@ -97,7 +97,7 @@ > + " AND ms.definition = md " // > + " AND ms.resource = res " // > + " AND mb IS NOT NULL " // >- + " AND ac.category = 'BASELINE' "), // >+ + " AND ac.category = 'BASELINE' "), // > @NamedQuery(name = AlertCondition.QUERY_BY_CATEGORY_CHANGE, query = "" // > + " SELECT new org.rhq.core.domain.alert.composite.AlertConditionChangesCategoryComposite " // > + " ( " // >@@ -121,18 +121,8 @@ > + " SELECT new org.rhq.core.domain.alert.composite.AlertConditionTraitCategoryComposite " // > + " ( " // > + " ac, " // >- + " ms.id, " // >- + " (" // >- + " SELECT md.value " // >- + " FROM MeasurementDataTrait md " // >- + " WHERE md.schedule = ms " // >- + " AND md.id.timestamp = " // >- + " ( " // >- + " SELECT max(imd.id.timestamp) " // >- + " FROM MeasurementDataTrait imd " // >- + " WHERE ms.id = imd.schedule.id " // >- + " ) " // >- + " ) " // >+ + " res.id, " // >+ + " ms.id " // > + " ) " // > + " FROM AlertCondition AS ac " // > + " JOIN ac.alertDefinition ad " // >@@ -414,7 +404,7 @@ public void setCategory(AlertConditionCategory category) { > * Identifies the measurement definition of the metric that is to be compared when determining > * if the condition is true. This is null if the condition category is not a metric-related one > * (metric related categories are THRESHOLD, TRAIT, BASELINE and CHANGE; others are not). >- * >+ * > * @return measurement definition or null > */ > public MeasurementDefinition getMeasurementDefinition() { >@@ -427,7 +417,7 @@ public void setMeasurementDefinition(MeasurementDefinition measurementDefinition > > /** > * The name of the condition whose semantics are different based on this condition's category: >- * >+ * > * AVAILABILITY: The relevant Avail AlertConditionOperator name > * THRESHOLD: the name of the metric (TODO: today its the display name, very bad for i18n purposes) > * BASELINE: the name of the metric (TODO: today its the display name, very bad for i18n purposes) >@@ -441,9 +431,9 @@ public void setMeasurementDefinition(MeasurementDefinition measurementDefinition > * DRIFT: the name of the drift definition that triggered the drift detection. This is actually a > * regex that allows the user to match more than one drift definition if they so choose. > * (this value may be null, in which case it doesn't matter which drift definition were the ones >- * in which the drift was detected) >+ * in which the drift was detected) > * RANGE: the name of the metric (TODO: today its the display name, very bad for i18n purposes) >- * >+ * > * @return additional information about the condition > */ > public String getName() { >@@ -455,12 +445,12 @@ public void setName(String name) { > } > > /** >- * THRESHOLD and BASELINE: one of these comparators: "<", ">" or "=" >+ * THRESHOLD and BASELINE: one of these comparators: "<", ">" or "=" > * For calltime alert conditions (i.e. category CHANGE for calltime metric definitions), > * comparator will be one of these comparators: "HI", "LO", "CH" (where "CH" means "change"). > * RANGE: one of these comparators "<", ">" (meaning inside and outside the range respectively) > * or one of these "<=", ">=" (meaning inside and outside inclusive respectively) >- * >+ * > * Other types of conditions will return <code>null</code> (i.e. this will be > * null if the condition does not compare values). > * >@@ -479,10 +469,10 @@ public void setComparator(String comparator) { > * This is only valid for conditions of category THRESHOLD, BASELINE, RANGE and CHANGE (but > * only where CHANGE is for a calltime metric alert condition). All other > * condition types will return <code>null</code>. >- * >+ * > * Note: If RANGE condition, this threshold is always the LOW end of the range. > * The high end of the range is in {@link #getOption()}. >- * >+ * > * @return threshold value or null > */ > public Double getThreshold() { >diff --git a/modules/core/domain/src/main/java/org/rhq/core/domain/alert/composite/AlertConditionTraitCategoryComposite.java b/modules/core/domain/src/main/java/org/rhq/core/domain/alert/composite/AlertConditionTraitCategoryComposite.java >index bf05fd7..7c7b176 100644 >--- a/modules/core/domain/src/main/java/org/rhq/core/domain/alert/composite/AlertConditionTraitCategoryComposite.java >+++ b/modules/core/domain/src/main/java/org/rhq/core/domain/alert/composite/AlertConditionTraitCategoryComposite.java >@@ -30,14 +30,21 @@ > */ > public class AlertConditionTraitCategoryComposite extends AlertConditionScheduleCategoryComposite { > >- private final String value; >+ /** >+ * This is set when calculating the cache. >+ */ >+ private String value; > >- public AlertConditionTraitCategoryComposite(AlertCondition condition, Integer scheduleId, String value) { >+ public AlertConditionTraitCategoryComposite(AlertCondition condition, int scheduleId) { > super(condition, scheduleId, DataType.TRAIT); >- this.value = value; > } > > public String getValue() { > return value; > } >-} >+ >+ public void setValue(String value) { >+ this.value = value; >+ } >+ >+} >\ No newline at end of file >diff --git a/modules/core/domain/src/main/java/org/rhq/core/domain/criteria/CallTimeDataCriteria.java b/modules/core/domain/src/main/java/org/rhq/core/domain/criteria/CallTimeDataCriteria.java >index 8d0116e..4defc3d 100644 >--- a/modules/core/domain/src/main/java/org/rhq/core/domain/criteria/CallTimeDataCriteria.java >+++ b/modules/core/domain/src/main/java/org/rhq/core/domain/criteria/CallTimeDataCriteria.java >@@ -22,6 +22,8 @@ > */ > package org.rhq.core.domain.criteria; > >+import java.util.Date; >+ > import javax.xml.bind.annotation.XmlAccessType; > import javax.xml.bind.annotation.XmlAccessorType; > import javax.xml.bind.annotation.XmlRootElement; >@@ -216,4 +218,53 @@ public void addSortCount(PageOrdering sortCount) { > public boolean hasCustomizedSorting() { > return true; > } >+ >+ public Integer getFilterResourceId() { >+ return filterResourceId; >+ } >+ >+ public Integer getFilterResourceGroupId() { >+ return filterResourceGroupId; >+ } >+ >+ public Integer getFilterAutoGroupResourceTypeId() { >+ return filterAutoGroupResourceTypeId; >+ } >+ >+ public Integer getFilterAutoGroupParentResourceId() { >+ return filterAutoGroupParentResourceId; >+ } >+ >+ public DataType getFilterDataType() { >+ return filterDataType; >+ } >+ >+ public Long getFilterBeginTime() { >+ return filterBeginTime; >+ } >+ >+ public Long getFilterEndTimeDate() { >+ return filterEndTime; >+ } >+ >+ public String getFilterDestination() { >+ return filterDestination; >+ } >+ >+ public Double getFilterMinimum() { >+ return filterMinimum; >+ } >+ >+ public Double getFilterMaximum() { >+ return filterMaximum; >+ } >+ >+ public Double getFilterTotal() { >+ return filterTotal; >+ } >+ >+ public Long getFilterCount() { >+ return filterCount; >+ } >+ > } >diff --git a/modules/core/domain/src/main/java/org/rhq/core/domain/criteria/MeasurementDataTraitCriteria.java b/modules/core/domain/src/main/java/org/rhq/core/domain/criteria/MeasurementDataTraitCriteria.java >index 3abcbaf..693f351 100644 >--- a/modules/core/domain/src/main/java/org/rhq/core/domain/criteria/MeasurementDataTraitCriteria.java >+++ b/modules/core/domain/src/main/java/org/rhq/core/domain/criteria/MeasurementDataTraitCriteria.java >@@ -67,24 +67,6 @@ > private PageOrdering sortResourceName; // requires overrides > > public MeasurementDataTraitCriteria() { >- filterOverrides.put(FILTER_FIELD_SCHEDULE_ID, "id.scheduleId = ?"); >- filterOverrides.put(FILTER_FIELD_RESOURCE_ID, "schedule.resource.id = ?"); >- filterOverrides.put(FILTER_FIELD_GROUP_ID, "schedule.resource.id IN " // >- + "( SELECT res.id " // >- + " FROM Resource res " // >- + " JOIN res.explicitGroups grp " // >- + " WHERE grp.id = ? )"); >- filterOverrides.put(FILTER_FIELD_DEFINITION_ID, "schedule.definition.id = ?"); >- filterOverrides.put(FILTER_FIELD_MAX_TIMESTAMP, "id.timestamp = " // >- + "( SELECT MAX(mdt.id.timestamp) " >- + " FROM MeasurementDataTrait mdt " >- + " WHERE mdt.id.scheduleId = " + getAlias() + ".id.scheduleId ) " // >- + " AND 1 = ?"); >- filterOverrides.put(FILTER_FIELD_ENABLED, "schedule.enabled = ?"); >- >- sortOverrides.put(SORT_FIELD_TIMESTAMP, "id.timestamp"); >- sortOverrides.put(SORT_FIELD_DISPLAY_NAME, "schedule.definition.displayName"); >- sortOverrides.put(SORT_FIELD_RESOURCE_NAME, "schedule.resource.name"); > } > > @Override >@@ -127,17 +109,14 @@ public void fetchSchedule(boolean fetchSchedule) { > } > > public void addSortTimestamp(PageOrdering sortTimestamp) { >- addSortField(SORT_FIELD_TIMESTAMP); > this.sortTimestamp = sortTimestamp; > } > > public void addSortName(PageOrdering sortName) { >- addSortField(SORT_FIELD_DISPLAY_NAME); > this.sortDisplayName = sortName; > } > > public void addSortResourceName(PageOrdering sortResourceName) { >- addSortField(SORT_FIELD_RESOURCE_NAME); > this.sortResourceName = sortResourceName; > } > >@@ -155,4 +134,41 @@ public boolean isSupportsAddFilterId() { > public boolean isSupportsAddFilterIds() { > return false; > } >+ >+ Integer getFilterScheduleId() { >+ return filterScheduleId; >+ } >+ >+ Integer getFilterResourceId() { >+ return filterResourceId; >+ } >+ >+ public Integer getFilterGroupId() { >+ return filterGroupId; >+ } >+ >+ Integer getFilterDefinitionId() { >+ return filterDefinitionId; >+ } >+ >+ public boolean isFilterMaxTimestamp() { >+ return filterMaxTimestamp != null; >+ } >+ >+ Boolean getFilterEnabled() { >+ return filterEnabled; >+ } >+ >+ public PageOrdering getSortTimestamp() { >+ return sortTimestamp; >+ } >+ >+ PageOrdering getSortDisplayName() { >+ return sortDisplayName; >+ } >+ >+ PageOrdering getSortResourceName() { >+ return sortResourceName; >+ } >+ > } >diff --git a/modules/core/domain/src/main/java/org/rhq/core/domain/criteria/MeasurementScheduleCriteria.java b/modules/core/domain/src/main/java/org/rhq/core/domain/criteria/MeasurementScheduleCriteria.java >index 1980d01..d3f0ba8 100644 >--- a/modules/core/domain/src/main/java/org/rhq/core/domain/criteria/MeasurementScheduleCriteria.java >+++ b/modules/core/domain/src/main/java/org/rhq/core/domain/criteria/MeasurementScheduleCriteria.java >@@ -22,6 +22,8 @@ > */ > package org.rhq.core.domain.criteria; > >+import static java.util.Collections.singletonList; >+ > import java.util.ArrayList; > import java.util.Arrays; > import java.util.List; >@@ -29,6 +31,8 @@ > import javax.xml.bind.annotation.XmlAccessType; > import javax.xml.bind.annotation.XmlAccessorType; > >+import org.rhq.core.domain.measurement.DataType; >+import org.rhq.core.domain.measurement.DisplayType; > import org.rhq.core.domain.measurement.MeasurementSchedule; > import org.rhq.core.domain.resource.InventoryStatus; > import org.rhq.core.domain.util.PageOrdering; >@@ -51,6 +55,7 @@ > public static final String SORT_FIELD_NAME = "name"; > public static final String SORT_FIELD_DISPLAY_NAME = "displayName"; > public static final String SORT_FIELD_DATA_TYPE = "dataType"; >+ public static final String SORT_FIELD_RESOURCE_NAME = "resourceName"; > > /** > * @deprecated Sorting by this field has never been supported. This constant has been introduced in error and will >@@ -81,6 +86,8 @@ > public static final String FILTER_FIELD_RESOURCE_TYPE_ID = "resourceTypeId"; > public static final String FILTER_FIELD_AUTO_GROUP_RESOURCE_TYPE_ID = "autoGroupResourceTypeId"; > public static final String FILTER_FIELD_AUTO_GROUP_PARENT_RESOURCE_ID = "autoGroupParentResourceId"; >+ public static final String FILTER_FIELD_DISPLAY_TYPE = "displayType"; >+ public static final String FILTER_FIELD_DATA_TYPE = "dataType"; > > private Boolean filterEnabled; > private List<Integer> filterDefinitionIds; // requires overrides >@@ -90,6 +97,8 @@ > private Integer filterAutoGroupResourceTypeId; // requires overrides > private Integer filterAutoGroupParentResourceId; // requires overrides > private Integer filterResourceTypeId; // requires overrides >+ private DisplayType filterDisplayType; // requires overrides >+ private DataType filterDataType; // requires overrides > > private boolean fetchBaseline; > private boolean fetchDefinition; >@@ -98,6 +107,7 @@ > private PageOrdering sortName; // requires overrides > private PageOrdering sortDisplayName; // requires overrides > private PageOrdering sortDataType; // requires overrides >+ private PageOrdering sortResourceName; // overrides > > public MeasurementScheduleCriteria() { > filterOverrides.put(FILTER_FIELD_DEFINITION_IDS, "definition.id IN ( ? )"); >@@ -119,11 +129,14 @@ public MeasurementScheduleCriteria() { > + " JOIN res.parentResource parent " // > + " WHERE parent.id = ? )"); > filterOverrides.put(FILTER_FIELD_RESOURCE_TYPE_ID, "resource.type.id = ?"); >+ filterOverrides.put(FILTER_FIELD_DISPLAY_TYPE, "definition.displayType = ?"); >+ filterOverrides.put(FILTER_FIELD_DATA_TYPE, "definition.dataType = ?"); > > sortOverrides.put(SORT_FIELD_DEFINITION_ID, "definition.id"); > sortOverrides.put(SORT_FIELD_NAME, "definition.name"); > sortOverrides.put(SORT_FIELD_DISPLAY_NAME, "definition.displayName"); > sortOverrides.put(SORT_FIELD_DATA_TYPE, "definition.dataType"); >+ sortOverrides.put(SORT_FIELD_RESOURCE_NAME, "resource.name"); > > // by default, we want to only return schedules for committed resources > ArrayList<InventoryStatus> defaults = new ArrayList<InventoryStatus>(1); >@@ -131,6 +144,32 @@ public MeasurementScheduleCriteria() { > addFilterResourceInventoryStatuses(defaults); > } > >+ public MeasurementScheduleCriteria(MeasurementDataTraitCriteria criteria) { >+ this(); >+ Integer def = criteria.getFilterDefinitionId(); >+ if (def != null) { >+ filterDefinitionIds = singletonList(def); >+ } >+ filterEnabled = criteria.getFilterEnabled(); >+ filterResourceGroupId = criteria.getFilterGroupId(); >+ filterResourceId = criteria.getFilterResourceId(); >+ // max timestamp filtering done by the bean >+ filterId = criteria.getFilterScheduleId(); >+ sortDisplayName = criteria.getSortDisplayName(); >+ sortResourceName = criteria.getSortResourceName(); >+ // this is done by the bean >+ // criteria.getSortTimestamp(); >+ } >+ >+ public MeasurementScheduleCriteria(CallTimeDataCriteria criteria) { >+ this(); >+ filterResourceGroupId = criteria.getFilterResourceGroupId(); >+ filterResourceId = criteria.getFilterResourceId(); >+ filterAutoGroupParentResourceId = criteria.getFilterAutoGroupParentResourceId(); >+ filterAutoGroupResourceTypeId = criteria.getFilterAutoGroupResourceTypeId(); >+ filterDataType = criteria.getFilterDataType(); >+ } >+ > @Override > public Class<MeasurementSchedule> getPersistentClass() { > return MeasurementSchedule.class; >@@ -168,6 +207,14 @@ public void addFilterResourceTypeId(Integer filterResourceTypeId) { > this.filterResourceTypeId = filterResourceTypeId; > } > >+ public void addFilterDisplayType(DisplayType filterDisplayType) { >+ this.filterDisplayType = filterDisplayType; >+ } >+ >+ public void addFilterDataType(DataType filterDataType) { >+ this.filterDataType = filterDataType; >+ } >+ > public void fetchBaseline(boolean fetchBaseline) { > this.fetchBaseline = fetchBaseline; > } >diff --git a/modules/core/domain/src/main/java/org/rhq/core/domain/measurement/MeasurementDataTrait.java b/modules/core/domain/src/main/java/org/rhq/core/domain/measurement/MeasurementDataTrait.java >index b8e4757..d8aca62 100644 >--- a/modules/core/domain/src/main/java/org/rhq/core/domain/measurement/MeasurementDataTrait.java >+++ b/modules/core/domain/src/main/java/org/rhq/core/domain/measurement/MeasurementDataTrait.java >@@ -23,80 +23,12 @@ > package org.rhq.core.domain.measurement; > > import javax.persistence.Entity; >-import javax.persistence.NamedQueries; >-import javax.persistence.NamedQuery; > import javax.persistence.Table; > > @Entity >-@NamedQueries( { >- @NamedQuery(name = MeasurementDataTrait.FIND_CURRENT_FOR_RESOURCE_AND_DISPLAY_TYPE, query = "SELECT trait, d.displayName " >- + "FROM MeasurementDataTrait trait JOIN trait.schedule s join s.definition d JOIN s.resource r" >- + " WHERE r.id = :resourceId " >- + " AND trait.id.timestamp = " >- + " (SELECT max(mdt.id.timestamp) " >- + " FROM MeasurementDataTrait mdt " >- + " WHERE s.id = mdt.schedule.id " >- + " ) " >- + " AND d.displayType = :displayType "), >- @NamedQuery(name = MeasurementDataTrait.FIND_CURRENT_FOR_RESOURCE, query = "SELECT trait, d.displayName " >- + "FROM MeasurementDataTrait trait JOIN trait.schedule s join s.definition d JOIN s.resource r" >- + " WHERE r.id = :resourceId " + " AND trait.id.timestamp = " + " (SELECT max(mdt.id.timestamp) " >- + " FROM MeasurementDataTrait mdt " + " WHERE s.id = mdt.schedule.id) "), >- @NamedQuery(name = MeasurementDataTrait.FIND_CURRENT_FOR_SCHEDULES, query = "SELECT mdt, d.displayName FROM MeasurementDataTrait mdt JOIN mdt.schedule s JOIN s.definition d " >- + "WHERE mdt.id.scheduleId IN ( :scheduleIds ) " >- + " AND mdt.id.timestamp = " >- + " ( SELECT MAX(x.id.timestamp) " >- + " FROM MeasurementDataTrait x " >- + " WHERE x.id.scheduleId = mdt.id.scheduleId" + " ) "), >- @NamedQuery(name = MeasurementDataTrait.FIND_ALL_FOR_RESOURCE_AND_DEFINITION, query = "SELECT trait, d.displayName " >- + "FROM MeasurementDataTrait trait JOIN trait.schedule s JOIN s.definition d JOIN s.resource r " >- + " WHERE r.id = :resourceId " + " AND d.id = :definitionId " + "ORDER BY trait.id.timestamp DESC "), >- @NamedQuery(name = MeasurementDataTrait.QUERY_DELETE_BY_RESOURCES, query = "DELETE MeasurementDataTrait t WHERE t.schedule IN ( SELECT ms FROM MeasurementSchedule ms WHERE ms.resource.id IN ( :resourceIds ) )") }) > @Table(name = "RHQ_MEASUREMENT_DATA_TRAIT") > public class MeasurementDataTrait extends MeasurementData { > >- /** >- * Find current traits for a Resource in :resourceId that have a certain displayType in :displayType >- */ >- public static final String FIND_CURRENT_FOR_RESOURCE_AND_DISPLAY_TYPE = "MeasurementDataTrait.FindCurrentForResourceAndDislayType"; >- >- /** >- * Find all current traits for a Resource in :resourceId >- */ >- public static final String FIND_CURRENT_FOR_RESOURCE = "MeasurementDataTrait.FindCurrentForResource"; >- >- /** >- * Find the current traits for the schedule ids passed in :scheduleIds >- */ >- public static final String FIND_CURRENT_FOR_SCHEDULES = "MeasurementDataTrait.FIND_CURRENT_FOR_SCHEDULES"; >- >- /** >- * Find all trait data for the provided resource id and definition id resource id is first parameter definition id >- * is second parameter >- */ >- public static final String FIND_ALL_FOR_RESOURCE_AND_DEFINITION = "MeasurementDataTrait.FIND_ALL_FOR_RESOURCE_AND_DEFINITION"; >- >- public static final String QUERY_DELETE_BY_RESOURCES = "MeasurementDataTrait.deleteByResources"; >- >- /* >- * NOTE: Avoid using the AS keyword in the FROM clauses in this query, because Oracle barfs on it >- * (see http://download.oracle.com/docs/cd/B19306_01/server.102/b14200/ap_standard_sql003.htm, subfeature id >- * E051-08). >- */ >- public static final String NATIVE_QUERY_PURGE = "" // >- + "DELETE FROM rhq_measurement_data_trait " // SQL Server doesn't like aliases, use full table name >- + "WHERE EXISTS " // rewritten as exists because H2 doesn't support multi-column conditions >- + " (SELECT t2.schedule_id, t2.time_stamp " // >- + " FROM rhq_measurement_data_trait t2, " // >- + " (SELECT max(t4.time_stamp) as mx, t4.schedule_id as schedule_id " // >- + " FROM rhq_measurement_data_trait t4 " // >- + " WHERE t4.time_stamp < ? " // >- + " GROUP BY t4.schedule_id) t3 " // >- + " WHERE t2.schedule_id = t3.schedule_id " // >- + " AND t2.time_stamp < t3.mx " // >- + " AND rhq_measurement_data_trait.time_stamp = t2.time_stamp " // rewrote multi-column conditions as additional >- + " AND rhq_measurement_data_trait.schedule_id = t2.schedule_id) "; // correlated restrictions to the delete table >- > private static final long serialVersionUID = 1L; > > private String value; >@@ -139,6 +71,13 @@ protected MeasurementDataTrait() { > /* JPA use only */ > } > >+ /** >+ * Sets the schedule this trait is derived from. >+ */ >+ public void setSchedule(MeasurementSchedule schedule) { >+ this.schedule = schedule; >+ } >+ > @Override > public String getValue() { > return value; >diff --git a/modules/core/domain/src/main/java/org/rhq/core/domain/measurement/MeasurementScheduleRequest.java b/modules/core/domain/src/main/java/org/rhq/core/domain/measurement/MeasurementScheduleRequest.java >index 795a400..764bd79 100644 >--- a/modules/core/domain/src/main/java/org/rhq/core/domain/measurement/MeasurementScheduleRequest.java >+++ b/modules/core/domain/src/main/java/org/rhq/core/domain/measurement/MeasurementScheduleRequest.java >@@ -40,7 +40,13 @@ > */ > public static final int NO_SCHEDULE_ID = 1; > >+ /** >+ * Indicates that no resource ID was set. >+ */ >+ public static final int NO_RESOURCE_ID = -1; >+ > private int scheduleId; >+ > private String name; > private int interval; > private boolean enabled; >@@ -48,7 +54,7 @@ > byte dataNumType; > > public MeasurementScheduleRequest(MeasurementSchedule schedule) { >- this(schedule.getId(), schedule.getDefinition().getName(), schedule.getInterval(), schedule.isEnabled(), >+ this(schedule.getId(), schedule.getResource().getId(), schedule.getDefinition().getName(), schedule.getInterval(), schedule.isEnabled(), > schedule.getDefinition().getDataType(), schedule.getDefinition().getRawNumericType()); > } > >@@ -63,11 +69,17 @@ public MeasurementScheduleRequest(MeasurementScheduleRequest scheduleRequest) { > > public MeasurementScheduleRequest(int scheduleId, String name, long interval, boolean enabled, DataType dataType, > NumericType rawNumericType) { >+ this(scheduleId, NO_RESOURCE_ID, name, interval, enabled, dataType, rawNumericType); >+ } >+ >+ public MeasurementScheduleRequest(int scheduleId, int resourceId, String name, long interval, boolean enabled, DataType dataType, >+ NumericType rawNumericType) { > this.scheduleId = scheduleId; > if (name != null) { > this.name = name.intern(); >- } else >+ } else { > this.name = null; >+ } > this.interval = (int) (interval / 1000); > this.enabled = enabled; > this.dataNumType = toDataNumType(dataType, rawNumericType); >@@ -187,4 +199,5 @@ private byte toDataNumType(DataType dataType, NumericType numericType) { > byte nTmp = (byte) (numericType != null ? numericType.ordinal() + 1 : 0); > return (byte) (dTmp * 16 + nTmp); > } >-} >\ No newline at end of file >+ >+} >diff --git a/modules/core/domain/src/main/java/org/rhq/core/domain/measurement/calltime/CallTimeData.java b/modules/core/domain/src/main/java/org/rhq/core/domain/measurement/calltime/CallTimeData.java >index d53627d..4fce501 100644 >--- a/modules/core/domain/src/main/java/org/rhq/core/domain/measurement/calltime/CallTimeData.java >+++ b/modules/core/domain/src/main/java/org/rhq/core/domain/measurement/calltime/CallTimeData.java >@@ -29,7 +29,6 @@ > import java.util.Map; > > import org.jetbrains.annotations.NotNull; >- > import org.rhq.core.domain.measurement.MeasurementScheduleRequest; > > /** >@@ -43,6 +42,7 @@ > private static final long serialVersionUID = 1L; > > private int scheduleId; >+ > private Map<String, CallTimeDataValue> values = new HashMap<String, CallTimeDataValue>(); > > /** >@@ -51,7 +51,14 @@ > * @param schedule the schedule for which this data was collected > */ > public CallTimeData(MeasurementScheduleRequest schedule) { >- this.scheduleId = schedule.getScheduleId(); >+ this(schedule.getScheduleId()); >+ } >+ >+ /** >+ * Constructs a new <code>CallTimeData</code>. >+ */ >+ public CallTimeData(int scheduleId) { >+ this.scheduleId = scheduleId; > } > > /** >@@ -152,4 +159,4 @@ public int hashCode() { > return result; > } > >-} >\ No newline at end of file >+} >diff --git a/modules/core/domain/src/main/java/org/rhq/core/domain/measurement/calltime/CallTimeDataComposite.java b/modules/core/domain/src/main/java/org/rhq/core/domain/measurement/calltime/CallTimeDataComposite.java >index 37909ca..18f6c10 100644 >--- a/modules/core/domain/src/main/java/org/rhq/core/domain/measurement/calltime/CallTimeDataComposite.java >+++ b/modules/core/domain/src/main/java/org/rhq/core/domain/measurement/calltime/CallTimeDataComposite.java >@@ -47,6 +47,17 @@ > private CallTimeDataComposite() { > } > >+ public CallTimeDataComposite(String callDestination, double minimum, >+ double maximum, double total, long count, double average) >+ { >+ this.callDestination = callDestination; >+ this.minimum = minimum; >+ this.maximum = maximum; >+ this.total = total; >+ this.count = count; >+ this.average = average; >+ } >+ > public CallTimeDataComposite(@NotNull String callDestination, @NotNull Number minimum, @NotNull Number maximum, > @NotNull Number total, @NotNull Number count, @NotNull Number average) { > this.callDestination = callDestination; >diff --git a/modules/core/domain/src/main/java/org/rhq/core/domain/measurement/calltime/CallTimeDataKey.java b/modules/core/domain/src/main/java/org/rhq/core/domain/measurement/calltime/CallTimeDataKey.java >index 3ee9adc..c1319d5 100644 >--- a/modules/core/domain/src/main/java/org/rhq/core/domain/measurement/calltime/CallTimeDataKey.java >+++ b/modules/core/domain/src/main/java/org/rhq/core/domain/measurement/calltime/CallTimeDataKey.java >@@ -32,14 +32,10 @@ > import javax.persistence.Id; > import javax.persistence.JoinColumn; > import javax.persistence.ManyToOne; >-import javax.persistence.NamedQueries; >-import javax.persistence.NamedQuery; >-import javax.persistence.SequenceGenerator; > import javax.persistence.Table; > > import org.jetbrains.annotations.NotNull; > import org.jetbrains.annotations.Nullable; >- > import org.rhq.core.domain.measurement.MeasurementSchedule; > > /** >@@ -49,8 +45,6 @@ > * @author Ian Springer > */ > @Entity >-@NamedQueries( { @NamedQuery(name = CallTimeDataKey.QUERY_DELETE_BY_RESOURCES, query = "DELETE CallTimeDataKey ctdk WHERE ctdk.schedule IN ( SELECT ms FROM MeasurementSchedule ms WHERE ms.resource.id IN ( :resourceIds ) )") }) >-@SequenceGenerator(allocationSize = org.rhq.core.domain.util.Constants.ALLOCATION_SIZE, name = "RHQ_CALLTIME_DATA_KEY_ID_SEQ", sequenceName = "RHQ_CALLTIME_DATA_KEY_ID_SEQ") > @Table(name = "RHQ_CALLTIME_DATA_KEY") > public class CallTimeDataKey implements Serializable { > private static final long serialVersionUID = 1L; >diff --git a/modules/core/domain/src/main/java/org/rhq/core/domain/measurement/calltime/CallTimeDataValue.java b/modules/core/domain/src/main/java/org/rhq/core/domain/measurement/calltime/CallTimeDataValue.java >index 47779a2..1e23b1f 100644 >--- a/modules/core/domain/src/main/java/org/rhq/core/domain/measurement/calltime/CallTimeDataValue.java >+++ b/modules/core/domain/src/main/java/org/rhq/core/domain/measurement/calltime/CallTimeDataValue.java >@@ -32,9 +32,6 @@ > import javax.persistence.Id; > import javax.persistence.JoinColumn; > import javax.persistence.ManyToOne; >-import javax.persistence.NamedQueries; >-import javax.persistence.NamedQuery; >-import javax.persistence.SequenceGenerator; > import javax.persistence.Table; > > import org.jetbrains.annotations.Nullable; >@@ -45,39 +42,6 @@ > * @author Ian Springer > */ > @Entity >-@NamedQueries( { >- // NOTE: This query only includes data chunks that are fully within the specified time interval, because it would >- // not be possible to extrapolate the stats only for the overlapping portion of partially overlapping chunks. >- @NamedQuery(name = CallTimeDataValue.QUERY_FIND_COMPOSITES_FOR_RESOURCE, query = "SELECT new org.rhq.core.domain.measurement.calltime.CallTimeDataComposite(" >- + "key.callDestination, " >- + "MIN(value.minimum), " >- + "MAX(value.maximum), " >- + "SUM(value.total), " >- + "SUM(value.count), " >- + "SUM(value.total) / SUM(value.count)) " >- + "FROM CallTimeDataValue value " >- + "JOIN value.key key " >- + "WHERE key.schedule.id = :scheduleId " >- + "AND value.count != 0 " >- + "AND value.minimum != -1 " >- + "AND value.beginTime >= :beginTime " >- + "AND value.endTime <= :endTime " >- + "GROUP BY key.callDestination "), >- @NamedQuery(name = CallTimeDataValue.QUERY_FIND_RAW_FOR_RESOURCE, query = "SELECT new org.rhq.core.domain.measurement.calltime.CallTimeDataComposite(" >- + "key.callDestination, " >- + "value.minimum, " >- + "value.maximum, " >- + "value.total, " >- + "value.count, " >- + "value.total / value.count) " >- + "FROM CallTimeDataValue value " >- + "WHERE key.schedule.id = :scheduleId " >- + "AND value.count != 0 " >- + "AND value.minimum != -1 " >- + "AND value.beginTime >= :beginTime " >- + "AND value.endTime <= :endTime "), >- @NamedQuery(name = CallTimeDataValue.QUERY_DELETE_BY_RESOURCES, query = "DELETE CallTimeDataValue ctdv WHERE ctdv.key IN ( SELECT ctdk.id FROM CallTimeDataKey ctdk WHERE ctdk.schedule.resource.id IN ( :resourceIds ) )") }) >-@SequenceGenerator(allocationSize = org.rhq.core.domain.util.Constants.ALLOCATION_SIZE, name = "RHQ_CALLTIME_DATA_VALUE_ID_SEQ", sequenceName = "RHQ_CALLTIME_DATA_VALUE_ID_SEQ") > @Table(name = "RHQ_CALLTIME_DATA_VALUE") > public class CallTimeDataValue implements Serializable { > private static final long serialVersionUID = 1L; >diff --git a/modules/core/domain/src/main/java/org/rhq/core/domain/resource/Resource.java b/modules/core/domain/src/main/java/org/rhq/core/domain/resource/Resource.java >index 4ae23dc..93dd7f8 100644 >--- a/modules/core/domain/src/main/java/org/rhq/core/domain/resource/Resource.java >+++ b/modules/core/domain/src/main/java/org/rhq/core/domain/resource/Resource.java >@@ -1174,7 +1174,7 @@ > private Set<DriftDefinition> driftDefinitions = null; > > @Transient >- private final boolean customChildResourcesCollection; >+ private boolean customChildResourcesCollection; > > public Resource() { > customChildResourcesCollection = false; >diff --git a/modules/core/domain/src/main/java/org/rhq/core/domain/util/PageList.java b/modules/core/domain/src/main/java/org/rhq/core/domain/util/PageList.java >index 3425f21..ee0df62 100644 >--- a/modules/core/domain/src/main/java/org/rhq/core/domain/util/PageList.java >+++ b/modules/core/domain/src/main/java/org/rhq/core/domain/util/PageList.java >@@ -38,7 +38,7 @@ > > private static final long serialVersionUID = 1L; > >- private int totalSize = 0; >+ private int totalSize; > private boolean isUnbounded; // Is the total size of the list known? > private PageControl pageControl; > >@@ -94,7 +94,7 @@ public PageList(int totalSize, PageControl pageControl) { > */ > public PageList(Collection<? extends E> collection, PageControl pageControl) { > super(collection); >- isUnbounded = true; >+ this.isUnbounded = true; > this.pageControl = pageControl; > } > >@@ -124,6 +124,15 @@ public PageList(Collection<? extends E> collection, int totalSize, PageControl p > this.pageControl = pageControl; > } > >+ /** >+ * Pages the given collection, using the supplied page list data from an >+ * existing wrapped collection. >+ */ >+ public PageList(Collection<? extends E> collection, PageList<?> results) { >+ this(collection, results.getTotalSize(), results.getPageControl()); >+ this.isUnbounded = results.isUnbounded; >+ } >+ > public PageControl getPageControl() { > return pageControl; > } >diff --git a/modules/core/domain/src/test/java/org/rhq/core/domain/measurement/test/MeasurementTest.java b/modules/core/domain/src/test/java/org/rhq/core/domain/measurement/test/MeasurementTest.java >index a13f0c5..8ed69c9 100644 >--- a/modules/core/domain/src/test/java/org/rhq/core/domain/measurement/test/MeasurementTest.java >+++ b/modules/core/domain/src/test/java/org/rhq/core/domain/measurement/test/MeasurementTest.java >@@ -303,47 +303,6 @@ public void testAddAvailability() throws Exception { > } > } > >- @SuppressWarnings("unchecked") >- @Test(groups = "integration.ejb3") >- public void testTraitQuery() throws Exception { >- System.out.println("testTraitQuery ..."); >- getTransactionManager().begin(); >- EntityManager em = getEntityManager(); >- try { >- MeasurementDefinition def = setupTables(em); >- MeasurementSchedule sched = def.getSchedules().get(0); >- Resource resource = sched.getResource(); >- int resourceId = resource.getId(); >- >- MeasurementDataPK pk = new MeasurementDataPK(sched.getId()); >- MeasurementDataTrait mdt = new MeasurementDataTrait(pk, "Hello World"); >- em.persist(mdt); >- em.flush(); >- >- Query q = em.createNamedQuery(MeasurementDataTrait.FIND_CURRENT_FOR_RESOURCE); >- q.setParameter("resourceId", resourceId); >- List<Object[]> res = q.getResultList(); >- System.out.println("testTraitQuery: found " + res.size() + " item(s)"); >- if (res.size() > 0) { >- MeasurementDataTrait foo = (MeasurementDataTrait) res.get(0)[0]; >- String name = (String) res.get(0)[1]; >- System.out.println(" and it is " + foo.toString() + " and name " + name); >- } >- >- Query q2 = em.createNamedQuery(MeasurementDataTrait.FIND_CURRENT_FOR_SCHEDULES); >- List<Integer> ids = new ArrayList<Integer>(); >- ids.add(1); >- ids.add(2); >- ids.add(3); >- ids.add(sched.getId()); >- q2.setParameter("scheduleIds", ids); >- List<MeasurementDataTrait> traits = q.getResultList(); >- assert traits.size() >= 1; // at least the one for the schedule above should be found >- } finally { >- getTransactionManager().rollback(); >- } >- } >- > /** > * Setup some entities to check query against them etc. > * >diff --git a/modules/enterprise/server/data-migration/pom.xml b/modules/enterprise/server/data-migration/pom.xml >index 59a44d4..8c5b9fc 100644 >--- a/modules/enterprise/server/data-migration/pom.xml >+++ b/modules/enterprise/server/data-migration/pom.xml >@@ -149,6 +149,11 @@ > <artifactId>rhq-server-metrics</artifactId> > <version>${project.version}</version> > </artifactItem> >+ <dependency> >+ <groupId>${project.groupId}</groupId> >+ <artifactId>rhq-core-domain</artifactId> >+ <version>${project.version}</version> >+ </dependency> > <artifactItem> > <groupId>${project.groupId}</groupId> > <artifactId>rhq-core-util</artifactId> >diff --git a/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/DataMigrator.java b/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/DataMigrator.java >index 9578e73..792a4f2 100644 >--- a/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/DataMigrator.java >+++ b/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/DataMigrator.java >@@ -22,16 +22,19 @@ > > import javax.persistence.EntityManager; > >-import com.datastax.driver.core.Session; >- > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; >- >+import org.rhq.server.metrics.CallTimeDAO; >+import org.rhq.server.metrics.TraitsDAO; > import org.rhq.server.metrics.domain.MetricsTable; > import org.rhq.server.metrics.migrator.workers.AggregateDataMigrator; >+import org.rhq.server.metrics.migrator.workers.CallTimeDataMigrator; > import org.rhq.server.metrics.migrator.workers.CallableMigrationWorker; > import org.rhq.server.metrics.migrator.workers.DeleteAllData; > import org.rhq.server.metrics.migrator.workers.RawDataMigrator; >+import org.rhq.server.metrics.migrator.workers.TraitDataMigrator; >+ >+import com.datastax.driver.core.Session; > > > /** >@@ -75,6 +78,8 @@ public DataMigrator(EntityManager entityManager, Session session, DatabaseType d > config.setRun1HAggregateDataMigration(true); > config.setRun6HAggregateDataMigration(true); > config.setRun1DAggregateDataMigration(true); >+ config.setRunCallTimeMigration(true); >+ config.setRunTraitMigration(true); > } > > public void runRawDataMigration(boolean value) { >@@ -93,6 +98,14 @@ public void run1DAggregateDataMigration(boolean value) { > config.setRun1DAggregateDataMigration(value); > } > >+ public void runTraitMigration(boolean value) { >+ config.setRunTraitMigration(value); >+ } >+ >+ public void runCallTimeMigration(boolean value) { >+ config.setRunCallTimeMigration(value); >+ } >+ > public void deleteDataImmediatelyAfterMigration() { > config.setDeleteDataImmediatelyAfterMigration(true); > config.setDeleteAllDataAtEndOfMigration(false); >@@ -126,6 +139,14 @@ public long estimate() throws Exception { > retryOnFailure(new AggregateDataMigrator(MetricsTable.TWENTY_FOUR_HOUR, config), Task.Estimate); > } > >+ if (config.isRunTraitMigration()) { >+ retryOnFailure(new TraitDataMigrator(TraitsDAO.TABLE, config.getTraitTTLDays(), config), Task.Estimate); >+ } >+ >+ if (config.isRunCallTimeMigration()) { >+ retryOnFailure(new CallTimeDataMigrator(CallTimeDAO.TABLE, config.getCallTimeDays(), config), Task.Estimate); >+ } >+ > if (config.isDeleteAllDataAtEndOfMigration()) { > retryOnFailure(new DeleteAllData(config), Task.Estimate); > } >@@ -152,6 +173,14 @@ public void migrateData() throws Exception { > retryOnFailure(new AggregateDataMigrator(MetricsTable.TWENTY_FOUR_HOUR, config), Task.Migrate); > } > >+ if (config.isRunTraitMigration()) { >+ retryOnFailure(new TraitDataMigrator(TraitsDAO.TABLE, config.getTraitTTLDays(), config), Task.Migrate); >+ } >+ >+ if (config.isRunCallTimeMigration()) { >+ retryOnFailure(new CallTimeDataMigrator(CallTimeDAO.TABLE, config.getCallTimeDays(), config), Task.Migrate); >+ } >+ > if (config.isDeleteAllDataAtEndOfMigration()) { > retryOnFailure(new DeleteAllData(config), Task.Migrate); > } >@@ -226,6 +255,8 @@ public Exception getException() { > private final DatabaseType databaseType; > private final boolean experimentalDataSource; > >+ int TTL_DEFAULT = 180; >+ > private boolean deleteDataImmediatelyAfterMigration; > private boolean deleteAllDataAtEndOfMigration; > >@@ -233,6 +264,10 @@ public Exception getException() { > private boolean run1HAggregateDataMigration; > private boolean run6HAggregateDataMigration; > private boolean run1DAggregateDataMigration; >+ private boolean runTraitMigration = true; >+ private boolean runCallTimeMigration = true; >+ private int traitTTLDays = TTL_DEFAULT; >+ private int callTimeDays = TTL_DEFAULT; > > public DataMigratorConfiguration(EntityManager entityManager, Session session, DatabaseType databaseType, > boolean experimentalDataSource) { >@@ -290,6 +325,38 @@ private void setRun1DAggregateDataMigration(boolean run1dAggregateDataMigration) > run1DAggregateDataMigration = run1dAggregateDataMigration; > } > >+ public boolean isRunTraitMigration() { >+ return runTraitMigration; >+ } >+ >+ public void setRunTraitMigration(boolean runTraitMigration) { >+ this.runTraitMigration = runTraitMigration; >+ } >+ >+ public boolean isRunCallTimeMigration() { >+ return runCallTimeMigration; >+ } >+ >+ public void setRunCallTimeMigration(boolean runCallTimeMigration) { >+ this.runCallTimeMigration = runCallTimeMigration; >+ } >+ >+ public int getTraitTTLDays() { >+ return traitTTLDays; >+ } >+ >+ public void setTraitTTLDays(int traitTTLDays) { >+ this.traitTTLDays = traitTTLDays; >+ } >+ >+ public int getCallTimeDays() { >+ return callTimeDays; >+ } >+ >+ public void setCallTimeDays(int callTimeDays) { >+ this.callTimeDays = callTimeDays; >+ } >+ > public EntityManager getEntityManager() { > return entityManager; > } >@@ -305,5 +372,7 @@ public DatabaseType getDatabaseType() { > public boolean isExperimentalDataSource() { > return experimentalDataSource; > } >+ > } >+ > } >diff --git a/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/DataMigratorRunner.java b/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/DataMigratorRunner.java >index c064971..857c0eb 100644 >--- a/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/DataMigratorRunner.java >+++ b/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/DataMigratorRunner.java >@@ -127,6 +127,12 @@ > private final Option disable1DOption = OptionBuilder.withLongOpt("disable-1d-migration").hasOptionalArg() > .withType(Boolean.class) > .withDescription("Disable 24 hours aggregates table migration (default: false)").create(); >+ private final Option disableTraitOption = OptionBuilder.withLongOpt("disable-trait-migration").hasOptionalArg() >+ .withType(Boolean.class) >+ .withDescription("Disable rhq_measurement_data_trait table migration (default: false)").create(); >+ private final Option disableCallTimeOption = OptionBuilder.withLongOpt("disable-call-time-migration").hasOptionalArg() >+ .withType(Boolean.class) >+ .withDescription("Disable rhq_call_time table migration (default: false)").create(); > private final Option deleteDataOption = OptionBuilder.withLongOpt("delete-data").hasOptionalArg() > .withType(Boolean.class) > .withDescription("Delete SQL data at the end of migration (default: false)").create(); >@@ -243,6 +249,8 @@ private void configure(String args[]) throws Exception { > options.addOption(disable1HOption); > options.addOption(disable6HOption); > options.addOption(disable1DOption); >+ options.addOption(disableTraitOption); >+ options.addOption(disableCallTimeOption); > options.addOption(deleteDataOption); > options.addOption(estimateOnlyOption); > options.addOption(deleteOnlyOption); >@@ -325,6 +333,8 @@ private void loadDefaultConfiguration() throws Exception { > configuration.put(disable1HOption, false); > configuration.put(disable6HOption, false); > configuration.put(disable1DOption, false); >+ configuration.put(disableTraitOption, false); >+ configuration.put(disableCallTimeOption, false); > configuration.put(estimateOnlyOption, false); > configuration.put(deleteDataOption, false); > configuration.put(deleteOnlyOption, false); >@@ -534,6 +544,16 @@ private void parseMigrationOptions(CommandLine commandLine) { > configuration.put(disable1DOption, value); > } > >+ if (commandLine.hasOption(disableCallTimeOption.getLongOpt())) { >+ value = tryParseBoolean(commandLine.getOptionValue(disableCallTimeOption.getLongOpt()), true); >+ configuration.put(disableCallTimeOption, value); >+ } >+ >+ if (commandLine.hasOption(disableTraitOption.getLongOpt())) { >+ value = tryParseBoolean(commandLine.getOptionValue(disableTraitOption.getLongOpt()), true); >+ configuration.put(disableTraitOption, value); >+ } >+ > if (commandLine.hasOption(deleteDataOption.getLongOpt())) { > value = tryParseBoolean(commandLine.getOptionValue(deleteDataOption.getLongOpt()), true); > configuration.put(deleteDataOption, value); >@@ -579,6 +599,8 @@ private void run() throws Exception { > migrator.run1HAggregateDataMigration(!(Boolean) configuration.get(disable1HOption)); > migrator.run6HAggregateDataMigration(!(Boolean) configuration.get(disable6HOption)); > migrator.run1DAggregateDataMigration(!(Boolean) configuration.get(disable1DOption)); >+ migrator.runTraitMigration(!(Boolean) configuration.get(disableTraitOption)); >+ migrator.runCallTimeMigration(!(Boolean) configuration.get(disableCallTimeOption)); > > System.out.println("Estimation process - starting\n"); > long estimate = migrator.estimate(); >diff --git a/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/AbstractMigrationWorker.java b/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/AbstractMigrationWorker.java >index c2ab85c..bebf949 100644 >--- a/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/AbstractMigrationWorker.java >+++ b/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/AbstractMigrationWorker.java >@@ -20,10 +20,11 @@ > > package org.rhq.server.metrics.migrator.workers; > >+import java.util.Date; >+ > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; > import org.hibernate.StatelessSession; >- > import org.rhq.server.metrics.migrator.DataMigrator; > import org.rhq.server.metrics.migrator.DataMigrator.DataMigratorConfiguration; > import org.rhq.server.metrics.migrator.DataMigrator.DatabaseType; >@@ -37,8 +38,15 @@ > * > */ > public abstract class AbstractMigrationWorker { >+ > private final Log log = LogFactory.getLog(AbstractMigrationWorker.class); > >+ protected final DataMigratorConfiguration config; >+ >+ public AbstractMigrationWorker(DataMigratorConfiguration config) { >+ this.config = config; >+ } >+ > /** > * Returns a list of all the raw SQL metric tables. > * There is no equivalent in Cassandra, all raw data is stored in a single column family. >@@ -61,7 +69,7 @@ > return tables; > } > >- protected ExistingDataSource getExistingDataSource(String query, Task task, DataMigratorConfiguration config) { >+ protected ExistingDataSource getExistingDataSource(String query, Task task) { > if (Task.Migrate.equals(task)) { > if (DatabaseType.Oracle.equals(config.getDatabaseType())) { > return new ScrollableDataSource(config.getEntityManager(), config.getDatabaseType(), query); >@@ -90,7 +98,7 @@ protected ExistingDataSource getExistingDataSource(String query, Task task, Data > return new ScrollableDataSource(config.getEntityManager(), config.getDatabaseType(), query); > } > >- protected void prepareSQLSession(StatelessSession session, DataMigratorConfiguration config) { >+ protected void prepareSQLSession(StatelessSession session) { > if (DatabaseType.Postgres.equals(config.getDatabaseType())) { > log.debug("Preparing SQL connection with timeout: " + DataMigrator.SQL_TIMEOUT); > >@@ -101,11 +109,11 @@ protected void prepareSQLSession(StatelessSession session, DataMigratorConfigura > } > } > >- protected StatelessSession getSQLSession(DataMigratorConfiguration config) { >+ protected StatelessSession getSQLSession() { > StatelessSession session = ((org.hibernate.Session) config.getEntityManager().getDelegate()) > .getSessionFactory().openStatelessSession(); > >- prepareSQLSession(session, config); >+ prepareSQLSession(session); > > return session; > } >@@ -120,4 +128,54 @@ protected void closeSQLSession(StatelessSession session) { > } > } > >+ protected final long getRowCount(String countQuery) { >+ StatelessSession session = getSQLSession(); >+ >+ org.hibernate.Query query = session.createSQLQuery(countQuery); >+ query.setReadOnly(true); >+ query.setTimeout(DataMigrator.SQL_TIMEOUT); >+ long count = Long.parseLong(query.uniqueResult().toString()); >+ >+ closeSQLSession(session); >+ >+ return count; >+ } >+ >+ protected final void deleteTableData(String deleteQuery) throws Exception { >+ int failureCount = 0; >+ int max = CallTimeDataMigrator.MAX_NUMBER_OF_FAILURES; >+ while (failureCount < max) { >+ try { >+ StatelessSession session = getSQLSession(); >+ session.getTransaction().begin(); >+ org.hibernate.Query nativeQuery = session.createSQLQuery(deleteQuery); >+ nativeQuery.executeUpdate(); >+ session.getTransaction().commit(); >+ closeSQLSession(session); >+ log.info("- " + deleteQuery + " - done -"); >+ } catch (Exception e) { >+ log.error(deleteQuery + " failed. Attempting to delete data one more time..."); >+ >+ failureCount++; >+ if (failureCount == max) { >+ throw e; >+ } >+ } >+ } >+ } >+ >+ /** >+ * Convert an object to a date. >+ */ >+ protected static Date date(Object o) { >+ if (o == null) >+ return null; >+ if (o instanceof Date) >+ return (Date)o; >+ // PostGres uses UNIX epoc time >+ if (o instanceof Number) >+ return new Date(((Number)o).longValue() * 1000); >+ throw new IllegalStateException("date?" + o); >+ } >+ > } >diff --git a/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/AggregateDataMigrator.java b/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/AggregateDataMigrator.java >index 73116a4..acefce5 100644 >--- a/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/AggregateDataMigrator.java >+++ b/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/AggregateDataMigrator.java >@@ -49,7 +49,6 @@ > > private final Log log = LogFactory.getLog(AggregateDataMigrator.class); > >- private final DataMigratorConfiguration config; > private final String selectQuery; > private final String deleteQuery; > private final String countQuery; >@@ -62,9 +61,9 @@ > */ > public AggregateDataMigrator(MetricsTable metricsTable, DataMigratorConfiguration config) > throws Exception { >+ super(config); > > this.metricsTable = metricsTable; >- this.config = config; > > if (MetricsTable.ONE_HOUR.equals(this.metricsTable)) { > this.selectQuery = MigrationQuery.SELECT_1H_DATA.toString(); >@@ -104,43 +103,7 @@ public long estimate() throws Exception { > public void migrate() throws Exception { > performMigration(Task.Migrate); > if (config.isDeleteDataImmediatelyAfterMigration()) { >- deleteTableData(); >- } >- } >- >- private long getRowCount(String countQuery) { >- StatelessSession session = getSQLSession(config); >- >- org.hibernate.Query query = session.createSQLQuery(countQuery); >- query.setReadOnly(true); >- query.setTimeout(DataMigrator.SQL_TIMEOUT); >- long count = Long.parseLong(query.uniqueResult().toString()); >- >- closeSQLSession(session); >- >- return count; >- } >- >- private void deleteTableData() throws Exception { >- int failureCount = 0; >- while (failureCount < MAX_NUMBER_OF_FAILURES) { >- try { >- StatelessSession session = getSQLSession(config); >- session.getTransaction().begin(); >- org.hibernate.Query nativeQuery = session.createSQLQuery(this.deleteQuery); >- nativeQuery.executeUpdate(); >- session.getTransaction().commit(); >- closeSQLSession(session); >- log.info("- " + metricsTable.toString() + " - Cleaned -"); >- } catch (Exception e) { >- log.error("Failed to delete " + metricsTable.toString() >- + " data. Attempting to delete data one more time..."); >- >- failureCount++; >- if (failureCount == MAX_NUMBER_OF_FAILURES) { >- throw e; >- } >- } >+ deleteTableData(this.deleteQuery); > } > } > >@@ -154,7 +117,7 @@ private Telemetry performMigration(Task task) throws Exception { > int failureCount; > > int lastMigratedRecord = 0; >- ExistingDataSource dataSource = getExistingDataSource(selectQuery, task, config); >+ ExistingDataSource dataSource = getExistingDataSource(selectQuery, task); > dataSource.initialize(); > > telemetry.getMigrationTimer().start(); >diff --git a/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/CallTimeDataMigrator.java b/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/CallTimeDataMigrator.java >new file mode 100644 >index 0000000..077a078 >--- /dev/null >+++ b/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/CallTimeDataMigrator.java >@@ -0,0 +1,140 @@ >+package org.rhq.server.metrics.migrator.workers; >+ >+import java.util.ArrayList; >+import java.util.Collection; >+import java.util.Date; >+import java.util.List; >+import java.util.concurrent.Future; >+ >+import org.apache.commons.logging.Log; >+import org.apache.commons.logging.LogFactory; >+import org.rhq.core.domain.measurement.calltime.CallTimeData; >+import org.rhq.server.metrics.CallTimeConfiguration; >+import org.rhq.server.metrics.CallTimeDAO; >+import org.rhq.server.metrics.StorageSession; >+import org.rhq.server.metrics.migrator.DataMigrator.DataMigratorConfiguration; >+import org.rhq.server.metrics.migrator.DataMigrator.Task; >+import org.rhq.server.metrics.migrator.datasources.ExistingDataSource; >+ >+/** >+ * Migrate call time data. >+ */ >+public class CallTimeDataMigrator extends AbstractMigrationWorker implements CallableMigrationWorker { >+ >+ private final Log log = LogFactory.getLog(CallTimeDataMigrator.class); >+ >+ private final String table; >+ private final CallTimeDAO dao; >+ >+ public CallTimeDataMigrator(String table, int ttlDays, DataMigratorConfiguration config) throws Exception { >+ super(config); >+ this.table = table; >+ CallTimeConfiguration ctconfig = new CallTimeConfiguration(); >+ ctconfig.setTTLDays(ttlDays); >+ ctconfig.setIdempotentInsert(true); >+ this.dao = new CallTimeDAO(new StorageSession(config.getSession()), ctconfig); >+ } >+ >+ @Override >+ public long estimate() throws Exception { >+ long recordCount = this.getRowCount(MigrationQuery.COUNT_CALL_TIME_DATA.getQuery()); >+ log.debug("Retrieved record count for table " + table + " -- " + recordCount); >+ >+ Telemetry telemetry = this.performMigration(Task.Estimate); >+ long estimatedTimeToMigrate = telemetry.getMigrationTime(); >+ >+ long estimation = (recordCount / (long) MAX_RECORDS_TO_LOAD_FROM_SQL / (long) NUMBER_OF_BATCHES_FOR_ESTIMATION) >+ * estimatedTimeToMigrate; >+ >+ estimation += telemetry.getNonMigrationTime(); >+ >+ return estimation; >+ } >+ >+ public void migrate() throws Exception { >+ performMigration(Task.Migrate); >+ if (config.isDeleteDataImmediatelyAfterMigration()) { >+ deleteTableData(MigrationQuery.DELETE_CALL_TIME_DATA_VALUE.getQuery()); >+ deleteTableData(MigrationQuery.DELETE_CALL_TIME_DATA_KEY.getQuery()); >+ } >+ } >+ >+ private Telemetry performMigration(Task task) throws Exception { >+ Telemetry telemetry = new Telemetry(); >+ telemetry.getGeneralTimer().start(); >+ >+ long numberOfBatchesMigrated = 0; >+ >+ List<Object[]> existingData; >+ int failureCount; >+ >+ int lastMigratedRecord = 0; >+ ExistingDataSource dataSource = getExistingDataSource(MigrationQuery.SELECT_CALL_TIME_DATA.getQuery(), task); >+ dataSource.initialize(); >+ >+ telemetry.getMigrationTimer().start(); >+ while (true) { >+ existingData = dataSource.getData(lastMigratedRecord, MAX_RECORDS_TO_LOAD_FROM_SQL); >+ >+ if (existingData.size() == 0) { >+ break; >+ } >+ >+ lastMigratedRecord += existingData.size(); >+ >+ failureCount = 0; >+ while (failureCount < MAX_NUMBER_OF_FAILURES) { >+ try { >+ insertDataToCassandra(existingData); >+ break; >+ } catch (Exception e) { >+ log.error("Failed to insert " + table >+ + " data. Attempting to insert the current batch of data one more time"); >+ log.error(e); >+ >+ failureCount++; >+ if (failureCount == MAX_NUMBER_OF_FAILURES) { >+ throw e; >+ } >+ } >+ } >+ >+ log.info("- " + table + " - " + lastMigratedRecord + " -"); >+ >+ numberOfBatchesMigrated++; >+ if (Task.Estimate.equals(task) && numberOfBatchesMigrated >= NUMBER_OF_BATCHES_FOR_ESTIMATION) { >+ break; >+ } >+ } >+ >+ telemetry.getMigrationTimer().stop(); >+ >+ dataSource.close(); >+ telemetry.getGeneralTimer().stop(); >+ >+ return telemetry; >+ } >+ >+ private void insertDataToCassandra(List<Object[]> existingData) throws Exception { >+ Collection<Future> futures = new ArrayList<Future>(); >+ for (Object[] o : existingData) { >+ // "select schedule_id, call_destination, begin_time, end_time, minimum, maximum, total, \"count\"" + >+ int i = 0; >+ int scheduleId = ((Number)o[i++]).intValue(); >+ String dest = o[i++].toString(); >+ Date begin = date(o[i++]); >+ Date end = date(o[i++]); >+ double min = ((Number)o[i++]).doubleValue(); >+ double max = ((Number)o[i++]).doubleValue(); >+ int total = ((Number)o[i++]).intValue(); >+ long count = ((Number)o[i++]).longValue(); >+ CallTimeData ctd = new CallTimeData(scheduleId); >+ ctd.addAggregatedCallData(dest, begin, end, min, max, total, count); >+ futures.add(dao.insert(ctd)); >+ } >+ for (Future f : futures) { >+ f.get(); >+ } >+ } >+ >+} >diff --git a/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/DeleteAllData.java b/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/DeleteAllData.java >index c1cb62c..101b930 100644 >--- a/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/DeleteAllData.java >+++ b/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/DeleteAllData.java >@@ -34,15 +34,13 @@ > > private final Log log = LogFactory.getLog(DeleteAllData.class); > >- private final DataMigratorConfiguration config; >- > public DeleteAllData(DataMigratorConfiguration config) { >- this.config = config; >+ super(config); > } > > public void migrate() { > org.hibernate.Query nativeQuery; >- StatelessSession session = getSQLSession(config); >+ StatelessSession session = getSQLSession(); > > if (config.isRun1HAggregateDataMigration()) { > session.getTransaction().begin(); >@@ -79,6 +77,23 @@ public void migrate() { > } > } > >+ if (config.isRunTraitMigration()) { >+ session.getTransaction().begin(); >+ nativeQuery = session.createSQLQuery(MigrationQuery.DELETE_TRAIT_DATA.getQuery()); >+ nativeQuery.executeUpdate(); >+ session.getTransaction().commit(); >+ log.info("- trait data - Cleaned -"); >+ } >+ >+ if (config.isRunCallTimeMigration()) { >+ session.getTransaction().begin(); >+ nativeQuery = session.createSQLQuery(MigrationQuery.DELETE_CALL_TIME_DATA_VALUE.getQuery()); >+ nativeQuery = session.createSQLQuery(MigrationQuery.DELETE_CALL_TIME_DATA_KEY.getQuery()); >+ nativeQuery.executeUpdate(); >+ session.getTransaction().commit(); >+ log.info("- call time - Cleaned -"); >+ } >+ > closeSQLSession(session); > } > >diff --git a/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/MetricsIndexUpdateAccumulator.java b/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/MetricsIndexUpdateAccumulator.java >index d60ecf6d..8772c8f 100644 >--- a/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/MetricsIndexUpdateAccumulator.java >+++ b/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/MetricsIndexUpdateAccumulator.java >@@ -58,7 +58,6 @@ > private final Map<Integer, Set<Long>> accumulator = new HashMap<Integer, Set<Long>>(); > > private final MetricsTable table; >- private final DataMigratorConfiguration config; > > private final long timeLimit; > private final PreparedStatement updateMetricsIndex; >@@ -68,8 +67,8 @@ > private int currentCount = 0; > > public MetricsIndexUpdateAccumulator(MetricsTable table, DataMigratorConfiguration config) { >+ super(config); > this.table = table; >- this.config = config; > > if (MetricsTable.RAW.equals(table) || MetricsTable.ONE_HOUR.equals(table) > || MetricsTable.SIX_HOUR.equals(table)) { >@@ -136,7 +135,7 @@ public void drain() throws Exception { > } > > private long getLastAggregationTime(MetricsTable migratedTable) { >- StatelessSession session = getSQLSession(config); >+ StatelessSession session = getSQLSession(); > > long aggregationSlice = Integer.MAX_VALUE; > Duration duration = null; >diff --git a/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/MigrationQuery.java b/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/MigrationQuery.java >index 7f13349..013183a 100644 >--- a/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/MigrationQuery.java >+++ b/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/MigrationQuery.java >@@ -32,11 +32,22 @@ > > DELETE_1H_DATA("DELETE FROM RHQ_MEASUREMENT_DATA_NUM_1H"), DELETE_6H_DATA("DELETE FROM RHQ_MEASUREMENT_DATA_NUM_6H"), DELETE_1D_DATA( > "DELETE FROM RHQ_MEASUREMENT_DATA_NUM_1D"), >+ DELETE_TRAIT_DATA("TRUNCATE TABLE RHQ_MEASUREMENT_DATA_TRAIT"), >+ DELETE_CALL_TIME_DATA_KEY("TRUNCATE TABLE RHQ_CALLTIME_DATA_KEY"), >+ DELETE_CALL_TIME_DATA_VALUE("TRUNCATE TABLE RHQ_CALLTIME_DATA_VALUE"), > > COUNT_1H_DATA("SELECT COUNT(*) FROM RHQ_MEASUREMENT_DATA_NUM_1H"), COUNT_6H_DATA( > "SELECT COUNT(*) FROM RHQ_MEASUREMENT_DATA_NUM_6H"), COUNT_1D_DATA( > "SELECT COUNT(*) FROM RHQ_MEASUREMENT_DATA_NUM_1D"), > >+ COUNT_TRAIT_DATA("select count(*) from RHQ_MEASUREMENT_DATA_TRAIT"), >+ COUNT_CALL_TIME_DATA("select count(*) from RHQ_CALLTIME_DATA_KEY k, RHQ_CALLTIME_DATA_VALUE v where k.id = v.key_id"), >+ SELECT_CALL_TIME_DATA( >+ "select schedule_id, call_destination, begin_time, end_time, minimum, maximum, total, \"count\"" + >+ "from rhq_calltime_data_key k, rhq_calltime_data_value v where k.id = v.key_id"), >+ SELECT_TRAIT_DATA( >+ "select time_stamp, schedule_id, value from RHQ_MEASUREMENT_DATA_TRAIT"), >+ > MAX_TIMESTAMP_1H_DATA("SELECT MAX(time_stamp) FROM RHQ_MEASUREMENT_DATA_NUM_1H"), MAX_TIMESTAMP_6H_DATA( > "SELECT MAX(time_stamp) FROM RHQ_MEASUREMENT_DATA_NUM_6H"), MAX_TIMESTAMP_1D_DATA( > "SELECT MAX(time_stamp) FROM RHQ_MEASUREMENT_DATA_NUM_1D"), >@@ -68,4 +79,4 @@ public String getQuery() { > public String toString() { > return query; > } >-} >\ No newline at end of file >+} >diff --git a/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/RawDataMigrator.java b/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/RawDataMigrator.java >index fd44a6f..ac16e17 100644 >--- a/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/RawDataMigrator.java >+++ b/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/RawDataMigrator.java >@@ -29,20 +29,18 @@ > import java.util.List; > import java.util.Queue; > >-import com.datastax.driver.core.ResultSetFuture; >-import com.datastax.driver.core.querybuilder.Batch; >-import com.datastax.driver.core.querybuilder.QueryBuilder; >- > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; >-import org.hibernate.StatelessSession; >- > import org.rhq.server.metrics.domain.MetricsTable; >-import org.rhq.server.metrics.migrator.DataMigrator; > import org.rhq.server.metrics.migrator.DataMigrator.DataMigratorConfiguration; > import org.rhq.server.metrics.migrator.DataMigrator.Task; > import org.rhq.server.metrics.migrator.datasources.ExistingDataSource; > >+import com.datastax.driver.core.ResultSetFuture; >+import com.datastax.driver.core.querybuilder.Batch; >+import com.datastax.driver.core.querybuilder.QueryBuilder; >+import org.rhq.server.metrics.migrator.workers.MigrationQuery; >+ > /** > * @author Stefan Negrea > * >@@ -52,10 +50,9 @@ > > private final Queue<String> tablesNotProcessed = new LinkedList<String>(Arrays.asList(getRawDataTables())); > private final MetricsIndexUpdateAccumulator metricsIndexAccumulator; >- private final DataMigratorConfiguration config; > > public RawDataMigrator(DataMigratorConfiguration config) { >- this.config = config; >+ super(config); > this.metricsIndexAccumulator = new MetricsIndexUpdateAccumulator(MetricsTable.RAW, config); > } > >@@ -83,20 +80,6 @@ public void migrate() throws Exception { > performMigration(Task.Migrate); > } > >- private long getRowCount(String countQuery) { >- StatelessSession session = getSQLSession(config); >- >- org.hibernate.Query query = session.createSQLQuery(countQuery); >- query.setReadOnly(true); >- query.setTimeout(DataMigrator.SQL_TIMEOUT); >- >- long count = Long.parseLong(query.uniqueResult().toString()); >- >- closeSQLSession(session); >- >- return count; >- } >- > private Telemetry performMigration(Task task) throws Exception { > Telemetry telemetry = new Telemetry(); > telemetry.getGeneralTimer().start(); >@@ -114,7 +97,7 @@ private Telemetry performMigration(Task task) throws Exception { > > String selectQuery = String.format(MigrationQuery.SELECT_RAW_DATA.toString(), table); > >- ExistingDataSource dataSource = getExistingDataSource(selectQuery, task, config); >+ ExistingDataSource dataSource = getExistingDataSource(selectQuery, task); > dataSource.initialize(); > > log.info("Start migrating raw table: " + table); >@@ -178,29 +161,6 @@ private Telemetry performMigration(Task task) throws Exception { > return telemetry; > } > >- private void deleteTableData(String table) throws Exception { >- String deleteQuery = String.format(MigrationQuery.DELETE_RAW_ENTRY.toString(), table); >- int failureCount = 0; >- while (failureCount < MAX_NUMBER_OF_FAILURES) { >- try { >- StatelessSession session = getSQLSession(config); >- session.getTransaction().begin(); >- org.hibernate.Query nativeQuery = session.createSQLQuery(deleteQuery); >- nativeQuery.executeUpdate(); >- session.getTransaction().commit(); >- closeSQLSession(session); >- log.info("- " + table + " - Cleaned -"); >- } catch (Exception e) { >- log.error("Failed to delete " + table + " data. Attempting to delete data one more time..."); >- >- failureCount++; >- if (failureCount == MAX_NUMBER_OF_FAILURES) { >- throw e; >- } >- } >- } >- } >- > private void insertDataToCassandra(List<Object[]> existingData) throws Exception { > List<ResultSetFuture> resultSetFutures = new ArrayList<ResultSetFuture>(); > Batch batch = QueryBuilder.batch(); >diff --git a/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/TraitDataMigrator.java b/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/TraitDataMigrator.java >new file mode 100644 >index 0000000..d7e887a >--- /dev/null >+++ b/modules/enterprise/server/data-migration/src/main/java/org/rhq/server/metrics/migrator/workers/TraitDataMigrator.java >@@ -0,0 +1,133 @@ >+package org.rhq.server.metrics.migrator.workers; >+ >+import java.util.ArrayList; >+import java.util.Collection; >+import java.util.Date; >+import java.util.List; >+import java.util.concurrent.Future; >+ >+import org.apache.commons.logging.Log; >+import org.apache.commons.logging.LogFactory; >+import org.rhq.core.domain.measurement.MeasurementDataPK; >+import org.rhq.core.domain.measurement.MeasurementDataTrait; >+import org.rhq.server.metrics.StorageSession; >+import org.rhq.server.metrics.TraitsConfiguration; >+import org.rhq.server.metrics.TraitsDAO; >+import org.rhq.server.metrics.migrator.DataMigrator.DataMigratorConfiguration; >+import org.rhq.server.metrics.migrator.DataMigrator.Task; >+import org.rhq.server.metrics.migrator.datasources.ExistingDataSource; >+ >+/** >+ * Migrate trait data. >+ */ >+public class TraitDataMigrator extends AbstractMigrationWorker implements CallableMigrationWorker { >+ >+ private final Log log = LogFactory.getLog(TraitDataMigrator.class); >+ >+ private final String table; >+ private final TraitsDAO dao; >+ >+ public TraitDataMigrator(String table, int ttlDays, DataMigratorConfiguration config) throws Exception { >+ super(config); >+ this.table = table; >+ TraitsConfiguration ctconfig = new TraitsConfiguration(); >+ ctconfig.setTTLDays(ttlDays); >+ this.dao = new TraitsDAO(new StorageSession(config.getSession()), ctconfig); >+ } >+ >+ @Override >+ public long estimate() throws Exception { >+ long recordCount = this.getRowCount(MigrationQuery.COUNT_TRAIT_DATA.getQuery()); >+ log.debug("Retrieved record count for table " + table + " -- " + recordCount); >+ >+ Telemetry telemetry = this.performMigration(Task.Estimate); >+ long estimatedTimeToMigrate = telemetry.getMigrationTime(); >+ >+ long estimation = (recordCount / (long) MAX_RECORDS_TO_LOAD_FROM_SQL / (long) NUMBER_OF_BATCHES_FOR_ESTIMATION) >+ * estimatedTimeToMigrate; >+ >+ estimation += telemetry.getNonMigrationTime(); >+ >+ return estimation; >+ } >+ >+ public void migrate() throws Exception { >+ performMigration(Task.Migrate); >+ if (config.isDeleteDataImmediatelyAfterMigration()) { >+ deleteTableData(MigrationQuery.DELETE_TRAIT_DATA.getQuery()); >+ } >+ } >+ >+ private Telemetry performMigration(Task task) throws Exception { >+ Telemetry telemetry = new Telemetry(); >+ telemetry.getGeneralTimer().start(); >+ >+ long numberOfBatchesMigrated = 0; >+ >+ List<Object[]> existingData; >+ int failureCount; >+ >+ int lastMigratedRecord = 0; >+ ExistingDataSource dataSource = getExistingDataSource(MigrationQuery.SELECT_TRAIT_DATA.getQuery(), task); >+ dataSource.initialize(); >+ >+ telemetry.getMigrationTimer().start(); >+ while (true) { >+ existingData = dataSource.getData(lastMigratedRecord, MAX_RECORDS_TO_LOAD_FROM_SQL); >+ >+ if (existingData.size() == 0) { >+ break; >+ } >+ >+ lastMigratedRecord += existingData.size(); >+ >+ failureCount = 0; >+ while (failureCount < MAX_NUMBER_OF_FAILURES) { >+ try { >+ insertDataToCassandra(existingData); >+ break; >+ } catch (Exception e) { >+ log.error("Failed to insert " + table >+ + " data. Attempting to insert the current batch of data one more time"); >+ log.error(e); >+ >+ failureCount++; >+ if (failureCount == MAX_NUMBER_OF_FAILURES) { >+ throw e; >+ } >+ } >+ } >+ >+ log.info("- " + table + " - " + lastMigratedRecord + " -"); >+ >+ numberOfBatchesMigrated++; >+ if (Task.Estimate.equals(task) && numberOfBatchesMigrated >= NUMBER_OF_BATCHES_FOR_ESTIMATION) { >+ break; >+ } >+ } >+ >+ telemetry.getMigrationTimer().stop(); >+ >+ dataSource.close(); >+ telemetry.getGeneralTimer().stop(); >+ >+ return telemetry; >+ } >+ >+ private void insertDataToCassandra(List<Object[]> existingData) throws Exception { >+ Collection<Future> futures = new ArrayList<Future>(); >+ for (Object[] o : existingData) { >+ int i = 0; >+ // "select time_stamp, schedule_id, value from RHQ_MEASUREMENT_DATA_TRAIT"), >+ Date time = date(o[i++]); >+ int scheduleId = ((Number)o[i++]).intValue(); >+ String value = o[i++].toString(); >+ MeasurementDataTrait trait = new MeasurementDataTrait(new MeasurementDataPK(time.getTime(), scheduleId), value); >+ futures.add(dao.insertTrait(trait)); >+ } >+ for (Future f : futures) { >+ f.get(); >+ } >+ } >+ >+} >diff --git a/modules/enterprise/server/itests-2/src/test/java/org/rhq/enterprise/server/measurement/test/MeasurementDataManagerTest.java b/modules/enterprise/server/itests-2/src/test/java/org/rhq/enterprise/server/measurement/test/MeasurementDataManagerTest.java >index cd1ece8..99058fc 100644 >--- a/modules/enterprise/server/itests-2/src/test/java/org/rhq/enterprise/server/measurement/test/MeasurementDataManagerTest.java >+++ b/modules/enterprise/server/itests-2/src/test/java/org/rhq/enterprise/server/measurement/test/MeasurementDataManagerTest.java >@@ -36,7 +36,6 @@ > import javax.persistence.Query; > > import org.testng.annotations.Test; >- > import org.rhq.core.clientapi.agent.measurement.MeasurementAgentService; > import org.rhq.core.domain.auth.Subject; > import org.rhq.core.domain.criteria.MeasurementDataTraitCriteria; >@@ -108,19 +107,8 @@ protected void beforeMethod() { > protected void afterMethod() { > > try { >- // delete values >- callTimeDataManager.purgeCallTimeData(new Date()); >- > beginTx(); > >- // delete keys >- List<Integer> resourceIds = new ArrayList<Integer>(); >- resourceIds.add(resource1.getId()); >- resourceIds.add(resource2.getId()); >- Query q = em.createNamedQuery(CallTimeDataKey.QUERY_DELETE_BY_RESOURCES); >- q.setParameter("resourceIds", resourceIds); >- q.executeUpdate(); >- > resource1 = em.merge(resource1); > for (MeasurementSchedule sched : resource1.getSchedules()) { > em.remove(sched); >@@ -433,13 +421,13 @@ private void setupGroupOfResources(EntityManager em) { > String value4 = "test-value4"; > > // method findLiveDataForGroup adds prefix with resource id which is part of equals >- MeasurementData expectedData1 = makeMeasurement(time1, schedule1.getId(), value1, name1); >+ MeasurementData expectedData1 = makeMeasurement(time1, schedule1, value1, name1); > expectedData1.setName(resource1.getId() + ":" + name1); >- MeasurementData expectedData2 = makeMeasurement(time2, schedule2.getId(), value2, name2); >+ MeasurementData expectedData2 = makeMeasurement(time2, schedule2, value2, name2); > expectedData2.setName(resource2.getId() + ":" + name2); >- MeasurementData expectedData3 = makeMeasurement(time3, schedule3.getId(), value3, name3); >+ MeasurementData expectedData3 = makeMeasurement(time3, schedule3, value3, name3); > expectedData3.setName(resource2.getId() + ":" + name3); >- MeasurementData expectedData4 = makeMeasurement(time4, schedule2.getId(), value4, name4); >+ MeasurementData expectedData4 = makeMeasurement(time4, schedule2, value4, name4); > expectedData4.setName(resource2.getId() + ":" + name4); > > expectedResult1 = new HashSet<MeasurementData>(1); >@@ -457,10 +445,10 @@ private void setupGroupOfResources(EntityManager em) { > // mock the MeasurementAgentService > MeasurementAgentService mockedMeasurementService = mock(MeasurementAgentService.class); > when(mockedMeasurementService.getRealTimeMeasurementValue(eq(resource1.getId()), any(Set.class))).thenReturn( >- new HashSet<MeasurementData>(Arrays.asList(makeMeasurement(time1, schedule1.getId(), value1, name1)))); >+ new HashSet<MeasurementData>(Arrays.asList(makeMeasurement(time1, schedule1, value1, name1)))); > when(mockedMeasurementService.getRealTimeMeasurementValue(eq(resource2.getId()), any(Set.class))).thenReturn( >- new HashSet<MeasurementData>(Arrays.asList(makeMeasurement(time2, schedule2.getId(), value2, name2), >- makeMeasurement(time3, schedule3.getId(), value3, name3)))); >+ new HashSet<MeasurementData>(Arrays.asList(makeMeasurement(time2, schedule2, value2, name2), >+ makeMeasurement(time3, schedule3, value3, name3)))); > TestServerCommunicationsService agentServiceContainer = prepareForTestAgents(); > agentServiceContainer.measurementService = mockedMeasurementService; > } >@@ -556,7 +544,7 @@ public void testAddAndFindTrait1() throws Exception { > expectedResult.add((MeasurementDataTrait) data); > } > // add the trait data (it stores it in db (without name field)) >- measurementDataManager.addTraitData(expectedResult); >+ addTraitData(expectedResult); > > // get back the trait data > List<MeasurementDataTrait> actualResult = measurementDataManager.findTraits(overlord, resource1.getId(), definitionCt1.getId()); >@@ -570,6 +558,12 @@ public void testAddAndFindTrait1() throws Exception { > } > } > >+ private void addTraitData(Set<MeasurementDataTrait> traits) throws InterruptedException { >+ measurementDataManager.addTraitData(traits); >+ // TODO make the above method synchronous? >+ Thread.sleep(500); >+ } >+ > @Test > public void testAddAndFindTrait2() throws Exception { > // prepare DB >@@ -615,7 +609,7 @@ public void testAddAndFindTrait3() throws Exception { > } > > // add the trait data (it stores it in db (without name field)) >- measurementDataManager.addTraitData(traitsData); >+ addTraitData(traitsData); > > // get back the trait data > List<MeasurementDataTrait> actualResult = measurementDataManager.findTraits(overlord, resource2.getId(), definitionCt2.getId()); >@@ -666,7 +660,7 @@ public void testAddAndFindByCriteria() throws Exception { > expectedResult.add((MeasurementDataTrait) data); > } > // add the trait data (it stores it in db (without name field)) >- measurementDataManager.addTraitData(expectedResult); >+ addTraitData(expectedResult); > > // get back the trait data by schedule id > MeasurementDataTraitCriteria criteria = new MeasurementDataTraitCriteria(); >@@ -733,7 +727,7 @@ public void testAddAndFindCurrentTraitByResourceIdAcrossMoreSchedules() throws E > expectedResult.add((MeasurementDataTrait) data); > } > // add the trait data (it stores it in db (without name field)) >- measurementDataManager.addTraitData(expectedResult); >+ addTraitData(expectedResult); > > // get back the trait data > List<MeasurementDataTrait> actualResult = measurementDataManager.findCurrentTraitsForResource(overlord, resource2.getId(), null); >@@ -760,7 +754,7 @@ public void testFindNonExistentCurrentTraitByResourceId() throws Exception { > expectedResult.add((MeasurementDataTrait) data); > } > // add the trait data (it stores it in db (without name field)) >- measurementDataManager.addTraitData(expectedResult); >+ addTraitData(expectedResult); > > // get back the trait data > List<MeasurementDataTrait> actualResult = measurementDataManager.findCurrentTraitsForResource(overlord, resource2.getId(), null); >@@ -775,8 +769,8 @@ public void testFindNonExistentCurrentTraitByResourceId() throws Exception { > } > > >- private MeasurementData makeMeasurement(long time, int scheduleId, String value, String name) { >- MeasurementData measurement = new MeasurementDataTrait(new MeasurementDataPK(time, scheduleId), value); >+ private MeasurementData makeMeasurement(long time, MeasurementSchedule schedule, String value, String name) { >+ MeasurementData measurement = new MeasurementDataTrait(new MeasurementDataPK(time, schedule.getId()), value); > measurement.setName(name); > return measurement; > } >diff --git a/modules/enterprise/server/itests-2/src/test/java/org/rhq/enterprise/server/scheduler/jobs/DataPurgeJobTest.java b/modules/enterprise/server/itests-2/src/test/java/org/rhq/enterprise/server/scheduler/jobs/DataPurgeJobTest.java >index 9cc4bc9..33c357c 100644 >--- a/modules/enterprise/server/itests-2/src/test/java/org/rhq/enterprise/server/scheduler/jobs/DataPurgeJobTest.java >+++ b/modules/enterprise/server/itests-2/src/test/java/org/rhq/enterprise/server/scheduler/jobs/DataPurgeJobTest.java >@@ -59,7 +59,6 @@ > import org.rhq.core.domain.measurement.DataType; > import org.rhq.core.domain.measurement.DisplayType; > import org.rhq.core.domain.measurement.MeasurementCategory; >-import org.rhq.core.domain.measurement.MeasurementDataTrait; > import org.rhq.core.domain.measurement.MeasurementDefinition; > import org.rhq.core.domain.measurement.MeasurementSchedule; > import org.rhq.core.domain.measurement.MeasurementScheduleRequest; >@@ -74,7 +73,6 @@ > import org.rhq.core.util.exception.ThrowableUtil; > import org.rhq.enterprise.server.event.EventManagerLocal; > import org.rhq.enterprise.server.measurement.CallTimeDataManagerLocal; >-import org.rhq.enterprise.server.measurement.MeasurementDataManagerLocal; > import org.rhq.enterprise.server.resource.ResourceManagerLocal; > import org.rhq.enterprise.server.scheduler.SchedulerLocal; > import org.rhq.enterprise.server.test.AbstractEJB3Test; >@@ -222,12 +220,6 @@ private void addDataToBePurged() throws NotSupportedException, SystemException, > // add events > createNewEvents(newResource, 0, 1000); > >- // add calltime/response times >- createNewCalltimeData(newResource, 0, 1000); >- >- // add trait data >- createNewTraitData(newResource, 0L, 100); >- > getTransactionManager().commit(); > } catch (Throwable t) { > getTransactionManager().rollback(); >@@ -245,7 +237,6 @@ private void makeSureDataIsPurged() throws NotSupportedException, SystemExceptio > getTransactionManager().begin(); > > try { >- Subject overlord = LookupUtil.getSubjectManager().getOverlord(); > Resource res = em.find(Resource.class, newResource.getId()); > > // check alerts >@@ -264,33 +255,6 @@ private void makeSureDataIsPurged() throws NotSupportedException, SystemExceptio > EventSource es = res.getEventSources().iterator().next(); > assert es.getEvents().size() == 0 : "didn't purge all events"; > >- // check calltime data >- int calltimeScheduleId = 0; >- for (MeasurementSchedule sched : res.getSchedules()) { >- if (sched.getDefinition().getDataType() == DataType.CALLTIME) { >- calltimeScheduleId = sched.getId(); >- break; >- } >- } >- assert calltimeScheduleId > 0 : "why don't we have a calltime schedule?"; >- PageList<CallTimeDataComposite> calltimeData = LookupUtil.getCallTimeDataManager() >- .findCallTimeDataForResource(overlord, calltimeScheduleId, 0, Long.MAX_VALUE, new PageControl()); >- assert calltimeData.getTotalSize() == 0 : "didn't purge all calltime data"; >- >- // check trait data >- MeasurementSchedule traitSchedule = null; >- for (MeasurementSchedule sched : res.getSchedules()) { >- if (sched.getDefinition().getDataType() == DataType.TRAIT) { >- traitSchedule = sched; >- break; >- } >- } >- assert traitSchedule != null : "why don't we have a trait schedule?"; >- >- List<MeasurementDataTrait> persistedTraits = LookupUtil.getMeasurementDataManager().findTraits(overlord, >- res.getId(), traitSchedule.getDefinition().getId()); >- assert persistedTraits.size() == 1 : "bad purge of trait data: " + persistedTraits.size(); >- > } finally { > getTransactionManager().rollback(); > } >@@ -333,67 +297,6 @@ public void jobWasExecuted(JobExecutionContext c, JobExecutionException e) { > return; > } > >- private void createNewTraitData(Resource res, long timestamp, int count) { >- MeasurementSchedule traitSchedule = null; >- for (MeasurementSchedule sched : res.getSchedules()) { >- if (sched.getDefinition().getDataType() == DataType.TRAIT) { >- traitSchedule = sched; >- break; >- } >- } >- assert traitSchedule != null : "why don't we have a trait schedule?"; >- >- MeasurementDataManagerLocal mgr = LookupUtil.getMeasurementDataManager(); >- >- MeasurementScheduleRequest msr = new MeasurementScheduleRequest(traitSchedule); >- >- Set<MeasurementDataTrait> dataset = new HashSet<MeasurementDataTrait>(); >- for (int i = 0; i < count; i++) { >- dataset.add(new MeasurementDataTrait(timestamp + i, msr, "DataPurgeJobTestTraitValue" + i)); >- } >- mgr.addTraitData(dataset); >- >- List<MeasurementDataTrait> persistedTraits = mgr.findTraits(LookupUtil.getSubjectManager().getOverlord(), >- res.getId(), traitSchedule.getDefinition().getId()); >- assert persistedTraits.size() == count : "did not persist trait data:" + persistedTraits.size() + ":" >- + persistedTraits; >- } >- >- private void createNewCalltimeData(Resource res, long timestamp, int count) { >- MeasurementSchedule calltimeSchedule = null; >- for (MeasurementSchedule sched : res.getSchedules()) { >- if (sched.getDefinition().getDataType() == DataType.CALLTIME) { >- calltimeSchedule = sched; >- break; >- } >- } >- assert calltimeSchedule != null : "why don't we have a calltime schedule?"; >- >- MeasurementScheduleRequest msr = new MeasurementScheduleRequest(calltimeSchedule); >- >- Set<CallTimeData> dataset = new HashSet<CallTimeData>(); >- CallTimeData data = new CallTimeData(msr); >- >- for (int i = 0; i < count; i++) { >- for (int j = 0; j < count; j++) { >- data.addCallData("DataPurgeJobTestCalltimeData" + j, new Date(timestamp), 777); >- } >- } >- >- dataset.add(data); >- >- CallTimeDataManagerLocal mgr = LookupUtil.getCallTimeDataManager(); >- mgr.addCallTimeData(dataset); >- >- PageList<CallTimeDataComposite> persistedData = mgr.findCallTimeDataForResource(LookupUtil.getSubjectManager() >- .getOverlord(), calltimeSchedule.getId(), timestamp - 1L, timestamp + count + 1L, new PageControl()); >- // just a few sanity checks >- assert persistedData.getTotalSize() == count : "did not persist all calltime data, only persisted: " >- + persistedData.getTotalSize(); >- assert persistedData.get(0).getCount() == count : "did not persist all endpoint calltime data, only persisted: " >- + persistedData.get(0).getCount(); >- } >- > private void createNewEvents(Resource res, long timestamp, int count) { > EventDefinition ed = res.getResourceType().getEventDefinitions().iterator().next(); > EventSource source = new EventSource("datapurgejobtest", ed, res); >diff --git a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/alert/engine/internal/AgentConditionCache.java b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/alert/engine/internal/AgentConditionCache.java >index 556351e..18209c2 100644 >--- a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/alert/engine/internal/AgentConditionCache.java >+++ b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/alert/engine/internal/AgentConditionCache.java >@@ -76,17 +76,17 @@ > */ > class AgentConditionCache extends AbstractConditionCache { > >- private Map<Integer, List<NumericDoubleCacheElement>> measurementDataCache; // key: schedule ID >- private Map<Integer, List<MeasurementTraitCacheElement>> measurementTraitCache; // key: schedule ID >- private Map<Integer, List<CallTimeDataCacheElement>> callTimeCache; // key: schedule ID >- private Map<Integer, List<EventCacheElement>> eventsCache; // key: resource ID >- private Map<Integer, List<DriftCacheElement>> driftCache; // key: resource ID >+ private final Map<Integer, List<NumericDoubleCacheElement>> measurementDataCache; // key: schedule ID >+ private final Map<Integer, List<MeasurementTraitCacheElement>> measurementTraitCache; // key: schedule ID >+ private final Map<Integer, List<CallTimeDataCacheElement>> callTimeCache; // key: schedule ID >+ private final Map<Integer, List<EventCacheElement>> eventsCache; // key: resource ID >+ private final Map<Integer, List<DriftCacheElement>> driftCache; // key: resource ID > >- private AlertConditionManagerLocal alertConditionManager; >- private MeasurementDataManagerLocal measurementDataManager; >- private SubjectManagerLocal subjectManager; >+ private final AlertConditionManagerLocal alertConditionManager; >+ private final MeasurementDataManagerLocal measurementDataManager; >+ private final SubjectManagerLocal subjectManager; > >- private int agentId; >+ private final int agentId; > > public AgentConditionCache(int agentId) { > super(); >@@ -261,6 +261,10 @@ private void insertAlertConditionComposite(int agentId, AbstractAlertConditionCa > > switch (alertConditionOperator) { > case CHANGES: >+ MeasurementDataTrait trait = measurementDataManager.getCurrentTraitForSchedule(traitsComposite.getScheduleId()); >+ if (trait != null) { >+ traitsComposite.setValue(trait.getValue()); >+ } > value = traitsComposite.getValue(); > break; > case REGEX: >diff --git a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/core/StartupBean.java b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/core/StartupBean.java >index 746d127..1ce66a1 100644 >--- a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/core/StartupBean.java >+++ b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/core/StartupBean.java >@@ -49,7 +49,6 @@ > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; > import org.quartz.SchedulerException; >- > import org.rhq.core.db.DatabaseTypeFactory; > import org.rhq.core.domain.auth.Subject; > import org.rhq.core.domain.cloud.Server; >@@ -87,6 +86,7 @@ > import org.rhq.enterprise.server.scheduler.jobs.PurgeResourceTypesJob; > import org.rhq.enterprise.server.scheduler.jobs.SavedSearchResultCountRecalculationJob; > import org.rhq.enterprise.server.scheduler.jobs.StorageClusterReadRepairJob; >+import org.rhq.enterprise.server.scheduler.jobs.TraitCleanup; > import org.rhq.enterprise.server.storage.StorageClientManager; > import org.rhq.enterprise.server.system.SystemManagerLocal; > import org.rhq.enterprise.server.util.LookupUtil; >@@ -106,7 +106,7 @@ > @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED) > @ConcurrencyManagement(ConcurrencyManagementType.BEAN) > public class StartupBean implements StartupLocal { >- private Log log = LogFactory.getLog(this.getClass()); >+ private final Log log = LogFactory.getLog(this.getClass()); > > private volatile boolean initialized = false; > >@@ -716,6 +716,14 @@ private void scheduleJobs() throws RuntimeException { > log.error("Cannot schedule data purge job.", e); > } > >+ // Trait Cleanup Job >+ try { >+ String cronString = "0 0 0 ? * SUN *"; // weekly >+ schedulerBean.scheduleSimpleCronJob(TraitCleanup.class, true, false, cronString); >+ } catch (Exception e) { >+ log.error("Cannot schedule trait cleanup job.", e); >+ } >+ > // Server Plugin Jobs > try { > ServerPluginServiceMBean mbean = LookupUtil.getServerPluginService(); >diff --git a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/CallTimeDataManagerBean.java b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/CallTimeDataManagerBean.java >index 337679b..6de338a 100644 >--- a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/CallTimeDataManagerBean.java >+++ b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/CallTimeDataManagerBean.java >@@ -19,13 +19,17 @@ > > package org.rhq.enterprise.server.measurement; > >-import java.sql.Connection; >-import java.sql.PreparedStatement; >-import java.sql.SQLException; >-import java.sql.Statement; >+import java.util.ArrayList; >+import java.util.Collections; >+import java.util.Comparator; > import java.util.Date; >+import java.util.Iterator; >+import java.util.LinkedList; > import java.util.List; >+import java.util.Map; >+import java.util.Map.Entry; > import java.util.Set; >+import java.util.TreeMap; > > import javax.ejb.EJB; > import javax.ejb.Stateless; >@@ -33,42 +37,32 @@ > import javax.ejb.TransactionAttributeType; > import javax.persistence.EntityManager; > import javax.persistence.PersistenceContext; >-import javax.persistence.Query; >-import javax.sql.DataSource; > > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; > import org.jetbrains.annotations.NotNull; >- >-import org.jboss.ejb3.annotation.TransactionTimeout; >- >-import org.rhq.core.db.DatabaseType; >-import org.rhq.core.db.DatabaseTypeFactory; >-import org.rhq.core.db.H2DatabaseType; >-import org.rhq.core.db.OracleDatabaseType; >-import org.rhq.core.db.Postgresql83DatabaseType; >-import org.rhq.core.db.PostgresqlDatabaseType; >-import org.rhq.core.db.SQLServerDatabaseType; > import org.rhq.core.domain.auth.Subject; > import org.rhq.core.domain.common.EntityContext; > import org.rhq.core.domain.criteria.CallTimeDataCriteria; >+import org.rhq.core.domain.criteria.MeasurementScheduleCriteria; >+import org.rhq.core.domain.measurement.DataType; > import org.rhq.core.domain.measurement.MeasurementSchedule; > import org.rhq.core.domain.measurement.calltime.CallTimeData; > import org.rhq.core.domain.measurement.calltime.CallTimeDataComposite; >-import org.rhq.core.domain.measurement.calltime.CallTimeDataValue; >-import org.rhq.core.domain.server.PersistenceUtility; > import org.rhq.core.domain.util.PageControl; > import org.rhq.core.domain.util.PageList; >-import org.rhq.core.domain.util.PageOrdering; >-import org.rhq.core.util.jdbc.JDBCUtil; > import org.rhq.enterprise.server.RHQConstants; > import org.rhq.enterprise.server.alert.engine.AlertConditionCacheManagerLocal; > import org.rhq.enterprise.server.alert.engine.AlertConditionCacheStats; >+import org.rhq.enterprise.server.auth.SubjectManagerLocal; > import org.rhq.enterprise.server.authz.AuthorizationManagerLocal; > import org.rhq.enterprise.server.authz.PermissionException; > import org.rhq.enterprise.server.measurement.instrumentation.MeasurementMonitor; >-import org.rhq.enterprise.server.util.CriteriaQueryGenerator; >-import org.rhq.enterprise.server.util.CriteriaQueryRunner; >+import org.rhq.enterprise.server.storage.StorageClientManager; >+import org.rhq.server.metrics.CallTimeDAO; >+import org.rhq.server.metrics.CallTimeRow; >+ >+import com.google.common.util.concurrent.FutureCallback; > > /** > * The manager for call-time metric data. >@@ -78,37 +72,12 @@ > @Stateless > @javax.annotation.Resource(name = "RHQ_DS", mappedName = RHQConstants.DATASOURCE_JNDI_NAME) > public class CallTimeDataManagerBean implements CallTimeDataManagerLocal, CallTimeDataManagerRemote { >- private static final String DATA_VALUE_TABLE_NAME = "RHQ_CALLTIME_DATA_VALUE"; >- private static final String DATA_KEY_TABLE_NAME = "RHQ_CALLTIME_DATA_KEY"; >- >- private static final String CALLTIME_KEY_INSERT_STATEMENT = "INSERT INTO " + DATA_KEY_TABLE_NAME >- + "(id, schedule_id, call_destination) " + "SELECT %s, ?, ? FROM RHQ_numbers WHERE i = 42 " >- + "AND NOT EXISTS (SELECT * FROM " + DATA_KEY_TABLE_NAME + " WHERE schedule_id = ? AND call_destination = ?)"; >- >- private static final String CALLTIME_KEY_INSERT_STATEMENT_AUTOINC = "INSERT INTO " + DATA_KEY_TABLE_NAME >- + "(schedule_id, call_destination) " + "SELECT ?, ? FROM RHQ_numbers WHERE i = 42 " >- + "AND NOT EXISTS (SELECT * FROM " + DATA_KEY_TABLE_NAME + " WHERE schedule_id = ? AND call_destination = ?)"; >- >- private static final String CALLTIME_VALUE_INSERT_STATEMENT = "INSERT /*+ APPEND */ INTO " + DATA_VALUE_TABLE_NAME >- + "(id, key_id, begin_time, end_time, minimum, maximum, total, count) " >- + "SELECT %s, key.id, ?, ?, ?, ?, ?, ? FROM " + DATA_KEY_TABLE_NAME >- + " key WHERE key.schedule_id = ? AND key.call_destination = ?"; >- >- private static final String CALLTIME_VALUE_INSERT_STATEMENT_AUTOINC = "INSERT INTO " + DATA_VALUE_TABLE_NAME >- + "(key_id, begin_time, end_time, minimum, maximum, total, count) SELECT key.id, ?, ?, ?, ?, ?, ? FROM " >- + DATA_KEY_TABLE_NAME + " key WHERE key.schedule_id = ? AND key.call_destination = ?"; >- >- private static final String CALLTIME_VALUE_PURGE_STATEMENT = "DELETE FROM " + DATA_VALUE_TABLE_NAME >- + " WHERE end_time < ?"; > > private final Log log = LogFactory.getLog(CallTimeDataManagerBean.class); > > @PersistenceContext(unitName = RHQConstants.PERSISTENCE_UNIT_NAME) > private EntityManager entityManager; > >- @javax.annotation.Resource(name = "RHQ_DS") >- private DataSource rhqDs; >- > @EJB > private AuthorizationManagerLocal authorizationManager; > >@@ -118,27 +87,43 @@ > @EJB > private AlertConditionCacheManagerLocal alertConditionCacheManager; > >+ @EJB >+ private StorageClientManager storageClientManager; >+ >+ @EJB >+ private MeasurementScheduleManagerLocal measurementScheduleManager; >+ >+ @EJB >+ private SubjectManagerLocal subjectManager; >+ > @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED) >- public void addCallTimeData(@NotNull Set<CallTimeData> callTimeDataSet) { >- if (callTimeDataSet.isEmpty()) { >+ public void addCallTimeData(@NotNull final Set<CallTimeData> data) { >+ if (data.isEmpty()) { > return; > } > >- log.debug("Persisting call-time data for " + callTimeDataSet.size() + " schedules..."); >- long startTime = System.currentTimeMillis(); >+ final long startTime = System.currentTimeMillis(); >+ CallTimeDAO dao = storageClientManager.getCallTimeDAO(); >+ dao.insert(data, new FutureCallback<Object>() { > >- // First make sure a single row exists in the key table for each reported call destination. >- callTimeDataManager.insertCallTimeDataKeys(callTimeDataSet); >+ @Override >+ public void onFailure(Throwable e) { >+ log.error("Error persisting calltime data " + data.size(), e); >+ } > >- // Finally, add the stats themselves to the value table. >- callTimeDataManager.insertCallTimeDataValues(callTimeDataSet); >- MeasurementMonitor.getMBean().incrementCallTimeInsertTime(System.currentTimeMillis() - startTime); >+ @Override >+ public void onSuccess(Object result) { >+ MeasurementMonitor.getMBean().incrementCallTimeInsertTime(System.currentTimeMillis() - startTime); >+ notifyAlertConditionCacheManager("insertCallTimeDataValues", >+ data.toArray(new CallTimeData[data.size()])); >+ } >+ }); > > } > >- public PageList<CallTimeDataComposite> findCallTimeDataRawForResource(Subject subject, int scheduleId, long beginTime, >- long endTime, PageControl pageControl) { >- pageControl.initDefaultOrderingField("value.beginTime", PageOrdering.ASC); >+ public PageList<CallTimeDataComposite> findCallTimeDataRawForResource(Subject subject, >+ int scheduleId, long beginTime, long endTime, PageControl pageControl) >+ { > MeasurementSchedule schedule = entityManager.find(MeasurementSchedule.class, scheduleId); > int resourceId = schedule.getResource().getId(); > if (authorizationManager.canViewResource(subject, resourceId) == false) { >@@ -146,33 +131,29 @@ public void addCallTimeData(@NotNull Set<CallTimeData> callTimeDataSet) { > + "] does not have permission to view call time data for measurementSchedule[id=" + scheduleId > + "] and resource[id=" + resourceId + "]"); > } >- String query = CallTimeDataValue.QUERY_FIND_RAW_FOR_RESOURCE; > >- Query queryWithOrderBy = PersistenceUtility.createQueryWithOrderBy(entityManager, query, pageControl); >- Query queryCount = PersistenceUtility.createCountQuery(this.entityManager, query); >+ CallTimeDAO dao = storageClientManager.getCallTimeDAO(); >+ List<CallTimeRow> select = dao.select(scheduleId, new Date(beginTime), new Date(endTime)); > >- queryWithOrderBy.setParameter("scheduleId", scheduleId); >- queryWithOrderBy.setParameter("beginTime", beginTime); >- queryWithOrderBy.setParameter("endTime", endTime); >+ // TODO sorting fields >+ // TODO paging > >- queryCount.setParameter("scheduleId", scheduleId); >- queryCount.setParameter("beginTime", beginTime); >- queryCount.setParameter("endTime", endTime); >- >- @SuppressWarnings("unchecked") >- List<CallTimeDataComposite> results = queryWithOrderBy.getResultList(); >- long count = (Long)queryCount.getSingleResult(); >+ List<CallTimeDataComposite> comps = new ArrayList<CallTimeDataComposite>(); >+ for (CallTimeRow row : select) { >+ comps.add(row.toComposite()); >+ } > >- return new PageList<CallTimeDataComposite>(results, (int) count, pageControl); >+ return new PageList<CallTimeDataComposite>(comps, (int) comps.size(), pageControl); > } > >- @SuppressWarnings("unchecked") > public PageList<CallTimeDataComposite> findCallTimeDataForResource(Subject subject, int scheduleId, long beginTime, > long endTime, PageControl pageControl) { >- pageControl.initDefaultOrderingField("SUM(value.total)/SUM(value.count)", PageOrdering.DESC); // only set if no ordering yet specified >- pageControl.addDefaultOrderingField("key.callDestination", PageOrdering.ASC); // add this to sort, if not already specified > > MeasurementSchedule schedule = entityManager.find(MeasurementSchedule.class, scheduleId); >+ if (schedule == null) { >+ return new PageList<CallTimeDataComposite>(pageControl); >+ } >+ > int resourceId = schedule.getResource().getId(); > if (authorizationManager.canViewResource(subject, resourceId) == false) { > throw new PermissionException("User [" + subject >@@ -180,26 +161,38 @@ public void addCallTimeData(@NotNull Set<CallTimeData> callTimeDataSet) { > + "] and resource[id=" + resourceId + "]"); > } > >- String query = CallTimeDataValue.QUERY_FIND_COMPOSITES_FOR_RESOURCE; >+ CallTimeDAO dao = storageClientManager.getCallTimeDAO(); >+ List<CallTimeRow> select = dao.select(scheduleId, null, new Date(beginTime), new Date(endTime)); > >- Query queryWithOrderBy = PersistenceUtility.createQueryWithOrderBy(entityManager, query, pageControl); >- Query queryCount = PersistenceUtility.createCountQuery(this.entityManager, query); >+ Aggregator a = new Aggregator(); >+ a.aggregate(select); >+ List<CallTimeDataComposite> comps = a.result(); > >- queryWithOrderBy.setParameter("scheduleId", scheduleId); >- queryWithOrderBy.setParameter("beginTime", beginTime); >- queryWithOrderBy.setParameter("endTime", endTime); >- >- List<CallTimeDataComposite> results = queryWithOrderBy.getResultList(); >+ return new PageList<CallTimeDataComposite>(comps, comps.size(), pageControl); >+ } > >- queryCount.setParameter("scheduleId", scheduleId); >- queryCount.setParameter("beginTime", beginTime); >- queryCount.setParameter("endTime", endTime); >+ static class Aggregator { >+ Map<String, List<CallTimeRow>> rows = new TreeMap<String, List<CallTimeRow>>(); > >- // Because of the use of the GROUP BY clause, the result list count will be returned as >- // the number of rows, rather than as a single number. >- long count = queryCount.getResultList().size(); >+ private void aggregate(List<CallTimeRow> select) { >+ for (CallTimeRow row : select) { >+ String d = row.getDest(); >+ List<CallTimeRow> list = rows.get(d); >+ if (list == null) { >+ rows.put(d, list = new ArrayList<CallTimeRow>()); >+ } >+ list.add(row); >+ } >+ } > >- return new PageList<CallTimeDataComposite>(results, (int) count, pageControl); >+ private List<CallTimeDataComposite> result() { >+ List<CallTimeDataComposite> comps = new LinkedList<CallTimeDataComposite>(); >+ for (Entry<String, List<CallTimeRow>> rowlist : rows.entrySet()) { >+ CallTimeRow aggregate = CallTimeRow.aggregate(rowlist.getValue()); >+ comps.add(aggregate.toComposite()); >+ } >+ return comps; >+ } > } > > public PageList<CallTimeDataComposite> findCallTimeDataForCompatibleGroup(Subject subject, int groupId, >@@ -232,12 +225,18 @@ public void addCallTimeData(@NotNull Set<CallTimeData> callTimeDataSet) { > public PageList<CallTimeDataComposite> findCallTimeDataForContext(Subject subject, EntityContext context, > CallTimeDataCriteria criteria) { > >+ if (log.isDebugEnabled()) { >+ log.debug("find " + subject + " " + context + " " + criteria); >+ } >+ >+ /* > PageControl pageControl = criteria.getPageControlOverrides(); > if (pageControl != null) { > pageControl.initDefaultOrderingField("SUM(calltimedatavalue.total)/SUM(calltimedatavalue.count)", > PageOrdering.DESC); // only set if no ordering yet specified > pageControl.addDefaultOrderingField("calltimedatavalue.key.callDestination", PageOrdering.ASC); // add this to sort, if not already specified > } >+ */ > > if (context.type == EntityContext.Type.Resource) { > criteria.addFilterResourceId(context.resourceId); >@@ -247,237 +246,91 @@ public void addCallTimeData(@NotNull Set<CallTimeData> callTimeDataSet) { > criteria.addFilterAutoGroupParentResourceId(context.parentResourceId); > criteria.addFilterAutoGroupResourceTypeId(context.resourceTypeId); > } >- criteria.setSupportsAddSortId(false); >- >- CriteriaQueryGenerator generator = new CriteriaQueryGenerator(subject, criteria); >- String replacementSelectList = "" // >- + " new org.rhq.core.domain.measurement.calltime.CallTimeDataComposite( " // >- + " calltimedatavalue.key.callDestination, " // >- + " MIN(calltimedatavalue.minimum), " // >- + " MAX(calltimedatavalue.maximum), " // >- + " SUM(calltimedatavalue.total), " // >- + " SUM(calltimedatavalue.count), " // >- + " SUM(calltimedatavalue.total) / SUM(calltimedatavalue.count) ) "; >- generator.alterProjection(replacementSelectList); >- generator.setGroupByClause("calltimedatavalue.key.callDestination"); >- >- if (authorizationManager.isInventoryManager(subject) == false) { >- generator.setAuthorizationResourceFragment(CriteriaQueryGenerator.AuthorizationTokenType.RESOURCE, >- "key.schedule.resource", subject.getId()); >- } > >- //log.info(generator.getParameterReplacedQuery(false)); >- //log.info(generator.getParameterReplacedQuery(true)); >+ MeasurementScheduleCriteria criteria2 = new MeasurementScheduleCriteria(criteria); > >- CriteriaQueryRunner<CallTimeDataComposite> queryRunner = new CriteriaQueryRunner<CallTimeDataComposite>( >- criteria, generator, entityManager); >- PageList<CallTimeDataComposite> results = queryRunner.execute(); >- return results; >- } >+ criteria2.addFilterDataType(DataType.CALLTIME); >+ PageList<MeasurementSchedule> schedules = >+ measurementScheduleManager.findSchedulesByCriteria(subjectManager.getOverlord(), criteria2); > >- /** >- * Deletes call-time data older than the specified time. >- * >- * @param deleteUpToTime call-time data older than this time will be deleted >- */ >- @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW) >- @TransactionTimeout(6 * 60 * 60) >- public int purgeCallTimeData(Date deleteUpToTime) throws SQLException { >- // NOTE: Apparently, Hibernate does not support DML JPQL queries, so we're stuck using JDBC. >- Connection conn = null; >- PreparedStatement stmt = null; >- try { >- conn = rhqDs.getConnection(); >- >- // Purge old rows from RHQ_CALLTIME_DATA_VALUE. >- stmt = conn.prepareStatement(CALLTIME_VALUE_PURGE_STATEMENT); >- stmt.setLong(1, deleteUpToTime.getTime()); >- >- long startTime = System.currentTimeMillis(); >- int deletedRowCount = stmt.executeUpdate(); >- MeasurementMonitor.getMBean().incrementPurgeTime(System.currentTimeMillis() - startTime); >- MeasurementMonitor.getMBean().setPurgedCallTimeData(deletedRowCount); >- return deletedRowCount; >- >- // NOTE: We do not purge unreferenced rows from RHQ_CALLTIME_DATA_KEY, because this can cause issues >- // (see http://jira.jboss.com/jira/browse/JBNADM-1606). Once we limit the number of keys per >- // resource at insertion time (see http://jira.jboss.com/jira/browse/JBNADM-2618), the key >- // table will not require truncation. >- } finally { >- JDBCUtil.safeClose(conn, stmt, null); >+ if (log.isDebugEnabled()) { >+ log.debug("found schedules " + schedules); > } >- } > >- /* >- * internal method, do not expose to the remote API >- */ >- @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW) >- public void insertCallTimeDataKeys(Set<CallTimeData> callTimeDataSet) { >- >- int[] results; >- String insertKeySql; >- PreparedStatement ps = null; >- Connection conn = null; >- >- try { >- conn = rhqDs.getConnection(); >- DatabaseType dbType = DatabaseTypeFactory.getDatabaseType(conn); >- >- if (dbType instanceof Postgresql83DatabaseType) { >- Statement st = null; >- try { >- // Take advantage of async commit here >- st = conn.createStatement(); >- st.execute("SET synchronous_commit = off"); >- } finally { >- JDBCUtil.safeClose(st); >- } >- } >+ Aggregator a = new Aggregator(); >+ CallTimeDAO dao = storageClientManager.getCallTimeDAO(); > >- if (dbType instanceof PostgresqlDatabaseType || dbType instanceof OracleDatabaseType >- || dbType instanceof H2DatabaseType) { >- String keyNextvalSql = JDBCUtil.getNextValSql(conn, "RHQ_calltime_data_key"); >- insertKeySql = String.format(CALLTIME_KEY_INSERT_STATEMENT, keyNextvalSql); >- } else if (dbType instanceof SQLServerDatabaseType) { >- insertKeySql = CALLTIME_KEY_INSERT_STATEMENT_AUTOINC; >- } else { >- throw new IllegalArgumentException("Unknown database type, can't continue: " + dbType); >- } >+ for (MeasurementSchedule schedule : schedules) { >+ int scheduleId = schedule.getId(); >+ Long begin = criteria.getFilterBeginTime(); >+ Date beginD = begin == null ? new Date(0) : new Date(begin); >+ Long end = criteria.getFilterEndTimeDate(); >+ Date endD = end == null ? new Date(Long.MAX_VALUE) : new Date(end); > >- ps = conn.prepareStatement(insertKeySql); >- for (CallTimeData callTimeData : callTimeDataSet) { >- ps.setInt(1, callTimeData.getScheduleId()); >- ps.setInt(3, callTimeData.getScheduleId()); >- Set<String> callDestinations = callTimeData.getValues().keySet(); >- for (String callDestination : callDestinations) { >- ps.setString(2, callDestination); >- ps.setString(4, callDestination); >- ps.addBatch(); >- } >- } >+ // note that destination filter is exact match >+ List<CallTimeRow> select = dao.select(scheduleId, /*criteria.getFilterDestination(),*/ beginD, endD); >+ log.info("selected " + select); >+ a.aggregate(select); >+ } > >- results = ps.executeBatch(); >+ List<CallTimeDataComposite> comps = a.result(); >+ if (log.isDebugEnabled()) { >+ log.debug("found comps " + comps); >+ } > >- int insertedRowCount = 0; >- for (int i = 0; i < results.length; i++) { >- if (((results[i] < 0) || (results[i] > 1)) && (results[i] != -2)) // oracle returns -2 because it can't count updated rows >- { >- throw new MeasurementStorageException("Failed to insert call-time data key rows - result [" >- + results[i] + "] for batch command [" + i + "] is less than 0 or greater than 1."); >- } >+ filter(criteria, comps); > >- insertedRowCount += results[i] == -2 ? 1 : results[i]; // If Oracle returns -2, just count 1 row >+ Collections.sort(comps, new Comparator<CallTimeDataComposite>() { >+ >+ @Override >+ public int compare(CallTimeDataComposite o1, CallTimeDataComposite o2) { >+ if (o1.getAverage() == o2.getAverage()) { // zero? >+ return o1.getCallDestination().compareTo(o2.getCallDestination()); >+ } >+ // sort by average (descending) >+ return o1.getAverage() > o2.getAverage() ? -1 : 1; > } > >- log.debug("Inserted new call-time data key rows for " + ((insertedRowCount >= 0) ? insertedRowCount : "?") >- + " out of " + results.length + " reported key-value pairs."); >- } catch (SQLException e) { >- logSQLException("Failed to persist call-time data keys", e); >- } catch (Throwable t) { >- log.error("Failed to persist call-time data keys", t); >- } finally { >- JDBCUtil.safeClose(conn, ps, null); >+ }); >+ >+ if (log.isDebugEnabled()) { >+ log.debug("results are " + comps); > } >+ PageList<CallTimeDataComposite> results2 = new PageList<CallTimeDataComposite>(comps, PageControl.getUnlimitedInstance()); >+ return results2; > } > >- /* >- * internal method, do not expose to the remote API >- */ >- @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW) >- public void insertCallTimeDataValues(Set<CallTimeData> callTimeDataSet) { >- int[] results; >- String insertValueSql; >- PreparedStatement ps = null; >- Connection conn = null; >- >- try { >- conn = rhqDs.getConnection(); >- DatabaseType dbType = DatabaseTypeFactory.getDatabaseType(conn); >- >- if (dbType instanceof Postgresql83DatabaseType) { >- Statement st = null; >- try { >- // Take advantage of async commit here >- st = conn.createStatement(); >- st.execute("SET synchronous_commit = off"); >- } finally { >- JDBCUtil.safeClose(st); >- } >+ private void filter(CallTimeDataCriteria criteria, List<CallTimeDataComposite> comps) { >+ String dest = criteria.getFilterDestination(); >+ Double min = criteria.getFilterMinimum(); >+ Double max = criteria.getFilterMaximum(); >+ Double total = criteria.getFilterTotal(); >+ Long count = criteria.getFilterCount(); >+ for (Iterator<CallTimeDataComposite> i = comps.iterator(); i.hasNext(); ) { >+ CallTimeDataComposite comp = i.next(); >+ if (dest != null && !comp.getCallDestination().contains(dest)) { >+ i.remove(); > } >- >- if (dbType instanceof PostgresqlDatabaseType || dbType instanceof OracleDatabaseType >- || dbType instanceof H2DatabaseType) { >- String valueNextvalSql = JDBCUtil.getNextValSql(conn, "RHQ_calltime_data_value"); >- insertValueSql = String.format(CALLTIME_VALUE_INSERT_STATEMENT, valueNextvalSql); >- } else if (dbType instanceof SQLServerDatabaseType) { >- insertValueSql = CALLTIME_VALUE_INSERT_STATEMENT_AUTOINC; >- } else { >- throw new IllegalArgumentException("Unknown database type, can't continue: " + dbType); >+ if (min != null && comp.getMinimum() < min) { >+ i.remove(); > } >- >- ps = conn.prepareStatement(insertValueSql); >- for (CallTimeData callTimeData : callTimeDataSet) { >- ps.setInt(7, callTimeData.getScheduleId()); >- Set<String> callDestinations = callTimeData.getValues().keySet(); >- for (String callDestination : callDestinations) { >- CallTimeDataValue callTimeDataValue = callTimeData.getValues().get(callDestination); >- ps.setLong(1, callTimeDataValue.getBeginTime()); >- ps.setLong(2, callTimeDataValue.getEndTime()); >- ps.setDouble(3, callTimeDataValue.getMinimum()); >- ps.setDouble(4, callTimeDataValue.getMaximum()); >- ps.setDouble(5, callTimeDataValue.getTotal()); >- ps.setLong(6, callTimeDataValue.getCount()); >- ps.setString(8, callDestination); >- ps.addBatch(); >- } >+ if (max != null && comp.getMaximum() > max) { >+ i.remove(); > } >- >- results = ps.executeBatch(); >- >- int insertedRowCount = 0; >- for (int i = 0; i < results.length; i++) { >- if ((results[i] != 1) && (results[i] != -2)) // Oracle likes to return -2 becuase it doesn't track batch update counts >- { >- throw new MeasurementStorageException("Failed to insert call-time data value rows - result [" >- + results[i] + "] for batch command [" + i + "] does not equal 1."); >- } >- >- insertedRowCount += results[i] == -2 ? 1 : results[i]; // If Oracle returns -2, just count 1 row; >+ if (total != null && comp.getTotal() < total) { >+ i.remove(); > } >- >- notifyAlertConditionCacheManager("insertCallTimeDataValues", callTimeDataSet >- .toArray(new CallTimeData[callTimeDataSet.size()])); >- >- if (insertedRowCount > 0) { >- MeasurementMonitor.getMBean().incrementCalltimeValuesInserted(insertedRowCount); >- >- log.debug("Inserted " + insertedRowCount + " call-time data value rows."); >+ if (count != null && comp.getCount() < count) { >+ i.remove(); > } >- >- } catch (SQLException e) { >- logSQLException("Failed to persist call-time data values", e); >- } catch (Throwable t) { >- log.error("Failed to persist call-time data values", t); >- } finally { >- JDBCUtil.safeClose(conn, ps, null); > } >- > } > > private void notifyAlertConditionCacheManager(String callingMethod, CallTimeData... data) { > AlertConditionCacheStats stats = alertConditionCacheManager.checkConditions(data); >- >- log.debug(callingMethod + ": " + stats.toString()); >- } >- >- private void logSQLException(String message, SQLException e) { >- SQLException mainException = e; >- StringBuilder causes = new StringBuilder(); >- int i = 1; >- while ((e = e.getNextException()) != null) { >- causes.append(i++).append("\n\t").append(e); >+ if (log.isDebugEnabled()) { >+ log.debug(callingMethod + ": " + stats.toString()); > } >- >- log.error(message + " - causes: " + causes, mainException); > } >-} >\ No newline at end of file >+ >+} >diff --git a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/CallTimeDataManagerLocal.java b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/CallTimeDataManagerLocal.java >index 7cb2416..bba594a 100644 >--- a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/CallTimeDataManagerLocal.java >+++ b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/CallTimeDataManagerLocal.java >@@ -51,12 +51,4 @@ > PageList<CallTimeDataComposite> findCallTimeDataForContext(Subject subject, EntityContext context, long beginTime, > long endTime, String destination, PageControl pageControl); > >- int purgeCallTimeData(Date deleteUpToTime) throws SQLException; >- >- /* >- * internal methods that are exposed here so as to enable finer-grained manipulation of transactional boundaries >- */ >- void insertCallTimeDataKeys(Set<CallTimeData> callTimeDataSet); >- >- void insertCallTimeDataValues(Set<CallTimeData> callTimeDataSet); > } >\ No newline at end of file >diff --git a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementDataManagerBean.java b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementDataManagerBean.java >index 93fd9f4..d3d0868 100644 >--- a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementDataManagerBean.java >+++ b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementDataManagerBean.java >@@ -18,6 +18,9 @@ > */ > package org.rhq.enterprise.server.measurement; > >+import static java.util.Collections.emptyList; >+import static java.util.Collections.emptySet; >+ > import java.sql.Connection; > import java.sql.PreparedStatement; > import java.sql.ResultSet; >@@ -27,6 +30,7 @@ > import java.util.Collection; > import java.util.Collections; > import java.util.Comparator; >+import java.util.Date; > import java.util.HashMap; > import java.util.HashSet; > import java.util.Iterator; >@@ -45,20 +49,17 @@ > import javax.ejb.TransactionAttributeType; > import javax.persistence.EntityManager; > import javax.persistence.FlushModeType; >-import javax.persistence.NoResultException; > import javax.persistence.PersistenceContext; > import javax.persistence.Query; > import javax.sql.DataSource; > > import com.google.common.base.Stopwatch; >+import com.google.common.util.concurrent.FutureCallback; > > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; > import org.jetbrains.annotations.Nullable; >- >-import org.jboss.ejb3.annotation.TransactionTimeout; > import org.jboss.remoting.CannotConnectException; >- > import org.rhq.core.domain.auth.Subject; > import org.rhq.core.domain.common.EntityContext; > import org.rhq.core.domain.criteria.MeasurementDataTraitCriteria; >@@ -80,13 +81,10 @@ > import org.rhq.core.domain.resource.ResourceType; > import org.rhq.core.domain.resource.composite.ResourceIdWithAgentComposite; > import org.rhq.core.domain.resource.group.ResourceGroup; >-import org.rhq.core.domain.server.PersistenceUtility; >-import org.rhq.core.domain.util.OrderingField; > import org.rhq.core.domain.util.PageControl; > import org.rhq.core.domain.util.PageList; > import org.rhq.core.domain.util.PageOrdering; > import org.rhq.core.util.collection.ArrayUtils; >-import org.rhq.core.util.exception.ThrowableUtil; > import org.rhq.core.util.jdbc.JDBCUtil; > import org.rhq.enterprise.server.RHQConstants; > import org.rhq.enterprise.server.agentclient.AgentClient; >@@ -102,10 +100,9 @@ > import org.rhq.enterprise.server.resource.group.ResourceGroupManagerLocal; > import org.rhq.enterprise.server.rest.ResourceHandlerBean; > import org.rhq.enterprise.server.storage.StorageClientManager; >-import org.rhq.enterprise.server.util.CriteriaQueryGenerator; >-import org.rhq.enterprise.server.util.CriteriaQueryRunner; > import org.rhq.server.metrics.MetricsServer; > import org.rhq.server.metrics.RawDataInsertedCallback; >+import org.rhq.server.metrics.TraitsDAO; > import org.rhq.server.metrics.domain.AggregateNumericMetric; > import org.rhq.server.metrics.domain.RawNumericMetric; > >@@ -119,21 +116,6 @@ > @Stateless > @javax.annotation.Resource(name = "RHQ_DS", mappedName = RHQConstants.DATASOURCE_JNDI_NAME) > public class MeasurementDataManagerBean implements MeasurementDataManagerLocal, MeasurementDataManagerRemote { >- // time_stamp, schedule_id, value, schedule_id, schedule_id, value, value, value, value >- private static final String TRAIT_INSERT_STATEMENT = "INSERT INTO RHQ_measurement_data_trait \n" >- + " SELECT ?, ?, ? FROM RHQ_numbers n \n" >- + " WHERE n.i = 42 \n" >- + " AND NOT EXISTS \n" >- + " ( \n" >- + " SELECT 1 \n" >- + " FROM (SELECT dt2.value as v \n" >- + " FROM RHQ_measurement_data_trait dt2 \n" >- + " WHERE dt2.schedule_id = ? \n" >- + " AND dt2.time_stamp = \n" >- + " (SELECT max(dt3.time_stamp) FROM RHQ_measurement_data_trait dt3 WHERE dt3.schedule_id = ?)) lastValue \n" >- + " WHERE NOT ((? is null AND lastValue.v is not null) \n" >- + " OR (? is not null AND lastValue.v is null) \n" >- + " OR (? is not null AND lastValue.v is not null AND ? <> lastValue.v)) \n" + " )"; > > private final Log log = LogFactory.getLog(MeasurementDataManagerBean.class); > >@@ -172,29 +154,6 @@ > @EJB > private SubjectManagerLocal subjectManager; > >- // doing a bulk delete in here, need to be in its own tx >- @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW) >- @TransactionTimeout(6 * 60 * 60) >- public int purgeTraits(long oldest) { >- Connection conn = null; >- PreparedStatement stmt = null; >- >- try { >- conn = rhqDs.getConnection(); >- stmt = conn.prepareStatement(MeasurementDataTrait.NATIVE_QUERY_PURGE); >- stmt.setLong(1, oldest); >- long startTime = System.currentTimeMillis(); >- int deleted = stmt.executeUpdate(); >- MeasurementMonitor.getMBean().incrementPurgeTime(System.currentTimeMillis() - startTime); >- MeasurementMonitor.getMBean().setPurgedMeasurementTraits(deleted); >- return deleted; >- } catch (Exception e) { >- throw new RuntimeException("Failed to purge traits older than [" + oldest + "]", e); >- } finally { >- JDBCUtil.safeClose(conn, stmt, null); >- } >- } >- > @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED) > public void mergeMeasurementReport(MeasurementReport report) { > long start = System.currentTimeMillis(); >@@ -240,9 +199,9 @@ public void addNumericData(final Set<MeasurementDataNumeric> data) { > MetricsServer metricsServer = storageClientManager.getMetricsServer(); > metricsServer.addNumericData(data, new RawDataInsertedCallback() { > >- private ReentrantLock lock = new ReentrantLock(); >+ private final ReentrantLock lock = new ReentrantLock(); > >- private Set<MeasurementData> insertedData = new TreeSet<MeasurementData>(new Comparator<MeasurementData>() { >+ private final Set<MeasurementData> insertedData = new TreeSet<MeasurementData>(new Comparator<MeasurementData>() { > @Override > public int compare(MeasurementData d1, MeasurementData d2) { > return (d1.getTimestamp() < d2.getTimestamp()) ? -1 : ((d1.getTimestamp() == d2.getTimestamp()) ? 0 : 1); >@@ -271,46 +230,22 @@ public void onFailure(Throwable throwable) { > }); > } > >- @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW) >- public void addTraitData(Set<MeasurementDataTrait> data) { >- if ((data == null) || (data.isEmpty())) { >- return; >- } >+ @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED) >+ public void addTraitData(final Set<MeasurementDataTrait> data) { >+ TraitsDAO dao = storageClientManager.getTraitsDAO(); >+ dao.insertTraits(data, new FutureCallback<Object>() { > >- Connection conn = null; >- PreparedStatement ps = null; >- try { >- conn = rhqDs.getConnection(); >- ps = conn.prepareStatement(TRAIT_INSERT_STATEMENT); >- >- for (MeasurementDataTrait aData : data) { >- // time_stamp, schedule_id, value, schedule_id, schedule_id, value, value, value, value >- ps.setLong(1, aData.getTimestamp()); >- ps.setInt(2, aData.getScheduleId()); >- ps.setString(3, aData.getValue()); >- ps.setInt(4, aData.getScheduleId()); >- ps.setInt(5, aData.getScheduleId()); >- ps.setString(6, aData.getValue()); >- ps.setString(7, aData.getValue()); >- ps.setString(8, aData.getValue()); >- ps.setString(9, aData.getValue()); >- ps.addBatch(); >+ @Override >+ public void onFailure(Throwable e) { >+ log.error("Error persisting trait data " + data.size(), e); > } > >- int[] res = ps.executeBatch(); >- if (res.length != data.size()) { >- throw new MeasurementStorageException("Failure to store measurement trait data."); >- // It is expected that some of these batch updates didn't update anything as the previous value was the same >+ @Override >+ public void onSuccess(Object result) { >+ notifyAlertConditionCacheManager("mergeMeasurementReport", >+ data.toArray(new MeasurementData[data.size()])); > } >- >- notifyAlertConditionCacheManager("mergeMeasurementReport", data.toArray(new MeasurementData[data.size()])); >- } catch (SQLException e) { >- log.warn("Failure saving measurement trait data:\n" + ThrowableUtil.getAllMessages(e)); >- } catch (Exception e) { >- log.error("Error persisting trait data", e); >- } finally { >- JDBCUtil.safeClose(conn, ps, null); >- } >+ }); > } > > /** >@@ -545,50 +480,10 @@ public void addTraitData(Set<MeasurementDataTrait> data) { > return resMap; > } > >- /** >- * Helper to fill the name of the trait from the passed array into the MeasurementDataTrait object. The input is a >- * tuple [MeasurementDataTrait,String name, Short displayOrder]. >- * >- * @param objs Tuple {@link MeasurementDataTrait},String,Short >- * >- * @return {@link MeasurementDataTrait} where the name property is set. Or null if the input was null. >- */ >- private MeasurementDataTrait fillMeasurementDataTraitFromObjectArray(Object[] objs) { >- if (objs == null) { >- return null; >- } >- >- MeasurementDataTrait mdt = (MeasurementDataTrait) objs[0]; >- String name = (String) objs[1]; >- >- mdt.setName(name); >- return mdt; >- } >- >- /** >- * Return the current trait value for the passed schedule >- * >- * @param scheduleId id of a MeasurementSchedule that 'points' to a Trait >- * >- * @return One trait or null if nothing was found in the db. >- */ > @Nullable > public MeasurementDataTrait getCurrentTraitForSchedule(int scheduleId) { >- Query q = entityManager.createNamedQuery(MeasurementDataTrait.FIND_CURRENT_FOR_SCHEDULES); >- q.setParameter("scheduleIds", Collections.singletonList(scheduleId)); >- Object[] res; >- try { >- res = (Object[]) q.getSingleResult(); >- MeasurementDataTrait trait = fillMeasurementDataTraitFromObjectArray(res); >- >- return trait; >- } catch (NoResultException nre) { >- if (log.isDebugEnabled()) { >- log.debug("No current trait data for schedule with id [" + scheduleId + "] found"); >- } >- >- return null; >- } >+ MeasurementDataTrait trait = storageClientManager.getTraitsDAO().currentTrait(scheduleId); >+ return trait; > } > > @Nullable >@@ -728,7 +623,6 @@ public MeasurementAggregate getAggregate(Subject subject, int groupId, int defin > * > * @return a List of MeasurementDataTrait > */ >- @SuppressWarnings("unchecked") > public List<MeasurementDataTrait> findCurrentTraitsForResource(Subject subject, int resourceId, > DisplayType displayType) { > if (authorizationManager.canViewResource(subject, resourceId) == false) { >@@ -736,38 +630,40 @@ public MeasurementAggregate getAggregate(Subject subject, int groupId, int defin > + "] does not have permission to view traits for resource[id=" + resourceId + "]"); > } > >- Query query; >- List<Object[]> qres; >+ MeasurementScheduleCriteria criteria = new MeasurementScheduleCriteria(); >+ criteria.addFilterResourceId(resourceId); >+ criteria.addFilterDisplayType(displayType); >+ criteria.setPageControl(PageControl.getUnlimitedInstance()); >+ PageList<MeasurementSchedule> list = measurementScheduleManager.findSchedulesByCriteria(subjectManager.getOverlord(), criteria); > >- if (displayType == null) { >- // query = entityManager.createNamedQuery(MeasurementDataTrait.FIND_CURRENT_FOR_RESOURCE); >- query = PersistenceUtility.createQueryWithOrderBy(entityManager, >- MeasurementDataTrait.FIND_CURRENT_FOR_RESOURCE, new OrderingField("d.displayOrder", PageOrdering.ASC)); >- } else { >- query = PersistenceUtility.createQueryWithOrderBy(entityManager, >- MeasurementDataTrait.FIND_CURRENT_FOR_RESOURCE_AND_DISPLAY_TYPE, new OrderingField("d.displayOrder", >- PageOrdering.ASC)); >- query.setParameter("displayType", displayType); >- } >+ TraitsDAO dao = storageClientManager.getTraitsDAO(); > >- query.setParameter("resourceId", resourceId); >- qres = query.getResultList(); >+ List<MeasurementDataTrait> traits = dao.queryAll(list, false); >+ for (MeasurementDataTrait trait : traits) { >+ MeasurementSchedule sched = entityManager.find(MeasurementSchedule.class, trait.getScheduleId()); >+ // use display name here not definition name >+ trait.setSchedule(sched); >+ trait.setName(sched.getDefinition().getDisplayName()); >+ } > >- /* >- * Now that we have everything from the query (it returns a tuple <MeasurementDataTrait,DislayName> of the >- * definition), lets create the method output. >- */ >- List<MeasurementDataTrait> result = new ArrayList<MeasurementDataTrait>(qres.size()); >- for (Object[] objs : qres) { >- MeasurementDataTrait mdt = fillMeasurementDataTraitFromObjectArray(objs); >- result.add(mdt); >+ Collections.sort(traits, new Comparator<MeasurementDataTrait>() { >+ public int compare(MeasurementDataTrait t1, MeasurementDataTrait t2) { >+ int o1 = t1.getSchedule().getDefinition().getDisplayOrder(); >+ int o2 = t2.getSchedule().getDefinition().getDisplayOrder(); >+ return o1 - o2; >+ } >+ }); >+ >+ // null out schedule -- not needed for display >+ for (MeasurementDataTrait trait : traits) { >+ trait.setSchedule(null); > } > > if (log.isDebugEnabled()) { >- log.debug("getCurrentTraitsForResource(" + resourceId + ") -> result is " + result); >+ log.debug("getCurrentTraitsForResource(" + resourceId + ") -> result is " + traits); > } > >- return result; >+ return traits; > } > > @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED) >@@ -882,7 +778,7 @@ public MeasurementAggregate getAggregate(Subject subject, int groupId, int defin > } > // return an empty collection if no definition ids were provided > if (definitionIds == null || definitionIds.length == 0) { >- return Collections.<MeasurementData>emptySet(); >+ return emptySet(); > } > > Query query = entityManager.createNamedQuery(Agent.QUERY_FIND_BY_RESOURCE_ID); >@@ -892,7 +788,7 @@ public MeasurementAggregate getAggregate(Subject subject, int groupId, int defin > // return empty data if the agent is the dummy one > if (agent.getName().startsWith(ResourceHandlerBean.DUMMY_AGENT_NAME_PREFIX) > && agent.getAgentToken().startsWith(ResourceHandlerBean.DUMMY_AGENT_TOKEN_PREFIX)) { >- return Collections.<MeasurementData> emptySet(); >+ return emptySet(); > } > > query = entityManager.createNamedQuery(MeasurementSchedule.FIND_BY_RESOURCE_IDS_AND_DEFINITION_IDS); >@@ -928,7 +824,7 @@ public MeasurementAggregate getAggregate(Subject subject, int groupId, int defin > //[BZ 760139] always return non-null value even when there are errors on the server side. Avoids cryptic > // Global UI Exceptions when attempting to serialize null responses. > if (null == result) { >- result = Collections.<MeasurementData>emptySet(); >+ result = emptySet(); > } > > return result; >@@ -944,9 +840,9 @@ public MeasurementAggregate getAggregate(Subject subject, int groupId, int defin > } > // return an empty collection if no definition ids were provided > if (definitionIds == null || definitionIds.length == 0) { >- return Collections.<MeasurementData>emptySet(); >+ return emptySet(); > } >- >+ > Set<MeasurementData> values = new HashSet<MeasurementData>(); > if (resourceIds != null) { > Query query = entityManager.createNamedQuery(Agent.QUERY_FIND_RESOURCE_IDS_WITH_AGENTS_BY_RESOURCE_IDS); >@@ -958,7 +854,7 @@ public MeasurementAggregate getAggregate(Subject subject, int groupId, int defin > if (resourceIdWithAgent.getAgent().getName().startsWith(ResourceHandlerBean.DUMMY_AGENT_NAME_PREFIX) > && resourceIdWithAgent.getAgent().getAgentToken() > .startsWith(ResourceHandlerBean.DUMMY_AGENT_TOKEN_PREFIX)) { >- values.addAll(Collections.<MeasurementData> emptySet()); >+ values.addAll(Collections.<MeasurementData>emptySet()); > continue; > } > >@@ -1042,52 +938,74 @@ public MeasurementAggregate getAggregate(Subject subject, int groupId, int defin > + "] and definition[id=" + definitionId + "]"); > } > >- Query q = entityManager.createNamedQuery(MeasurementDataTrait.FIND_ALL_FOR_RESOURCE_AND_DEFINITION); >- q.setParameter("resourceId", resourceId); >- q.setParameter("definitionId", definitionId); >- List<Object[]> queryResult = q.getResultList(); >- >- List<MeasurementDataTrait> result = new ArrayList<MeasurementDataTrait>(queryResult.size()); >- >- for (Object[] objs : queryResult) { >- MeasurementDataTrait mdt = fillMeasurementDataTraitFromObjectArray(objs); >- result.add(mdt); >+ // todo optimize this >+ List<MeasurementSchedule> scheds = measurementScheduleManager. >+ findSchedulesByResourceIdsAndDefinitionIds(new int[] { resourceId }, new int[] { definitionId}); >+ if (scheds.isEmpty()) { >+ return emptyList(); > } >- >- return result; >+ MeasurementSchedule sched = scheds.get(0); >+ List<MeasurementDataTrait> traits = storageClientManager.getTraitsDAO().historyFor(sched.getId()); >+ return traits; > } > > public PageList<MeasurementDataTrait> findTraitsByCriteria(Subject subject, MeasurementDataTraitCriteria criteria) { >- CriteriaQueryGenerator generator = new CriteriaQueryGenerator(subject, criteria); > >- Map<String, Object> filterFields = generator.getFilterFields(criteria); >- if (!this.authorizationManager.isInventoryManager(subject)) { >- generator.setAuthorizationResourceFragment(CriteriaQueryGenerator.AuthorizationTokenType.RESOURCE, >- "schedule.resource", subject.getId()); >+ // Query everything by schedule, then load the traits from the RHQ storage >+ MeasurementScheduleCriteria scriteria = new MeasurementScheduleCriteria(criteria); >+ // Used to get trait names >+ scriteria.fetchDefinition(true); >+ >+ // If the query is filtered by group id, also fetch the Resource for >+ // each schedule, so the results include the Resource names. >+ boolean filterGroup = criteria.getFilterGroupId() != null; >+ scriteria.fetchResource(filterGroup); >+ PageList<MeasurementSchedule> schedules = measurementScheduleManager.findSchedulesByCriteria( >+ subjectManager.getOverlord(), scriteria); >+ >+ List<MeasurementDataTrait> traits; >+ TraitsDAO traitsDAO = storageClientManager.getTraitsDAO(); >+ >+ // for permission checking (later) >+ Set<Integer> resources = new HashSet<Integer>(); >+ >+ if (criteria.isFilterMaxTimestamp()) { >+ traits = traitsDAO.queryAll(schedules, false); >+ } else { >+ traits = traitsDAO.queryAll(schedules, true); > } > >- CriteriaQueryRunner<MeasurementDataTrait> queryRunner = new CriteriaQueryRunner(criteria, generator, >- this.entityManager); >- PageList<MeasurementDataTrait> results = queryRunner.execute(); >+ for (MeasurementSchedule schedule : schedules) { >+ int resourceId = schedule.getResource().getId(); >+ resources.add(resourceId); >+ >+ if (filterGroup) { >+ schedule.getResource().getName(); >+ } >+ } > > // Fetch the metric definition for each schedule, so the results include the trait names. >- for (MeasurementDataTrait result : results) { >- result.getSchedule().getDefinition().getName(); >+ for (MeasurementDataTrait trait : traits) { >+ MeasurementSchedule schedule = entityManager.find(MeasurementSchedule.class, trait.getScheduleId()); >+ trait.setSchedule(schedule); >+ // trait.setName(...) -- do not set the name >+ schedule.getDefinition().getDisplayName(); > } > >- // If the query is filtered by group id, also fetch the Resource for each schedule, so the results include the >- // Resource names. >- if (filterFields.get(MeasurementDataTraitCriteria.FILTER_FIELD_GROUP_ID) != null) { >- for (MeasurementDataTrait result : results) { >- result.getSchedule().getResource().getName(); >- } >+ if (criteria.getSortTimestamp() == PageOrdering.ASC) { >+ Collections.reverse(traits); > } > >- return results; >- } >+ for (int resourceId : resources) { >+ if (!authorizationManager.canViewResource(subject, resourceId)) { >+ throw new PermissionException("User[" + subject.getName() >+ + "] cannot view resource[id=" + resourceId + "]"); >+ } >+ } > >- private MeasurementDataManagerUtility getConnectedUtilityInstance() { >- return MeasurementDataManagerUtility.getInstance(rhqDs); >+ PageList<MeasurementDataTrait> tresults = new PageList<MeasurementDataTrait>(traits, PageControl.getUnlimitedInstance()); >+ tresults.setTotalSize(traits.size()); >+ return tresults; > } > > private void pushToAlertSubsystem(Set<MeasurementData> data) { >@@ -1102,4 +1020,10 @@ private void pushToAlertSubsystem(Set<MeasurementData> data) { > > this.measurementDataManager.mergeMeasurementReport(fakeReport); > } >+ >+ public void cleanupTraitHistory(Date after) { >+ TraitsDAO dao = storageClientManager.getTraitsDAO(); >+ dao.cleanup(after); >+ } >+ > } >diff --git a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementDataManagerLocal.java b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementDataManagerLocal.java >index 74318aa..9613f9a 100644 >--- a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementDataManagerLocal.java >+++ b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementDataManagerLocal.java >@@ -19,6 +19,7 @@ > package org.rhq.enterprise.server.measurement; > > import java.util.Collection; >+import java.util.Date; > import java.util.List; > import java.util.Map; > import java.util.Set; >@@ -44,7 +45,11 @@ > @Local > public interface MeasurementDataManagerLocal extends MeasurementDataManagerRemote { > >- int purgeTraits(long oldest); >+ /** >+ * Remove duplicate traits after this date. >+ * Should be run once a week or so. >+ */ >+ void cleanupTraitHistory(Date after); > > void mergeMeasurementReport(MeasurementReport report); > >@@ -53,13 +58,11 @@ > void addTraitData(Set<MeasurementDataTrait> data); > > /** >- * Return the current trait value for the passed schedule >+ * Return the current trait value for the passed schedule. > * > * @param scheduleId id of a MeasurementSchedule that 'points' to a Trait >- * >- * @return One trait > */ >- public MeasurementDataTrait getCurrentTraitForSchedule(int scheduleId); >+ MeasurementDataTrait getCurrentTraitForSchedule(int scheduleId); > > /** > * Return the current numeric value for the passed schedule >@@ -79,7 +82,7 @@ > * @param group compatible group > * > * @return a Map of resource id, List of summaries for this resource >- * >+ * > * @deprecated portal-war > */ > Map<Integer, List<MetricDisplaySummary>> findNarrowedMetricsDisplaySummariesForCompGroup(Subject subject, >@@ -89,7 +92,7 @@ > * Get the {@link MetricDisplaySummary}s for the resources passed in, that all need to be of the same > * {@link ResourceType}. Summaries only contain a basic selection of fields for the purpose of filling the Child > * resource popups. >- * >+ * > * @deprecated portal-war > */ > public Map<Integer, List<MetricDisplaySummary>> findNarrowedMetricDisplaySummariesForCompatibleResources( >@@ -111,7 +114,7 @@ > * @param resourceIds List of primary keys of the resources we are interested in > * @param begin begin time > * @param end end time >- * >+ * > * @deprecated portal-war > */ > public Map<Integer, List<MetricDisplaySummary>> findNarrowedMetricDisplaySummariesForResourcesAndParent( >@@ -132,7 +135,7 @@ > * @param resourceId the id of the resource > * @param definitionIds the array of ids of schedule definitions > * @param timeout the amount of time in milliseconds before timing out the request. Should be > 0. If null then default >- * is applied. Default agent connection failures can be long. >+ * is applied. Default agent connection failures can be long. > * > * @return MeasurementData for this Schedule. Not null. Returns empty set if agent connection can not be established or > * component fails to report live data. >diff --git a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementScheduleManagerLocal.java b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementScheduleManagerLocal.java >index fc1e027..be7dde3 100644 >--- a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementScheduleManagerLocal.java >+++ b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementScheduleManagerLocal.java >@@ -197,7 +197,7 @@ void updateSchedulesForAutoGroup(Subject subject, int parentResourceId, int chil > * @param resourceId PK of the resource we're interested in > * @param dataType DataType of the desired results use null for no filtering > * @param displayType the display type desired or null for no filtering >- * @param enabledOnly should we restrict the query to certain enablement state? null means "don't care". >+ * @param enabledOnly true to restrict the query to enabled metrics only, false means any > * > * @return List of MeasuremenSchedules for the given resource > */ >diff --git a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/resource/ResourceManagerBean.java b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/resource/ResourceManagerBean.java >index 84b85bf..33df45d 100644 >--- a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/resource/ResourceManagerBean.java >+++ b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/resource/ResourceManagerBean.java >@@ -56,7 +56,6 @@ > import org.quartz.Scheduler; > import org.quartz.SchedulerException; > import org.quartz.SimpleTrigger; >- > import org.rhq.core.db.DatabaseType; > import org.rhq.core.db.DatabaseTypeFactory; > import org.rhq.core.domain.alert.Alert; >@@ -88,12 +87,9 @@ > import org.rhq.core.domain.measurement.Availability; > import org.rhq.core.domain.measurement.AvailabilityType; > import org.rhq.core.domain.measurement.MeasurementBaseline; >-import org.rhq.core.domain.measurement.MeasurementDataTrait; > import org.rhq.core.domain.measurement.MeasurementOOB; > import org.rhq.core.domain.measurement.MeasurementSchedule; > import org.rhq.core.domain.measurement.ResourceAvailability; >-import org.rhq.core.domain.measurement.calltime.CallTimeDataKey; >-import org.rhq.core.domain.measurement.calltime.CallTimeDataValue; > import org.rhq.core.domain.operation.ResourceOperationHistory; > import org.rhq.core.domain.operation.ResourceOperationScheduleEntity; > import org.rhq.core.domain.resource.Agent; >@@ -142,6 +138,7 @@ > import org.rhq.enterprise.server.resource.group.ResourceGroupDeleteException; > import org.rhq.enterprise.server.resource.group.ResourceGroupManagerLocal; > import org.rhq.enterprise.server.rest.ResourceHandlerBean; >+import org.rhq.enterprise.server.storage.StorageClientManager; > import org.rhq.enterprise.server.util.CriteriaQueryGenerator; > import org.rhq.enterprise.server.util.CriteriaQueryRunner; > import org.rhq.enterprise.server.util.LookupUtil; >@@ -183,6 +180,8 @@ > private MeasurementScheduleManagerLocal measurementScheduleManager; > @EJB > private AvailabilityManagerLocal availabilityManager; >+ @EJB >+ StorageClientManager storageClientManager; > > public void createResource(Subject user, Resource resource, int parentId) throws ResourceAlreadyExistsException { > Resource parent = null; >@@ -665,13 +664,21 @@ private boolean uninventoryResourcesBulkDelete(Subject overlord, List<Integer> r > } > > private boolean uninventoryResourceBulkDeleteAsyncWork(Subject overlord, int resourceId) { >+ >+ // storage manager cleanup >+ >+ Resource resource = entityManager.find(Resource.class, resourceId); >+ for (MeasurementSchedule sched : resource.getSchedules()) { >+ // could restrict by measurement type? >+ int scheduleId = sched.getId(); >+ storageClientManager.getCallTimeDAO().deleteSchedule(scheduleId); >+ storageClientManager.getTraitsDAO().deleteSchedule(scheduleId); >+ } >+ > String[] namedQueriesToExecute = new String[] { // > StorageNode.QUERY_UPDATE_REMOVE_LINKED_RESOURCES, //remove storage node resource links > ResourceRepo.DELETE_BY_RESOURCES, // > MeasurementBaseline.QUERY_DELETE_BY_RESOURCES, // baseline BEFORE schedules >- MeasurementDataTrait.QUERY_DELETE_BY_RESOURCES, // traits BEFORE schedules >- CallTimeDataValue.QUERY_DELETE_BY_RESOURCES, // call time data values BEFORE schedules & call time data keys >- CallTimeDataKey.QUERY_DELETE_BY_RESOURCES, // call time data keys BEFORE schedules > MeasurementOOB.DELETE_FOR_RESOURCES, // > MeasurementSchedule.DELETE_BY_RESOURCES, // schedules AFTER baselines, traits, and calltime data > Availability.QUERY_DELETE_BY_RESOURCES, // >diff --git a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/scheduler/jobs/DataPurgeJob.java b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/scheduler/jobs/DataPurgeJob.java >index 231ff99..bfd65a4 100644 >--- a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/scheduler/jobs/DataPurgeJob.java >+++ b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/scheduler/jobs/DataPurgeJob.java >@@ -30,7 +30,6 @@ > import org.quartz.JobExecutionException; > import org.quartz.SimpleTrigger; > import org.quartz.StatefulJob; >- > import org.rhq.core.domain.auth.Subject; > import org.rhq.enterprise.server.RHQConstants; > import org.rhq.enterprise.server.alert.AlertConditionManagerLocal; >@@ -40,9 +39,7 @@ > import org.rhq.enterprise.server.drift.DriftManagerLocal; > import org.rhq.enterprise.server.event.EventManagerLocal; > import org.rhq.enterprise.server.measurement.AvailabilityManagerLocal; >-import org.rhq.enterprise.server.measurement.CallTimeDataManagerLocal; > import org.rhq.enterprise.server.measurement.MeasurementBaselineManagerLocal; >-import org.rhq.enterprise.server.measurement.MeasurementDataManagerLocal; > import org.rhq.enterprise.server.measurement.MeasurementOOBManagerLocal; > import org.rhq.enterprise.server.scheduler.SchedulerLocal; > import org.rhq.enterprise.server.storage.StorageClientManager; >@@ -120,42 +117,15 @@ public void executeJobCode(JobExecutionContext context) throws JobExecutionExcep > } > > private void purgeEverything(Properties systemConfig) { >- purgeCallTimeData(LookupUtil.getCallTimeDataManager(), systemConfig); > purgeEventData(LookupUtil.getEventManager(), systemConfig); > purgeAlertData(LookupUtil.getAlertManager(), systemConfig); > purgeUnusedAlertDefinitions(LookupUtil.getAlertDefinitionManager()); > purgeOrphanedAlertConditions(LookupUtil.getAlertConditionManager()); > purgeOrphanedAlertNotifications(LookupUtil.getAlertNotificationManager()); >- purgeMeasurementTraitData(LookupUtil.getMeasurementDataManager(), systemConfig); > purgeAvailabilityData(LookupUtil.getAvailabilityManager(), systemConfig); > purgeOrphanedDriftFiles(LookupUtil.getDriftManager(), systemConfig); > } > >- private void purgeMeasurementTraitData(MeasurementDataManagerLocal measurementDataManager, Properties systemConfig) { >- long timeStart = System.currentTimeMillis(); >- LOG.info("Trait data purge starting at " + new Date(timeStart)); >- int traitsPurged = 0; >- >- try { >- long threshold; >- String traitPurgeThresholdStr = systemConfig.getProperty(RHQConstants.TraitPurge); >- if (traitPurgeThresholdStr == null) { >- threshold = timeStart - (1000L * 60 * 60 * 24 * 365); >- LOG.debug("No purge traits threshold found - will purge traits older than one year"); >- } else { >- threshold = timeStart - Long.parseLong(traitPurgeThresholdStr); >- } >- >- LOG.info("Purging traits that are older than " + new Date(threshold)); >- traitsPurged = measurementDataManager.purgeTraits(threshold); >- } catch (Exception e) { >- LOG.error("Failed to purge trait data. Cause: " + e, e); >- } finally { >- long duration = System.currentTimeMillis() - timeStart; >- LOG.info("Traits data purged [" + traitsPurged + "] - completed in [" + duration + "]ms"); >- } >- } >- > private void purgeAvailabilityData(AvailabilityManagerLocal availabilityManager, Properties systemConfig) { > long timeStart = System.currentTimeMillis(); > LOG.info("Availability data purge starting at " + new Date(timeStart)); >@@ -180,23 +150,6 @@ private void purgeAvailabilityData(AvailabilityManagerLocal availabilityManager, > } > } > >- private void purgeCallTimeData(CallTimeDataManagerLocal callTimeDataManager, Properties systemConfig) { >- long timeStart = System.currentTimeMillis(); >- LOG.info("Measurement calltime data purge starting at " + new Date(timeStart)); >- int calltimePurged = 0; >- >- try { >- long threshold = timeStart - Long.parseLong(systemConfig.getProperty(RHQConstants.RtDataPurge)); >- LOG.info("Purging calltime data that is older than " + new Date(threshold)); >- calltimePurged = callTimeDataManager.purgeCallTimeData(new Date(threshold)); >- } catch (Exception e) { >- LOG.error("Failed to purge calltime data. Cause: " + e, e); >- } finally { >- long duration = System.currentTimeMillis() - timeStart; >- LOG.info("Calltime purged [" + calltimePurged + "] - completed in [" + duration + "]ms"); >- } >- } >- > private void purgeEventData(EventManagerLocal eventManager, Properties systemConfig) { > long timeStart = System.currentTimeMillis(); > LOG.info("Event data purge starting at " + new Date(timeStart)); >diff --git a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/scheduler/jobs/TraitCleanup.java b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/scheduler/jobs/TraitCleanup.java >new file mode 100644 >index 0000000..8baf35f >--- /dev/null >+++ b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/scheduler/jobs/TraitCleanup.java >@@ -0,0 +1,38 @@ >+package org.rhq.enterprise.server.scheduler.jobs; >+ >+import java.util.Calendar; >+ >+import org.apache.commons.logging.Log; >+import org.apache.commons.logging.LogFactory; >+import org.quartz.JobExecutionContext; >+import org.quartz.JobExecutionException; >+import org.rhq.enterprise.server.storage.StorageClientManager; >+import org.rhq.enterprise.server.util.LookupUtil; >+import org.rhq.server.metrics.TraitsDAO; >+ >+/** >+ * Cleans up duplicate traits within a weekly window. >+ */ >+public class TraitCleanup extends AbstractStatefulJob { >+ private static final Log LOG = LogFactory.getLog(TraitCleanup.class); >+ >+ @Override >+ public void executeJobCode(JobExecutionContext context) throws JobExecutionException { >+ long timeStart = System.currentTimeMillis(); >+ LOG.info("Trait cleanup starting"); >+ try { >+ StorageClientManager storageClientManager = LookupUtil.getStorageClientManager(); >+ TraitsDAO traitsDAO = storageClientManager.getTraitsDAO(); >+ // Since we run once a week, clean up data in an ~7 day history window >+ Calendar c = Calendar.getInstance(); >+ c.add(-8, Calendar.DATE); >+ traitsDAO.cleanup(c.getTime()); >+ } catch (Exception e) { >+ LOG.error("Failed to cleanup trait data. Cause: " + e, e); >+ } finally { >+ long duration = System.currentTimeMillis() - timeStart; >+ LOG.info("Trait cleanup completed in [" + duration + "]ms"); >+ } >+ } >+ >+} >diff --git a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/storage/StorageClientManager.java b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/storage/StorageClientManager.java >index f5c355c..529bfc5 100644 >--- a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/storage/StorageClientManager.java >+++ b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/storage/StorageClientManager.java >@@ -32,6 +32,7 @@ > import java.util.ArrayList; > import java.util.Collection; > import java.util.List; >+import java.util.Properties; > > import javax.annotation.Resource; > import javax.ejb.ConcurrencyManagement; >@@ -61,26 +62,33 @@ > > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; >- > import org.rhq.cassandra.schema.SchemaManager; > import org.rhq.cassandra.util.ClusterBuilder; >+import org.rhq.core.domain.auth.Subject; > import org.rhq.core.domain.cloud.StorageNode; >+import org.rhq.core.domain.common.SystemConfiguration; > import org.rhq.core.domain.common.composite.SystemSetting; > import org.rhq.core.domain.common.composite.SystemSettings; > import org.rhq.core.util.ObjectNameFactory; > import org.rhq.core.util.PropertiesFileUpdate; > import org.rhq.core.util.exception.ThrowableUtil; >+import org.rhq.enterprise.server.RHQConstants; > import org.rhq.enterprise.server.auth.SubjectManagerLocal; > import org.rhq.enterprise.server.cloud.StorageNodeManagerLocal; > import org.rhq.enterprise.server.core.CoreServer; > import org.rhq.enterprise.server.system.SystemManagerLocal; > import org.rhq.enterprise.server.util.JMXUtil; >+import org.rhq.enterprise.server.util.LookupUtil; >+import org.rhq.server.metrics.CallTimeConfiguration; >+import org.rhq.server.metrics.CallTimeDAO; > import org.rhq.server.metrics.DateTimeService; > import org.rhq.server.metrics.MetricsConfiguration; > import org.rhq.server.metrics.MetricsConstants; > import org.rhq.server.metrics.MetricsDAO; > import org.rhq.server.metrics.MetricsServer; > import org.rhq.server.metrics.StorageSession; >+import org.rhq.server.metrics.TraitsConfiguration; >+import org.rhq.server.metrics.TraitsDAO; > > /** > * @author John Sanda >@@ -112,12 +120,15 @@ > private MetricsConfiguration metricsConfiguration; > private MetricsDAO metricsDAO; > private MetricsServer metricsServer; >+ private TraitsDAO traitsDAO; >+ private CallTimeDAO callTimeDAO; > private boolean initialized; > private StorageClusterMonitor storageClusterMonitor; > > private String cachedStorageUsername; > private String cachedStoragePassword; > >+ > public void scheduleStorageSessionMaintenance() { > // each time the webapp is reloaded, we don't want to create duplicate jobs > Collection<Timer> timers = timerService.getTimers(); >@@ -179,9 +190,26 @@ public synchronized boolean init() { > metricsConfiguration = new MetricsConfiguration(); > metricsDAO = new MetricsDAO(session, metricsConfiguration); > >+ // TODO make this work at runtime... >+ Subject subject = LookupUtil.getSubjectManager().getOverlord(); >+ SystemSettings settings = systemManager.getSystemSettings(subject); >+ String traitPurgeThresholdStr = settings.get(SystemSetting.TRAIT_PURGE_PERIOD.getInternalName()); >+ TraitsConfiguration config = new TraitsConfiguration(); >+ if (traitPurgeThresholdStr != null) { >+ config.setTTLDays(Integer.parseInt(traitPurgeThresholdStr)); >+ } >+ traitsDAO = new TraitsDAO(session, config); >+ >+ // TODO same issue >+ String callTimeStr = settings.get(SystemSetting.RT_DATA_PURGE_PERIOD.getInternalName()); >+ CallTimeConfiguration ctconfig = new CallTimeConfiguration(); >+ if (callTimeStr != null) { >+ config.setTTLDays(Integer.parseInt(callTimeStr)); >+ } >+ callTimeDAO = new CallTimeDAO(session, ctconfig); >+ > initMetricsServer(); > JMXUtil.registerMBean(this, OBJECT_NAME); >- initialized = true; > > initialized = true; > LOG.info("Storage client subsystem is now initialized"); >@@ -247,6 +275,8 @@ public synchronized boolean refreshCredentialsAndSession() { > > session.registerNewSession(wrappedSession); > metricsDAO.initPreparedStatements(); >+ traitsDAO.initPreparedStatements(); >+ callTimeDAO.initPreparedStatements(); > return true; > } > >@@ -289,6 +319,16 @@ public synchronized void shutdown() { > > metricsDAO = null; > >+ if (traitsDAO != null) { >+ traitsDAO.shutdown(); >+ traitsDAO = null; >+ } >+ >+ if (callTimeDAO != null) { >+ callTimeDAO.shutdown(); >+ callTimeDAO = null; >+ } >+ > try { > if (cluster != null) { > cluster.shutdown(); >@@ -309,6 +349,16 @@ public MetricsDAO getMetricsDAO() { > } > > @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED) >+ public CallTimeDAO getCallTimeDAO() { >+ return callTimeDAO; >+ } >+ >+ @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED) >+ public TraitsDAO getTraitsDAO() { >+ return traitsDAO; >+ } >+ >+ @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED) > public MetricsServer getMetricsServer() { > return metricsServer; > } >diff --git a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/system/SystemManagerBean.java b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/system/SystemManagerBean.java >index 610c38d..168791f 100644 >--- a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/system/SystemManagerBean.java >+++ b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/system/SystemManagerBean.java >@@ -87,12 +87,10 @@ > > private final String[] TABLES_TO_VACUUM = { "RHQ_RESOURCE", "RHQ_CONFIG", "RHQ_CONFIG_PROPERTY", "RHQ_AGENT" }; > >- private final String[] TABLES_TO_REINDEX = { "RHQ_MEASUREMENT_DATA_NUM_1D", "RHQ_MEASUREMENT_DATA_NUM_6H", >- "RHQ_MEASUREMENT_DATA_NUM_1H", "RHQ_MEASUREMENT_DATA_TRAIT", "RHQ_CALLTIME_DATA_KEY", >- "RHQ_CALLTIME_DATA_VALUE", "RHQ_AVAILABILITY" }; >+ private final String[] TABLES_TO_REINDEX = { "RHQ_AVAILABILITY" }; > >- private final String[] ORA_INDEXES_TO_REBUILD = { "RHQ_MEAS_DATA_1H_ID_TIME_PK", "RHQ_MEAS_DATA_6H_ID_TIME_PK", >- "RHQ_MEAS_DATA_1D_ID_TIME_PK", "RHQ_MEAS_BASELINE_CTIME_IDX", "RHQ_MEAS_DATA_TRAIT_ID_TIME_PK" }; >+ private final String[] ORA_INDEXES_TO_REBUILD = { >+ "RHQ_MEAS_BASELINE_CTIME_IDX" }; > > private static final Log LOG = LogFactory.getLog(SystemManagerBean.class); > >@@ -371,7 +369,7 @@ private boolean isStorageSetting(SystemSetting setting) { > case STORAGE_AUTOMATIC_DEPLOYMENT: > case STORAGE_PASSWORD: > case STORAGE_USERNAME: >- return true; >+ return true; > default: > return false; > } >diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/CallTimeConfiguration.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/CallTimeConfiguration.java >new file mode 100644 >index 0000000..ecea1d5 >--- /dev/null >+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/CallTimeConfiguration.java >@@ -0,0 +1,67 @@ >+package org.rhq.server.metrics; >+ >+import java.util.concurrent.TimeUnit; >+ >+/** >+ * Configuration, mainly for trait retention period. >+ */ >+public class CallTimeConfiguration { >+ >+ private int ttl; >+ >+ /** >+ * Mainly for data migration use, ensure the same UUID is >+ * produced each time for a call time and destination. >+ */ >+ private boolean idempotent; >+ >+ /** >+ * Construct a new instance. >+ * @param ttl TTL in days >+ */ >+ public CallTimeConfiguration() { >+ setTTLDays(30); >+ } >+ >+ /** >+ * Returns TTL in days. >+ */ >+ public int getTTLDays() { >+ return ttl; >+ } >+ >+ /** >+ * Returns TTL in seconds. >+ */ >+ public int getTTLSeconds() { >+ return (int) TimeUnit.SECONDS.convert(ttl, TimeUnit.DAYS); >+ } >+ >+ /** >+ * Setter for TTL in days. >+ */ >+ public void setTTLDays(int ttl) { >+ if (ttl <= 0) >+ throw new IllegalArgumentException(); >+ this.ttl = ttl; >+ } >+ >+ /** >+ * Returns true if multiple inserts of a call time with same schedule ID, >+ * begin time, and destination should overwrite themselves. Note there is a >+ * small chance two destination strings will hash to the same value, which >+ * is why this defaults to false. This is mainly useful for data migration, >+ * which may need to be run multiple times. >+ */ >+ public boolean isIdempotentInsert() { >+ return idempotent; >+ } >+ >+ /** >+ * See {@link #isIdempotentInsert()}. >+ */ >+ public void setIdempotentInsert(boolean indepotentInsert) { >+ this.idempotent = indepotentInsert; >+ } >+ >+} >diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/CallTimeDAO.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/CallTimeDAO.java >new file mode 100644 >index 0000000..2e0dc97 >--- /dev/null >+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/CallTimeDAO.java >@@ -0,0 +1,236 @@ >+package org.rhq.server.metrics; >+ >+import java.nio.ByteBuffer; >+import java.util.ArrayList; >+import java.util.Collection; >+import java.util.Date; >+import java.util.List; >+import java.util.Map.Entry; >+import java.util.UUID; >+import java.util.concurrent.ExecutorService; >+import java.util.concurrent.Executors; >+import java.util.concurrent.atomic.AtomicInteger; >+ >+import org.apache.cassandra.utils.UUIDGen; >+import org.apache.commons.logging.Log; >+import org.apache.commons.logging.LogFactory; >+import org.rhq.core.domain.measurement.calltime.CallTimeData; >+import org.rhq.core.domain.measurement.calltime.CallTimeDataValue; >+ >+import com.datastax.driver.core.BoundStatement; >+import com.datastax.driver.core.PreparedStatement; >+import com.datastax.driver.core.ResultSet; >+import com.datastax.driver.core.Row; >+import com.google.common.util.concurrent.FutureCallback; >+import com.google.common.util.concurrent.Futures; >+import com.google.common.util.concurrent.ListenableFuture; >+ >+ >+/** >+ * Manages call time persistence. >+ */ >+public class CallTimeDAO { >+ >+ private static final long MIN_CLOCK_SEQ_AND_NODE = 0x8080808080808080L; >+ >+ private final Log log = LogFactory.getLog(CallTimeDAO.class); >+ >+ private final StorageSession storageSession; >+ >+ private final CallTimeConfiguration configuration; >+ >+ private PreparedStatement insert; >+ >+ private PreparedStatement selectRange; >+ >+ private PreparedStatement selectRangeDest; >+ >+ private PreparedStatement deleteSchedule; >+ >+ /** >+ * Cassandra table name. >+ */ >+ public static final String TABLE = "calltime"; >+ >+ /** >+ * UUID counter. >+ */ >+ private static final AtomicInteger nanos = new AtomicInteger(); >+ >+ /** >+ * Unclear if this needs to be configurable or not. >+ * These threads are used to notify the alert cache. >+ */ >+ private final ExecutorService executor = >+ Executors.newFixedThreadPool(10, new StorageClientThreadFactory("CallTimeDAO")); >+ >+ /** >+ * Constructs a new instance with session and configuration. >+ */ >+ public CallTimeDAO(StorageSession session, CallTimeConfiguration configuration) { >+ this.storageSession = session; >+ this.configuration = configuration; >+ initPreparedStatements(); >+ } >+ >+ /** >+ * Called to shutdown. >+ */ >+ public void shutdown() { >+ executor.shutdown(); >+ } >+ >+ /** >+ * Initialize statements before use. >+ */ >+ public void initPreparedStatements() { >+ log.info("Initializing prepared statements"); >+ long startTime = System.currentTimeMillis(); >+ int ttl = configuration.getTTLSeconds(); >+ >+ // CallTimeDataComposite >+ insert = storageSession.prepare( >+ "INSERT INTO " + TABLE + " (schedule_id, dest, start, end, min, max, total, count) " + >+ "VALUES (?,?,?,?, ?,?,?,?) USING TTL " + ttl); >+ >+ String s = "SELECT dest, start, end, min, max, total, count from " + TABLE + >+ " WHERE schedule_id = ? AND start >= ? AND start <= ?"; >+ selectRange = storageSession.prepare(s); >+ selectRangeDest = storageSession.prepare(s + " AND dest = ?"); >+ >+ deleteSchedule = storageSession.prepare( >+ "DELETE FROM " + TABLE + " WHERE schedule_id = ?"); >+ >+ long endTime = System.currentTimeMillis(); >+ log.info("Finished initializing prepared statements in " + (endTime - startTime) + " ms"); >+ } >+ >+ /** >+ * Select a range of call times. >+ * The list will be sorted by begin time for an event, i.e. oldest first. >+ * >+ * @param scheduleId >+ * @param destination if not null, destination name to filter with >+ * @param start >+ * @param end >+ * @return list of rows >+ */ >+ public List<CallTimeRow> select(int scheduleId, String destination, Date start, Date end) { >+ >+ List<CallTimeRow> list = new ArrayList<CallTimeRow>(); >+ BoundStatement bind; >+ UUID startU = UUIDGen.minTimeUUID(start.getTime()); >+ UUID endU = UUIDGen.minTimeUUID(end.getTime()); >+ if (destination == null) { >+ bind = selectRange.bind(scheduleId, startU, endU); >+ } else { >+ bind = selectRangeDest.bind(scheduleId, startU, endU, destination); >+ } >+ ResultSet result = storageSession.execute(bind); >+ for (Row row : result) { >+ String dest = row.getString(0); >+ long t1 = UUIDGen.getAdjustedTimestamp(row.getUUID(1)); >+ Date t2 = row.getDate(2); >+ if (t2.after(end)) { >+ break; >+ } >+ double min = row.getDouble(3); >+ double max = row.getDouble(4); >+ double total = row.getDouble(5); >+ long count = row.getLong(6); >+ CallTimeRow ct = new CallTimeRow(scheduleId, dest, new Date(t1), t2, min, max, total, count); >+ list.add(ct); >+ } >+ return list; >+ } >+ >+ /** >+ * Insert call times. >+ * The callback will be called when the insert is complete. >+ */ >+ public void insert(Collection<CallTimeData> calltime, final FutureCallback<Object> callback) { >+ >+ if (calltime.isEmpty()) { >+ executor.submit(new Runnable() { >+ public void run() { callback.onSuccess(null); } >+ }); >+ return; >+ } >+ >+ final AtomicInteger count = new AtomicInteger(calltime.size()); >+ for (CallTimeData data : calltime) { >+ ListenableFuture<List<ResultSet>> future = insert(data); >+ Futures.addCallback(future, new FutureCallback<List<ResultSet>>() { >+ @Override >+ public void onSuccess(List<ResultSet> result) { >+ if (count.decrementAndGet() == 0) { >+ log.trace("completed inserts"); >+ callback.onSuccess(result); >+ } >+ } >+ >+ @Override >+ public void onFailure(Throwable throwable) { >+ log.trace("failure inserting", throwable); >+ callback.onFailure(throwable); >+ } >+ >+ }, this.executor); >+ } >+ } >+ >+ /** >+ * Inserts one call time, returning a future. >+ */ >+ public ListenableFuture<List<ResultSet>> insert(CallTimeData data) { >+ BoundStatement statement; >+ List<StorageResultSetFuture> insertFutures = new ArrayList<StorageResultSetFuture>(3); >+ for (Entry<String, CallTimeDataValue> call : data.getValues().entrySet()) { >+ int scheduleId = data.getScheduleId(); >+ String dest = call.getKey(); >+ CallTimeDataValue value = call.getValue(); >+ UUID begin = uuidForCall(value.getBeginTime(), dest); >+ statement = insert.bind( >+ // " (schedule_id, dest, start, end, min, max, total, count) " + >+ scheduleId, dest, begin, new Date(value.getEndTime()), >+ value.getMinimum(), value.getMaximum(), >+ value.getTotal(), value.getCount()); >+ StorageResultSetFuture future = storageSession.executeAsync(statement); >+ insertFutures.add(future); >+ } >+ return Futures.successfulAsList(insertFutures); >+ } >+ >+ /** >+ * Selects schedules by date range only. >+ */ >+ public List<CallTimeRow> select(int scheduleId, Date begin, Date end) { >+ return select(scheduleId, null, begin, end); >+ } >+ >+ /** >+ * Delete call time data by ID. >+ */ >+ public StorageResultSetFuture deleteSchedule(int id) { >+ BoundStatement bind = deleteSchedule.bind(id); >+ return storageSession.executeAsync(bind); >+ } >+ >+ /** >+ * Converts time into a UUID, adding a unique 100-nanosecond value. >+ * If configured as an idempotent insert, hashes the destination string. >+ */ >+ private UUID uuidForCall(long time, String dest) { >+ if (configuration.isIdempotentInsert()) { >+ UUID uuid = UUIDGen.minTimeUUID(time); >+ int hash = Math.abs(dest.hashCode()); >+ return new UUID(uuid.getMostSignificantBits(), MIN_CLOCK_SEQ_AND_NODE + hash); >+ } >+ >+ int nano = nanos.incrementAndGet(); >+ nano = Math.abs(nano % 10000); >+ byte[] raw = UUIDGen.getTimeUUIDBytes(time, nano); >+ return org.apache.cassandra.utils.UUIDGen.getUUID(ByteBuffer.wrap(raw)); >+ } >+ >+} >diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/CallTimeRow.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/CallTimeRow.java >new file mode 100644 >index 0000000..7a983bc >--- /dev/null >+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/CallTimeRow.java >@@ -0,0 +1,110 @@ >+package org.rhq.server.metrics; >+ >+import java.util.Date; >+import java.util.List; >+import java.util.Map.Entry; >+ >+import org.apache.avro.tool.ToTextTool; >+import org.rhq.core.domain.measurement.calltime.CallTimeDataComposite; >+ >+/** >+ * Used by the DAO to return a row. >+ */ >+public class CallTimeRow { >+ >+ private final int scheduleId; >+ private final String dest; >+ private final Date begin; >+ private final Date end; >+ private final double min; >+ private final double max; >+ private final double total; >+ private final long count; >+ >+ /** >+ * Row value. >+ */ >+ public CallTimeRow(int scheduleId, String dest, Date begin, Date end, double min, double max, double total, long count) { >+ this.scheduleId = scheduleId; >+ this.dest = dest; >+ this.begin = begin; >+ this.end = end; >+ this.min = min; >+ this.max = max; >+ this.total = total; >+ this.count = count; >+ } >+ >+ /** >+ * Converts this object to a composite. >+ */ >+ public CallTimeDataComposite toComposite() { >+ return new CallTimeDataComposite(dest, min, max, total, count, total/count); >+ } >+ >+ public int getScheduleId() { >+ return scheduleId; >+ } >+ >+ public String getDest() { >+ return dest; >+ } >+ >+ public Date getBegin() { >+ return begin; >+ } >+ >+ public Date getEnd() { >+ return end; >+ } >+ >+ public double getMin() { >+ return min; >+ } >+ >+ public double getMax() { >+ return max; >+ } >+ >+ public double getTotal() { >+ return total; >+ } >+ >+ public long getCount() { >+ return count; >+ } >+ >+ @Override >+ public String toString() { >+ return "CallTimeRow [scheduleId=" + scheduleId + ", dest=" + dest >+ + ", begin=" + begin + ", end=" + end + ", min=" + min >+ + ", max=" + max + ", total=" + total + ", count=" + count + "]"; >+ } >+ >+ /** >+ * Aggregate this row. >+ * @param rowlist >+ * @return one aggregate >+ */ >+ public static CallTimeRow aggregate(List<CallTimeRow> rowlist) { >+ if (rowlist.isEmpty()) { >+ throw new IllegalArgumentException(); >+ } >+ CallTimeRow row0 = rowlist.get(0); >+ double min = Double.MAX_VALUE; >+ double max = Double.MIN_VALUE; >+ int count = 0; >+ double total = 0; >+ Date end = row0.end; >+ for (CallTimeRow row : rowlist) { >+ min = Math.min(min, row.min); >+ max = Math.max(max, row.max); >+ count += row.count; >+ total += row.total; >+ end = row.end; >+ } >+ // assume first entry is oldest >+ return new CallTimeRow(row0.scheduleId, row0.dest, row0.begin, end, min, max, total, count); >+ } >+ >+} >diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java >index 66d0493..a8c2459 100644 >--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java >+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java >@@ -60,9 +60,9 @@ > > private final Log log = LogFactory.getLog(MetricsDAO.class); > >- private StorageSession storageSession; >+ private final StorageSession storageSession; > >- private MetricsConfiguration configuration; >+ private final MetricsConfiguration configuration; > > private PreparedStatement insertRawData; > private PreparedStatement rawMetricsQuery; >@@ -315,4 +315,5 @@ public StorageResultSetFuture deleteMetricsIndexEntriesAsync(MetricsTable table, > BoundStatement statement = deleteIndexEntries.bind(table.getTableName(), new Date(timestamp)); > return storageSession.executeAsync(statement); > } >+ > } >diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageClientThreadFactory.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageClientThreadFactory.java >index 8829f86..3102d26 100644 >--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageClientThreadFactory.java >+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageClientThreadFactory.java >@@ -13,12 +13,17 @@ > > private final Log log; > >- private AtomicInteger threadNumber = new AtomicInteger(0); >+ private final AtomicInteger threadNumber = new AtomicInteger(0); > >- private String poolName = "StorageClientThreadPool"; >+ private final String poolName; > >- public StorageClientThreadFactory() { >+ public StorageClientThreadFactory(String poolName) { > log = LogFactory.getLog(poolName); >+ this.poolName = poolName; >+ } >+ >+ public StorageClientThreadFactory() { >+ this("StorageClientThreadPool"); > } > > @Override >diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageResultSetFuture.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageResultSetFuture.java >index d6b8598..444fa54 100644 >--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageResultSetFuture.java >+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageResultSetFuture.java >@@ -18,6 +18,12 @@ > > private StorageSession session; > >+ private static Executor direct = new Executor() { >+ public void execute(Runnable run) { >+ run.run(); >+ } >+ }; >+ > public StorageResultSetFuture(ResultSetFuture resultSetFuture, StorageSession session) { > wrapperFuture = resultSetFuture; > this.session = session; >@@ -28,6 +34,13 @@ public void addListener(Runnable listener, Executor executor) { > wrapperFuture.addListener(listener, executor); > } > >+ /** >+ * Add a listener that runs a task in the completion thread. >+ */ >+ public void addListener(Runnable listener) { >+ wrapperFuture.addListener(listener, direct); >+ } >+ > @Override > public boolean cancel(boolean mayInterruptIfRunning) { > return wrapperFuture.cancel(mayInterruptIfRunning); >diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/TraitsCleanup.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/TraitsCleanup.java >new file mode 100644 >index 0000000..a15f9d8 >--- /dev/null >+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/TraitsCleanup.java >@@ -0,0 +1,129 @@ >+package org.rhq.server.metrics; >+ >+import static org.rhq.server.metrics.TraitsDAO.TABLE; >+ >+import java.util.ArrayList; >+import java.util.Date; >+import java.util.List; >+ >+import org.apache.commons.logging.Log; >+import org.apache.commons.logging.LogFactory; >+ >+import com.datastax.driver.core.PreparedStatement; >+import com.datastax.driver.core.ResultSet; >+import com.datastax.driver.core.Row; >+import com.google.common.util.concurrent.Futures; >+ >+class TraitsCleanup { >+ >+ private final Log log = LogFactory.getLog(TraitsCleanup.class); >+ >+ /** >+ * For cleanup, how many delete statements to wait for at submit. >+ */ >+ public static final int BULK_COUNT = 25; >+ >+ /** >+ * Limit size. Cassandra 2.0 doesn't need this. >+ */ >+ static int LIMIT = 1000; >+ >+ private StorageSession storageSession; >+ private int scheduleIdOld = -1; >+ private String preValue = null; >+ private Date preTime = null; >+ private List<StorageResultSetFuture> futures = new ArrayList<StorageResultSetFuture>(BULK_COUNT); >+ >+ private PreparedStatement selectCleanupIds; >+ >+ private PreparedStatement selectCleanupIds2; >+ >+ private PreparedStatement deleteHistory; >+ >+ /** >+ * Constructs a new instance. >+ */ >+ public TraitsCleanup(StorageSession session) { >+ this.storageSession = session; >+ >+ String s = "SELECT schedule_id, time, value from " + TABLE + >+ " WHERE time > ? LIMIT " + LIMIT + " ALLOW FILTERING"; >+ selectCleanupIds = storageSession.prepare(s); >+ >+ s = "SELECT schedule_id, time, value from " + TABLE + >+ " WHERE time > ? AND token(schedule_id) > token(?) LIMIT " + LIMIT + " ALLOW FILTERING"; >+ selectCleanupIds2 = storageSession.prepare(s); >+ >+ deleteHistory = storageSession.prepare( >+ "DELETE FROM " + TABLE + " WHERE schedule_id = ? AND time = ?"); >+ } >+ >+ /** >+ * Run cleanup. >+ */ >+ public void cleanup(Date after) { >+ log.debug("cleanup traits after " + after); >+ ResultSet result = storageSession.execute(selectCleanupIds.bind(after)); >+ while (cleanup(result)) { >+ log.debug("fetching more rows..."); >+ result = storageSession.execute(selectCleanupIds2.bind(after, scheduleIdOld)); >+ } >+ complete(futures); >+ log.debug("done"); >+ } >+ >+ /** >+ * Returns false if no more results. >+ */ >+ private boolean cleanup(ResultSet result) { >+ boolean debug = log.isDebugEnabled(); >+ if (result.isExhausted()) >+ return false; >+ for (Row row : result) { >+ int scheduleId = row.getInt(0); >+ if (scheduleId != scheduleIdOld) { >+ preValue = null; >+ } >+ scheduleIdOld = scheduleId; >+ >+ if (debug) { >+ log.debug("cleanup sid=" + scheduleId); >+ } >+ >+ /* >+ * newer -> older >+ * (T4, a), (T3, a), (T2, b), (T1, b) >+ * remote T3, T1 >+ */ >+ Date time = row.getDate(1); >+ String value = row.getString(2); >+ if (preValue != null && preValue.equals(value)) { >+ if (debug) { >+ log.debug("remove time=" + preTime); >+ } >+ StorageResultSetFuture future = storageSession.executeAsync( >+ deleteHistory.bind(scheduleId, preTime)); >+ futures.add(future); >+ if (futures.size() > BULK_COUNT) { >+ complete(futures); >+ } >+ } >+ preValue = value; >+ preTime = time; >+ } >+ return true; >+ } >+ >+ /** >+ * Complete these tasks, ignoring the result. >+ */ >+ private void complete(List<StorageResultSetFuture> futures) { >+ try { >+ log.debug("wait for results " + futures.size()); >+ Futures.successfulAsList(futures).get(); >+ futures.clear(); >+ } catch (Exception e) { >+ log.error("clean failed", e); >+ } >+ } >+} >diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/TraitsConfiguration.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/TraitsConfiguration.java >new file mode 100644 >index 0000000..1197b6d >--- /dev/null >+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/TraitsConfiguration.java >@@ -0,0 +1,43 @@ >+package org.rhq.server.metrics; >+ >+import java.util.concurrent.TimeUnit; >+ >+/** >+ * Configuration, mainly for trait retention period. >+ */ >+public class TraitsConfiguration { >+ >+ private int ttl; >+ >+ /** >+ * Construct a new instance. >+ * @param ttl TTL in days >+ */ >+ public TraitsConfiguration() { >+ setTTLDays(365); >+ } >+ >+ /** >+ * Returns TTL in days. >+ */ >+ public int getTTLDays() { >+ return ttl; >+ } >+ >+ /** >+ * Returns TTL in seconds. >+ */ >+ public int getTTLSeconds() { >+ return (int) TimeUnit.SECONDS.convert(ttl, TimeUnit.DAYS); >+ } >+ >+ /** >+ * Setter for TTL in days. >+ */ >+ public void setTTLDays(int ttl) { >+ if (ttl <= 0) >+ throw new IllegalArgumentException(); >+ this.ttl = ttl; >+ } >+ >+} >diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/TraitsDAO.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/TraitsDAO.java >new file mode 100644 >index 0000000..b8898bb >--- /dev/null >+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/TraitsDAO.java >@@ -0,0 +1,257 @@ >+package org.rhq.server.metrics; >+ >+import java.util.ArrayList; >+import java.util.Collection; >+import java.util.Date; >+import java.util.List; >+import java.util.Set; >+import java.util.concurrent.ExecutorService; >+import java.util.concurrent.Executors; >+import java.util.concurrent.atomic.AtomicInteger; >+ >+import org.apache.commons.logging.Log; >+import org.apache.commons.logging.LogFactory; >+import org.rhq.core.domain.measurement.MeasurementDataPK; >+import org.rhq.core.domain.measurement.MeasurementDataTrait; >+import org.rhq.core.domain.measurement.MeasurementSchedule; >+ >+import com.datastax.driver.core.BoundStatement; >+import com.datastax.driver.core.PreparedStatement; >+import com.datastax.driver.core.ResultSet; >+import com.datastax.driver.core.Row; >+import com.google.common.util.concurrent.FutureCallback; >+import com.google.common.util.concurrent.Futures; >+import com.google.common.util.concurrent.ListenableFuture; >+ >+ >+/** >+ * Manages trait persistence. >+ */ >+public class TraitsDAO { >+ >+ private final Log log = LogFactory.getLog(TraitsDAO.class); >+ >+ private final StorageSession storageSession; >+ >+ private final TraitsConfiguration configuration; >+ >+ private PreparedStatement insertTrait; >+ >+ PreparedStatement deleteHistory; >+ >+ PreparedStatement selectCleanupIds; >+ >+ /** >+ * Table for traits. >+ */ >+ public static final String TABLE = "measurement_data_traits"; >+ >+ /** >+ * Unclear if this needs to be configurable or not. >+ * These threads are used to notify the alert cache. >+ */ >+ private final ExecutorService executor = >+ Executors.newFixedThreadPool(10, new StorageClientThreadFactory("TraitsDAO")); >+ >+ private PreparedStatement deleteTrait; >+ >+ private PreparedStatement selectHistory; >+ >+ private PreparedStatement selectLatest; >+ >+ /** >+ * Constructs a new instance with session and configuration. >+ */ >+ public TraitsDAO(StorageSession session, TraitsConfiguration configuration) { >+ this.storageSession = session; >+ this.configuration = configuration; >+ initPreparedStatements(); >+ } >+ >+ /** >+ * Called to shutdown the thread pools. >+ */ >+ public void shutdown() { >+ executor.shutdown(); >+ } >+ >+ /** >+ * Initialize statements before use. >+ */ >+ public void initPreparedStatements() { >+ log.info("Initializing prepared statements"); >+ int ttl = configuration.getTTLSeconds(); >+ >+ insertTrait = storageSession.prepare( >+ "INSERT INTO " + TABLE + " (schedule_id, time, value) " + >+ "VALUES (?, ?, ?) USING TTL " + ttl); >+ >+ String s = "SELECT schedule_id, time, value from " + TABLE + " WHERE schedule_id = ?"; >+ selectHistory = storageSession.prepare(s); >+ selectLatest = storageSession.prepare(s + " LIMIT 1"); >+ deleteHistory = storageSession.prepare( >+ "DELETE FROM " + TABLE + " WHERE schedule_id = ? AND time = ?"); >+ deleteTrait = storageSession.prepare( >+ "DELETE FROM " + TABLE + " WHERE schedule_id = ?"); >+ >+ log.info("Finished initializing prepared statements"); >+ } >+ >+ /** >+ * Removes duplicated history entries after this date. Duplicates may still >+ * exist in the history table before this date. >+ * >+ * New traits may be added before cleanup is completed. This is okay as >+ * duplicate history entries are expected and filtered. >+ */ >+ public void cleanup(Date after) { >+ TraitsCleanup traitsCleanup = new TraitsCleanup(storageSession); >+ traitsCleanup.cleanup(after); >+ } >+ >+ /** >+ * Returns all traits for these schedules using bulk async query. >+ * The order is not preserved, however if history is obtained, the >+ * history is ordered from latest to oldest. >+ */ >+ public List<MeasurementDataTrait> queryAll(Collection<MeasurementSchedule> schedules, boolean history) { >+ List<MeasurementDataTrait> traits = new ArrayList<MeasurementDataTrait>(schedules.size()); >+ List<StorageResultSetFuture> futures = new ArrayList<StorageResultSetFuture>(); >+ for (MeasurementSchedule sched : schedules) { >+ BoundStatement bind; >+ if (history) { >+ bind = selectHistory.bind(sched.getId()); >+ } else { >+ bind = selectLatest.bind(sched.getId()); >+ } >+ futures.add(storageSession.executeAsync(bind)); >+ } >+ for (StorageResultSetFuture future : futures) { >+ ResultSet resultSet = future.get(); >+ if (history) { >+ traits.addAll(historyFor(resultSet)); >+ } else { >+ MeasurementDataTrait trait = currentTrait(resultSet); >+ if (trait != null) { >+ traits.add(trait); >+ } >+ } >+ } >+ return traits; >+ } >+ >+ /** >+ * Returns the current trait for this resource and schedule, >+ * or null if not found. >+ */ >+ public MeasurementDataTrait currentTrait(int scheduleId) { >+ BoundStatement bind = selectLatest.bind(scheduleId); >+ ResultSet result = storageSession.execute(bind); >+ return currentTrait(result); >+ } >+ >+ private MeasurementDataTrait currentTrait(ResultSet result) { >+ Row row = result.one(); >+ if (row == null) { >+ return null; >+ } >+ int scheduleId = row.getInt(0); >+ Date time = row.getDate(1); >+ String value = row.getString(2); >+ MeasurementDataPK pk = new MeasurementDataPK(time.getTime(), scheduleId); >+ MeasurementDataTrait trait = new MeasurementDataTrait(pk, value); >+ return trait; >+ } >+ >+ /** >+ * Returns the history for a particular schedule, where the >+ * first entry is the latest value, etc. >+ */ >+ public List<MeasurementDataTrait> historyFor(int scheduleId) { >+ ResultSet result = storageSession.execute(selectHistory.bind(scheduleId)); >+ return historyFor(result); >+ } >+ >+ private List<MeasurementDataTrait> historyFor(ResultSet result) { >+ List<MeasurementDataTrait> history = new ArrayList<MeasurementDataTrait>(); >+ /* >+ * newer --> older >+ * (T4, a), (T3, a), (T2, b), (T1, b) >+ */ >+ MeasurementDataTrait trait = null; >+ for (Row hrow : result) { >+ int scheduleId = hrow.getInt(0); >+ Date time = hrow.getDate(1); >+ String value = hrow.getString(2); >+ if (trait != null && !trait.getValue().equals(value)) { >+ history.add(trait); >+ } >+ MeasurementDataPK pk = new MeasurementDataPK(time.getTime(), scheduleId); >+ trait = new MeasurementDataTrait(pk, value); >+ } >+ if (trait != null) { >+ history.add(trait); >+ } >+ return history; >+ } >+ >+ /** >+ * Inserts a measurement trait. >+ */ >+ public StorageResultSetFuture insertTrait(MeasurementDataTrait data) { >+ BoundStatement statement; >+ int scheduleId = data.getScheduleId(); >+ statement = insertTrait.bind( >+ scheduleId, new Date(data.getTimestamp()), >+ data.getValue()); >+ return storageSession.executeAsync(statement); >+ } >+ >+ /** >+ * Inserts multiple traits, calling the given callback function when done. >+ * Note that the callback is executed in the DAO thread pool. >+ */ >+ public void insertTraits(Set<MeasurementDataTrait> traits, final FutureCallback<Object> callback) { >+ >+ if (log.isDebugEnabled()) { >+ log.debug("Inserting " + traits.size() + " traits"); >+ } >+ >+ if (traits.isEmpty()) { >+ executor.submit(new Runnable() { >+ public void run() { callback.onSuccess(null); } >+ }); >+ return; >+ } >+ >+ final AtomicInteger count = new AtomicInteger(traits.size()); >+ for (MeasurementDataTrait trait : traits) { >+ ListenableFuture<ResultSet> future = insertTrait(trait); >+ Futures.addCallback(future, new FutureCallback<ResultSet>() { >+ @Override >+ public void onSuccess(ResultSet result) { >+ if (count.decrementAndGet() == 0) { >+ log.trace("completed inserts"); >+ callback.onSuccess(result); >+ } >+ } >+ >+ @Override >+ public void onFailure(Throwable throwable) { >+ log.trace("failure inserting", throwable); >+ callback.onFailure(throwable); >+ } >+ >+ }, this.executor); >+ >+ } >+ } >+ >+ /** >+ * Deletes a trait by schedule ID, returning a future. >+ */ >+ public StorageResultSetFuture deleteSchedule(int scheduleId) { >+ return storageSession.executeAsync(deleteTrait.bind(scheduleId)); >+ } >+ >+} >diff --git a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/CallTimeDAOTest.java b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/CallTimeDAOTest.java >new file mode 100644 >index 0000000..8c2e67a >--- /dev/null >+++ b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/CallTimeDAOTest.java >@@ -0,0 +1,147 @@ >+package org.rhq.server.metrics; >+ >+import static java.util.Collections.singleton; >+import static org.testng.Assert.assertEquals; >+ >+import java.util.Calendar; >+import java.util.Date; >+import java.util.List; >+import java.util.concurrent.CountDownLatch; >+import java.util.concurrent.TimeUnit; >+ >+import org.apache.commons.logging.Log; >+import org.apache.commons.logging.LogFactory; >+import org.junit.AfterClass; >+import org.rhq.core.domain.measurement.calltime.CallTimeData; >+import org.testng.annotations.BeforeClass; >+import org.testng.annotations.BeforeMethod; >+import org.testng.annotations.Test; >+ >+import com.google.common.util.concurrent.FutureCallback; >+ >+/** >+ * Tests {@link CallTimeDAO}. >+ */ >+public class CallTimeDAOTest extends CassandraIntegrationTest { >+ >+ private final Log log = LogFactory.getLog(CallTimeDAOTest.class); >+ >+ private CallTimeDAO dao; >+ >+ private CallTimeConfiguration config; >+ >+ @BeforeClass >+ public void initDAO() throws Exception { >+ config = new CallTimeConfiguration(); >+ dao = new CallTimeDAO(storageSession, config); >+ } >+ >+ @AfterClass >+ public void after() { >+ dao.shutdown(); >+ } >+ >+ @BeforeMethod >+ public void resetDB() throws Exception { >+ session.execute("TRUNCATE " + CallTimeDAO.TABLE); >+ } >+ >+ @Test >+ public void simpleUpdate() throws Exception { >+ log.info("simpleUpdate"); >+ int scheduleId = 1; >+ >+ Calendar c = Calendar.getInstance(); >+ c.add(Calendar.DATE, -10); >+ >+ c.add(Calendar.HOUR, 1); >+ Date d0 = c.getTime(); >+ c.add(Calendar.HOUR, 1); >+ Date d1 = c.getTime(); >+ c.add(Calendar.HOUR, 1); >+ Date d2 = c.getTime(); >+ c.add(Calendar.HOUR, 1); >+ Date d3 = c.getTime(); >+ c.add(Calendar.HOUR, 1); >+ Date d4 = c.getTime(); >+ >+ CallTimeData data = new CallTimeData(scheduleId); >+ String destination = "dest"; >+ String destination2 = "dest2"; >+ double minimum = 4; >+ double maximum = 5; >+ double total = 100.5; >+ long count = 3; >+ data.addAggregatedCallData(destination, d1, d2, minimum, maximum, total, count); >+ data.addAggregatedCallData(destination2, d2, d3, minimum + 1, maximum + 1, total - 1, count + 2); >+ dao.insert(data); >+ >+ log.info("select two rows"); >+ List<CallTimeRow> rows = dao.select(scheduleId, d1, d3); >+ assertEquals(rows.size(), 2, "two rows"); >+ CallTimeRow row0 = rows.get(0); >+ assertEquals(destination, row0.getDest()); >+ assertEquals(d1, row0.getBegin()); >+ assertEquals(d2, row0.getEnd()); >+ assertEquals(minimum, row0.getMin()); >+ assertEquals(maximum, row0.getMax()); >+ assertEquals(total, row0.getTotal()); >+ assertEquals(count, row0.getCount()); >+ CallTimeRow row1 = rows.get(1); >+ assertEquals(destination2, row1.getDest()); >+ >+ CallTimeRow agg = CallTimeRow.aggregate(rows); >+ assertEquals(agg.getBegin(), row0.getBegin()); >+ assertEquals(agg.getEnd(), row1.getEnd()); >+ assertEquals(agg.getMin(), row0.getMin()); >+ assertEquals(agg.getMax(), row1.getMax()); >+ assertEquals(agg.getTotal(), row0.getTotal() + row1.getTotal()); >+ assertEquals(agg.getCount(), row0.getCount() + row1.getCount()); >+ >+ log.info("select one row"); >+ rows = dao.select(scheduleId, d2, d3); >+ assertEquals(rows.size(), 1, "one row"); >+ >+ log.info("select one row based on destination"); >+ rows = dao.select(scheduleId, destination2, d0, d4); >+ assertEquals(rows.size(), 1, "one row"); >+ assertEquals(destination2, rows.get(0).getDest()); >+ >+ log.info("delete schedule"); >+ dao.deleteSchedule(scheduleId).get(); >+ rows = dao.select(scheduleId, d0, d4); >+ assertEquals(rows.size(), 0, "no rows"); >+ >+ log.info("test async"); >+ CallTimeData data2 = new CallTimeData(scheduleId + 1); >+ data2.addAggregatedCallData(destination, d2, d3, minimum - 1, maximum + 2, 0, 3); >+ >+ final CountDownLatch latch = new CountDownLatch(1); >+ FutureCallback<Object> callback = new FutureCallback<Object>() { >+ public void onSuccess(Object result) { >+ latch.countDown(); >+ } >+ public void onFailure(Throwable t) { >+ log.error("failure", t); >+ latch.countDown(); >+ } >+ }; >+ dao.insert(singleton(data2), callback); >+ boolean await = latch.await(1, TimeUnit.MINUTES); >+ assertEquals(await, true, "should have gotten success"); >+ >+ log.info("test async result"); >+ rows = dao.select(data2.getScheduleId(), d0, d4); >+ assertEquals(rows.size(), 1, "one row"); >+ >+ log.info("test idempotent"); >+ config.setIdempotentInsert(true); >+ dao.deleteSchedule(scheduleId).get(); >+ dao.insert(data).get(); >+ dao.insert(data).get(); >+ log.info("duplicate insert, but find one row only"); >+ rows = dao.select(scheduleId, d1, d2); >+ assertEquals(rows.size(), 1, "one row"); >+ } >+ >+} >diff --git a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/TraitsDAOTest.java b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/TraitsDAOTest.java >new file mode 100644 >index 0000000..6e69f8d >--- /dev/null >+++ b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/TraitsDAOTest.java >@@ -0,0 +1,175 @@ >+package org.rhq.server.metrics; >+ >+import static java.util.Collections.emptySet; >+import static org.testng.Assert.assertEquals; >+ >+import java.util.ArrayList; >+import java.util.Collection; >+import java.util.Date; >+import java.util.HashSet; >+import java.util.List; >+import java.util.Set; >+import java.util.concurrent.CountDownLatch; >+import java.util.concurrent.TimeUnit; >+ >+import org.apache.commons.logging.Log; >+import org.apache.commons.logging.LogFactory; >+import org.junit.AfterClass; >+import org.rhq.core.domain.measurement.MeasurementDataPK; >+import org.rhq.core.domain.measurement.MeasurementDataTrait; >+import org.rhq.core.domain.measurement.MeasurementSchedule; >+import org.testng.annotations.BeforeClass; >+import org.testng.annotations.BeforeMethod; >+import org.testng.annotations.Test; >+ >+import com.datastax.driver.core.ResultSet; >+import com.google.common.util.concurrent.FutureCallback; >+ >+/** >+ * Tests {@link TraitsDAO}. >+ */ >+public class TraitsDAOTest extends CassandraIntegrationTest { >+ >+ private final Log log = LogFactory.getLog(TraitsDAOTest.class); >+ >+ private TraitsDAO dao; >+ >+ @BeforeClass >+ public void initDAO() throws Exception { >+ TraitsCleanup.LIMIT = 25; // for testing paging >+ dao = new TraitsDAO(storageSession, new TraitsConfiguration()); >+ } >+ >+ @AfterClass >+ public void after() { >+ dao.shutdown(); >+ } >+ >+ @BeforeMethod >+ public void resetDB() throws Exception { >+ session.execute("TRUNCATE " + TraitsDAO.TABLE); >+ } >+ >+ @Test >+ public void simpleUpdate() throws Exception { >+ log.info("simpleUpdate"); >+ int scheduleId = 1; >+ >+ assertEquals(dao.currentTrait(scheduleId), null, "no trait data expected"); >+ >+ long timestamp = System.currentTimeMillis(); >+ String value = "testing"; >+ MeasurementDataTrait data = trait(scheduleId, timestamp, value); >+ insertTrait(data); >+ >+ assertEquals(dao.currentTrait(scheduleId), data, "inserted trait data expected"); >+ >+ String value2 = "testing 2"; >+ MeasurementDataTrait data2 = trait(scheduleId, timestamp + 10000, value2); >+ insertTrait(data2); >+ assertEquals(dao.currentTrait(scheduleId), data2, "inserted trait data expected"); >+ >+ value2 = "testing 2"; >+ MeasurementDataTrait data3 = trait(scheduleId, timestamp + 20000, value2); >+ insertTrait(data3); // duplicate trait value (should be ignored) >+ >+ List<MeasurementDataTrait> history = dao.historyFor(scheduleId); >+ assertEquals(2, history.size(), "only should have two records"); >+ assertEquals(data2, history.get(0), "newest value first"); >+ assertEquals(data, history.get(1), "oldest"); >+ >+ log.debug("do cleanup"); >+ dao.cleanup(new Date(0)); // remove duplicated row 'data3' >+ history = dao.historyFor(scheduleId); >+ assertEquals(2, history.size(), "still have two records"); >+ assertEquals(data2, history.get(0), "newest"); >+ assertEquals(data, history.get(1), "oldest"); >+ dao.deleteSchedule(scheduleId); >+ >+ history = dao.historyFor(scheduleId); >+ assertEquals(0, history.size(), "no history"); >+ >+ assertEquals(null, dao.currentTrait(scheduleId), "no trait datak"); >+ >+ Set<MeasurementDataTrait> traits = new HashSet<MeasurementDataTrait>(); >+ traits.add(data); >+ traits.add(data2); >+ traits.add(data3); >+ final CountDownLatch latch = new CountDownLatch(1); >+ FutureCallback<Object> callback = new FutureCallback<Object>() { >+ >+ @Override >+ public void onSuccess(Object result) { >+ latch.countDown(); >+ } >+ >+ @Override >+ public void onFailure(Throwable t) { >+ log.error("failure", t); >+ latch.countDown(); >+ } >+ }; >+ dao.insertTraits(traits, callback); >+ boolean await = latch.await(1, TimeUnit.MINUTES); >+ assertEquals(await, true, "should have gotten success"); >+ >+ Collection<MeasurementSchedule> schedules = new ArrayList<MeasurementSchedule>(); >+ MeasurementSchedule s1 = new MeasurementSchedule(); >+ s1.setId(data2.getScheduleId()); >+ schedules.add(s1); >+ history = dao.queryAll(schedules, true); >+ assertEquals(history.size(), 2); >+ history = dao.queryAll(schedules, false); >+ assertEquals(history.size(), 1); >+ } >+ >+ private MeasurementDataTrait trait(int scheduleId, long timestamp, >+ String value2) { >+ return new MeasurementDataTrait(new MeasurementDataPK(timestamp + 10000, scheduleId), value2); >+ } >+ >+ private void insertTrait(MeasurementDataTrait data) throws Exception { >+ dao.insertTrait(data).get(); >+ } >+ >+ @Test >+ public void testCleanup() throws Exception { >+ >+ long start = System.currentTimeMillis() - 100000; >+ int count = 50; >+ for (int i = 0; i < count; i++) { >+ long timestamp = start + (5000 * i); >+ String value = "testing"; >+ insertTrait(trait(i, timestamp, value)); >+ insertTrait(trait(i, timestamp + 2000, value)); >+ insertTrait(trait(i, timestamp + 3000, value + "x")); >+ } >+ >+ assertEquals(count(), count * 3); >+ dao.cleanup(new Date(start)); >+ // the count isn't exactly * 2 because of how the result paging works. >+ // by selecting > token(x), the some times are split... >+ int count2 = count(); >+ assertEquals(count2 >= count * 2, true, "retain all rows " + count2); >+ assertEquals(count2 <= count * 2 + 10, true, "delete most rows " + count2); >+ } >+ >+ private int count() { >+ ResultSet result = storageSession.execute("select count(*) from " + TraitsDAO.TABLE); >+ return (int)result.one().getLong(0); >+ } >+ >+ @Test >+ public void testEmptySet() throws Exception { >+ Set<MeasurementDataTrait> traits = emptySet(); >+ final CountDownLatch latch = new CountDownLatch(1); >+ FutureCallback<Object> callback = new FutureCallback<Object>() { >+ public void onSuccess(Object result) { latch.countDown(); } >+ public void onFailure(Throwable t) {} >+ }; >+ dao.insertTraits(traits, callback); >+ boolean await = latch.await(1, TimeUnit.MINUTES); >+ assertEquals(await, true, "should have gotten success"); >+ } >+ >+} >-- >1.8.1.2 >
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
View Attachment As Diff
View Attachment As Raw
Actions:
View
|
Diff
Attachments on
bug 1093948
:
892568
|
893002