Login
[x]
Log in using an account from:
Fedora Account System
Red Hat Associate
Red Hat Customer
Or login using a Red Hat Bugzilla account
Forgot Password
Login:
Hide Forgot
Create an Account
Red Hat Bugzilla – Attachment 879889 Details for
Bug 1051444
[neutron]: neutron-dhcp-agent and neutron-l3-agent won't respawn child processes if something goes wrong
[?]
New
Simple Search
Advanced Search
My Links
Browse
Requests
Reports
Current State
Search
Tabular reports
Graphical reports
Duplicates
Other Reports
User Changes
Plotly Reports
Bug Status
Bug Severity
Non-Defaults
|
Product Dashboard
Help
Page Help!
Bug Writing Guidelines
What's new
Browser Support Policy
5.0.4.rh83 Release notes
FAQ
Guides index
User guide
Web Services
Contact
Legal
This site requires JavaScript to be enabled to function correctly, please enable it.
Neutron agent watch code
neutron-agent-watch (text/x-python), 25.12 KB, created by
Miguel Angel Ajo
on 2014-03-28 14:59:43 UTC
(
hide
)
Description:
Neutron agent watch code
Filename:
MIME Type:
Creator:
Miguel Angel Ajo
Created:
2014-03-28 14:59:43 UTC
Size:
25.12 KB
patch
obsolete
>#!/usr/bin/python ># vim: tabstop=4 shiftwidth=4 softtabstop=4 ># ># Copyright 2014 Red Hat ># Copyright 2012 New Dream Network, LLC (DreamHost) ># ># Licensed under the Apache License, Version 2.0 (the "License"); you may ># not use this file except in compliance with the License. You may obtain ># a copy of the License at ># ># http://www.apache.org/licenses/LICENSE-2.0 ># ># Unless required by applicable law or agreed to in writing, software ># distributed under the License is distributed on an "AS IS" BASIS, WITHOUT ># WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the ># License for the specific language governing permissions and limitations ># under the License. ># ># @author: Miguel Angel Ajo, Red Hat ># ># This is a little daemon, to watch over agents status, and child status ># in some conditions an agent could run un-synchronized from it's master ># or childs could die unexpectedly, and not be recovered ># ># It emulates the status files defined in ># > >import atexit >import csv >import eventlet >import fcntl >import os >import StringIO >import signal >import sys >import tempfile >import time > >from neutron.agent.common import config as agent_config >from neutron.agent import l3_agent >from neutron.agent.linux import dhcp >from neutron.agent.linux import ip_lib >from neutron.api.v2 import attributes >from neutron.common import config >from neutron.common import utils >from neutron.openstack.common.fileutils import delete_if_exists >from neutron.openstack.common import excutils >from neutron.openstack.common import log as logging >from neutronclient.v2_0 import client >from neutronclient.common import exceptions >from oslo.config import cfg > > >LOG = logging.getLogger("neutron_agent_watch") > ># error levels , higher = more critical >INFO = 0 >WARNING = 10 >ERROR = 20 >CRITICAL = 30 > >ERROR_LEVEL_NAMES = {INFO: 'INFO', WARNING: 'WARNING', > ERROR: 'ERROR', CRITICAL: 'CRITICAL'} > ># error codes >OK = 0 >RPC_BROKER_DISCONNECTED = 10 >HEALTH_CHECK_FAILED = 20 > > >class ProcessStatus(object): > # error codes to messages and levels > STATUS = {OK: {'level': INFO, > 'message': _('Service OK')}, > RPC_BROKER_DISCONNECTED: {'level': CRITICAL, > 'message': _('RPC Broker connectivity ' > 'lost')}, > HEALTH_CHECK_FAILED: {'level': ERROR, > 'message': _('Health check failed')} > } > > def __init__(self, status_file): > self._status_file = StatusFile(status_file) > self._active_status = {} > self._write_status() > > def _get_default_status_info(self, status_code): > status_level = self.STATUS[status_code]['level'] > status_message = self.STATUS[status_code]['message'] > return status_level, status_message > > def set_status(self, status_code, extended_message=""): > level, message = self._get_default_status_info(status_code) > if extended_message != "": > message = "%s: %s" % (message, extended_message) > self._active_status[status_code] = {'level': level, > 'message': message} > self._write_status() > > def clear_status(self, status_code): > try: > self._active_status.pop(status_code) > except KeyError: > return False > self._write_status() > return True > > def get_status_csv(self): > status_list_by_level = self._render_sorted_status_list() > csv_str = StringIO.StringIO() > writer = csv.writer(csv_str) > writer.writerow(self._list_to_utf8(["code", "level", "message"])) > > for status in status_list_by_level: > row = [status[0], ERROR_LEVEL_NAMES[status[1]], status[2]] > writer.writerow(self._list_to_utf8(row)) > > return csv_str.getvalue() > > def _list_to_utf8(self, item_list): > string_list = [str(item) for item in item_list] > return [string.encode("utf-8") for string in string_list] > > def _render_sorted_status_list(self): > status_list = self._render_unsorted_status_list() > > return sorted(status_list, > key=lambda status: status[1], > reverse=True) > > def _render_unsorted_status_list(self): > status_list = [] > > for status_code, status in self._active_status.items(): > status_list.append((status_code, > status['level'], status['message'])) > > if len(status_list) == 0: > level, message = self._get_default_status_info(OK) > status_list.append((OK, level, message)) > > return status_list > > def _write_status(self): > status_csv = self.get_status_csv() > self._status_file.update(status_csv) > > def check_if_it_was_deleted(self): > """ this function is only used for the D/S workaround, as we have > no control of the status file when the agent is restarted, the > status file must be deleted at the same moment agent process > exits, we rewrite a clear status, until we poll it again""" > if self._status_file.was_deleted: > self._active_status.clear() > self._write_status() > > >class StatusFile(object): > > def __init__(self, status_file): > self._status_filename = status_file > self._fd = None > self._register_file_cleanup() > > def _register_file_cleanup(self): > atexit.register(self._delete_file) > > def _delete_file(self): > LOG.info(_("closing and deleting %s") % self._status_filename) > os.close(self._fd) > delete_if_exists(self._status_filename) > > @property > def was_deleted(self): > return not os.path.isfile(self._status_filename) > > def update(self, status_data): > > # create a temporary file with the new content > base_dir = os.path.dirname(os.path.abspath(self._status_filename)) > tmp_file = tempfile.NamedTemporaryFile('w+', dir=base_dir, > delete=False) > tmp_file.write(status_data) > tmp_file.close() > os.chmod(tmp_file.name, 0o644) > > # open & lock it to avoid any other process writing to the same file > try: > new_fd = os.open(tmp_file.name, os.O_CREAT | os.O_RDWR) > except IOError: > LOG.exception(_("Failed to open status file: %s") % > self._status_filename) > sys.exit(1) > > if fcntl.flock(new_fd, fcntl.LOCK_EX) is not None: > LOG.exception(_('Unable to change lock status on file: %s') % > self._status_filename) > sys.exit(1) > > # rename the tmp filename into the old one, atomically > os.rename(tmp_file.name, self._status_filename) > > # if we had an old file descriptor, close it > if self._fd is not None: > os.close(self._fd) > > self._fd = new_fd > > >class AgentWatcher(object): > > def __init__(self, conf, neutron_client, agent_type, options={}): > self.host = options['host'] > self.options = options > self.conf = conf > self._neutron_client = neutron_client > self._agent_id = None > self.agent_type = agent_type > self._known_pid_files = {} > self._process_status = ProcessStatus(options['state_file']) > self._pid_file = options['pid_file'] > self._pid = None > self._offline_seconds = 0 > > @property > def state_path(self): > if 'state_path' in self.options: > return self.options['state_path'] > else: > return self.conf.state_path > > def _find_my_agent_id(self): > if self.agent_type == 'l3': > agent_host_pair = (self.host, 'L3 agent') > elif self.agent_type == 'dhcp': > agent_host_pair = (self.host, 'DHCP agent') > > for agent in self._neutron_client.list_agents()['agents']: > if ((agent['host'], agent['agent_type']) == agent_host_pair): > LOG.info(_("agent ID %s found for %s @ %s") % (agent['id'], > self.agent_type, > self.host)) > return agent['id'] > return None > > @property > def agent_id(self): > > if not self._agent_id: > self._agent_id = self._find_my_agent_id() > > if not self._agent_id: > LOG.warning(_("No agent id for %s @ %s") % (self.agent_type, > self.host)) > return self._agent_id > > @property > def alive(self): > agent_info = self._neutron_client.show_agent(self.agent_id)['agent'] > return agent_info['alive'] > > > > def _is_pid_path(self, file_path): > return file_path.endswith('/pid') or file_path.endswith('.pid') > > def _is_pid_active_for_object_id(self, pid_file, object_id): > if not os.path.isfile(pid_file): > return False > > pid = self._read_pid_file(pid_file) > > if pid is None: > return False > > cmdline_file = '/proc/%s/cmdline' % str(pid) > > try: > with open(cmdline_file, "r") as f: > return object_id in f.readline() > except IOError: > return False > > def _read_pid_file(self, filename): > try: > with open(filename, "r") as f: > return f.readline().replace("\r", "").replace("\n", "") > except IOError: > return None > > def _agent_pid_changed(self): > pid = self._read_pid_file(self._pid_file) > if pid != self._pid: > previous_pid = self._pid > self._pid = pid > return previous_pid is not None > return False > > def _is_agent_alive(self): > cmdline = "neutron-%s-agent" % self.agent_type > return self._is_pid_active_for_object_id(self._pid_file, cmdline) > > > def ensure_expected_pid_file(self, expected_file, object_id, type): > if not expected_file in self._known_pid_files: > self._known_pid_files[expected_file] = \ > {'propagation_delay': self.conf.max_propagation_delay, > 'object_id': object_id, > 'type': type} > > def _handle_seconds(self, seconds_passed): > """ update the propagation delay on every known file until > it reaches zero, we use this method to inform the class how > many seconds passed since the last call """ > for pid_file, data in self._known_pid_files.items(): > prop_delay = max(data['propagation_delay'] - seconds_passed, 0) > data['propagation_delay'] = prop_delay > > self._check_expected_pid_files_exist() > > def _check_expected_pid_files_exist(self): > expected_files = self._get_expected_files_after_prop_delay() > > if expected_files['pid_files'] is None: > self._process_status.clear_status(HEALTH_CHECK_FAILED) > else: > for expected in expected_files['pid_files']: > is_active = self._is_pid_active_for_object_id( > expected['pid_file'], > expected['object_id']) > > if not is_active: > message = _("missing child %(type)s for " > "network/router %(object_id)s") % expected > self._process_status.set_status( > HEALTH_CHECK_FAILED, message) > self._log_warning(message) > break > else: > cleared = self._process_status.clear_status( > HEALTH_CHECK_FAILED) > if cleared: > self._log_info(_("Status cleared, health recovered")) > > if len(expected_files['missing_netns'])>0: > self._log_warning(_("Waiting for namespaces %s") % > expected_files['missing_netns']) > > def _log_warning(self, warning_str): > LOG.warning("[%(agent)s : %(id)s] : %(msg)s", > {"agent": self.agent_type, > "id": self.agent_id, > "msg": warning_str}) > def _log_info(self, warning_str): > LOG.info("[%(agent)s : %(id)s] : %(msg)s", > {"agent": self.agent_type, > "id": self.agent_id, > "msg": warning_str}) > > > def _get_expected_files_after_prop_delay(self): > > pid_files = [] > missing_netns = [] > > for pid_file, data in self._known_pid_files.items(): > # if the namespace doesn't exist yet we don't expect the > # pid file, because 1: that wouldn't work, and 2: if > # many resources are created together, that may require > # some time > if not self._does_netns_exist_for_resource(data['object_id']): > missing_netns.append(self._get_ns_for_resource( > data['object_id'])) > data['propagation_delay'] = self.conf.max_propagation_delay > continue > if data['propagation_delay'] == 0: > pid_file_data = {'pid_file': pid_file} > pid_file_data.update(data) > pid_files.append(pid_file_data) > > return {'pid_files': pid_files, > 'missing_netns': missing_netns } > > def _remove_old_known_pidfiles(self, expected_pid_files): > > for known in self._known_pid_files: > if not known in expected_pid_files: > self._remove_expected_pid_file(known) > > def _does_netns_exist_for_resource(self, resource_uuid): > return self._get_ns_for_resource(resource_uuid) in self._netns_list > > def _reset_propagation_delays(self): > for pid_file, data in self._known_pid_files.items(): > data['propagation_delay'] = self.conf.max_propagation_delay > > def _check_agent_alive(self): > if not self.alive: > self._offline_seconds += self.conf.check_interval > if self._offline_seconds > self.conf.max_agent_downtime: > self._log_warning(_("%d seconds offline, " > "Max agent downtime exceeded") % > self._offline_seconds) > self._process_status.set_status(RPC_BROKER_DISCONNECTED) > else: > if self._offline_seconds > 0: > self._log_info(_("agent back alive after %d seconds") % > self._offline_seconds) > self._offline_seconds = 0 > self._process_status.clear_status(RPC_BROKER_DISCONNECTED) > > def run(self, context): > > self._netns_list = context['netns'] > > if self._agent_pid_changed(): > self._reset_propagation_delays() > self._log_info(_("Agent pid changed")) > > if not self._is_agent_alive(): > self._reset_propagation_delays() > self._log_warning(_("Agent process doesn't seem to be alive")) > > self._check_agent_alive() > > self._run() # run method implemented in child class > self._handle_seconds(self.conf.check_interval) > > # we force the rewrite in this workaround as we have no tight control > # of, when process exits, removing the file with old status > self._process_status.check_if_it_was_deleted() > > >class DhcpAgentWatcher(AgentWatcher): > def __init__(self, conf, neutron_client, options={}): > super(DhcpAgentWatcher, self).__init__(conf=conf, > neutron_client=neutron_client, > agent_type='dhcp', > options=options) > > def _get_ns_for_resource(self, resource_uuid): > return (dhcp.NS_PREFIX + resource_uuid) > > def _is_isolated_metadata_enabled(self): > if 'isolated_metadata' in self.options: > return self.options['isolated_metadata'].lower() in ['true', '1'] > else: > return False > > def _get_network_ids(self): > sched_networks = [network['id'] for network in > self._neutron_client.list_networks_on_dhcp_agent( > self.agent_id)['networks']] > > active_networks = self._neutron_client.list_networks( > admin_state_up=True, > status='ACTIVE') > > dhcp_subnets = [subnet['id'] for subnet in > self._neutron_client.list_subnets(enable_dhcp=True)[ > 'subnets']] > > sched_networks_with_dhcp = [] > > # we only want scheduled networks to this agent, which have at least > # a subnet > for network in active_networks['networks']: > if not network['id'] in sched_networks: > continue > for subnet_id in network['subnets']: > if subnet_id in dhcp_subnets: > sched_networks_with_dhcp.append(network['id']) > break > > return sched_networks_with_dhcp > > def _check_dhcp_pids(self, network_ids): > expected_pid_files = [ > os.path.join(self.state_path, 'dhcp', net_id, 'pid') > for net_id in network_ids] > > for index, expected_pid_file in enumerate(expected_pid_files): > self.ensure_expected_pid_file(expected_pid_file, > network_ids[index], > 'dnsmasq dhcp server') > > return expected_pid_files > > def _check_isolated_metadata_pids(self, network_ids): > > external_path = os.path.join(self.state_path, 'external') > > expected_pid_files = [ > os.path.join(external_path, 'pids', net_id + ".pid") > for net_id in network_ids] > > for index, expected_pid_file in enumerate(expected_pid_files): > self.ensure_expected_pid_file(expected_pid_file, > network_ids[index], > 'isolated metadata proxy') > > return expected_pid_files > > def _run(self): > if not self.agent_id: > return > > network_ids = self._get_network_ids() > > expected_pid_files = self._check_dhcp_pids(network_ids) > if self._is_isolated_metadata_enabled(): > expected_pid_files += self._check_isolated_metadata_pids( > network_ids) > > self._remove_old_known_pidfiles(expected_pid_files) > > >class L3AgentWatcher(AgentWatcher): > def __init__(self, conf, neutron_client, options={}): > super(L3AgentWatcher, self).__init__(conf=conf, > neutron_client=neutron_client, > agent_type='l3', > options=options) > > > def _get_ns_for_resource(self, resource_uuid): > return (l3_agent.NS_PREFIX + resource_uuid) > > def _get_router_ids(self): > response = self._neutron_client.list_routers_on_l3_agent(self.agent_id) > return [router['id'] for router in response['routers']] > > def _check_metadata_proxy_pids(self, router_ids): > external_path = os.path.join(self.state_path, 'external') > > expected_pid_files = [ > os.path.join(external_path, 'pids', net_id + ".pid") > for net_id in router_ids] > > for index, expected_pid_file in enumerate(expected_pid_files): > self.ensure_expected_pid_file(expected_pid_file, > router_ids[index], > 'router metadata proxy') > return expected_pid_files > > def _run(self): > router_ids = self._get_router_ids() > expected_pid_files = self._check_metadata_proxy_pids(router_ids) > self._remove_old_known_pidfiles(expected_pid_files) > > >class AgentWatchDaemon(): > def __init__(self, conf): > self.conf = conf > self.auth_info = {} > self._neutron_client = self._get_neutron_client() > self.agent_watchers = self._build_agent_watchers(conf.watched_agents) > > def _build_agent_watchers(self, watched_agents): > agent_watchers = [] > for agent in watched_agents: > agent_options = self._parse_agent_options(agent) > > if agent_options['type'] == 'dhcp': > watcher = DhcpAgentWatcher(conf=self.conf, > neutron_client=self._neutron_client, > options=agent_options) > elif agent_options['type'] == 'l3': > watcher = L3AgentWatcher(conf=self.conf, > neutron_client=self._neutron_client, > options=agent_options) > else: > LOG.error(_("Unknown agent type %s, exiting") % agent_type) > return [] > > agent_watchers.append(watcher) > > return agent_watchers > > def _parse_agent_options(self, options_str): > > options = options_str.split(':') > > agent = {'type': options[0], > 'pid_file': options[1], > 'state_file': options[2], > 'host': utils.get_hostname(), > 'max_down_time': 60} > > for option in options[3:]: > try: > key, value = [kv.strip() for kv in option.split('=')] > agent[key] = value > except ValueError: > LOG.error(_("not well formed option %s for agent type %s") % > (agent['type'], option)) > sys.exit(1) > > return agent > > def _get_neutron_client(self): > neutron_client = client.Client( > username=self.conf.admin_user, > password=self.conf.admin_password, > tenant_name=self.conf.admin_tenant_name, > auth_url=self.conf.auth_url, > auth_strategy=self.conf.auth_strategy, > region_name=self.conf.auth_region, > auth_token=self.auth_info.get('auth_token'), > endpoint_url=self.auth_info.get('endpoint_url'), > endpoint_type=self.conf.endpoint_type > ) > return neutron_client > > @excutils.forever_retry_uncaught_exceptions > def run(self, root_helper): > if len(self.agent_watchers) < 1: > LOG.error("no agents to watch, exiting") > return > > while True: > try: > context = {'netns': > ip_lib.IPWrapper.get_namespaces(root_helper)} > > for watcher in self.agent_watchers: > watcher.run(context) > except exceptions.ConnectionFailed: > LOG.error(_("Connection to neutron-server failed")) > except exceptions.EndpointNotFound: > LOG.error(_("Check auth region")) > sys.exit(1) > except exceptions.Unauthorized: > LOG.error(_("Unauthorized: pPCheck authentication details")) > sys.exit(1) > time.sleep(self.conf.check_interval) > > >def signal_handler(signal, frame): > LOG.info(_("exiting")) > sys.exit(0) > > >def main(): > eventlet.monkey_patch() > opts = [ > cfg.MultiStrOpt('watched_agents', default=[], > help=_('Defines agents to be watched locally' > ' using the format: ' > '<agent>:<emulated_status_file>:' > '[[host=..][,isolated_metadata=True]' > '[,max_down_time=60]]')), > cfg.IntOpt('check_interval', > default=10, > help=_("Interval, in seconds, for checking agent asigned" > " resources from the neutron-server")), > cfg.IntOpt('max_propagation_delay', > default=20, > help=_("Interval, in seconds, for exposing that a process" > " must be existing, to allow propagation from" > " netns creation to child process existance")), > cfg.IntOpt('max_agent_downtime', > default=30, > help=_("Maximum time for an agent to be allowed as down")), > cfg.StrOpt('admin_user', > help=_("Admin user")), > cfg.StrOpt('admin_password', > help=_("Admin password"), > secret=True), > cfg.StrOpt('admin_tenant_name', > help=_("Admin tenant name")), > cfg.StrOpt('auth_url', > help=_("Authentication URL")), > cfg.StrOpt('auth_strategy', default='keystone', > help=_("The type of authentication to use")), > cfg.StrOpt('auth_region', > help=_("Authentication region")), > cfg.StrOpt('endpoint_type', > default='adminURL', > help=_("Network service endpoint type to pull from " > "the keystone catalog")) > ] > > cfg.CONF.register_cli_opts(opts) > cfg.CONF(project='neutron') > config.setup_logging(cfg.CONF) > agent_config.register_root_helper(cfg.CONF) > utils.log_opt_values(LOG) > > root_helper = agent_config.get_root_helper(cfg.CONF) > > # when terminated externally, we need to call sys.exit to make > # sure all the atexit handlers are called, and we cleanup all > # status files > signal.signal(signal.SIGINT, signal_handler) # keyboard > signal.signal(signal.SIGTERM, signal_handler) # init.d killproc > > agent_watcher = AgentWatchDaemon(cfg.CONF) > agent_watcher.run(root_helper) > >if __name__ == "__main__": > main()
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
View Attachment As Raw
Actions:
View
Attachments on
bug 1051444
: 879889 |
879890
|
879891
|
879892
|
879893