Login
[x]
Log in using an account from:
Fedora Account System
Red Hat Associate
Red Hat Customer
Or login using a Red Hat Bugzilla account
Forgot Password
Login:
Hide Forgot
Create an Account
Red Hat Bugzilla – Attachment 716574 Details for
Bug 921247
FEAT: address prompting quirks from 'hwcert' command refactoring
[?]
New
Simple Search
Advanced Search
My Links
Browse
Requests
Reports
Current State
Search
Tabular reports
Graphical reports
Duplicates
Other Reports
User Changes
Plotly Reports
Bug Status
Bug Severity
Non-Defaults
|
Product Dashboard
Help
Page Help!
Bug Writing Guidelines
What's new
Browser Support Policy
5.0.4.rh83 Release notes
FAQ
Guides index
User guide
Web Services
Contact
Legal
This site requires JavaScript to be enabled to function correctly, please enable it.
[patch]
refactor implementation of hwcert and hwcert-backend commands on R22
0001-921247-FEAT-address-prompting-quirks-from-hwcert-com.patch (text/plain), 123.86 KB, created by
Greg Nichols
on 2013-03-26 15:15:05 UTC
(
hide
)
Description:
refactor implementation of hwcert and hwcert-backend commands on R22
Filename:
MIME Type:
Creator:
Greg Nichols
Created:
2013-03-26 15:15:05 UTC
Size:
123.86 KB
patch
obsolete
>From a750f4de7ed7a3815e022086f566693a6f46e960 Mon Sep 17 00:00:00 2001 >From: Greg Nichols <gnichols@redhat.com> >Date: Tue, 26 Mar 2013 11:12:59 -0400 >Subject: [PATCH] 921247 - FEAT: address prompting quirks from 'hwcert' command > refactoring > >--- > Makefile | 2 +- > hwcert-client.spec.in | 4 +- > hwcert/backend.py | 303 +++++++++ > hwcert/certificationtest.py | 2 +- > hwcert/hardwarecertification.py | 53 +- > hwcert/hardwaretest.py | 1414 --------------------------------------- > hwcert/harness.py | 1147 +++++++++++++++++++++++++++++++ > hwcert/test.py | 16 +- > test-env/bin/hwcert | 1 - > test-env/bin/hwcert-backend | 8 +- > 10 files changed, 1515 insertions(+), 1435 deletions(-) > create mode 100644 hwcert/backend.py > delete mode 100644 hwcert/hardwaretest.py > create mode 100644 hwcert/harness.py > >diff --git a/Makefile b/Makefile >index 5880fea..8bc2ca4 100644 >--- a/Makefile >+++ b/Makefile >@@ -14,7 +14,7 @@ > # Author: Greg Nichols > > HWCERT_VERSION := 1.7.0 >-HWCERT_RELEASE := 22 >+HWCERT_RELEASE := 23 > HWCERT_VERSION_RELEASE := $(HWCERT_VERSION)-$(HWCERT_RELEASE) > HWCERT_VERSION_PY := hwcert/version.py > HWCERT_RHEL_VERSION := 7 >diff --git a/hwcert-client.spec.in b/hwcert-client.spec.in >index 8ffc0fc..0baf719 100644 >--- a/hwcert-client.spec.in >+++ b/hwcert-client.spec.in >@@ -76,9 +76,9 @@ DESTDIR=$RPM_BUILD_ROOT make HWCERT_RHEL_VERSION=%{rhel_version} install > > > %changelog >-* Wed Mar 20 2013 Greg Nichols <gnichols@redhat.com> >+* Tue Mar 26 Greg Nichols <gnichols@redhat.com> > >-hwcert-client 1.7.0 R22 >+hwcert-client 1.7.0 R23 > > 921247 - FEAT: address prompting quirks from 'hwcert' command refactoring > >diff --git a/hwcert/backend.py b/hwcert/backend.py >new file mode 100644 >index 0000000..72b728e >--- /dev/null >+++ b/hwcert/backend.py >@@ -0,0 +1,303 @@ >+# Copyright (c) 2006 Red Hat, Inc. All rights reserved. This copyrighted material >+# is made available to anyone wishing to use, modify, copy, or >+# redistribute it subject to the terms and conditions of the GNU General >+# Public License v.2. >+# >+# This program is distributed in the hope that it will be useful, but WITHOUT ANY >+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A >+# PARTICULAR PURPOSE. See the GNU General Public License for more details. >+# >+# You should have received a copy of the GNU General Public License >+# along with this program; if not, write to the Free Software >+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. >+# >+# Author: Greg Nichols >+# >+ >+import os, time >+import sys >+import syslog >+import errno >+import tempfile >+import time >+import copy >+import socket >+import urllib2 >+import re >+import commands >+from optparse import OptionParser >+ >+from hwcert.controller import Controller >+from hwcert.tags import Tags, Attributes, Constants, TestTag, SystemCategories >+from hwcert.environment import Environment >+from hwcert.certificationtest import ResultsDocument, CertificationDocument >+from hwcert.testdocument import TestDocument >+from hwcert.test import TestParameters >+from hwcert.report import Report >+from hwcert.reporthtml import ReportHTML >+from hwcert.planner import Planner >+import hwcert.version >+from hwcert.device import Device, UdevDevice >+from hwcert.command import Command, HwCertCommandException >+from hwcert.resultsengine import ResultsEngine >+from hwcert.redhatrelease import RedHatRelease >+from hwcert.daemon import HwCertDaemon >+from hwcert.log import Log >+from hwcert.http import HwCertHttp >+from hwcert.catalog import Catalog >+from hwcert.harness import HardwareTestHarness >+ >+class Backend(HardwareTestHarness): >+ >+ Debugging=True >+ >+ def __init__(self, echoResponses=True): >+ self.getOptions() >+ HardwareTestHarness.__init__(self, self.options, echoResponses) >+ self.command = None >+ >+ self.commands = {'plan': self.doPlan, >+ 'verify': self.doVerify, >+ 'certify': self.doCertify, >+ 'run': self.doRun, >+ 'continue': self.doContinue, >+ 'print': self.doPrint, >+ 'save': self.doSave, >+ 'submit': self.doSubmit , >+ 'clean': self.doClean, >+ 'server': self.doServer, >+ 'version': self.doVersion, >+ } >+ >+ def getOptions(self): >+ usage = "usage: %prog <command> [options]\n\ncommand:\n"\ >+ "\tplan \t- plan certification testing\n"\ >+ "\tverify \t- verify that the configuration is ready for testing\n"\ >+ "\tcertify - execute the remaining tests from the certification test plan\n"\ >+ "\tprint \t- print certification test results\n"\ >+ "\tsubmit \t- submit certification test results\n"\ >+ "\tclean [results | all] \t- remove all results and temporary files\n"\ >+ "\trun \t- run specific certification tests\n"\ >+ "\tcontinue \t- continue a test run\n"\ >+ "\tversion\t- prints the version of hwcert\n"\ >+ "\tserver (start | stop | status | daemon) - run hwcert-backend as a test server\n"\ >+ "\tsave\t- save a copy of the current test results\n" >+ >+ parser = OptionParser(usage) >+ >+ parser.add_option("-n","--server",action="store", type="string", dest="server", >+ help="Remote test host") >+ parser.add_option("-i","--udi", action="append", type="string", dest="udi", >+ help="UDI - the unique device ID") >+ # note that "forced" mode is not allowed on the command line, but is valid >+ parser.add_option("-m","--mode", action="store", type="choice", choices=(Constants.normal, Constants.auto), dest="mode", >+ help="Test Mode: normal, auto") >+ parser.add_option("-t","--test", action="append", type="string", dest="test", >+ help="Test Name") >+ parser.add_option("-o", "--report", >+ action="store", type="choice", choices=(Constants.plan, Constants.summary, Constants.full, Constants.device), >+ dest="report", default=Constants.summary, >+ help="The type of report to print.") >+ parser.add_option("-u", "--disable", >+ action="store_true", dest="disable", default=False, >+ help="Disable a test") >+ parser.add_option("-e", "--enable", >+ action="store_true", dest="enable", default=False, >+ help="Re-enable a test") >+ parser.add_option("-d", "--model", >+ action="store_true", dest="certification", default=False, >+ help="Edit model and vendor information (deprecated)") >+ parser.add_option("-c", "--certification", >+ action="store_true", dest="certification", default=False, >+ help="Edit certification, model and vendor information") >+ parser.add_option("-a", "--add", >+ action="store_true", dest="add", default=False, >+ help="Add a test") >+ parser.add_option("-b", "--debug", type="choice", choices=(Constants.off, Constants.low, Constants.medium, Constants.high), >+ action="store", dest="debug", default=Constants.off, >+ help="Enable debugging output") >+ parser.add_option("-f", "--failures", >+ action="store_true", dest="failures", default=False, >+ help="Show only test failures") >+ parser.add_option("-l", "--last", >+ action="store_true", dest="latest", default=False, >+ help="Show only the latest test run") >+ parser.add_option("-v", "--device", >+ action="store", type="string", dest="device", >+ help="The logical device to be tested.") >+ # default is TestTag.certification, set explicitly below because optparse doesn't do the right thing >+ parser.add_option("-g", "--tag", >+ action="append", type="choice", choices=TestTag.getAll()) >+ >+ parser.set_defaults(data="/var/hwcert", cfg="config.xml", plan="plan.xml", results="results.xml", mode="normal", source="/usr/share/hwcert/tests", virtResults="virt-results.xml") >+ >+ >+ (self.options,self.args) = parser.parse_args() >+ >+ # set default tag >+ if not self.options.tag: >+ self.options.tag = [TestTag.certification] >+ >+ if len(self.args) > 2 or (len(self.args) == 2 >+ and not (self.args[0] == "server" and self.args[1] in ["start", "stop", "status", "daemon"]) >+ and not (self.args[0] == Constants.clean and self.args[1] in [Constants.results, Constants.all])): >+ parser.error("Invalid command") >+ exit(1) >+ >+ self.optionParser = parser >+ >+ def setOptions(self, options): >+ self.webOptions = options >+ for (name, value) in options.items(): >+ self.options.__dict__[name] = value >+ >+ def getArgs(self): >+ >+ # check for valid commands >+ command = None >+ if self.args: >+ command=self.args[0] >+ if not command in self.commands: >+ self.optionParser.error( "Unknown command \"%s\"" % command) >+ sys.exit(1) >+ if command in "server" and len(self.args) > 1 and not self.args[1] in ("start", "stop", "status", "daemon"): >+ self.optionParser.error( "Unknown server command \"%s\"" % self.args[1]) >+ sys.exit(1) >+ >+ return self.args >+ >+ def do(self, args): >+ >+ #server doesn't lock >+ self.command = args[0] >+ if (self.command == "server"): >+ return self.doServer(args) >+ >+ if not self.getLock(): >+ return False >+ >+ result = False >+ # do the command >+ try: >+ result = self.commands[self.command]() >+ except KeyError, e: >+ print "unknown command: " + self.command >+ print e >+ self.releaseLock() >+ return result >+ >+ def doWeb(self): >+ # note - web server does not observe locks >+ command = "print" >+ if self.options.command: >+ command = self.options.command >+ if command == "print": >+ self.doPrint(format="html") >+ elif (command == 'run' or command == 'server' or command == 'daemon'): >+ result = self.addTask() >+ elif command == "status": >+ self.doWebStatus() >+ else: >+ print "Error: unknown/unsupported command %s" % command >+ return False >+ >+ return True >+ >+ def doServer(self, args): >+ >+ if not self.installServerRPM() and not self.Debugging: >+ return False >+ >+ #otherwise >+ if len(args) > 1: >+ subcommand = args[1] >+ else: >+ subcommand = "start" >+ planner = Planner(self.options, redHatRelease=self.redHatRelease) >+ planner.analyse(self.environment.getSourceDirectory()) >+ daemon = HwCertDaemon(self.environment, planner, self.options) >+ >+ # run daemon in foreground >+ if subcommand == "daemon": >+ if daemon.do("status"): >+ daemon.do("stop") # kill any in the background >+ print "Starting daemon" >+ sys.stdout.flush() >+ return daemon.run() >+ >+ # otherwise, do test-specific services >+ >+ testCount = 0 >+ tests = planner.getServerModeTests(); >+ result = True >+ >+ for test in tests: >+ print "%s test server %s:" % (test.Name(), subcommand) >+ if not self.runServer(test, subcommand): >+ result = False >+ print "" >+ >+ >+ # do the daemon >+ if not daemon.do(subcommand): >+ result = False >+ return result >+ >+ def addTask(self): >+ print "<pre>" >+ command = None >+ try: >+ command = Constants.hwcert_backend + " " + self.webOptions["command"] + " " + self.webOptions["subcommand"] + " " >+ except KeyError, e: >+ print "Error: missing URL parameter:" >+ print e >+ if command: >+ for option in self.webOptions.keys(): >+ if "command" not in option and self.webOptions[option]: >+ command += "--" + option + " " + self.webOptions[option] + " " >+ command += "\n" >+ print "Adding Task: " + command >+ sys.stdout.flush() >+ taskInFile = os.open(self.environment.getTaskFilePath() + ".in", os.O_WRONLY) >+ print "opened task file" >+ sys.stdout.flush() >+ os.write(taskInFile, command + "\n") >+ os.close(taskInFile) >+ print "Reading Results:" >+ print "</pre><hr><pre>" >+ >+ sys.stdout.flush() >+ taskOutFile = open(self.environment.getTaskFilePath()+".out", "r") >+ while True: >+ line = taskOutFile.readline() >+ if line: >+ sys.stdout.write(line) >+ sys.stdout.flush() >+ else: >+ break >+ taskOutFile.close() >+ print "</pre><hr><pre>" >+ print "done" >+ print "</pre>" >+ sys.stdout.flush() >+ >+ def doWebStatus(self): >+ print "<pre>" >+ planner = Planner(self.options, redHatRelease=self.redHatRelease) >+ planner.analyse(self.environment.getSourceDirectory()) >+ tests = planner.getServerModeTests(); >+ print "</pre>hwcert Server Status:<pre>" >+ self.doVersion() >+ print "" >+ success = True >+ for test in tests: >+ print "%s server:" % test.Name() >+ if not test.statusServer(): >+ success = False >+ print "" >+ print "</pre>" >+ if success: >+ print "<!-- %s=%s -->" % (Constants.hwcertserverstatus, Constants.running) >+ else: >+ print "<!-- %s=%s -->" % (Constants.hwcertserverstatus, Constants.failed) >+ return success >\ No newline at end of file >diff --git a/hwcert/certificationtest.py b/hwcert/certificationtest.py >index c22dd45..b37d80d 100644 >--- a/hwcert/certificationtest.py >+++ b/hwcert/certificationtest.py >@@ -366,7 +366,7 @@ class ResultsDocument(CertificationDocument): > # otherwise > return None > >- def getTest(self, otherTest): >+ def getTestMatching(self, otherTest): > """ find a test that is a best match otherTest's important attributes - class, udi, device """ > return self.getDeviceClass(otherTest.getDeviceClassName()).getTest(otherTest) > >diff --git a/hwcert/hardwarecertification.py b/hwcert/hardwarecertification.py >index a040e61..2012b71 100644 >--- a/hwcert/hardwarecertification.py >+++ b/hwcert/hardwarecertification.py >@@ -18,20 +18,65 @@ import os, sys, time > from optparse import OptionParser > > from controller import Controller >-from hardwaretest import HardwareTestHarness >+from harness import HardwareTestHarness > from resultsengine import ResultsEngine > from certificationtest import ResultsDocument > from testdocument import TestDocument >+from tags import Constants > > class HardwareCertification(HardwareTestHarness): > > def __init__(self ): >- # probably need to block old syntax here. >- HardwareTestHarness.__init__(self, echoResponses=False) >+ self.getOptions() >+ HardwareTestHarness.__init__(self, self.options, echoResponses=False) > self.markSelected = "->" > self.markUnSelected = " " > self.markMandatory = "<>" > >+ self.commands = { >+ 'continue': self.doContinue, >+ 'print': self.doPrint, >+ 'clean': self.doClean, >+ 'version': self.doVersion, >+ } >+ >+ def getOptions(self): >+ usage = "usage: %prog <command> [options]\n\ncommand:\n"\ >+ "\tprint \t- print certification test results\n"\ >+ "\tclean [results | all] \t- remove all results and temporary files\n"\ >+ "\tcontinue \t- continue a test run\n"\ >+ "\tversion\t- prints the version of hwcert\n" >+ >+ parser = OptionParser(usage) >+ >+ parser.add_option("-n","--server",action="store", type="string", dest="server", >+ help="Remote test host") >+ parser.add_option("-o", "--report", >+ action="store", type="choice", choices=(Constants.plan, Constants.summary, Constants.full, Constants.device), >+ dest="report", default=Constants.summary, >+ help="The type of report to print.") >+ parser.add_option("-b", "--debug", type="choice", choices=(Constants.off, Constants.low, Constants.medium, Constants.high), >+ action="store", dest="debug", default=Constants.off, >+ help="Enable debugging output") >+ >+ # some of these are needed for Backend options defaulting >+ parser.set_defaults(mode="normal", test=None, udi=None, device=None, certification=None) >+ >+ >+ (self.options,self.args) = parser.parse_args() >+ self.command = None >+ if len(self.args) >=2: >+ self.command = self.args[1] >+ >+ >+ if len(self.args) > 2 or (len(self.args) == 2 >+ and not (self.args[0] == Constants.clean and self.args[1] in [Constants.results, Constants.all])): >+ parser.error("Invalid command") >+ exit(1) >+ >+ self.optionParser = parser >+ >+ > def run(self): > print "The Red Hat Hardware Certification Test Suite" > if not self.getLock(): >@@ -216,6 +261,6 @@ class HardwareCertification(HardwareTestHarness): > if oldTest.getMandatory(): > continue > if oldTest.hasPassed() or oldTest.isDisabled(): >- test = self.certification.getTest(oldTest) >+ test = self.certification.getTestMatching(oldTest) > if test and (oldTest.hasPassed() or oldTest.isDisabled()): > test.setDisabled(True) >diff --git a/hwcert/hardwaretest.py b/hwcert/hardwaretest.py >deleted file mode 100644 >index 7f38fd1..0000000 >--- a/hwcert/hardwaretest.py >+++ /dev/null >@@ -1,1414 +0,0 @@ >-# Copyright (c) 2006 Red Hat, Inc. All rights reserved. This copyrighted material >-# is made available to anyone wishing to use, modify, copy, or >-# redistribute it subject to the terms and conditions of the GNU General >-# Public License v.2. >-# >-# This program is distributed in the hope that it will be useful, but WITHOUT ANY >-# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A >-# PARTICULAR PURPOSE. See the GNU General Public License for more details. >-# >-# You should have received a copy of the GNU General Public License >-# along with this program; if not, write to the Free Software >-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. >-# >-# Author: Greg Nichols >-# >- >-import os, time >-import sys >-import syslog >-import errno >-import tempfile >-import time >-import copy >-import socket >-import urllib2 >-import re >-import commands >-from optparse import OptionParser >- >-from hwcert.controller import Controller >-from hwcert.tags import Tags, Attributes, Constants, TestTag, SystemCategories >-from hwcert.environment import Environment >-from hwcert.certificationtest import ResultsDocument, CertificationDocument >-from hwcert.testdocument import TestDocument >-from hwcert.test import TestParameters >-from hwcert.report import Report >-from hwcert.reporthtml import ReportHTML >-from hwcert.planner import Planner >-import hwcert.version >-from hwcert.device import Device, UdevDevice >-from hwcert.command import Command, HwCertCommandException >-from hwcert.resultsengine import ResultsEngine >-from hwcert.redhatrelease import RedHatRelease >-from hwcert.daemon import HwCertDaemon >-from hwcert.log import Log >-from hwcert.http import HwCertHttp >-from hwcert.catalog import Catalog >- >-class HardwareTestHarness(Controller): >- >- Debugging=True >- >- def __init__(self, echoResponses=True): >- self.getOptions() >- Controller.__init__(self, self.options.debug, echoResponses) >- self.environment = Environment() >- self.makeDirectoryPath(self.environment.getDataDirectory()) >- self.testServer = self.options.server >- self.Debugging = self.options.debug != Constants.off >- self.debugLevel = self.options.debug # "off", "low", "medium", or 'high' >- self.certification = None >- self.runMode = self.options.mode >- self.command = None >- self.virtualization = None >- self.redHatRelease = RedHatRelease() >- self.catalog = Catalog(self.environment, self.options.debug) >- >- self.commands = {'plan': self.doPlan, >- 'verify': self.doVerify, >- 'certify': self.doCertify, >- 'run': self.doRun, >- 'continue': self.doContinue, >- 'print': self.doPrint, >- 'save': self.doSave, >- 'submit': self.doSubmit , >- 'clean': self.doClean, >- 'server': self.doServer, >- 'version': self.doVersion, >- } >- >- def getOptions(self): >- usage = "usage: %prog <command> [options]\n\ncommand:\n"\ >- "\tplan \t- plan certification testing\n"\ >- "\tverify \t- verify that the configuration is ready for testing\n"\ >- "\tcertify - execute the remaining tests from the certification test plan\n"\ >- "\tprint \t- print certification test results\n"\ >- "\tsubmit \t- submit certification test results\n"\ >- "\tclean [results | all] \t- remove all results and temporary files\n"\ >- "\trun \t- run specific certification tests\n"\ >- "\tcontinue \t- continue a test run\n"\ >- "\tversion\t- prints the version of hwcert\n"\ >- "\tserver (start | stop | status | daemon) - run hwcert-backend as a test server\n"\ >- "\tsave\t- save a copy of the current test results\n" >- >- parser = OptionParser(usage) >- >- parser.add_option("-n","--server",action="store", type="string", dest="server", >- help="Remote test host") >- parser.add_option("-i","--udi", action="append", type="string", dest="udi", >- help="UDI - the unique device ID") >- # note that "forced" mode is not allowed on the command line, but is valid >- parser.add_option("-m","--mode", action="store", type="choice", choices=(Constants.normal, Constants.auto), dest="mode", >- help="Test Mode: normal, auto") >- parser.add_option("-t","--test", action="append", type="string", dest="test", >- help="Test Name") >- parser.add_option("-o", "--report", >- action="store", type="choice", choices=(Constants.plan, Constants.summary, Constants.full, Constants.device), >- dest="report", default=Constants.summary, >- help="The type of report to print.") >- parser.add_option("-u", "--disable", >- action="store_true", dest="disable", default=False, >- help="Disable a test") >- parser.add_option("-e", "--enable", >- action="store_true", dest="enable", default=False, >- help="Re-enable a test") >- parser.add_option("-d", "--model", >- action="store_true", dest="certification", default=False, >- help="Edit model and vendor information (deprecated)") >- parser.add_option("-c", "--certification", >- action="store_true", dest="certification", default=False, >- help="Edit certification, model and vendor information") >- parser.add_option("-a", "--add", >- action="store_true", dest="add", default=False, >- help="Add a test") >- parser.add_option("-b", "--debug", type="choice", choices=(Constants.off, Constants.low, Constants.medium, Constants.high), >- action="store", dest="debug", default=Constants.off, >- help="Enable debugging output") >- parser.add_option("-f", "--failures", >- action="store_true", dest="failures", default=False, >- help="Show only test failures") >- parser.add_option("-l", "--last", >- action="store_true", dest="latest", default=False, >- help="Show only the latest test run") >- parser.add_option("-v", "--device", >- action="store", type="string", dest="device", >- help="The logical device to be tested.") >- # default is TestTag.certification, set explicitly below because optparse doesn't do the right thing >- parser.add_option("-g", "--tag", >- action="append", type="choice", choices=TestTag.getAll()) >- >- parser.set_defaults(data="/var/hwcert", cfg="config.xml", plan="plan.xml", results="results.xml", mode="normal", source="/usr/share/hwcert/tests", virtResults="virt-results.xml") >- >- >- (self.options,self.args) = parser.parse_args() >- >- # set default tag >- if not self.options.tag: >- self.options.tag = [TestTag.certification] >- >- if len(self.args) > 2 or (len(self.args) == 2 >- and not (self.args[0] == "server" and self.args[1] in ["start", "stop", "status", "daemon"]) >- and not (self.args[0] == Constants.clean and self.args[1] in [Constants.results, Constants.all])): >- parser.error("Invalid command") >- exit(1) >- >- self.optionParser = parser >- >- def setOptions(self, options): >- self.webOptions = options >- for (name, value) in options.items(): >- self.options.__dict__[name] = value >- >- def getArgs(self): >- >- # check for valid commands >- command = None >- if self.args: >- command=self.args[0] >- if not command in self.commands: >- self.optionParser.error( "Unknown command \"%s\"" % command) >- sys.exit(1) >- if command in "server" and len(self.args) > 1 and not self.args[1] in ("start", "stop", "status", "daemon"): >- self.optionParser.error( "Unknown server command \"%s\"" % self.args[1]) >- sys.exit(1) >- >- return self.args >- >- def do(self, args): >- >- #server doesn't lock >- self.command = args[0] >- if (self.command == "server"): >- return self.doServer(args) >- >- if not self.getLock(): >- return False >- >- result = False >- # do the command >- try: >- result = self.commands[self.command]() >- except KeyError, e: >- print "unknown command: " + self.command >- print e >- self.releaseLock() >- return result >- >- def doVersion(self): >- print "hwcert version %s, release %s" % (hwcert.version.version, hwcert.version.release) >- return True >- >- def doWeb(self): >- # note - web server does not observe locks >- command = "print" >- if self.options.command: >- command = self.options.command >- if command == "print": >- self.doPrint(format="html") >- elif (command == 'run' or command == 'server' or command == 'daemon'): >- result = self.addTask() >- elif command == "status": >- self.doWebStatus() >- else: >- print "Error: unknown/unsupported command %s" % command >- return False >- >- return True >- >- >- def load(self): >- try: >- self.certification = ResultsDocument(self.Debugging) >- self.certification.load(self.environment.getResultsPath()) >- except IOError: >- self.certification = None >- return >- >- def doDiscover(self): >- self.certification = ResultsDocument(self.Debugging) >- >- # try and recover old hardware and os info >- if os.path.exists(self.environment.getCertificationPath()): >- certificationDocument = CertificationDocument() >- certificationDocument.load(self.environment.getCertificationPath()) >- self.certification.copy(certificationDocument) >- else: >- self.certification.new() >- self.getBiosInfo() >- print "Hardware: %s %s %s" % (self.certification.getHardware(Tags.vendor), >- self.certification.getHardware(Tags.make), >- self.certification.getHardware(Tags.model)) >- self.getOSInfo() >- if self.options.mode != Constants.auto: >- self.editCertification() >- >- # save the certification info off to certification.xml >- certificationDocument = CertificationDocument() >- certificationDocument.copy(self.certification) >- certificationDocument.save(self.environment.getCertificationPath()) >- >- if self.testServer: >- self.checkTestServer(self.testServer) >- self.certification.setTestServer(self.testServer) >- >- self.certification.setDiscoverTime(self.getCurrentUTCTime()) >- self.certification.save(self.environment.getResultsPath()) >- >- return True >- >- def getOSInfo(self): >- self.certification.setOS(Tags.name, self.redHatRelease.getCodeName()) >- if self.redHatRelease.getVersionPointUpdate(): >- self.certification.setOS(Tags.release, self.redHatRelease.getVersionPointUpdate()) >- else: >- print "Error: could not determine OS release number" >- return False >- if self.redHatRelease.getProduct(): >- self.certification.setOS(Tags.product, self.redHatRelease.getProduct()) >- else: >- print "Error: could not determine OS product name" >- return False >- print "OS: %s %s" % (self.certification.getOS(Tags.name), self.certification.getOS(Tags.release)) >- hostname = socket.gethostname() >- if hostname: >- self.certification.setOS(Tags.hostname, hostname) >- >- # set the certification id to blank for now >- self.certification.setCertificationID(0) >- >- def checkVirtualization(self): >- # checks if we happen to be running within a virtualized guest >- # use the method introduced in >- # "/usr/lib/python2.4/site-packages/sos/plugins/xen.py" >- # to check if the system is PV >- self.virtualization = None >- osType,hostname,kernel,others,arch = os.uname() >- if kernel.find('xen') > 0 and os.access("/proc/xen/capabilities", os.R_OK): >- (status, output) = commands.getstatusoutput("grep -q control_d /proc/xen/capabilities") >- if status != 0: >- self.virtualization = Constants.paravirtualized >- >- def getVirtualization(self): >- return self.virtualization >- >- def isRealtime(self): >- if os.uname()[3].find(" RT ") == -1: >- self.realtime = False >- else: >- self.realtime = True >- return self.realtime >- >- def doPlan(self): >- self.checkVirtualization() >- if self.getVirtualization() is Constants.paravirtualization: >- if not self.ui.promptContinue("\n Warning !\n\n You're running hwcert in a Para-Virtualized guest. \n\n Test results from a Para-Virtualized guest are not valid for certification, do you wish to"): >- return True >- self.load() >- self.planner = Planner(self.options, redHatRelease=self.redHatRelease) >- self.planner.analyse(self.environment.getSourceDirectory()) >- tests = self.planner.plan() >- if not self.certification: >- self.doDiscover() >- self.saveNewPlan(tests) >- self.checkRequiredRPMs(tests) >- else: >- if not self.checkPlan(): >- return False >- >- editPlan = (self.options.test or self.options.udi or self.options.device) and (self.options.disable or self.options.enable or self.options.add or self.options.server) >- >- if editPlan: >- if self.options.disable or self.options.enable: >- self.setTestsEnabled() >- >- elif self.options.add: >- self.addTest() >- >- if self.options.server: >- self.checkTestServer(self.options.server) >- self.setServerOnTests() >- >- if self.options.certification: >- self.editCertification() >- >- self.certification.save(self.environment.getResultsPath()) >- if self.debugLevel != Constants.off: >- print "saved test plan to %s" % self.environment.getResultsPath() >- return True >- >- def saveNewPlan(self, tests): >- for test in tests: >- if self.Debugging: >- print "Planned %s on %s for udi %s" %(test.Name(), test.getLogicalDeviceName(), test.getUDI()) >- self.certification.appendTest(test) >- >- for device in self.planner.getDevices(): >- self.certification.appendDevice(device) >- >- self.certification.setPlanTime(self.getCurrentUTCTime()) >- print "Created a new plan with %u tests on %u devices" % (len(tests), len(self.planner.getDevices())) >- >- def checkPlan(self): >- if not self.certification: >- print "Error: can't check plan since no prior plan exists" >- return False >- (newTests, missingTests, restorableTests) = self.planner.checkPlan(self.certification) >- if len(newTests) > 0 or len(missingTests) > 0: >- report = Report(self.options, self.certification) >- >- print "\nWarning: The hardware devices have changed." >- if len(newTests) > 0: >- print "Additional tests are required:" >- for key in newTests: >- test = newTests[key] >- print "%-10s %-10s %-36s" % (test.Name(), test.getLogicalDeviceName(), test.getShortUDI()) >- >- if len(missingTests) > 0: >- print "Previously planned tests are no longer necessary:" >- for key in missingTests: >- test = missingTests[key] >- print "%-10s %-10s %-36s" % (test.getName(), test.getLogicalDeviceName(), test.getShortUDI()) >- if self.options.mode != Constants.auto and not self.ui.promptConfirm("Would you like to continue?"): >- return False >- >- # otherwise, change the plan >- for key in newTests: >- newTest = newTests[key] >- # restore previously-deleted tests >- if key in restorableTests: >- restorableTests[key].unMarkDeleted() >- else: >- self.certification.appendTest(newTests[key]) >- for key in missingTests: >- missingTests[key].markDeleted() >- >- return True >- >- def checkRequiredRPMs(self, tests): >- return self.installRequiredRPMs(self.getRequiredRPMs(tests)) >- >- def getRequiredRPMs(self, tests): >- if self.debugLevel != Constants.off: >- print "Checking for additional required packages based on the test plan:" >- requiredRPMs = dict() >- for test in tests: >- rpms = test.getRequiredRPMs() >- if len(rpms) > 0: >- for rpm in rpms: >- requiredRPMs[rpm] = rpm >- if self.debugLevel != Constants.off: >- sys.stdout.write(test.Name() + " requires " + ", ".join(rpms)) >- sys.stdout.write("\n") >- return requiredRPMs >- >- def installRequiredRPMs(self, requiredRPMs): >- >- while True: >- missingRPMs = list() >- if self.debugLevel != Constants.off: >- print "Checking installed rpms:" >- for rpm in requiredRPMs.keys(): >- try: >- rpmQ = Command("rpm -q " + rpm) >- if self.debugLevel != Constants.off: >- rpmQ.echo() >- else: >- rpmQ.run() >- except HwCertCommandException, e: >- missingRPMs.append(rpm) >- if len(missingRPMs) > 0: >- print "The following rpms are required for testing:" >- missingRPMList = "" >- for rpm in missingRPMs: >- missingRPMList = "%s %s" % (missingRPMList, rpm) >- print missingRPMList >- if self.options.mode == Constants.auto or self.ui.promptConfirm("Would you like to install them now?"): >- try: >- yum = Command("yum install -y " + missingRPMList) >- print "Running yum: --------------------------------" >- yum.echo() >- print "---------------------------------------------" >- except HwCertCommandException, e: >- print "Error: Could not install rpm" >- print e >- if self.options.mode == Constants.auto: >- print "Error: could not install required rpms" >- return False >- # otherwise >- if self.debugLevel != Constants.off: >- print "Rechecking required rpms" >- else: # user-abort >- print "Warning: some tests may fail due to missing rpms" >- return False >- else: >- if self.debugLevel != Constants.off: >- print "All required rpms installed" >- return True >- print "" >- >- def installServerRPM(self): >- >- hwcertServerRPM = "hwcert-server" >- try: >- rpmQ = Command("rpm -q " + hwcertServerRPM) >- rpmQ.echo() >- return True >- except HwCertCommandException, e: >- pass # continue >- >- print "Error: the %s package is not installed." % hwcertServerRPM >- if self.options.mode == Constants.auto or self.ui.promptConfirm("Would you like to install it now?"): >- try: >- yum = Command("yum install -y " + hwcertServerRPM) >- sys.stdout.write("Running yum... ") >- sys.stdout.flush() >- yum.echo() >- yum.printOutput() >- print "done" >- except HwCertCommandException, e: >- print "\nError: Could not install %s" % hwcertServerRPM >- print e >- print e.command.printErrors() >- return False >- >- # otherwise >- return True >- >- def doServer(self, args): >- >- if not self.installServerRPM() and not self.Debugging: >- return False >- >- #otherwise >- if len(args) > 1: >- subcommand = args[1] >- else: >- subcommand = "start" >- planner = Planner(self.options, redHatRelease=self.redHatRelease) >- planner.analyse(self.environment.getSourceDirectory()) >- daemon = HwCertDaemon(self.environment, planner, self.options) >- >- # run daemon in foreground >- if subcommand == "daemon": >- if daemon.do("status"): >- daemon.do("stop") # kill any in the background >- print "Starting daemon" >- sys.stdout.flush() >- return daemon.run() >- >- # otherwise, do test-specific services >- >- testCount = 0 >- tests = planner.getServerModeTests(); >- result = True >- >- for test in tests: >- print "%s test server %s:" % (test.Name(), subcommand) >- if not self.runServer(test, subcommand): >- result = False >- print "" >- >- >- # do the daemon >- if not daemon.do(subcommand): >- result = False >- return result >- >- def addTask(self): >- print "<pre>" >- command = None >- try: >- command = Constants.hwcert_backend + " " + self.webOptions["command"] + " " + self.webOptions["subcommand"] + " " >- except KeyError, e: >- print "Error: missing URL parameter:" >- print e >- if command: >- for option in self.webOptions.keys(): >- if "command" not in option and self.webOptions[option]: >- command += "--" + option + " " + self.webOptions[option] + " " >- command += "\n" >- print "Adding Task: " + command >- sys.stdout.flush() >- taskInFile = os.open(self.environment.getTaskFilePath() + ".in", os.O_WRONLY) >- print "opened task file" >- sys.stdout.flush() >- os.write(taskInFile, command + "\n") >- os.close(taskInFile) >- print "Reading Results:" >- print "</pre><hr><pre>" >- >- sys.stdout.flush() >- taskOutFile = open(self.environment.getTaskFilePath()+".out", "r") >- while True: >- line = taskOutFile.readline() >- if line: >- sys.stdout.write(line) >- sys.stdout.flush() >- else: >- break >- taskOutFile.close() >- print "</pre><hr><pre>" >- print "done" >- print "</pre>" >- sys.stdout.flush() >- >- def doWebStatus(self): >- print "<pre>" >- planner = Planner(self.options, redHatRelease=self.redHatRelease) >- planner.analyse(self.environment.getSourceDirectory()) >- tests = planner.getServerModeTests(); >- print "</pre>hwcert Server Status:<pre>" >- self.doVersion() >- print "" >- success = True >- for test in tests: >- print "%s server:" % test.Name() >- if not test.statusServer(): >- success = False >- print "" >- print "</pre>" >- if success: >- print "<!-- %s=%s -->" % (Constants.hwcertserverstatus, Constants.running) >- else: >- print "<!-- %s=%s -->" % (Constants.hwcertserverstatus, Constants.failed) >- return success >- >- def checkTestServer(self, server): >- if self.debugLevel != Constants.off: >- print "Verifying hwcert Server..." >- result = True >- try: >- request = urllib2.Request('http://%s/hwcert/cgi/hwCertWeb.py?command=status' % server) >- opener = urllib2.build_opener() >- response = opener.open(request).readlines() >- nvrPattern = re.compile(Constants.serverNVRRegex) >- statusPattern = re.compile("%s=(?P<status>[a-zA-Z]+)" % Constants.hwcertserverstatus) >- serverRunning = False >- if response: >- for line in response: >- match = nvrPattern.search(line) >- if match: >- hwcertServerVersion = match.group("version") >- hwcertServerRelease = match.group("release") >- if self.debugLevel != Constants.off: >- print "Version %s Release %s" % (hwcertServerVersion, hwcertServerRelease) >- if hwcertServerVersion + "." + hwcertServerRelease < Constants.hwCertServerMinimumVersion: >- print "Error: hwcert server is from a prior release." >- print "This is a likely cause of test failures" >- result = False >- match = statusPattern.search(line) >- if match: >- if self.debugLevel != Constants.off: >- print "Status: %s" % match.group("status") >- if match.group("status") == Constants.running: >- serverRunning = True >- else: >- print "Warning: The hwcert server on %s is reporting errors" % server >- for line in response: >- if "Error:" in line: >- sys.stdout.write("%s says: %s" % (server, line)) >- return False >- else: >- "Warning: The hwcert server on %s is not running" % server >- return False >- >- except urllib2.URLError, exception: >- print "Warning: Could not contact the hwcert server on %s" % server >- print exception >- return False >- >- return result >- >- def doRun(self): >- if not self.doPlan(): >- return False >- >- #check tests >- if self.options.test: >- testNotFound = False >- for testName in self.options.test: >- if not self.certification.getTest(testName): >- print "Error: No such test \"" + testName + "\"" >- testNotFound = True >- if testNotFound: >- return False >- # otherwise >- >- >- tests = self.certification.getFilteredTests(self.options) >- >- if len(tests) > 0: >- if not self.__checkResultsSize(): >- return False >- tests = self.addMandatoryTests(tests) >- if not tests: >- return False >- print "\nRunning the following tests:" >- for test in tests: >- udi = test.getShortUDI() >- if not udi: udi = "" >- print "%-10s %-10s %-36s" % (test.getName(), test.getLogicalDeviceName(), udi) >- >- verified = self.verify(tests) >- if not verified and self.options.mode != Constants.auto: >- if not self.ui.promptConfirm("Verification failed, would you like to continue testing?"): >- return False >- >- # otherwise overriding verification failure or passed verification >- return self._doRun(tests) >- else: >- print "Error: no tests match the supplied options" >- return False >- >- def doContinue(self): >- self.load() >- tests = self.certification.getIncompleteTestsFromLastRun() >- if len(tests) is 0: >- print "All tests in the last test run are complete" >- return False >- # tests = self.addMandatoryTests(tests) don't re-do mandatory tests on a continue. >- self.planner = Planner(self.options, redHatRelease=self.redHatRelease) >- self.planner.analyse(self.environment.getSourceDirectory()) >- return self._doRun(tests, continueRun=True) >- >- def _doRun(self, tests, continueRun=False): >- >- # don't run suspend on a continue run >- if continueRun: >- for test in tests: >- if test.getName() == 'suspend': >- tests.remove(test) >- break >- >- if len(tests) is 0: >- print "No matching tests found." >- return False >- >- if self.debugLevel != Constants.off: >- report = Report(self.options, self.certification) >- for test in tests: >- print report.formatTest(test) >- print "" >- >- # determine test server >- if self.testServer: >- self.certification.setTestServer(self.testServer) >- else: >- if self.certification.getTestServer(): >- self.testServer = self.certification.getTestServer() >- >- >- # Set up our system logging stuff >- syslog.openlog(self.getSystemLogOpen()) >- syslog.syslog("Beginning test run.") >- >- >- # create the new run, mark incomplete >- if continueRun: >- runNumber = self.certification.getNumberOfTestRuns() >- runDirectory = self.getTestRunDirectory() >- else: >- runNumber = self.certification.addTestRun() >- # Make a log dir for this testrun >- runDirectory = self.makeTestRunDirectory() >- >- for test in tests: >- if not test.isDisabled(): >- run = test.getRun(runNumber) >- if not run: >- run = test.newTestRun() >- run.setNumber(runNumber) >- self.certification.save(self.environment.getResultsPath()) >- >- # Start testin'! >- combinedResult = True # assume all tests pass >- for test in tests: >- if test.isDisabled(): >- if self.debugLevel != Constants.off: >- print "skipping %s - disabled" % test.getName() >- continue >- >- if self.debugLevel != Constants.off: >- print ("running %s on %s") % (test.getName(),test.getUDI()) >- run = test.getRun(runNumber) >- >- run.setRunTime(self.getCurrentUTCTime()) >- if not run.getMode(): >- run.setMode(self.runMode) >- run.setNumber(self.certification.getNumberOfTestRuns()) >- run.Debugging = self.Debugging >- >- >- >- >- # create log dir >- logDirectory=self.makeLogDirectory(runDirectory,test,continueRun) >- # TODO: RHR2-compat: add tests/ links >- outputFilePath=logDirectory+"/output.log" # set logfile name >- >- # create temp dir >- tmpDirectory=tempfile.mkdtemp('',"hwcert-"+test.getName()+"-") >- # put test files in temp dir >- if not self.installTest(os.path.join(self.environment.getSourceDirectory(), test.getName()), tmpDirectory): >- return False >- >- # Here goes! Actually run the test. >- syslog.syslog("%s: begin" % test.getName()) >- testError = False >- # Walk the tmpdir and find any directory that contains a runtest.sh >- returnValue = 1 >- try: >- summary = Constants.FAIL >- if True: >- # try: >- returnValue = self.runTest(tmpDirectory, test, run, outputFilePath) >- if returnValue is 0: >- summary = Constants.PASS >- if False: >- # except Exception, exception: >- print "Error: test raised exception:" >- print exception >- sys.stdout.flush() >- summary = Constants.ABORT >- # if debugging, re-raise the exception to show tracebacks, etc. >- if self.Debugging: >- raise exception >- >- run.setSummary(summary) >- if summary != Constants.PASS: >- combinedResult = False >- run.getResultsAttachments(self.environment, outputFilePath) >- >- except OSError, e: >- print "Test error: %s" % e >- syslog.syslog("Test error: %s" % e) >- testerror = True >- combinedResult = False >- >- # Log the end of the test. >- run.setEndTime(self.getCurrentUTCTime()) >- syslog.syslog("%s: end" % test.getName()) >- # If the test did not run correctly, don't record the results. >- if testError: >- self.removeDirectory(logDirectory) >- continue >- >- # Save the system log >- self.saveSystemLog(test, run) >- # encode return value into output.log >- self.saveOutput(run, returnValue, outputFilePath) >- # print result >- print ("Return value was %u") % returnValue >- # TODO: clean up tmpdir here? >- self.certification.save(self.environment.getResultsPath()) >- if self.debugLevel != Constants.off: >- print "saved to " + self.environment.getResultsPath() >- >- syslog.syslog("Testing complete.") >- syslog.closelog() >- self.certification.save(self.environment.getResultsPath()) >- >- return combinedResult >- >- def logDevices(self, test, run): >- devices = test.logDevices(self.planner, self.certification) >- if devices: >- run.appendDevices(devices) >- return True >- >- #otherwise >- return False >- >- def addMandatoryTests(self, requestedTests): >- if self.debugLevel == Constants.off: >- requestedTestDict = dict(zip(map(TestDocument.getName, requestedTests), requestedTests)) >- >- for mandatoryTest in self.certification.getMandatoryTests(): >- try: >- test = requestedTestDict[mandatoryTest.getName()] >- except KeyError: >- mandatoryTest.setMode(Constants.forced) >- if mandatoryTest.getMandatory() == Constants.pre: >- requestedTests.insert(0,mandatoryTest) >- elif mandatoryTest.getMandatory() == Constants.post: >- requestedTests.append(mandatoryTest) >- >- # reboot test is a special case - really needs to run last >- tests = list() >- rebootTests = list() >- for test in requestedTests: >- if test.getName() != "reboot": >- tests.append(test) >- else: >- rebootTests.append(test) >- if rebootTests: >- tests.extend(rebootTests) >- return tests >- >- # otherwise - debugging - don't add any tests >- print "Warning: Test runs in debug mode are invalid for certification." >- if self.certification.getMandatoryTests(): >- print "The following tests will be skipped: %s" % ", ".join(map(TestDocument.getName, self.certification.getMandatoryTests())) >- if not self.ui.promptConfirm("Do you wish to continue?"): >- return None >- return requestedTests >- >- def doCertify(self): >- """ run only tests needed for certification """ >- >- if not self.doPlan(): >- return False >- >- engine = ResultsEngine(self.certification) >- tests = engine.getRemainingTests(self.catalog) >- >- # for realtime kernel, just look for remaining realtime tests >- if self.isRealtime(): >- tests = self.certification.getTaggedTests(tests, [TestTag.realtime]) >- >- # filter remaining tests via tags >- elif self.options.tag: >- tests = self.certification.getTaggedTests(tests, self.options.tag) >- >- # filter by device (AKA component-level certify command) >- if self.options.udi or self.options.device: >- tests = self.certification.getDeviceTests(self.options, tests) >- >- if len(tests) > 0: >- if not self.__checkResultsSize(): >- return False >- tests = self.addMandatoryTests(tests) >- if not tests: >- return False >- >- >- deviceName = "" >- if self.options.device: >- deviceName = self.options.device >- if self.options.udi: >- deviceName += " (" + ", ".join(self.options.udi) + ")" >- if deviceName: >- print "\nThe following component tests for device %s with tag(s) [%s] are recommended:" % (deviceName, ", ".join(self.options.tag)) >- elif self.options.tag == [TestTag.certification]: >- print "\nThe following tests are recommended to complete the certification:" >- else: >- print "\nThe following tests with tag(s) [%s] are recommended:" % (", ".join(self.options.tag)) >- for test in tests: >- print "%-10s %-10s %-36s" % (test.getName(), test.getLogicalDeviceName(), test.getShortUDI()) >- >- verified = self.verify(tests) >- if not verified and self.options.mode != Constants.auto: >- if not self.ui.promptConfirm("Verification failed, would you like to continue testing?"): >- return False >- >- # otherwise overriding verification failure, or mode auto, or passed verification and user continue >- if not verified or self.options.mode == Constants.auto or self.ui.promptConfirm("Run these tests?"): >- return self._doRun(tests) >- elif self.options.mode == Constants.auto: >- print "There are no outstanding non-interactive tests" >- else: >- print "Testing appears to be complete." >- >- return True >- >- def doVerify(self): >- """ run tests configuration verification code """ >- >- if not self.doPlan(): >- return False >- >- return self.verify(self.certification.getTests()) >- >- def __checkResultsSize(self): >- >- if self.certification.getLoadedFileSize() < self.environment.getResultsWarningSize(): >- return True >- >- # otherwise >- print "Warning: the test results may be too large to submit to the hardware catalog." >- print "File: %s is %u MB" % (self.certification.getLoadedFilePath(), int(self.certification.getLoadedFileSize()/1048576)) >- if self.options.mode != Constants.auto and self.ui.promptConfirm("Would you like to package the current results first?"): >- self.doSubmit() >- return False >- >- # size warning was ignored (or mode auto) >- return True >- >- def verify(self, testDocuments): >- >- result = True >- >- # just call verify on each test directly >- failures = 0 >- >- for testDocument in testDocuments: >- if testDocument.isDisabled(): >- continue >- test = self.planner.getTest(testDocument.getName()) >- testParameters = TestParameters(self, testDocument) >- test.setParameters(testParameters) >- if test.verify(): >- # allow test verify code to update test document >- testDocument.setParameters(testParameters) >- else: >- failures += 1 >- >- if failures == 0: >- print "\nTest Verification Passed" >- return True >- >- # otherwise: >- print "\n%u Tests Failed Verification" % failures >- return False >- >- def doPrint(self, format="text"): >- self.load() >- if format == "html": >- report = ReportHTML(self.options, self.certification) >- return report.Write() >- # otherwise >- report = Report(self.options, self.certification) >- return report.Write() >- >- def doSubmit(self): >- self.load() >- return self.__submit() >- >- def __submit(self): >- >- if not self.certification or self.certification.getNumberOfTestRuns() == 0: >- print "Error: no test results to submit" >- return False >- >- if self.catalog.isReachable() and self.options.mode != Constants.auto: >- (submitted, success) = self.catalog.submit(self.certification) >- if success and submitted and self.ui.promptConfirm("Would you like to clean current test results from this system?"): >- self.clean() >- >- # otherwise >- return True >- >- def doSave(self): >- self.load() >- if self.certification and self.certification.getNumberOfTestRuns() > 0: >- return self.__save() >- >- #otherwise >- print "Error: no test results to save" >- return False >- >- >- def __save(self): >- if self.certification: >- path = self.environment.getStoreDirectory() >- serverPath = list() >- self.makeDirectoryPath(path) >- for tag in Tags.vendor, Tags.make, Tags.model, Tags.arch: >- value = self.certification.getHardware(tag) >- if len(value) > 0 and value != "unknown": >- path += "/%s" % value >- self.makeDirectoryPath(path) >- serverPath.append(value) >- for value in self.certification.getOSProductShortName(), self.certification.getOS(Tags.release): >- if len(value) > 0: >- path += "/%s" % value >- self.makeDirectoryPath(path) >- serverPath.append(value) >- >- savedFilename = self.certification.getVerboseFileName() >- print "Saving current results to:" >- savedFilePath = os.path.join(path, savedFilename) >- print savedFilePath + ".gz" >- self.certification.compressToFile(savedFilePath) >- >- # if it's supplied as an option, use that one but don't save it, >- # otherwise, check the certification's test server >- if not self.testServer: >- self.testServer = self.certification.getTestServer() >- if self.testServer == "unknown": >- self.testServer = None >- >- if self.testServer and (self.options.mode == Constants.auto or self.ui.promptConfirm("\nCopy results to test server %s?" % self.testServer)): >- try: >- request = HwCertHttp(self.testServer, "/hwcert/cgi/saveFile.py") >- request.addField("server-path", serverPath) >- return request.httpUpload(savedFilePath+".gz") >- except Exception, e: >- print "Error: could not save results to server %s" % self.testServer >- print e >- return False >- >- # otherwise >- return True >- >- #otherwise >- print "Error: no test results to save" >- return False >- >- def doClean(self): >- subcommand = Constants.results >- try: >- subcommand = self.args[1] >- except IndexError: >- pass >- if subcommand not in [Constants.results, Constants.all]: >- print "Error: invalid option %s, should be %s or %s" % (subcommand, Constants.results, Constants.all) >- return False >- >- >- if self.ui.promptConfirm("Are you sure you want to delete all test results?"): >- self.clean() >- if subcommand == Constants.all and self.ui.promptConfirm("Also remove certification data?"): >- self.cleanAll() >- return True >- >- >- >- def getBiosInfo(self): >- arch = self.getArch() >- self.certification.setHardware(Tags.arch, arch) >- if self.Debugging: >- print "Getting Bios Info for %s" % arch >- vendor = "" >- make = "" >- model = "" >- if arch in ['i386','i586','i686','x86_64']: >- dmidecode = os.popen("/usr/sbin/dmidecode") >- >- for line in dmidecode: >- if line.find(':') == -1: >- continue >- (key,val)=[t.strip() for t in line.split(':',1) if line.find(':')] >- >- if key == "Manufacturer" and val: >- vendor = val >- if key == "Product Name" and val: >- model = val >- # try and use the first word as the make >- make = val.split()[0] >- if vendor and model: >- break >- dmidecode.close() >- elif os.path.exists('/proc/device-tree/model'): >- f=open('/proc/device-tree/model') >- model = f.readline().strip() >- # if there's a comma, guess the format is <vendor>, <model> >- # failing that, if there's a space, assume it's <vendor> <model> >- try: >- for separator in [',', ' ']: >- if separator in model: >- vendor = model.split(separator)[0] >- model = model.split(separator)[1] >- break >- except: >- pass >- >- f.close() >- # IBM-likely arches >- if arch in ['s390','s390x', "ppc", "ppc64"]: >- if not vendor: >- vendor = 'IBM' >- if not make: >- make = 'IBM' >- if not model: >- model = arch >- >- >- >- self.certification.setHardware(Tags.model, model) >- self.certification.setHardware(Tags.make, make) >- self.certification.setHardware(Tags.vendor, vendor) >- >- >- def getArch(self): >- system,node,release,version,machine = os.uname() >- return machine >- >- def makeTestRunDirectory(self): >- """Create a directory for the current test run""" >- testRunDirectory = self.getTestRunDirectory() >- self.removeDirectory(testRunDirectory) >- self.makeDirectoryPath(testRunDirectory) >- return testRunDirectory >- >- def getTestRunDirectory(self): >- runNumber = self.certification.getNumberOfTestRuns() >- return self.environment.getLogDirectory() + "/runs/%u" % runNumber >- >- def makeLogDirectory(self, directory, test, continueRun=False): >- "Create a directory for logs for the currently-running test" >- logdir=directory+"/"+test.getName() >- if not continueRun: >- self.removeDirectory(logdir) # clean it, if it's there >- self.makeDirectoryPath(logdir) >- return logdir >- >- def installTest(self, testDirectory,tmpDirectory): >- "Install a test's files to the named tmpdir for running" >- if self.Debugging != Constants.off: >- print "installing test from %s into %s" % (testDirectory, tmpDirectory) >- try: >- cwd=os.getcwd() >- os.chdir(testDirectory) >- os.system("make install DEST="+tmpDirectory) >- os.chdir(cwd) >- except OSError, error: >- print "Error: Could not install test sources - %s" % error >- return False >- return True >- >- >- def runTest(self, testDirectory, testDocument, run, outputFilePath): >- "Run a test, sending output to the logfile and stdout" >- if self.Debugging: >- print "HardwareTestHarness.runTest:" >- print " test directory %s udi %s outputFile %s " % (testDirectory, testDocument.getUDI(), outputFilePath) >- # change cwd to test scratch area >- cwd=os.getcwd() >- os.chdir(testDirectory) >- >- # call make build to compile/chmod any called scripts >- try: >- buildCommand = Command("make build") >- buildCommand.echo() >- except HwCertCommandException, exception: >- print "Warning: test build produced errors." >- print exception >- buildCommand.printErrors() >- >- #tee stdout and stderr to terminal and log file >- realStdout = sys.stdout >- realStderr = sys.stderr >- sys.stdout = Log(outputFilePath) >- sys.stderr = sys.stdout >- >- # find the test driver python and call it directly >- test = self.planner.getTest(testDocument.getName()) >- testParameters = TestParameters(self, testDocument) >- testParameters.set(Constants.OUTPUTFILE, outputFilePath) >- test.setParameters(testParameters) >- if self.Debugging != Constants.off: >- print "Test Parameters: %s" % testParameters >- rv = test.run() >- >- # restore stdout and stdin, and cwd >- sys.stdout.close() >- sys.stdout = realStdout >- sys.stderr = realStderr >- os.chdir(cwd) >- >- # log devices >- if not self.logDevices(test, run): >- testError = True >- >- if self.Debugging: >- print "...Done: return value = %u." % rv >- return rv >- >- def runServer(self, test, subcommand): >- if subcommand == "start": >- return test.startServer() >- if subcommand == "stop": >- return test.stopServer() >- >- # otherwise >- return test.statusServer() >- >- def saveSystemLog(self, test, run): >- "Save a section of the system log to the named file" >- # It feels a bit wrong to have this stuff hardcoded, but it works. >- contents = self.getSystemLog(test.getName()) >- run.setSystemLog(contents) >- >- def saveOutput(self, run, retval, outputFilePath): >- print "saveOutput: %s" % outputFilePath >- run.setReturnValue(retval) >- if not run.setTestOutput(outputFilePath): >- print "Error: could not save test logs, marking as failed." >- run.setSummary(Constants.FAIL) >- >- def _getTestsFromOptions(self): >- if self.options.device: >- return self.certification.getTestsByLogicalDevice(self.options.device) >- if self.options.udi: >- return self.certification.getTestsByUDI(self.options.udi) >- if self.options.test: >- return self.certification.getTests(self.options.test) >- print "Error: no tests specified in options" >- return None >- >- def setTestsEnabled(self): >- tests = self._getTestsFromOptions() >- if self.Debugging: >- print "HardwareTest.setTestsEnabled:" >- disabled = True >- output = "Disabled" >- count = 0 >- if self.options.enable: >- disabled = False >- output = "Enabled" >- for test in tests: >- if disabled and test.getMandatory(): >- print "The %s test must be included in every test run and can not be disabled." % test.getName() >- continue >- if test.isDisabled() != disabled: >- test.setDisabled(disabled) >- count = count + 1 >- else: >- print "test %s is already %s" % (test.getName(), output) >- >- if self.Debugging: >- print (output + " test: %s udi: %s") % (test.getName(), test.getUDI()) >- print (output + " %u tests") % count >- >- def setServerOnTests(self): >- tests = self._getTestsFromOptions() >- if self.Debugging: >- print "HardwareTest.setServerOnTests:" >- >- if self.options.server: >- output = "Set server to %s for " % self.options.server >- for test in tests: >- test.setServer(self.options.server) >- if self.Debugging: >- print (output + " test: %s udi: %s") % (test.getName(), test.getUDI()) >- print (output + " %u test(s)") % len(tests) >- >- def addTest(self): >- if not self.options.test: >- print "Must specify a test , use --test <test name>" >- return >- >- # otherwise, get the test to be added >- planner = Planner(self.options, redHatRelease=self.redHatRelease) >- planner.analyse(self.environment.getSourceDirectory()) >- >- try: >- test = planner.getTest(self.options.test[0]) >- test = copy.copy(test) >- except KeyError: >- print "No such test %s" % self.options.test >- return >- >- if not self.options.udi and not self.options.device: >- print "Warning: No device specified with --udi <device udi> or --device <logical device name>" >- if not self.ui.promptConfirm("Are you sure the test does not require a specific device?"): >- return >- else: >- >- device = None >- if self.options.udi: >- key = self.options.udi[0] >- device = planner.getDeviceByUDI(key) >- elif self.options.device: >- key = self.options.device >- #bz 485212 - don't try and match device "name" unless >- # three or more characters are provided >- if len(key) > 2: >- device = planner.getDeviceByName(key) >- >- if not device: >- print "Warning: unknown device: %s " % key >- # create one from scratch >- device = UdevDevice(dict()) >- if self.options.udi and len(self.options.udi) > 0: >- device.setUDI(self.options.udi[0]) >- >- test.setDevice(device) >- if self.options.device: >- test.setLogicalDeviceName(self.options.device) >- >- >- if self.Debugging: >- print "added test: %s for device: %s" % (test, device.getProperty('info.product')) >- >- test.setSource(Constants.manual) >- self.certification.appendTest(test) >- self.certification.setPlanTime(self.getCurrentUTCTime()) >- print "Added test" >- >- def getCurrentUTCTime(self): >- return time.gmtime(time.time()) >- >- # YK: to remove HwCert log files after the result rpm is generated >- def clean(self): >- # is the rug going to be pulled out from under us? >- if self.environment.getLogDirectory() in os.getcwd(): >- os.chdir("/var/log") >- if self.debugLevel != Constants.off: >- print "removing HwCert logs ..." >- os.system("rm -rf %s" % self.environment.getLogDirectory()) >- if self.debugLevel != Constants.off: >- print "removing HwCert results..." >- os.system("rm -f %s" % self.environment.getResultsPath()) >- >- def cleanAll(self): >- os.system("rm -f %s" % self.environment.getCertificationPath()) >- >- def getLock(self): >- if os.path.exists(self.environment.getLockFile()): >- print "Error: hwcert is already running (lock file %s found)" % self.environment.getLockFile() >- if self.runMode == Constants.normal: >- if self.ui.promptConfirm("Override?"): >- return True >- # otherwise, can't ask about override >- return False >- # otherwise, no lock - go ahead and lock it >- lock = open(self.environment.getLockFile(), "w") >- return True >- >- def releaseLock(self): >- try: >- os.remove(self.environment.getLockFile()) >- except OSError, e: >- print "Warning: hwcert lock file missing - other instances of hwcert may have run." >- >- def editCertification(self): >- editable = [ Tags.vendor, Tags.make, Tags.model, Tags.product_url] >- print "\nPlease verify the hardware product information:" >- for tag in editable: >- value = self.certification.getHardware(tag) >- value = self.ui.promptEdit(" %s:" % tag, value) >- if len(value) > 0: >- self.certification.setHardware(tag, value) >- >- # category >- answers = SystemCategories.getAll() >- value = self.certification.getHardware(Tags.category) >- value = self.ui.promptEdit(" " + Tags.category, value, answers) >- if len(value) > 0: >- self.certification.setHardware(Tags.category, value) >- >- >- certificationID = self.certification.getCertificationID() >- >- # if the certification id isn't set yet - see if they want to use the catalog >- if not certificationID: >- self.catalog.getCertificationID(self.certification) >- # else, let them edit it directly >- else: >- answers = list() >- answers.append(str(certificationID)) >- while True: >- value = self.ui.prompt("\nPlease enter the certification ID:", answers) >- if not value: >- break >- try: >- int(value) >- break >- except ValueError: >- print "Error: %s is not an integer." % value >- if value and int(value) >= 0: >- self.certification.setCertificationID(value) >- >- # set a local test server >- value = self.certification.getTestServer() >- value = self.ui.promptEdit("Local Hardware Certification Test Server: ", value) >- if len(value) > 0: >- self.certification.setTestServer(value) >- >- # copy over to certification.xml >- certificationDocument = CertificationDocument() >- certificationDocument.copy(self.certification) >- certificationDocument.save(self.environment.getCertificationPath()) >diff --git a/hwcert/harness.py b/hwcert/harness.py >new file mode 100644 >index 0000000..eaa5ac7 >--- /dev/null >+++ b/hwcert/harness.py >@@ -0,0 +1,1147 @@ >+# Copyright (c) 2006 Red Hat, Inc. All rights reserved. This copyrighted material >+# is made available to anyone wishing to use, modify, copy, or >+# redistribute it subject to the terms and conditions of the GNU General >+# Public License v.2. >+# >+# This program is distributed in the hope that it will be useful, but WITHOUT ANY >+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A >+# PARTICULAR PURPOSE. See the GNU General Public License for more details. >+# >+# You should have received a copy of the GNU General Public License >+# along with this program; if not, write to the Free Software >+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. >+# >+# Author: Greg Nichols >+# >+ >+import os, time, sys, syslog, errno, tempfile, urllib2, re >+ >+from optparse import OptionParser >+ >+from hwcert.controller import Controller >+from hwcert.tags import Tags, Attributes, Constants, TestTag, SystemCategories >+from hwcert.environment import Environment >+from hwcert.certificationtest import ResultsDocument, CertificationDocument >+from hwcert.testdocument import TestDocument >+from hwcert.test import TestParameters >+from hwcert.report import Report >+from hwcert.reporthtml import ReportHTML >+from hwcert.planner import Planner >+import hwcert.version >+from hwcert.device import Device, UdevDevice >+from hwcert.command import Command, HwCertCommandException >+from hwcert.resultsengine import ResultsEngine >+from hwcert.redhatrelease import RedHatRelease >+from hwcert.daemon import HwCertDaemon >+from hwcert.log import Log >+from hwcert.http import HwCertHttp >+from hwcert.catalog import Catalog >+ >+class HardwareTestHarness(Controller): >+ >+ Debugging=True >+ >+ def __init__(self, options, echoResponses=True): >+ self.options = options >+ Controller.__init__(self, self.options.debug, echoResponses) >+ self.environment = Environment() >+ self.makeDirectoryPath(self.environment.getDataDirectory()) >+ self.redHatRelease = RedHatRelease() >+ self.catalog = Catalog(self.environment, self.options.debug) >+ >+ def doVersion(self): >+ print "hwcert version %s, release %s" % (hwcert.version.version, hwcert.version.release) >+ return True >+ >+ def load(self): >+ try: >+ self.certification = ResultsDocument(self.Debugging) >+ self.certification.load(self.environment.getResultsPath()) >+ except IOError: >+ self.certification = None >+ return >+ >+ def doDiscover(self): >+ self.certification = ResultsDocument(self.Debugging) >+ >+ # try and recover old hardware and os info >+ if os.path.exists(self.environment.getCertificationPath()): >+ certificationDocument = CertificationDocument() >+ certificationDocument.load(self.environment.getCertificationPath()) >+ self.certification.copy(certificationDocument) >+ else: >+ self.certification.new() >+ self.getBiosInfo() >+ print "Hardware: %s %s %s" % (self.certification.getHardware(Tags.vendor), >+ self.certification.getHardware(Tags.make), >+ self.certification.getHardware(Tags.model)) >+ self.getOSInfo() >+ if self.options.mode != Constants.auto: >+ self.editCertification() >+ >+ # save the certification info off to certification.xml >+ certificationDocument = CertificationDocument() >+ certificationDocument.copy(self.certification) >+ certificationDocument.save(self.environment.getCertificationPath()) >+ >+ if self.options.server: >+ self.checkTestServer(self.options.server) >+ self.certification.setTestServer(self.options.server) >+ >+ self.certification.setDiscoverTime(self.getCurrentUTCTime()) >+ self.certification.save(self.environment.getResultsPath()) >+ >+ return True >+ >+ def getOSInfo(self): >+ self.certification.setOS(Tags.name, self.redHatRelease.getCodeName()) >+ if self.redHatRelease.getVersionPointUpdate(): >+ self.certification.setOS(Tags.release, self.redHatRelease.getVersionPointUpdate()) >+ else: >+ print "Error: could not determine OS release number" >+ return False >+ if self.redHatRelease.getProduct(): >+ self.certification.setOS(Tags.product, self.redHatRelease.getProduct()) >+ else: >+ print "Error: could not determine OS product name" >+ return False >+ print "OS: %s %s" % (self.certification.getOS(Tags.name), self.certification.getOS(Tags.release)) >+ hostname = socket.gethostname() >+ if hostname: >+ self.certification.setOS(Tags.hostname, hostname) >+ >+ # set the certification id to blank for now >+ self.certification.setCertificationID(0) >+ >+ def checkVirtualization(self): >+ # checks if we happen to be running within a virtualized guest >+ # use the method introduced in >+ # "/usr/lib/python2.4/site-packages/sos/plugins/xen.py" >+ # to check if the system is PV >+ self.virtualization = None >+ osType,hostname,kernel,others,arch = os.uname() >+ if kernel.find('xen') > 0 and os.access("/proc/xen/capabilities", os.R_OK): >+ (status, output) = commands.getstatusoutput("grep -q control_d /proc/xen/capabilities") >+ if status != 0: >+ self.virtualization = Constants.paravirtualized >+ >+ def getVirtualization(self): >+ return self.virtualization >+ >+ def isRealtime(self): >+ if os.uname()[3].find(" RT ") == -1: >+ self.realtime = False >+ else: >+ self.realtime = True >+ return self.realtime >+ >+ def doPlan(self): >+ self.checkVirtualization() >+ if self.getVirtualization() is Constants.paravirtualization: >+ if not self.ui.promptContinue("\n Warning !\n\n You're running hwcert in a Para-Virtualized guest. \n\n Test results from a Para-Virtualized guest are not valid for certification, do you wish to"): >+ return True >+ self.load() >+ self.planner = Planner(self.options, redHatRelease=self.redHatRelease) >+ self.planner.analyse(self.environment.getSourceDirectory()) >+ tests = self.planner.plan() >+ if not self.certification: >+ self.doDiscover() >+ self.saveNewPlan(tests) >+ self.checkRequiredRPMs(tests) >+ else: >+ if not self.checkPlan(): >+ return False >+ >+ editPlan = (self.options.test or self.options.udi or self.options.device) and (self.options.disable or self.options.enable or self.options.add or self.options.server) >+ >+ if editPlan: >+ if self.options.disable or self.options.enable: >+ self.setTestsEnabled() >+ >+ elif self.options.add: >+ self.addTest() >+ >+ if self.options.server: >+ self.checkTestServer(self.options.server) >+ self.setServerOnTests() >+ >+ if self.options.certification: >+ self.editCertification() >+ >+ self.certification.save(self.environment.getResultsPath()) >+ if self.options.debug != Constants.off: >+ print "saved test plan to %s" % self.environment.getResultsPath() >+ return True >+ >+ def saveNewPlan(self, tests): >+ for test in tests: >+ if self.Debugging: >+ print "Planned %s on %s for udi %s" %(test.Name(), test.getLogicalDeviceName(), test.getUDI()) >+ self.certification.appendTest(test) >+ >+ for device in self.planner.getDevices(): >+ self.certification.appendDevice(device) >+ >+ self.certification.setPlanTime(self.getCurrentUTCTime()) >+ print "Created a new plan with %u tests on %u devices" % (len(tests), len(self.planner.getDevices())) >+ >+ def checkPlan(self): >+ if not self.certification: >+ print "Error: can't check plan since no prior plan exists" >+ return False >+ (newTests, missingTests, restorableTests) = self.planner.checkPlan(self.certification) >+ if len(newTests) > 0 or len(missingTests) > 0: >+ report = Report(self.options, self.certification) >+ >+ print "\nWarning: The hardware devices have changed." >+ if len(newTests) > 0: >+ print "Additional tests are required:" >+ for key in newTests: >+ test = newTests[key] >+ print "%-10s %-10s %-36s" % (test.Name(), test.getLogicalDeviceName(), test.getShortUDI()) >+ >+ if len(missingTests) > 0: >+ print "Previously planned tests are no longer necessary:" >+ for key in missingTests: >+ test = missingTests[key] >+ print "%-10s %-10s %-36s" % (test.getName(), test.getLogicalDeviceName(), test.getShortUDI()) >+ if self.options.mode != Constants.auto and not self.ui.promptConfirm("Would you like to continue?"): >+ return False >+ >+ # otherwise, change the plan >+ for key in newTests: >+ newTest = newTests[key] >+ # restore previously-deleted tests >+ if key in restorableTests: >+ restorableTests[key].unMarkDeleted() >+ else: >+ self.certification.appendTest(newTests[key]) >+ for key in missingTests: >+ missingTests[key].markDeleted() >+ >+ return True >+ >+ def checkRequiredRPMs(self, tests): >+ return self.installRequiredRPMs(self.getRequiredRPMs(tests)) >+ >+ def getRequiredRPMs(self, tests): >+ if self.options.debug != Constants.off: >+ print "Checking for additional required packages based on the test plan:" >+ requiredRPMs = dict() >+ for test in tests: >+ rpms = test.getRequiredRPMs() >+ if len(rpms) > 0: >+ for rpm in rpms: >+ requiredRPMs[rpm] = rpm >+ if self.options.debug != Constants.off: >+ sys.stdout.write(test.Name() + " requires " + ", ".join(rpms)) >+ sys.stdout.write("\n") >+ return requiredRPMs >+ >+ def installRequiredRPMs(self, requiredRPMs): >+ >+ while True: >+ missingRPMs = list() >+ if self.options.debug != Constants.off: >+ print "Checking installed rpms:" >+ for rpm in requiredRPMs.keys(): >+ try: >+ rpmQ = Command("rpm -q " + rpm) >+ if self.options.debug != Constants.off: >+ rpmQ.echo() >+ else: >+ rpmQ.run() >+ except HwCertCommandException, e: >+ missingRPMs.append(rpm) >+ if len(missingRPMs) > 0: >+ print "The following rpms are required for testing:" >+ missingRPMList = "" >+ for rpm in missingRPMs: >+ missingRPMList = "%s %s" % (missingRPMList, rpm) >+ print missingRPMList >+ if self.options.mode == Constants.auto or self.ui.promptConfirm("Would you like to install them now?"): >+ try: >+ yum = Command("yum install -y " + missingRPMList) >+ print "Running yum: --------------------------------" >+ yum.echo() >+ print "---------------------------------------------" >+ except HwCertCommandException, e: >+ print "Error: Could not install rpm" >+ print e >+ if self.options.mode == Constants.auto: >+ print "Error: could not install required rpms" >+ return False >+ # otherwise >+ if self.options.debug != Constants.off: >+ print "Rechecking required rpms" >+ else: # user-abort >+ print "Warning: some tests may fail due to missing rpms" >+ return False >+ else: >+ if self.options.debug != Constants.off: >+ print "All required rpms installed" >+ return True >+ print "" >+ >+ def installServerRPM(self): >+ >+ hwcertServerRPM = "hwcert-server" >+ try: >+ rpmQ = Command("rpm -q " + hwcertServerRPM) >+ rpmQ.echo() >+ return True >+ except HwCertCommandException, e: >+ pass # continue >+ >+ print "Error: the %s package is not installed." % hwcertServerRPM >+ if self.options.mode == Constants.auto or self.ui.promptConfirm("Would you like to install it now?"): >+ try: >+ yum = Command("yum install -y " + hwcertServerRPM) >+ sys.stdout.write("Running yum... ") >+ sys.stdout.flush() >+ yum.echo() >+ yum.printOutput() >+ print "done" >+ except HwCertCommandException, e: >+ print "\nError: Could not install %s" % hwcertServerRPM >+ print e >+ print e.command.printErrors() >+ return False >+ >+ # otherwise >+ return True >+ >+ def checkTestServer(self, server): >+ if self.options.debug != Constants.off: >+ print "Verifying hwcert Server..." >+ result = True >+ try: >+ request = urllib2.Request('http://%s/hwcert/cgi/hwCertWeb.py?command=status' % server) >+ opener = urllib2.build_opener() >+ response = opener.open(request).readlines() >+ nvrPattern = re.compile(Constants.serverNVRRegex) >+ statusPattern = re.compile("%s=(?P<status>[a-zA-Z]+)" % Constants.hwcertserverstatus) >+ serverRunning = False >+ if response: >+ for line in response: >+ match = nvrPattern.search(line) >+ if match: >+ hwcertServerVersion = match.group("version") >+ hwcertServerRelease = match.group("release") >+ if self.options.debug != Constants.off: >+ print "Version %s Release %s" % (hwcertServerVersion, hwcertServerRelease) >+ if hwcertServerVersion + "." + hwcertServerRelease < Constants.hwCertServerMinimumVersion: >+ print "Error: hwcert server is from a prior release." >+ print "This is a likely cause of test failures" >+ result = False >+ match = statusPattern.search(line) >+ if match: >+ if self.options.debug != Constants.off: >+ print "Status: %s" % match.group("status") >+ if match.group("status") == Constants.running: >+ serverRunning = True >+ else: >+ print "Warning: The hwcert server on %s is reporting errors" % server >+ for line in response: >+ if "Error:" in line: >+ sys.stdout.write("%s says: %s" % (server, line)) >+ return False >+ else: >+ "Warning: The hwcert server on %s is not running" % server >+ return False >+ >+ except urllib2.URLError, exception: >+ print "Warning: Could not contact the hwcert server on %s" % server >+ print exception >+ return False >+ >+ return result >+ >+ def doRun(self): >+ if not self.doPlan(): >+ return False >+ >+ #check tests >+ if self.options.test: >+ testNotFound = False >+ for testName in self.options.test: >+ if not self.certification.getTest(testName): >+ print "Error: No such test \"" + testName + "\"" >+ testNotFound = True >+ if testNotFound: >+ return False >+ # otherwise >+ >+ >+ tests = self.certification.getFilteredTests(self.options) >+ >+ if len(tests) > 0: >+ if not self.__checkResultsSize(): >+ return False >+ tests = self.addMandatoryTests(tests) >+ if not tests: >+ return False >+ print "\nRunning the following tests:" >+ for test in tests: >+ udi = test.getShortUDI() >+ if not udi: udi = "" >+ print "%-10s %-10s %-36s" % (test.getName(), test.getLogicalDeviceName(), udi) >+ >+ verified = self.verify(tests) >+ if not verified and self.options.mode != Constants.auto: >+ if not self.ui.promptConfirm("Verification failed, would you like to continue testing?"): >+ return False >+ >+ # otherwise overriding verification failure or passed verification >+ return self._doRun(tests) >+ else: >+ print "Error: no tests match the supplied options" >+ return False >+ >+ def doContinue(self): >+ self.load() >+ tests = self.certification.getIncompleteTestsFromLastRun() >+ if len(tests) is 0: >+ print "All tests in the last test run are complete" >+ return False >+ # tests = self.addMandatoryTests(tests) don't re-do mandatory tests on a continue. >+ self.planner = Planner(self.options, redHatRelease=self.redHatRelease) >+ self.planner.analyse(self.environment.getSourceDirectory()) >+ return self._doRun(tests, continueRun=True) >+ >+ def _doRun(self, tests, continueRun=False): >+ >+ # don't run suspend on a continue run >+ if continueRun: >+ for test in tests: >+ if test.getName() == 'suspend': >+ tests.remove(test) >+ break >+ >+ if len(tests) is 0: >+ print "No matching tests found." >+ return False >+ >+ if self.options.debug != Constants.off: >+ report = Report(self.options, self.certification) >+ for test in tests: >+ print report.formatTest(test) >+ print "" >+ >+ # determine test server >+ if self.options.server: >+ self.certification.setTestServer(self.options.server) >+ else: >+ if self.certification.getTestServer(): >+ self.options.server = self.certification.getTestServer() >+ >+ >+ # Set up our system logging stuff >+ syslog.openlog(self.getSystemLogOpen()) >+ syslog.syslog("Beginning test run.") >+ >+ >+ # create the new run, mark incomplete >+ if continueRun: >+ runNumber = self.certification.getNumberOfTestRuns() >+ runDirectory = self.getTestRunDirectory() >+ else: >+ runNumber = self.certification.addTestRun() >+ # Make a log dir for this testrun >+ runDirectory = self.makeTestRunDirectory() >+ >+ for test in tests: >+ if not test.isDisabled(): >+ run = test.getRun(runNumber) >+ if not run: >+ run = test.newTestRun() >+ run.setNumber(runNumber) >+ self.certification.save(self.environment.getResultsPath()) >+ >+ # Start testin'! >+ combinedResult = True # assume all tests pass >+ for test in tests: >+ if test.isDisabled(): >+ if self.options.debug != Constants.off: >+ print "skipping %s - disabled" % test.getName() >+ continue >+ >+ if self.options.debug != Constants.off: >+ print ("running %s on %s") % (test.getName(),test.getUDI()) >+ run = test.getRun(runNumber) >+ >+ run.setRunTime(self.getCurrentUTCTime()) >+ if not run.getMode(): >+ run.setMode(self.options.mode) >+ run.setNumber(self.certification.getNumberOfTestRuns()) >+ run.Debugging = self.Debugging >+ >+ >+ >+ >+ # create log dir >+ logDirectory=self.makeLogDirectory(runDirectory,test,continueRun) >+ # TODO: RHR2-compat: add tests/ links >+ outputFilePath=logDirectory+"/output.log" # set logfile name >+ >+ # create temp dir >+ tmpDirectory=tempfile.mkdtemp('',"hwcert-"+test.getName()+"-") >+ # put test files in temp dir >+ if not self.installTest(os.path.join(self.environment.getSourceDirectory(), test.getName()), tmpDirectory): >+ return False >+ >+ # Here goes! Actually run the test. >+ syslog.syslog("%s: begin" % test.getName()) >+ testError = False >+ # Walk the tmpdir and find any directory that contains a runtest.sh >+ returnValue = 1 >+ try: >+ summary = Constants.FAIL >+ if True: >+ # try: >+ returnValue = self.runTest(tmpDirectory, test, run, outputFilePath) >+ if returnValue is 0: >+ summary = Constants.PASS >+ if False: >+ # except Exception, exception: >+ print "Error: test raised exception:" >+ print exception >+ sys.stdout.flush() >+ summary = Constants.ABORT >+ # if debugging, re-raise the exception to show tracebacks, etc. >+ if self.Debugging: >+ raise exception >+ >+ run.setSummary(summary) >+ if summary != Constants.PASS: >+ combinedResult = False >+ run.getResultsAttachments(self.environment, outputFilePath) >+ >+ except OSError, e: >+ print "Test error: %s" % e >+ syslog.syslog("Test error: %s" % e) >+ testerror = True >+ combinedResult = False >+ >+ # Log the end of the test. >+ run.setEndTime(self.getCurrentUTCTime()) >+ syslog.syslog("%s: end" % test.getName()) >+ # If the test did not run correctly, don't record the results. >+ if testError: >+ self.removeDirectory(logDirectory) >+ continue >+ >+ # Save the system log >+ self.saveSystemLog(test, run) >+ # encode return value into output.log >+ self.saveOutput(run, returnValue, outputFilePath) >+ # print result >+ print ("Return value was %u") % returnValue >+ # TODO: clean up tmpdir here? >+ self.certification.save(self.environment.getResultsPath()) >+ if self.options.debug != Constants.off: >+ print "saved to " + self.environment.getResultsPath() >+ >+ syslog.syslog("Testing complete.") >+ syslog.closelog() >+ self.certification.save(self.environment.getResultsPath()) >+ >+ return combinedResult >+ >+ def logDevices(self, test, run): >+ devices = test.logDevices(self.planner, self.certification) >+ if devices: >+ run.appendDevices(devices) >+ return True >+ >+ #otherwise >+ return False >+ >+ def addMandatoryTests(self, requestedTests): >+ if self.options.debug == Constants.off: >+ requestedTestDict = dict(zip(map(TestDocument.getName, requestedTests), requestedTests)) >+ >+ for mandatoryTest in self.certification.getMandatoryTests(): >+ try: >+ test = requestedTestDict[mandatoryTest.getName()] >+ except KeyError: >+ mandatoryTest.setMode(Constants.forced) >+ if mandatoryTest.getMandatory() == Constants.pre: >+ requestedTests.insert(0,mandatoryTest) >+ elif mandatoryTest.getMandatory() == Constants.post: >+ requestedTests.append(mandatoryTest) >+ >+ # reboot test is a special case - really needs to run last >+ tests = list() >+ rebootTests = list() >+ for test in requestedTests: >+ if test.getName() != "reboot": >+ tests.append(test) >+ else: >+ rebootTests.append(test) >+ if rebootTests: >+ tests.extend(rebootTests) >+ return tests >+ >+ # otherwise - debugging - don't add any tests >+ print "Warning: Test runs in debug mode are invalid for certification." >+ if self.certification.getMandatoryTests(): >+ print "The following tests will be skipped: %s" % ", ".join(map(TestDocument.getName, self.certification.getMandatoryTests())) >+ if not self.ui.promptConfirm("Do you wish to continue?"): >+ return None >+ return requestedTests >+ >+ def doCertify(self): >+ """ run only tests needed for certification """ >+ >+ if not self.doPlan(): >+ return False >+ >+ engine = ResultsEngine(self.certification) >+ tests = engine.getRemainingTests(self.catalog) >+ >+ # for realtime kernel, just look for remaining realtime tests >+ if self.isRealtime(): >+ tests = self.certification.getTaggedTests(tests, [TestTag.realtime]) >+ >+ # filter remaining tests via tags >+ elif self.options.tag: >+ tests = self.certification.getTaggedTests(tests, self.options.tag) >+ >+ # filter by device (AKA component-level certify command) >+ if self.options.udi or self.options.device: >+ tests = self.certification.getDeviceTests(self.options, tests) >+ >+ if len(tests) > 0: >+ if not self.__checkResultsSize(): >+ return False >+ tests = self.addMandatoryTests(tests) >+ if not tests: >+ return False >+ >+ >+ deviceName = "" >+ if self.options.device: >+ deviceName = self.options.device >+ if self.options.udi: >+ deviceName += " (" + ", ".join(self.options.udi) + ")" >+ if deviceName: >+ print "\nThe following component tests for device %s with tag(s) [%s] are recommended:" % (deviceName, ", ".join(self.options.tag)) >+ elif self.options.tag == [TestTag.certification]: >+ print "\nThe following tests are recommended to complete the certification:" >+ else: >+ print "\nThe following tests with tag(s) [%s] are recommended:" % (", ".join(self.options.tag)) >+ for test in tests: >+ print "%-10s %-10s %-36s" % (test.getName(), test.getLogicalDeviceName(), test.getShortUDI()) >+ >+ verified = self.verify(tests) >+ if not verified and self.options.mode != Constants.auto: >+ if not self.ui.promptConfirm("Verification failed, would you like to continue testing?"): >+ return False >+ >+ # otherwise overriding verification failure, or mode auto, or passed verification and user continue >+ if not verified or self.options.mode == Constants.auto or self.ui.promptConfirm("Run these tests?"): >+ return self._doRun(tests) >+ elif self.options.mode == Constants.auto: >+ print "There are no outstanding non-interactive tests" >+ else: >+ print "Testing appears to be complete." >+ >+ return True >+ >+ def doVerify(self): >+ """ run tests configuration verification code """ >+ >+ if not self.doPlan(): >+ return False >+ >+ return self.verify(self.certification.getTests()) >+ >+ def __checkResultsSize(self): >+ >+ if self.certification.getLoadedFileSize() < self.environment.getResultsWarningSize(): >+ return True >+ >+ # otherwise >+ print "Warning: the test results may be too large to submit to the hardware catalog." >+ print "File: %s is %u MB" % (self.certification.getLoadedFilePath(), int(self.certification.getLoadedFileSize()/1048576)) >+ if self.options.mode != Constants.auto and self.ui.promptConfirm("Would you like to package the current results first?"): >+ self.doSubmit() >+ return False >+ >+ # size warning was ignored (or mode auto) >+ return True >+ >+ def verify(self, testDocuments): >+ >+ result = True >+ >+ # just call verify on each test directly >+ failures = 0 >+ >+ for testDocument in testDocuments: >+ if testDocument.isDisabled(): >+ continue >+ test = self.planner.getTest(testDocument.getName()) >+ testParameters = TestParameters(self, testDocument) >+ test.setParameters(testParameters) >+ if test.verify(): >+ # allow test verify code to update test document >+ testDocument.setParameters(testParameters) >+ else: >+ failures += 1 >+ >+ if failures == 0: >+ print "\nTest Verification Passed" >+ return True >+ >+ # otherwise: >+ print "\n%u Tests Failed Verification" % failures >+ return False >+ >+ def doPrint(self, format="text"): >+ self.load() >+ if format == "html": >+ report = ReportHTML(self.options, self.certification) >+ return report.Write() >+ # otherwise >+ report = Report(self.options, self.certification) >+ return report.Write() >+ >+ def doSubmit(self): >+ self.load() >+ return self.__submit() >+ >+ def __submit(self): >+ >+ if not self.certification or self.certification.getNumberOfTestRuns() == 0: >+ print "Error: no test results to submit" >+ return False >+ >+ if self.catalog.isReachable() and self.options.mode != Constants.auto: >+ (submitted, success) = self.catalog.submit(self.certification) >+ if success and submitted and self.ui.promptConfirm("Would you like to clean current test results from this system?"): >+ self.clean() >+ >+ # otherwise >+ return True >+ >+ def doSave(self): >+ self.load() >+ if self.certification and self.certification.getNumberOfTestRuns() > 0: >+ return self.__save() >+ >+ #otherwise >+ print "Error: no test results to save" >+ return False >+ >+ >+ def __save(self): >+ if self.certification: >+ path = self.environment.getStoreDirectory() >+ serverPath = list() >+ self.makeDirectoryPath(path) >+ for tag in Tags.vendor, Tags.make, Tags.model, Tags.arch: >+ value = self.certification.getHardware(tag) >+ if len(value) > 0 and value != "unknown": >+ path += "/%s" % value >+ self.makeDirectoryPath(path) >+ serverPath.append(value) >+ for value in self.certification.getOSProductShortName(), self.certification.getOS(Tags.release): >+ if len(value) > 0: >+ path += "/%s" % value >+ self.makeDirectoryPath(path) >+ serverPath.append(value) >+ >+ savedFilename = self.certification.getVerboseFileName() >+ print "Saving current results to:" >+ savedFilePath = os.path.join(path, savedFilename) >+ print savedFilePath + ".gz" >+ self.certification.compressToFile(savedFilePath) >+ >+ # if it's supplied as an option, use that one but don't save it, >+ # otherwise, check the certification's test server >+ if not self.options.server: >+ self.options.server = self.certification.getTestServer() >+ if self.options.server == "unknown": >+ self.options.server = None >+ >+ if self.options.server and (self.options.mode == Constants.auto or self.ui.promptConfirm("\nCopy results to test server %s?" % self.options.server)): >+ try: >+ request = HwCertHttp(self.options.server, "/hwcert/cgi/saveFile.py") >+ request.addField("server-path", serverPath) >+ return request.httpUpload(savedFilePath+".gz") >+ except Exception, e: >+ print "Error: could not save results to server %s" % self.options.server >+ print e >+ return False >+ >+ # otherwise >+ return True >+ >+ #otherwise >+ print "Error: no test results to save" >+ return False >+ >+ def doClean(self): >+ subcommand = Constants.results >+ try: >+ subcommand = self.args[1] >+ except IndexError: >+ pass >+ if subcommand not in [Constants.results, Constants.all]: >+ print "Error: invalid option %s, should be %s or %s" % (subcommand, Constants.results, Constants.all) >+ return False >+ >+ >+ if self.ui.promptConfirm("Are you sure you want to delete all test results?"): >+ self.clean() >+ if subcommand == Constants.all and self.ui.promptConfirm("Also remove certification data?"): >+ self.cleanAll() >+ return True >+ >+ def getBiosInfo(self): >+ arch = self.getArch() >+ self.certification.setHardware(Tags.arch, arch) >+ if self.Debugging: >+ print "Getting Bios Info for %s" % arch >+ vendor = "" >+ make = "" >+ model = "" >+ if arch in ['i386','i586','i686','x86_64']: >+ dmidecode = os.popen("/usr/sbin/dmidecode") >+ >+ for line in dmidecode: >+ if line.find(':') == -1: >+ continue >+ (key,val)=[t.strip() for t in line.split(':',1) if line.find(':')] >+ >+ if key == "Manufacturer" and val: >+ vendor = val >+ if key == "Product Name" and val: >+ model = val >+ # try and use the first word as the make >+ make = val.split()[0] >+ if vendor and model: >+ break >+ dmidecode.close() >+ elif os.path.exists('/proc/device-tree/model'): >+ f=open('/proc/device-tree/model') >+ model = f.readline().strip() >+ # if there's a comma, guess the format is <vendor>, <model> >+ # failing that, if there's a space, assume it's <vendor> <model> >+ try: >+ for separator in [',', ' ']: >+ if separator in model: >+ vendor = model.split(separator)[0] >+ model = model.split(separator)[1] >+ break >+ except: >+ pass >+ >+ f.close() >+ # IBM-likely arches >+ if arch in ['s390','s390x', "ppc", "ppc64"]: >+ if not vendor: >+ vendor = 'IBM' >+ if not make: >+ make = 'IBM' >+ if not model: >+ model = arch >+ >+ >+ >+ self.certification.setHardware(Tags.model, model) >+ self.certification.setHardware(Tags.make, make) >+ self.certification.setHardware(Tags.vendor, vendor) >+ >+ def getArch(self): >+ system,node,release,version,machine = os.uname() >+ return machine >+ >+ def makeTestRunDirectory(self): >+ """Create a directory for the current test run""" >+ testRunDirectory = self.getTestRunDirectory() >+ self.removeDirectory(testRunDirectory) >+ self.makeDirectoryPath(testRunDirectory) >+ return testRunDirectory >+ >+ def getTestRunDirectory(self): >+ runNumber = self.certification.getNumberOfTestRuns() >+ return self.environment.getLogDirectory() + "/runs/%u" % runNumber >+ >+ def makeLogDirectory(self, directory, test, continueRun=False): >+ "Create a directory for logs for the currently-running test" >+ logdir=directory+"/"+test.getName() >+ if not continueRun: >+ self.removeDirectory(logdir) # clean it, if it's there >+ self.makeDirectoryPath(logdir) >+ return logdir >+ >+ def installTest(self, testDirectory,tmpDirectory): >+ "Install a test's files to the named tmpdir for running" >+ if self.Debugging != Constants.off: >+ print "installing test from %s into %s" % (testDirectory, tmpDirectory) >+ try: >+ cwd=os.getcwd() >+ os.chdir(testDirectory) >+ os.system("make install DEST="+tmpDirectory) >+ os.chdir(cwd) >+ except OSError, error: >+ print "Error: Could not install test sources - %s" % error >+ return False >+ return True >+ >+ def runTest(self, testDirectory, testDocument, run, outputFilePath): >+ "Run a test, sending output to the logfile and stdout" >+ if self.Debugging: >+ print "HardwareTestHarness.runTest:" >+ print " test directory %s udi %s outputFile %s " % (testDirectory, testDocument.getUDI(), outputFilePath) >+ # change cwd to test scratch area >+ cwd=os.getcwd() >+ os.chdir(testDirectory) >+ >+ # call make build to compile/chmod any called scripts >+ try: >+ buildCommand = Command("make build") >+ buildCommand.echo() >+ except HwCertCommandException, exception: >+ print "Warning: test build produced errors." >+ print exception >+ buildCommand.printErrors() >+ >+ #tee stdout and stderr to terminal and log file >+ realStdout = sys.stdout >+ realStderr = sys.stderr >+ sys.stdout = Log(outputFilePath) >+ sys.stderr = sys.stdout >+ >+ # find the test driver python and call it directly >+ test = self.planner.getTest(testDocument.getName()) >+ testParameters = TestParameters(self, testDocument) >+ testParameters.set(Constants.OUTPUTFILE, outputFilePath) >+ test.setParameters(testParameters) >+ if self.Debugging != Constants.off: >+ print "Test Parameters: %s" % testParameters >+ rv = test.run() >+ >+ # restore stdout and stdin, and cwd >+ sys.stdout.close() >+ sys.stdout = realStdout >+ sys.stderr = realStderr >+ os.chdir(cwd) >+ >+ # log devices >+ if not self.logDevices(test, run): >+ testError = True >+ >+ if self.Debugging: >+ print "...Done: return value = %u." % rv >+ return rv >+ >+ def runServer(self, test, subcommand): >+ if subcommand == "start": >+ return test.startServer() >+ if subcommand == "stop": >+ return test.stopServer() >+ >+ # otherwise >+ return test.statusServer() >+ >+ def saveSystemLog(self, test, run): >+ "Save a section of the system log to the named file" >+ # It feels a bit wrong to have this stuff hardcoded, but it works. >+ contents = self.getSystemLog(test.getName()) >+ run.setSystemLog(contents) >+ >+ def saveOutput(self, run, retval, outputFilePath): >+ print "saveOutput: %s" % outputFilePath >+ run.setReturnValue(retval) >+ if not run.setTestOutput(outputFilePath): >+ print "Error: could not save test logs, marking as failed." >+ run.setSummary(Constants.FAIL) >+ >+ def _getTestsFromOptions(self): >+ if self.options.device: >+ return self.certification.getTestsByLogicalDevice(self.options.device) >+ if self.options.udi: >+ return self.certification.getTestsByUDI(self.options.udi) >+ if self.options.test: >+ return self.certification.getTests(self.options.test) >+ print "Error: no tests specified in options" >+ return None >+ >+ def setTestsEnabled(self): >+ tests = self._getTestsFromOptions() >+ if self.Debugging: >+ print "HardwareTest.setTestsEnabled:" >+ disabled = True >+ output = "Disabled" >+ count = 0 >+ if self.options.enable: >+ disabled = False >+ output = "Enabled" >+ for test in tests: >+ if disabled and test.getMandatory(): >+ print "The %s test must be included in every test run and can not be disabled." % test.getName() >+ continue >+ if test.isDisabled() != disabled: >+ test.setDisabled(disabled) >+ count = count + 1 >+ else: >+ print "test %s is already %s" % (test.getName(), output) >+ >+ if self.Debugging: >+ print (output + " test: %s udi: %s") % (test.getName(), test.getUDI()) >+ print (output + " %u tests") % count >+ >+ def setServerOnTests(self): >+ tests = self._getTestsFromOptions() >+ if self.Debugging: >+ print "HardwareTest.setServerOnTests:" >+ >+ if self.options.server: >+ output = "Set server to %s for " % self.options.server >+ for test in tests: >+ test.setServer(self.options.server) >+ if self.Debugging: >+ print (output + " test: %s udi: %s") % (test.getName(), test.getUDI()) >+ print (output + " %u test(s)") % len(tests) >+ >+ def addTest(self): >+ if not self.options.test: >+ print "Must specify a test , use --test <test name>" >+ return >+ >+ # otherwise, get the test to be added >+ planner = Planner(self.options, redHatRelease=self.redHatRelease) >+ planner.analyse(self.environment.getSourceDirectory()) >+ >+ try: >+ test = planner.getTest(self.options.test[0]) >+ test = copy.copy(test) >+ except KeyError: >+ print "No such test %s" % self.options.test >+ return >+ >+ if not self.options.udi and not self.options.device: >+ print "Warning: No device specified with --udi <device udi> or --device <logical device name>" >+ if not self.ui.promptConfirm("Are you sure the test does not require a specific device?"): >+ return >+ else: >+ >+ device = None >+ if self.options.udi: >+ key = self.options.udi[0] >+ device = planner.getDeviceByUDI(key) >+ elif self.options.device: >+ key = self.options.device >+ #bz 485212 - don't try and match device "name" unless >+ # three or more characters are provided >+ if len(key) > 2: >+ device = planner.getDeviceByName(key) >+ >+ if not device: >+ print "Warning: unknown device: %s " % key >+ # create one from scratch >+ device = UdevDevice(dict()) >+ if self.options.udi and len(self.options.udi) > 0: >+ device.setUDI(self.options.udi[0]) >+ >+ test.setDevice(device) >+ if self.options.device: >+ test.setLogicalDeviceName(self.options.device) >+ >+ >+ if self.Debugging: >+ print "added test: %s for device: %s" % (test, device.getProperty('info.product')) >+ >+ test.setSource(Constants.manual) >+ self.certification.appendTest(test) >+ self.certification.setPlanTime(self.getCurrentUTCTime()) >+ print "Added test" >+ >+ def getCurrentUTCTime(self): >+ return time.gmtime(time.time()) >+ >+ # YK: to remove HwCert log files after the result rpm is generated >+ def clean(self): >+ # is the rug going to be pulled out from under us? >+ if self.environment.getLogDirectory() in os.getcwd(): >+ os.chdir("/var/log") >+ if self.options.debug != Constants.off: >+ print "removing HwCert logs ..." >+ os.system("rm -rf %s" % self.environment.getLogDirectory()) >+ if self.options.debug != Constants.off: >+ print "removing HwCert results..." >+ os.system("rm -f %s" % self.environment.getResultsPath()) >+ >+ def cleanAll(self): >+ os.system("rm -f %s" % self.environment.getCertificationPath()) >+ >+ def getLock(self): >+ if os.path.exists(self.environment.getLockFile()): >+ print "Error: hwcert is already running (lock file %s found)" % self.environment.getLockFile() >+ if self.options.mode == Constants.normal: >+ if self.ui.promptConfirm("Override?"): >+ return True >+ # otherwise, can't ask about override >+ return False >+ # otherwise, no lock - go ahead and lock it >+ lock = open(self.environment.getLockFile(), "w") >+ return True >+ >+ def releaseLock(self): >+ try: >+ os.remove(self.environment.getLockFile()) >+ except OSError, e: >+ print "Warning: hwcert lock file missing - other instances of hwcert may have run." >+ >+ def editCertification(self): >+ editable = [ Tags.vendor, Tags.make, Tags.model, Tags.product_url] >+ print "\nPlease verify the hardware product information:" >+ for tag in editable: >+ value = self.certification.getHardware(tag) >+ value = self.ui.promptEdit(" %s:" % tag, value) >+ if len(value) > 0: >+ self.certification.setHardware(tag, value) >+ >+ # category >+ answers = SystemCategories.getAll() >+ value = self.certification.getHardware(Tags.category) >+ value = self.ui.promptEdit(" " + Tags.category, value, answers) >+ if len(value) > 0: >+ self.certification.setHardware(Tags.category, value) >+ >+ >+ certificationID = self.certification.getCertificationID() >+ >+ # if the certification id isn't set yet - see if they want to use the catalog >+ if not certificationID: >+ self.catalog.getCertificationID(self.certification) >+ # else, let them edit it directly >+ else: >+ answers = list() >+ answers.append(str(certificationID)) >+ while True: >+ value = self.ui.prompt("\nPlease enter the certification ID:", answers) >+ if not value: >+ break >+ try: >+ int(value) >+ break >+ except ValueError: >+ print "Error: %s is not an integer." % value >+ if value and int(value) >= 0: >+ self.certification.setCertificationID(value) >+ >+ # set a local test server >+ value = self.certification.getTestServer() >+ value = self.ui.promptEdit("Local Hardware Certification Test Server: ", value) >+ if len(value) > 0: >+ self.certification.setTestServer(value) >+ >+ # copy over to certification.xml >+ certificationDocument = CertificationDocument() >+ certificationDocument.copy(self.certification) >+ certificationDocument.save(self.environment.getCertificationPath()) >diff --git a/hwcert/test.py b/hwcert/test.py >index 1a9cfe6..0dd01d7 100644 >--- a/hwcert/test.py >+++ b/hwcert/test.py >@@ -574,7 +574,7 @@ class Test(CommandLineUI): > > class TestParameters: > >- def __init__(self, hardwareTest, testDocument = None): >+ def __init__(self, harness, testDocument = None): > self.parameters = dict() > self.testDocument = testDocument > >@@ -587,15 +587,15 @@ class TestParameters: > pass > > # 2 get them from the harness/options >- if hardwareTest.testServer: >- self.parameters[Constants.TESTSERVER] = hardwareTest.testServer >- elif hardwareTest.certification and hardwareTest.certification.getTestServer(): >- self.parameters[Constants.TESTSERVER] = hardwareTest.certification.getTestServer() >+ if harness.options.server: >+ self.parameters[Constants.TESTSERVER] = harness.options.server >+ elif harness.certification and harness.certification.getTestServer(): >+ self.parameters[Constants.TESTSERVER] = harness.certification.getTestServer() > >- self.parameters[Constants.DEBUG] = hardwareTest.options.debug >- self.parameters[Constants.RUNMODE] = hardwareTest.options.mode >+ self.parameters[Constants.DEBUG] = harness.options.debug >+ self.parameters[Constants.RUNMODE] = harness.options.mode > >- if hardwareTest.command == "continue": >+ if harness.command == "continue": > self.parameters[Constants.INCOMPLETE] = "1" > > >diff --git a/test-env/bin/hwcert b/test-env/bin/hwcert >index 7d2422e..826a01d 100755 >--- a/test-env/bin/hwcert >+++ b/test-env/bin/hwcert >@@ -20,7 +20,6 @@ hwCertClassLibraryPath = '/usr/share/hwcert/lib/' > sys.path.append(hwCertClassLibraryPath) > os.putenv("PYTHONPATH", hwCertClassLibraryPath) > >-from hwcert.hardwaretest import HardwareTestHarness > from hwcert.hardwarecertification import HardwareCertification > > # bail out if user is not root >diff --git a/test-env/bin/hwcert-backend b/test-env/bin/hwcert-backend >index 2f03ef1..6229a11 100644 >--- a/test-env/bin/hwcert-backend >+++ b/test-env/bin/hwcert-backend >@@ -21,15 +21,15 @@ sys.path.append(hwcertClassLibraryPath) > os.putenv("PYTHONPATH", hwcertClassLibraryPath) > > from hwcert.test import Test >-from hwcert.hardwaretest import HardwareTestHarness >+from hwcert.backend import Backend > > # bail out if user is not root > if os.getuid() > 0: > sys.stderr.write("You need to be root to run this program.\n") > sys.exit(1) > >-hwcert=HardwareTestHarness() >-args = hwcert.getArgs() >+hwcertBackend=Backend() >+args = hwcertBackend.getArgs() > > > command = None >@@ -42,7 +42,7 @@ if command == "test": > success = test.do(args[1:]) > else: > >- success = hwcert.do(args) >+ success = hwcertBackend.do(args) > > if not success: > sys.exit(1) >-- >1.8.1.4 >
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
View Attachment As Diff
View Attachment As Raw
Actions:
View
|
Diff
Attachments on
bug 921247
:
712651
|
713444
|
714559
| 716574