Source code for oio.conscience.agent

# Copyright (C) 2015-2017 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import re
import os

import pkg_resources
from eventlet import GreenPool, sleep

from oio.common.daemon import Daemon
from oio.common.http_urllib3 import get_pool_manager
from oio.common.easy_value import float_value, int_value, true_value
from oio.common.configuration import parse_config, validate_service_conf
from oio.common.logger import get_logger
from oio.common.client import ProxyClient
from oio.conscience.client import ConscienceClient
from oio.common.exceptions import OioException


[docs]def load_modules(group_name): modules = {} for entry_point in pkg_resources.iter_entry_points(group_name): cls = entry_point.load(require=False) modules[entry_point.name] = cls return modules
[docs]class ServiceWatcher(object): def __init__(self, conf, service, **kwargs): self.conf = conf self.running = False for k in ['host', 'port', 'type']: if k not in service: raise Exception( 'Missing field "%s" in service configuration' % k) self.name = '%s|%s|%s' % \ (service['type'], service['host'], service['port']) self.service = service self.rise = int_value(self._load_item_config('rise'), 1) self.fall = int_value(self._load_item_config('fall'), 1) self.check_interval = float_value( self._load_item_config('check_interval'), 1) self.deregister_on_exit = true_value( self._load_item_config('deregister_on_exit', False)) self.logger = get_logger(self.conf) self.pool_manager = get_pool_manager() self.cs = ConscienceClient(self.conf, pool_manager=self.pool_manager, logger=self.logger) # FIXME: explain that self.client = ProxyClient(self.conf, pool_manager=self.pool_manager, no_ns_in_url=True, logger=self.logger) self.last_status = False self.status = False self.failed = False self.service_definition = { 'ns': self.conf['namespace'], 'type': self.service['type'], 'addr': '%s:%s' % (self.service['host'], self.service['port']), 'score': 0, 'tags': {}} if self.service.get('location', None): self.service_definition['tags']['tag.loc'] = \ self.service['location'] if self.service.get('slots', None): self.service_definition['tags']['tag.slots'] = \ ','.join(self.service['slots']) if self.service.get('service_id', None): self.service_definition['tags']['tag.service_id'] = \ self.service['service_id'] self.service_checks = list() self.service_stats = list() self.init_checkers(service) self.init_stats(service) def _load_item_config(self, item, default=None): return self.service.get(item, self.conf.get(item)) or default
[docs] def start(self): self.logger.info('watcher "%s" starting', self.name) self.running = True self.watch() self.running = False
[docs] def stop(self): self.logger.info('watcher "%s" stopping', self.name) if self.deregister_on_exit: self.logger.info('watcher "%s" deregister service', self.name) try: self.status = False self.last_status = False self.register() except Exception as e: self.logger.warn('Failed to register service: %s', e) self.running = False
[docs] def check(self): """Perform the registered checks on the service until any of them fails of the end of the list is reached.""" self.status = True for service_check in (x for x in self.service_checks if self.running): if not service_check.service_status(): self.status = False return
[docs] def get_stats(self): """Update service definition with all configured stats""" if not self.status: return try: for stat in (x for x in self.service_stats if self.running): stats = stat.get_stats() self.service_definition['tags'].update(stats) except Exception as ex: self.logger.debug("get_stats error: %s", ex) self.status = False
[docs] def register(self): # only accept a final zero/down-registration when exiting if not self.running and self.status: return # Alert when the status changes if self.status != self.last_status: if self.status: self.logger.info('service "%s" is now up', self.name) else: self.logger.warn('service "%s" is now down', self.name) self.last_status = self.status # Use a boolean so we can easily convert it to a number in conscience self.service_definition['tags']['tag.up'] = self.status try: self.cs.register(self.service['type'], self.service_definition, retries=False) except OioException as rqe: self.logger.warn("Failed to register service %s: %s", self.service_definition["addr"], rqe)
[docs] def watch(self): try: while self.running: self.check() self.get_stats() self.register() sleep(self.check_interval) except Exception as e: self.logger.warn('ERROR in watcher "%s"', e) self.failed = True raise e finally: self.logger.info('watcher "%s" stopped', self.name)
[docs] def init_checkers(self, service): for check in service['checks']: check['host'] = check.get('host') or service['host'] check['port'] = check.get('port') or service['port'] check['name'] = check.get('name') or "%s|%s|%s" % \ (check['type'], check['host'], check['port']) check['rise'] = check.get('rise') or self.rise check['fall'] = check.get('fall') or self.fall check['type'] = check.get('type') or 'unknown' service_check_class = CHECKERS_MODULES.get(check['type']) if not service_check_class: raise Exception( 'Invalid check type "%s", valid types: %s' % (check['type'], ', '.join(CHECKERS_MODULES.keys()))) service_check = service_check_class(self, check, self.logger) self.service_checks.append(service_check)
[docs] def init_stats(self, service): """Initialize service stat fetchers""" self.service_stats[:] = [] for stat in service['stats']: stat.setdefault('host', service['host']) stat.setdefault('port', service['port']) stat.setdefault('path', "") service_stat_class = STATS_MODULES.get(stat['type'], None) if not service_stat_class: raise Exception( 'Invalid stat type "%s", valid types: %s' % (stat['type'], ', '.join(STATS_MODULES.keys()))) service_stat = service_stat_class(self, stat, self.logger) self.service_stats.append(service_stat)
[docs]class ConscienceAgent(Daemon): def __init__(self, conf): validate_service_conf(conf) self.running = True self.conf = conf self.logger = get_logger(conf) self.load_services() self.init_watchers(self.conf['services'])
[docs] def stop(self): self.running = False
[docs] def run(self, *args, **kwargs): try: self.logger.info('conscience agent: starting') pool = GreenPool(len(self.watchers)) for watcher in self.watchers: pool.spawn(watcher.start) self.running = True while self.running: sleep(1) for w in self.watchers: if w.failed: self.watchers.remove(w) self.logger.warn('restart watcher "%s"', w.name) new_w = ServiceWatcher(self.conf, w.service) self.watchers.append(new_w) pool.spawn(new_w.start) except Exception as e: self.logger.error('ERROR in main loop %s', e) raise e finally: self.logger.warn('conscience agent: stopping') self.running = False self.stop_watchers()
[docs] def init_watchers(self, services): watchers = [] for _name, conf in services.iteritems(): try: watchers.append(ServiceWatcher(self.conf, conf)) except Exception: self.logger.exception("Failed to load configuration from %s", conf.get('cfgfile', 'main config file')) self.watchers = watchers
[docs] def stop_watchers(self): for watcher in self.watchers: watcher.stop()
[docs] def load_services(self): include_dir = self.conf.get('include_dir') self.conf['services'] = self.conf.get('services') or {} if include_dir: include_dir = os.path.expanduser(include_dir) cfgfiles = [os.path.join(include_dir, f) for f in os.listdir(include_dir) if re.match(r'.+\.(json|yml|yaml)$', f)] for cfgfile in cfgfiles: name = os.path.basename(cfgfile) name = os.path.splitext(name)[0] self.conf['services'][name] = parse_config(cfgfile) self.conf['services'][name]['cfgfile'] = cfgfile
CHECKERS_MODULES = load_modules('oio.conscience.checker') STATS_MODULES = load_modules('oio.conscience.stats')