Login
[x]
Log in using an account from:
Fedora Account System
Red Hat Associate
Red Hat Customer
Or login using a Red Hat Bugzilla account
Forgot Password
Login:
Hide Forgot
Create an Account
Red Hat Bugzilla – Attachment 901795 Details for
Bug 849087
[RFE] Improve journal operations scaling and resource consumption
[?]
New
Simple Search
Advanced Search
My Links
Browse
Requests
Reports
Current State
Search
Tabular reports
Graphical reports
Duplicates
Other Reports
User Changes
Plotly Reports
Bug Status
Bug Severity
Non-Defaults
|
Product Dashboard
Help
Page Help!
Bug Writing Guidelines
What's new
Browser Support Policy
5.0.4.rh83 Release notes
FAQ
Guides index
User guide
Web Services
Contact
Legal
This site requires JavaScript to be enabled to function correctly, please enable it.
inspiration for journalling.py
log.py (text/x-python), 37.14 KB, created by
Dalibor Pospíšil
on 2014-06-03 13:50:32 UTC
(
hide
)
Description:
inspiration for journalling.py
Filename:
MIME Type:
Creator:
Dalibor Pospíšil
Created:
2014-06-03 13:50:32 UTC
Size:
37.14 KB
patch
obsolete
>#!/usr/bin/env python > ># Copyright (c) 2009 Red Hat, Inc. All rights reserved. This copyrighted ># material is made available to anyone wishing to use, modify, copy, or ># redistribute it subject to the terms and conditions of the GNU General ># Public License v.2. ># ># This program is distributed in the hope that it will be useful, but WITHOUT ># ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS ># FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. ># ># You should have received a copy of the GNU General Public License ># along with this program; if not, write to the Free Software ># Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. ># ># Author: Lubos Trilety <ltrilety@redhat.com> > >""" Log API """ > ># takes name convention from logging and from previous code ># pylint: disable=C0103 > >import tempfile >import os >from datetime import datetime >import types >import sys >import socket >import xml.sax >import xml.sax.saxutils >import logging >import copy >import mrg.utils >from mrg.contrib.compat import OrderedDict > >## remove extra from kwargs >#if sys.version_info < (2, 5): ># class LoggerCompat(object): ># def __init__(self, logger): ># self._logger = logger ># ># def __getattr__(self, name): ># val = getattr(self._logger, name) ># if callable(val): ># def _wrapper(*args, **kwargs): ># # Python 2.4 logging module doesn't support 'extra' parameter to ># # methods of Logger ># kwargs.pop('extra', None) ># return val(*args, **kwargs) ># return _wrapper ># else: ># return val ># ># def getLogger(name=None): ># return LoggerCompat(MRGLogger(name=name)) >#else: >def getLogger(name=None, no_log=False): > """ own getLogger """ > return MRGLogger(name=name, no_log=no_log) > >LOG_LEVEL = 60 >PASS_LEVEL = 61 >FAIL_LEVEL = 62 >INFO = logging.INFO >CRITICAL = logging.CRITICAL >DEBUG = logging.DEBUG >ERROR = logging.ERROR >FATAL = logging.FATAL >WARNING = logging.WARNING >WARN = WARNING >NOTSET = logging.NOTSET > >logging.addLevelName(LOG_LEVEL, 'LOG') >logging.addLevelName(PASS_LEVEL, 'PASS') >logging.addLevelName(FAIL_LEVEL, 'FAIL') > > >def passed(self, message, *args, **kws): > """ log pass """ > # Yes, logger takes its '*args' as 'args'. > self._log(PASS_LEVEL, message, args, **kws) # pylint: disable=W0212 >logging.Logger.passed = passed > > >def failed(self, message, *args, **kws): > """ log fail """ > self._log(FAIL_LEVEL, message, args, **kws) # pylint: disable=W0212 >logging.Logger.failed = failed > >TIME_FORMAT = "%H:%M:%S" ># use for example "%H:%M:%S:%f" to enable millisecond logging >if sys.version_info < (2, 5): > TIME_FORMAT_MS = TIME_FORMAT >else: > TIME_FORMAT_MS = TIME_FORMAT + ",%f" > >STD_VARIABLE = ' [ %(levelname)-9s] %(name)s::' >STD_FORMAT = '[%(asctime)s,%(msecs).03d]' + STD_VARIABLE + ' %(message)s' >XML_FORMAT = '%(message)s' >STD_FORMATTER = logging.Formatter(STD_FORMAT, TIME_FORMAT) >XML_FORMATTER = logging.Formatter(XML_FORMAT) > > >class XmlHandler(logging.FileHandler): > """ Handler for xml file """ > def __init__(self, xml_file): > """ init """ > logging.FileHandler.__init__(self, xml_file) > > def emit(self, record): > """ emit message to xml """ > if sys.version_info < (2, 5): > if extra: > for key in extra.keys(): > setattr(record, key, extra[key]) > > # need to run self.formatter.format otherwise record.message is not filled > formatted_message = self.formatter.format(record) > if hasattr(record, 'tag'): > # there will be used tag > attr_str = '' > if hasattr(record, 'attrs'): > for attr_name in record.attrs: > attr_str += ' %s=%s' % (attr_name, xml.sax.saxutils.quoteattr(record.attrs[attr_name])) > if hasattr(record, 'starttag'): > if record.starttag: > self.stream.write('<%s%s>\n' % (record.tag, attr_str)) > else: > self.stream.write('</%s>\n' % record.tag) > else: > self.stream.write('<%s%s>%s</%s>\n' %\ > (record.tag, attr_str, xml.sax.saxutils.escape(record.message), record.tag)) > else: > if record.levelname == 'PASS' or record.levelname == 'FAIL': > # assert, or result of test > self.stream.write('<test message=%s>%s</test>\n' %\ > (xml.sax.saxutils.quoteattr(record.message), xml.sax.saxutils.escape(record.levelname))) > else: > # typical message > self.stream.write('<message severity=%s timestamp=%s>%s</message>\n' %\ > (xml.sax.saxutils.quoteattr(record.levelname),\ > xml.sax.saxutils.quoteattr(datetime.strftime(datetime.now(), TIME_FORMAT_MS)),\ > xml.sax.saxutils.escape(formatted_message))) > self.flush() > > >class MRGLogger(logging.Logger, object): > """ Special MRG Logger """ > def __txt_format_log(self, txt_format, lvl, msg, *args, **kwargs): > """ use different format """ > if self.handlers: > # set format > aux_std_formatter = logging.Formatter(txt_format, TIME_FORMAT) > for handler in self.handlers: > if not isinstance(handler, XmlHandler): > handler.setFormatter(aux_std_formatter) > logging.Logger.log(self, lvl, msg, *args, **kwargs) > # put format back > for handler in self.handlers: > if not isinstance(handler, XmlHandler): > handler.setFormatter(STD_FORMATTER) > > def log(self, lvl, msg, *args, **kwargs): > """ log message """ > if lvl == LOG_LEVEL: > std_format = STD_FORMAT > > if sys.version_info < (2, 5): > # remove extra if python version is less than 2.5 > global extra > extra = kwargs.pop('extra', None) > else: > extra = kwargs.get('extra', None) > > if extra and 'tag' in extra: > # print special format > # if beginning or ending tag log only xml > if extra['tag'] == 'phase' or extra['tag'] == 'BEAKER_TEST' or extra['tag'] == 'log': > handlers = copy.copy(self.handlers) > txtonly_handlers = [] > for handler in handlers: > if not isinstance(handler, XmlHandler): > self.removeHandler(handler) > txtonly_handlers.append(handler) > if self.handlers: > logging.Logger.log(self, lvl, msg, *args, **kwargs) > # if phase beginning log starttime to txt handlers > if extra['tag'] == 'phase' and 'attrs' in extra and 'starttime' in extra['attrs']: > self.handlers = copy.copy(txtonly_handlers) > txt_format = STD_FORMAT.replace(STD_VARIABLE, "") > self.__txt_format_log(txt_format, lvl, 'Phase started: %s', extra['attrs']['starttime'], **kwargs) > self.handlers = copy.copy(handlers) > return > else: > # if needed print tag as special string, otherwise print only formatted message > prints_subst = { > 'test_id': 'Test run ID :', > 'package': 'Package :', > 'release': 'Distro: :', > 'starttime': 'Test started :', > 'arch': 'Architecture :', > 'hostname': 'Hostname :', > 'endtime': 'Test or Phase finished:', > 'result': 'Phase result :' > } > if extra['tag'] in prints_subst: > std_format = STD_FORMAT.replace(STD_VARIABLE, ' %s' % prints_subst[extra['tag']]) > elif extra['tag'] == 'testid': > std_format = STD_FORMAT.replace(STD_VARIABLE, ' TCMS') + ' result : %s' % extra['attrs']['result'] > else: > std_format = STD_FORMAT.replace(STD_VARIABLE, "") > else: > # remove level from output > std_format = STD_FORMAT.replace(STD_VARIABLE, "") > handlers = copy.copy(self.handlers) > if 'noxml' in kwargs and kwargs['noxml']: > del(kwargs['noxml']) > # remove xml handler > for handler in handlers: > if isinstance(handler, XmlHandler): > self.removeHandler(handler) > # log to txt outputs with different format - std_format > self.__txt_format_log(std_format, lvl, msg, *args, **kwargs) > self.handlers = copy.copy(handlers) > else: > logging.Logger.log(self, lvl, msg, *args, **kwargs) > > >class MRGLog(MRGLogger): > """ Basic MRG Log class """ > def __init__(self, > testid='', > logdir='', > debug=False, > handlers=None, > module=False, > output=None, > level=logging.INFO, > main_logger=None, > no_log=False): > """ initialization > Key Parameters: > testid - name of the test or module > logdir - log directory > debug - stdout only log level threshold is set to logging.DEBUG > handlers - list of handlers instances > module - if it is called by module or by main test > output - list of which handler should be initialized, by default all will be used > - possible handlers: 'xml', 'std', 'txt' > - only when handlers are not provided and it is not module instance > - not used if debug mode > - e.g. ['xml', 'std', 'txt'] > """ > # find if it is module or not - module is True or empty testid > if not testid: > self.__module = True > else: > self.__module = module > > self.__handlers_level = level > # note: logging.DEBUG is 10 > # logger will not block anything - xml needs debug > # level of logs will be filtered on handlers > self.level = logging.DEBUG > > if not self.__module: > super(MRGLog, self).__init__('main_test') > else: > super(MRGLog, self).__init__(testid) > > self.disabled = no_log > > self.__package = 'unknown' > > # logger for counting results > if main_logger: > # move down for now > self.parent = main_logger > > # test results > self.__fails_number = 0 > self.__pass_number = 0 > self.__inittime = None > > # phase results and ids > self.__act_phase_name = None > self.__act_phase_test_id = None > self.__act_phase_pass = 0 > self.__act_phase_fail = 0 > self.__act_phase_init_time = None > self.__in_phase = False > > # tcms test results and ids > self.__tcms_tests = OrderedDict() > self.__act_test_id = None > self.__act_test_desc = '' > self.__act_test_pass = 0 > self.__act_test_fail = 0 > self.__act_test_init_time = None > > if debug or output == ['std']: > self.__initialize_handlers(None, debug=debug, handlers=handlers, output=output) > else: > self.__initialize_handlers(self.__get_log_dir(logdir, testid), debug=False, > handlers=handlers, output=output) > > if not self.__module: > self.__init_log() > > def __get_log_dir(self, logdir, testid): > """ prepare logging directory if needed """ > if not self.__module: > # try to guess the log directory, or build it > if len(logdir) > 0: > # logdir parameter specified, use it > tmp_logdir = logdir > elif os.environ.get('BEAKERLIB_DIR', None): > # BEAKERLIB_DIR defined and not empty, use it > tmp_logdir = os.environ['BEAKERLIB_DIR'] > else: > # use testid as part of the name of the temporary directory > tmp_logdir = os.path.join(tempfile.gettempdir(), 'beakerlib-' + testid) > try: > os.makedirs(tmp_logdir) > finally: > return tmp_logdir > return None > > def __initialize_handlers(self, logdir, debug, handlers, output): > """ create handlers instances """ > if handlers: > # not empty > self.handlers = copy.copy(handlers) > elif handlers == [] or self.__module or logdir is None: > # empty handlers or module or (debug mode or std only) > if debug: > # print on std only > self.__handlers_level = logging.DEBUG > self.propagate = 0 > self.setStdHandler() > # use parent handlers or use stdout only > elif not isinstance(self.parent, MRGLog): > self.setStdHandler() > else: > # initialize handlers > try: > if output is None or 'xml' in output: > path = os.path.join(logdir, 'journal2.xml') > # try to open and clear the xml journal file > xml_file = open(path, 'w') > xml_file.close() > # add correct handler for log to xml > self.setXmlHandler(path) > if output is None or 'txt' in output: > path = os.path.join(logdir, 'journal2.txt') > # try to open and clear the text journal file > text_file = open(path, 'w') > text_file.close() > # add correct handler for log to txt > self.setTextHandler(path, self.handlers_level) > if output is None or 'std' in output: > # add correct handler for printing to stdout > self.setStdHandler(self.handlers_level) > except Exception, error: > raise mrg.utils.MRGException("Log:: Log init error: %s " % str(error)) > > def __init_log(self): > """ log header """ > self.log(LOG_LEVEL, '', extra={'tag': 'BEAKER_TEST', 'starttag': True}) > self.log(LOG_LEVEL, str(self.name), extra={'tag': 'test_id'}) > self.log(LOG_LEVEL, str(self.__package), extra={'tag': 'package'}) > self.__inittime = datetime.now() > self.log(LOG_LEVEL, datetime.strftime(self.__inittime, TIME_FORMAT_MS), extra={'tag': 'starttime'}) > self.log(LOG_LEVEL, str(socket.getfqdn()), extra={'tag': 'hostname'}) > if mrg.utils.MRGEnv.isLinux: > self.log(LOG_LEVEL, str(os.uname()[-1]), extra={'tag': 'arch'}) > self.log(LOG_LEVEL, str(open("/etc/redhat-release", 'r').read().strip()), extra={'tag': 'release'}) > else: > self.log(LOG_LEVEL, '', extra={'tag': 'arch'}) > # no error > self.log(LOG_LEVEL, str(sys.getwindowsversion()), extra={'tag': 'release'}) # pylint: disable=E1101 > # error > > purpose = '' > try: > purpose_file = open("PURPOSE", 'r') > purpose = purpose_file.read() > purpose_file.close() > except IOError: > purpose = "Cannot find the PURPOSE file of this test." +\ > "Could be a missing, or rlInitializeJournal wasn't called from appropriate location" > > self.log(LOG_LEVEL, purpose, extra={'tag': 'purpose'}) > self.log(LOG_LEVEL, '', extra={'tag': 'log', 'starttag': True}) > self.log(LOG_LEVEL, '::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::', noxml=True) > > def close(self): > """ close instance """ > if self.__in_phase: > # close phase if running > self.closePhase() > if self.__act_test_id != None: > # close TCMS if running > self.closeTCMS() > if not self.__module: > self.__end_log() > rlJournalEnd = close > > def rlJournalPrint(self, *args, **kwargs): > """ backward compatibility, not needed, do nothing """ > pass > rlJournalPrintText = rlJournalPrint > > def __end_log(self): > """ log ending """ > self.log(LOG_LEVEL, '::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::', noxml=True) > endtime = datetime.now() > self.log(LOG_LEVEL, datetime.strftime(endtime, TIME_FORMAT_MS), extra={'tag': 'endtime'}) > self.log(LOG_LEVEL, '', extra={'tag': 'log', 'starttag': False}) > self.log(LOG_LEVEL, '', extra={'tag': 'BEAKER_TEST', 'starttag': False}) > self.log(LOG_LEVEL, '====================================================================', noxml=True) > self.log(LOG_LEVEL, '====================================================================', noxml=True) > if self.__fails_number != 0: > self.log(LOG_LEVEL, 'Test Summary : FAIL #TESTS:%d #FAILS:%d' %\ > (self.__fails_number + self.__pass_number, self.__fails_number), noxml=True) > else: > self.log(LOG_LEVEL, 'Test Summary : PASS #TESTS:%d #FAILS:%d' %\ > (self.__fails_number + self.__pass_number, self.__fails_number), noxml=True) > if mrg.utils.MRGEnv.isLinux: > try: > self.log(LOG_LEVEL, 'Test name : %s' % os.sep.join(os.getcwd().split(os.sep)[-4:]), noxml=True) > except Exception: > self.log(LOG_LEVEL, 'Test name : %s' % self.name, noxml=True) > else: > self.log(LOG_LEVEL, 'Test name : %s' % self.name, noxml=True) > self.log(LOG_LEVEL, 'Duration : %s' % str(endtime - self.__inittime), noxml=True) > if mrg.utils.MRGEnv.isLinux: > self.log(LOG_LEVEL, > 'Test MRG pkgs : <no-repository> on %s %s'\ > % (open("/etc/redhat-release", 'r').read().strip(), > str(os.uname()[-1])), > noxml=True) > else: > # pylint: disable=E1101 > # no error > self.log(LOG_LEVEL, > 'Test MRG pkgs : <no-repository> on Windows(%s)' % str(sys.getwindowsversion()), # pylint: disable=E1101 > noxml=True) > # pylint: enable=E1101 > # error > self.log(LOG_LEVEL, '====================================================================', noxml=True) > # print all tcms test with results (PASS|FAIL) > self.log(LOG_LEVEL, "Test-case[s] Summary: (%i found)" % len(self.__tcms_tests), noxml=True) > for testid in self.__tcms_tests.keys(): > self.log(LOG_LEVEL, > '%s: %s #tests:%d #fails:%d desc.:"%s"'\ > % (str(testid), > self.__tcms_tests[testid]['result'], > self.__tcms_tests[testid]['pass'] + self.__tcms_tests[testid]['fail'], > self.__tcms_tests[testid]['fail'], > self.__tcms_tests[testid]['desc']), > noxml=True) > self.log(LOG_LEVEL, '====================================================================', noxml=True) > self.log(LOG_LEVEL, '====================================================================', noxml=True) > > def setXmlHandler(self, xml_file): > """ prepare xml file logging > NOTE: this handler has always log level set to DEBUG > """ > xmlh = XmlHandler(xml_file) > xmlh.setFormatter(XML_FORMATTER) > xmlh.setLevel(logging.DEBUG) > self.addHandler(xmlh) > > def setStdHandler(self, level=logging.INFO): > """ prepare logging to stdout """ > stdh = logging.StreamHandler(sys.stdout) > stdh.setFormatter(STD_FORMATTER) > stdh.setLevel(level) > self.addHandler(stdh) > > def setTextHandler(self, text_file, level=logging.INFO): > """ prepare logging to txt file """ > texth = logging.FileHandler(text_file) > texth.setFormatter(STD_FORMATTER) > texth.setLevel(level) > self.addHandler(texth) > > def getLevel(self): > """ getter for level """ > return self.__handlers_level > > def setLevel(self, level): > """ set level > just for std and text file > """ > self.__handlers_level = level > for handler in self.handlers: > if not isinstance(handler, XmlHandler): > handler.setLevel(level) > > handlers_level = property(getLevel, setLevel) > > rlLogDebug = MRGLogger.debug > rlLogInfo = MRGLogger.info > rlLogWarning = MRGLogger.warning > rlLogError = MRGLogger.error > rlLogFatal = MRGLogger.critical > fatal = MRGLogger.critical > > def rlLog(self, message, priority=LOG_LEVEL): > """ log a message """ > if type(priority) == types.StringType: > # no error > if not priority.strip(): # pylint: disable=E1103 > # error > severities = {"DEBUG": logging.DEBUG, > "INFO": logging.INFO, > "WARNING": logging.WARNING, > "ERROR": logging.ERROR, > "FATAL": logging.CRITICAL, > "LOG": LOG_LEVEL} > level = severities[priority] > # error > elif type(priority) == types.IntType and priority < 10: > if priority == 0: > level = logging.DEBUG > elif priority == 1: > level = logging.INFO > elif priority == 2: > level = logging.WARNING > elif priority == 3: > level = logging.ERROR > elif priority == 4: > level = logging.CRITICAL > elif not priority: > level = LOG_LEVEL > else: > level = priority > self.log(level, message) > > def rlLogLevel(self, message, level=logging.DEBUG): > """ log variable level default debug """ > self.rlLog(message, level) > > def rlAddMessage(self, message, severity="LOG"): > """ log any message default log """ > self.rlLog(message, severity) > > def __increase_counters(self, passed=True): > """ increase pass or fail counters for logger """ > if passed: > self.__pass_number += 1 > if self.__in_phase: > self.__act_phase_pass += 1 > if self.__act_test_id != None: > self.__act_test_pass += 1 > else: > self.__fails_number += 1 > if self.__in_phase: > self.__act_phase_fail += 1 > if self.__act_test_id != None: > self.__act_test_fail += 1 > > def passed(self, message, *args, **kwargs): > """ log pass """ > if isinstance(self.parent, MRGLog): > self.parent.__increase_counters() > else: > self.__increase_counters() > MRGLogger.passed(self, message, *args, **kwargs) > rlPass = passed > > def failed(self, message, *args, **kwargs): > """ log fail """ > if isinstance(self.parent, MRGLog): > self.parent.__increase_counters(passed=False) > else: > self.__increase_counters(passed=False) > MRGLogger.failed(self, message, *args, **kwargs) > > def rlFail(self, message, failmsg=''): > """ almost the same as failed, but it needs fail message """ > complete_message = "%s %s" % (message, str(failmsg)) > self.failed(complete_message) > > def getTestState(self): > """ getTestState """ > self.debug('rlGetTestState: %s failed assert(s) in test' % str(self.__fails_number)) > return self.__fails_number > rlGetTestState = getTestState > > def getPhaseState(self): > """ getPhaseState """ > self.debug("rlGetPhaseState: %s failed assert(s) in phase" % str(self.__act_phase_fail)) > return self.__act_phase_fail > rlGetPhaseState = getPhaseState > > def addPhase(self, name, testid=''): > """ add a new phase > only one phase can be active in time > Key Parameters: > name - name of the phase > testid - id of the running tcms test > """ > if self.__in_phase: > # already in phase close it first > self.closePhase() > if testid: > if self.__act_test_id != None: > # already running TCMS close it first > self.closeTCMS() > self.__in_phase = True > > self.log(LOG_LEVEL, '::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::', noxml=True) > self.log(LOG_LEVEL, "Phase %s started" % name, noxml=True) > if testid: > self.addTCMS(testid, stat=False, desc=name) > self.__act_phase_test_id = testid > #self.log(LOG_LEVEL, '::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::', noxml=True) > self.__act_phase_init_time = datetime.now() > self.log(LOG_LEVEL, '', extra={'tag': 'phase', 'starttag': True, > 'attrs': {'name': name, > 'result': "unfinished", > 'starttime': datetime.strftime(self.__act_phase_init_time, TIME_FORMAT_MS) > }}) > self.__act_phase_fail = 0 > self.__act_phase_pass = 0 > self.__act_phase_name = name > rlAddPhase = addPhase > phaseStart = addPhase > rlPhaseStart = addPhase > rljPhaseStart = addPhase > > def closePhase(self): > """ finish phase """ > if self.__in_phase: > self.log(LOG_LEVEL, datetime.strftime(datetime.now(), TIME_FORMAT_MS), extra={'tag': 'endtime'}) > self.log(LOG_LEVEL, > 'Duration : %ds' % (datetime.now() - self.__act_phase_init_time).seconds, > noxml=True) > self.log(LOG_LEVEL, > 'Assertions : %d good, %d bad' % (self.__act_phase_pass, self.__act_phase_fail), > noxml=True) > > if self.__act_phase_test_id: > self.closeTCMS(stat=False) > > if self.__act_phase_fail == 0: > self.log(LOG_LEVEL, 'PASS', extra={'tag': 'result'}) > else: > self.log(LOG_LEVEL, 'FAIL', extra={'tag': 'result'}) > > self.log(LOG_LEVEL, '', extra={'tag': 'phase', 'starttag': False}) > self.log(LOG_LEVEL, > '::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::', > noxml=True) > > #self.rlLogDebug("rlClosePhase: Phase %s closed" % self.__act_phase_name) > self.__in_phase = False > self.__act_phase_fail = 0 > self.__act_phase_pass = 0 > self.__act_phase_init_time = None > self.__act_phase_name = None > self.__act_phase_test_id = None > rlClosePhase = closePhase > phaseEnd = closePhase > rlPhaseEnd = closePhase > rljPhaseEnd = closePhase > > def addTCMS(self, testid, stat=True, desc=''): > """ start to count results for the TCMS test > NOTE: Only one can be opened in time > The test could be part of phase -- testid in phase > Key Parameters: > stat - if additional information should be printed > - by default it is printed (not to xml) > desc - description of test > """ > if self.__act_test_id != None: > # already running TCMS > self.closeTCMS() > if stat: > self.log(LOG_LEVEL, > '--------------------------------------------------------------------------------', > noxml=True) > self.log(LOG_LEVEL, 'TCMS %s started' % str(testid), noxml=True) > self.__act_test_id = testid > self.__act_test_desc = '' > if desc: > self.__act_test_desc = desc > self.__act_test_fail = 0 > self.__act_test_pass = 0 > self.__act_test_init_time = datetime.now() > testStart = addTCMS > > def closeTCMS(self, stat=True): > """ close TCMS print PASS or FAIL > Key Parameters: > stat - if additional information should be printed > - by default it is printed (not to xml) > """ > if self.__act_test_id != None: > if stat: > self.log(LOG_LEVEL, > 'TCMS %s duration : %ds' % (str(self.__act_test_id), > (datetime.now() - self.__act_test_init_time).seconds), > noxml=True) > self.log(LOG_LEVEL, > 'TCMS %s assertions: %d good, %d bad'\ > % (str(self.__act_test_id), self.__act_test_pass, self.__act_test_fail), > noxml=True) > > if self.__act_test_fail == 0: > self.addTestId(result='PASS') > else: > self.addTestId(result='FAIL') > > self.__tcms_tests[self.__act_test_id]['pass'] = self.__act_test_pass > self.__tcms_tests[self.__act_test_id]['fail'] = self.__act_test_fail > self.__tcms_tests[self.__act_test_id]['desc'] = self.__act_test_desc > > if stat: > self.log(LOG_LEVEL, > '--------------------------------------------------------------------------------', > noxml=True) > > self.__act_test_id = None > self.__act_test_desc = '' > self.__act_test_fail = 0 > self.__act_test_pass = 0 > self.__act_test_init_time = None > testEnd = closeTCMS > > def addTestId(self, testid=None, result='FAIL'): > """ add reult of the test, for tcms parsing """ > if testid is None: > testid = self.__act_test_id > self.__tcms_tests[testid] = {'result': result, 'pass': 0, 'fail': 0, 'desc': ''} > self.log(LOG_LEVEL, testid, extra={'tag': 'testid', 'attrs': {'result': str(result).upper()}}) > rlAddTestId = addTestId > > @property > def fails(self): > """ getter for number of fails """ > return self.__fails_number > > def rlFailsNumber(self): > """ return number of fails """ > return self.fails > > >class Log(MRGLog): # pylint: disable=R0904 > """ MRG Log class with asserts """ > def __init__(self, > testid='', > logdir='', > debug=False, > handlers=None, > module=False, > output=None, > level=logging.INFO, > main_logger=None, > test_env=None, > no_log=False): > """ initialization > see MRGLog init for more info > Key Parameter: > test_env - dictionary with check ids and expected values > - e.g. {'test1': True} > """ > MRGLog.__init__(self, testid, logdir, debug, handlers, module, output, level, main_logger, no_log) > if not test_env is None: > self.test_env = dict(test_env) > else: > self.test_env = {} > > def phaseStartSetup(self, name='Setup', testid=''): > """ phaseStartSetup """ > self.addPhase(name, testid) > rlPhaseStartSetup = phaseStartSetup > > def phaseStartTest(self, name='Test', testid=''): > """ phaseStartTest """ > self.addPhase(name, testid) > rlPhaseStartTest = phaseStartTest > > def phaseStartCleanup(self, name='Cleanup', testid=''): > """ phaseStartCleanup """ > self.addPhase(name, testid) > rlPhaseStartCleanup = phaseStartCleanup > > def conditionalAssert(self, message, status, failmsg=''): > """ check status and then log pass or fail accordingly """ > if status: > self.passed(message) > return True > else: > if failmsg: > self.failed('%s %s', message, failmsg) > else: > self.failed(message) > return False > > def conditionalAssertBool(self, message, status, failmsg=''): > """ check status and then log pass or fail accordingly """ > self.conditionalAssert(message, status is True, failmsg) > > def assertEquals(self, message, real_value, exp_value, fail_msg='', comp=None): > """ compare real value with expected value, log pass or fail accordingly > Key Parameters: > real_value - compared value against exp_value > exp_value - expected value > message - message described the check > fail_msg - message logged when compare fails > comp - compare method > """ > if fail_msg: > fail_message = fail_msg > else: > fail_message = 'Assert fail - Real value: "%s", expected value: "%s"' % (str(real_value), str(exp_value)) > > if comp != None: > fail_message = '%s, used compare method: %s' % (fail_message, str(comp.func_code.co_name)) > ret = comp(real_value, exp_value) > else: > ret = (real_value == exp_value) > > self.conditionalAssert(message, ret, fail_message) > return ret > assertEqual = assertEquals > > def rlAssertEquals(self, message, value1, value2, comp=None): > """ works as assertEquals, but checks arguments""" > if value1 == None or len(str(value1)) == 0 or value2 == None or len(str(value2)) == 0: > self.failed('rlAssertEquals called without all needed parameters') > return (False) > self.assertEquals(message, value1, value2, comp=comp) > > def assertEqualsSpecial(self, real_value, exp_value, assertID=None, message='', comp=None): > """ compare real value with expected value, log pass or fail accordingly > Key Parameters: > real_value - compared value against exp_value > exp_value - expected value > - if it is None it means that real_value could be anything > message - message described the check > comp - compare method > assert_id - id of the check > """ > if assertID != None and assertID in self.test_env: > aux_exp_val = self.test_env[assertID] > else: > aux_exp_val = exp_value > > if aux_exp_val == exp_value: > msg = 'Assert - Real value: "%s", expected value: "%s"' % (str(real_value), str(exp_value)) > else: > msg = 'Assert - Real value: "%s", expected value (taken from test_environment): "%s"'\ > % (str(real_value), str(aux_exp_val)) > if message: > msg = '%s %s' % (msg, message) > > if aux_exp_val != None: > return self.assertEquals(msg, real_value, aux_exp_val, fail_msg='', comp=comp) > else: > self.debug("%s, not compared" % message) > return True > assertEqualSpecial = assertEqualsSpecial > > def assertNotEquals(self, message, value1, value2): > """ assertNotEquals """ > fail_msg = "Assert fail - value: '%s' is not different from '%s'" % (str(value1), str(value2)) > return self.assertEquals(message, value1, value2, > comp=(lambda x,y: x != y), fail_msg=fail_msg) # pylint: disable=C0324 > rlAssertNotEqual = assertNotEquals > assertNotEqual = assertNotEquals > > def assertEmpty(self, message, value): > """ assertEmpty """ > return self.conditionalAssert(message, not value, "Assert fail - value: '%s' is not empty" % str(value)) > rlAssertEmpty = assertEmpty > > def assertNotEmpty(self, message, value): > """ assertNotEmpty """ > return self.conditionalAssert(message, value, "Assert fail - value: '%s' is empty" % str(value)) > rlAssertNotEmpty = assertNotEmpty > > def assertGreater(self, message, value1, value2): > """ assertGreater """ > fail_msg = "Assert fail - value: %s is not bigger than: %s" % (str(value1), str(value2)) > return (self.assertEquals(message, value1, value2, > comp=(lambda x,y: x > y), fail_msg=fail_msg)) # pylint: disable=C0324 > rlAssertGreater = assertGreater > > def assertLesser(self, message, value1, value2): > """ assertGreater """ > fail_msg = "Assert fail - value: %s is not lesser than: %s" % (str(value1), str(value2)) > return (self.assertEquals(message, value1, value2, > comp=(lambda x,y: x < y), fail_msg=fail_msg)) # pylint: disable=C0324 > rlAssertLesser = assertLesser > > def assertGreaterOrEqual(self, message, value1, value2): > """ assertGreaterOrEqual """ > fail_msg = "Assert fail - value: %s is lesser than: %s" % (str(value1), str(value2)) > return self.assertEquals(message, value1, value2, > comp=(lambda x,y: x >= y), fail_msg=fail_msg) # pylint: disable=C0324 > rlAssertGreaterOrEqual = assertGreaterOrEqual > > def assertFileExists(self, message, checked_file): > """ assertFileExists """ > if not type(checked_file) == types.StringType or len(checked_file) == 0: > raise mrg.utils.MRGException('Log.assertFileExists:: called without parameter') > ret = os.path.exists(checked_file) > self.conditionalAssert(message, ret, "Assert fail - File %s does not exist" % str(checked_file)) > return (ret) > rlAssertFileExists = assertFileExists > > def assertFileNotExists(self, message, checked_file): > """ assertFileNotExists """ > if not type(checked_file) == types.StringType or len(checked_file) == 0: > raise mrg.utils.MRGException('Log.assertFileNotExists:: called without parameter') > ret = not os.path.exists(checked_file) > self.conditionalAssert(message, ret, "File %s exists" % str(checked_file)) > return (ret) > rlAssertFileNotExists = assertFileNotExists > > def assertDirectoryExists(self, message, dir_path): > """ assertDirectoryExists """ > if not type(dir_path) is types.StringType or len(dir_path) == 0: > raise mrg.utils.MRGException('rlAssertDirectoryExists called without parameter') > ret = os.path.isdir(dir_path) > self.conditionalAssert(message, ret, "Directory %s does not exist" % str(dir_path)) > return ret > rlAssertDirectoryExists = assertDirectoryExists > > def assertDirectoryNotExists(self, message, dir_path): > """ assertDirectoryNotExists """ > if not type(dir_path) is types.StringType or len(dir_path) == 0: > raise mrg.utils.MRGException('rlAssertDirectoryNotExists called without parameter') > ret = not os.path.isdir(dir_path) > self.conditionalAssert(message, ret, "Directory %s exists" % str(dir_path)) > return ret > rlAssertDirectoryNotExists = assertDirectoryNotExists > > def assertMountpointExists(self, message, dir_path): > """ assertMountpointExists """ > if not type(dir_path) is types.StringType or len(dir_path) == 0: > raise mrg.utils.MRGException('rlAssertMountpointExists called without parameter') > ret = os.path.ismount(dir_path) > self.conditionalAssert(message, ret, "Mountpoint %s does not exist" % str(dir_path)) > return ret > rlAssertMountpointExists = assertMountpointExists > > def assertMountpointNotExists(self, message, dir_path): > """ assertMountpointNotExists """ > if not type(dir_path) is types.StringType or len(dir_path) == 0: > raise mrg.utils.MRGException('rlAssertMountpointNotExists called without parameter') > ret = not os.path.ismount(dir_path) > self.conditionalAssert(message, ret, "Mountpoint %s exists" % str(dir_path)) > return ret > rlAssertMountpointNotExists = assertMountpointNotExists > > ># USAGE >def main(): > """ main """ > ##logger = logging.getLogger('muj_logger') > #logger = getLogger('') > #logger.setStdHandler() > #logger.setTextHandler('myapp.log') > #logger.setXmlHandler('myapp.xml') > #logger.setLevel(logging.DEBUG) > # > ## if stdout is not required > ## logger.propagate = False > # > #logger.warning('Protocol problem: %s', 'connection reset') > #logger.debug('Protocol problem: %s', 'connection reset') > #logger.passed('Protocol problem: %s', 'connection reset') > #logger.failed('Proste proto') > #logger.log(LOG_LEVEL, '123', extra={'tag': 'test_id'}) > #logger.log(LOG_LEVEL, '',\ > # extra={'tag': 'phase',\ > # 'starttag': True,\ > # 'attrs': {'name': 'phase_name', 'result':"unfinished", 'starttime':'time', 'type':'nevim'}\ > # }) > #logger.log(LOG_LEVEL, 'result', extra={'tag': 'metric', 'attrs': {'type':'nevim', 'tolerance':'nevim'}}) > #logger.log(LOG_LEVEL, '', extra={'tag': 'phase', 'starttag': False}) > > log = Log('testik') > log2 = Log('remoteconfiguration', module=True, handlers=log.handlers) > log.assertNotEmpty("rovno", "hodnota1") > log.testStart('1120', desc='my test') > log.passed('message') > log.failed('message') > log.testStart('1121') > log.rlAddPhase("My_phase") > log.testEnd() > log2.rlLog('hmmmm1', logging.INFO) > log2.rlLog('neco', logging.DEBUG) > log.testStart('1122') > log.failed('message') > log.rlAddPhase("My_phase2", testid='1124') > log.rlLog('co to tu mame', logging.WARNING) > log2.fatal("ups") > log.addTestId('1123', 'PASS') > log.assertEqual("rovno", "hodnota1", "hodnota2") > log.assertEqual("rovno", "hodnota1", "hodnota1") > log.test_env = {'11': 'hodnota1'} > log.assertEqualSpecial("hodnota2", "hodnota2", assertID="11") > log.assertEqualSpecial("hodnota1", "hodnota2", assertID="11") > log.assertEqualSpecial("hodnota2", "hodnota2") > log.rlClosePhase() > log.rlAddPhase("My_phase3", testid='1125') > log2.close() > log.close() > > #import mrg.utils > #log = mrg.utils.TCEnv.get_logger('testik') > #log2 = mrg.utils.TCEnv.get_logger('remoteconfiguration', module=True) > #log2.rlLog('hmmmm1', logging.INFO) > #log2.rlLog('neco', logging.DEBUG) > #log.rlLog('co to tu mame', logging.WARNING) > #log2.close() > #log.close() > >if __name__ == "__main__": > main() > ># pylint: enable=C0103
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
View Attachment As Raw
Actions:
View
Attachments on
bug 849087
: 901795