Source code for oio.cli.cluster.cluster

# Copyright (C) 2015-2017 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from logging import getLogger
from cliff import lister, show, command
from oio.common.exceptions import OioException


[docs]class ClusterShow(show.ShowOne): """Show information of all services in the cluster""" log = getLogger(__name__ + '.ClusterShow')
[docs] def take_action(self, parsed_args): self.log.debug('take_action(%s)', parsed_args) data = self.app.client_manager.cluster.info() output = list() output.append(('namespace', data['ns'])) output.append(('chunksize', data['chunksize'])) for k, v in data['storage_policy'].iteritems(): output.append(('storage_policy.%s' % k, v)) for k, v in data['data_security'].iteritems(): output.append(('data_security.%s' % k, v)) for k, v in data['service_pools'].iteritems(): output.append(('service_pool.%s' % k, v)) for k, v in sorted(data['options'].iteritems()): output.append((k, v)) return zip(*output)
[docs]class ClusterList(lister.Lister): """List services of the namespace""" log = getLogger(__name__ + '.ClusterList')
[docs] def get_parser(self, prog_name): parser = super(ClusterList, self).get_parser(prog_name) parser.add_argument( 'srv_types', metavar='<srv_type>', nargs='*', help='Type of services to list') parser.add_argument( '--stats', '--full', action='store_true', help='Display service statistics') return parser
def _list_services(self, parsed_args): if not parsed_args.srv_types: parsed_args.srv_types = \ self.app.client_manager.cluster.service_types() for srv_type in parsed_args.srv_types: try: data = self.app.client_manager.cluster.all_services( srv_type, parsed_args.stats) except OioException: self.log.exception("Failed to list services of type %s", srv_type) continue for srv in data: tags = srv['tags'] location = tags.get('tag.loc', 'n/a') slots = tags.get('tag.slots', 'n/a') volume = tags.get('tag.vol', 'n/a') service_id = tags.get('tag.service_id', 'n/a') addr = srv['addr'] up = tags.get('tag.up', 'n/a') score = srv['score'] if parsed_args.stats: stats = ["%s=%s" % (k, v) for k, v in tags.items() if k.startswith('stat.')] values = (srv_type, addr, service_id, volume, location, slots, up, score, " ".join(stats)) else: values = (srv_type, addr, service_id, volume, location, slots, up, score) yield values
[docs] def take_action(self, parsed_args): self.log.debug('take_action(%s)', parsed_args) if parsed_args.stats: columns = ('Type', 'Addr', 'Service Id', 'Volume', 'Location', 'Slots', 'Up', 'Score', 'Stats') else: columns = ('Type', 'Addr', 'Service Id', 'Volume', 'Location', 'Slots', 'Up', 'Score') return columns, self._list_services(parsed_args)
[docs]class ClusterLocalList(lister.Lister): """List local services""" log = getLogger(__name__ + '.ClusterLocalList')
[docs] def get_parser(self, prog_name): parser = super(ClusterLocalList, self).get_parser(prog_name) parser.add_argument( 'srv_types', metavar='<srv_types>', nargs='*', help='Service Type(s)') return parser
[docs] def take_action(self, parsed_args): self.log.debug('take_action(%s)', parsed_args) results = [] srv_types = parsed_args.srv_types data = self.app.client_manager.cluster.local_services() for srv in data: tags = srv['tags'] location = tags.get('tag.loc', 'n/a') slots = tags.get('tag.slots', 'n/a') volume = tags.get('tag.vol', 'n/a') service_id = tags.get('tag.service_id', 'n/a') addr = srv['addr'] up = tags.get('tag.up', 'n/a') score = srv['score'] srv_type = srv['type'] if not srv_types or srv_type in srv_types: results.append((srv_type, addr, service_id, volume, location, slots, up, score)) columns = ('Type', 'Addr', 'Service Id', 'Volume', 'Location', 'Slots', 'Up', 'Score') result_gen = (r for r in results) return columns, result_gen
[docs]class ClusterUnlock(lister.Lister): """Unlock score""" log = getLogger(__name__ + '.ClusterUnlock')
[docs] def get_parser(self, prog_name): parser = super(ClusterUnlock, self).get_parser(prog_name) parser.add_argument( 'srv_type', metavar='<srv_type>', help='Service Type') parser.add_argument( 'srv_addr', metavar='<srv_addr>', help='Network address of the service' ) return parser
def _unlock_one(self, type_, addr): service_info = {'type': type_, 'addr': addr} try: self.app.client_manager.cluster.unlock_score(service_info) yield type_, addr, "unlocked" except Exception as exc: yield type_, addr, str(exc)
[docs] def take_action(self, parsed_args): self.log.debug('take_action(%s)', parsed_args) return (('Type', 'Service', 'Result'), self._unlock_one(parsed_args.srv_type, parsed_args.srv_addr))
def _batches_boundaries(srclen, size): for start in range(0, srclen, size): end = min(srclen, start + size) yield start, end def _bounded_batches(src, size): for start, end in _batches_boundaries(len(src), size): yield src[start:end]
[docs]class ClusterUnlockAll(lister.Lister): """Unlock all services of the cluster""" log = getLogger(__name__ + '.ClusterUnlockAll')
[docs] def get_parser(self, prog_name): parser = super(ClusterUnlockAll, self).get_parser(prog_name) parser.add_argument( 'types', metavar='<types>', nargs='*', help='Service Type(s) to unlock (or all if unset)') return parser
def _unlock_all(self, parsed_args): types = parsed_args.types if not parsed_args.types: types = self.app.client_manager.cluster.service_types() for type_ in types: try: all_descr = self.app.client_manager.cluster.all_services(type_) except OioException: self.log.exception("Failed to list services of type %s", type_) continue for descr in all_descr: descr['type'] = type_ for batch in _bounded_batches(all_descr, 4096): try: self.app.client_manager.cluster.unlock_score(batch) for descr in batch: yield type_, descr['addr'], "unlocked" except Exception as exc: for descr in batch: yield type_, descr['addr'], str(exc)
[docs] def take_action(self, parsed_args): columns = ('Type', 'Service', 'Result') return columns, self._unlock_all(parsed_args)
[docs]class ClusterWait(lister.Lister): """Wait for services to get a score above specified value""" log = getLogger(__name__ + '.ClusterWait')
[docs] def get_parser(self, prog_name): parser = super(ClusterWait, self).get_parser(prog_name) parser.add_argument( 'types', metavar='<types>', nargs='*', help='Service Type(s) to wait for (or all if unset)') parser.add_argument( '-n', '--count', metavar='<count>', type=int, default=0, help='How many services are expected') parser.add_argument( '-d', '--delay', metavar='<delay>', type=float, default=15.0, help='How long to wait for a score') parser.add_argument( '-s', '--score', metavar='<score>', type=int, default=0, help='Minimum score value required for the chosen services') parser.add_argument( '-u', '--unlock', action='store_true', default=False, help='Should the service be unlocked.') return parser
def _wait(self, parsed_args): from time import time as now, sleep types = parsed_args.types if not parsed_args.types: types = self.app.client_manager.cluster.service_types() min_score = parsed_args.score delay = parsed_args.delay deadline = now() + delay exc_msg = ("Timeout ({0}s) while waiting for the services to get a " "score > {1}, still {2} are not") def maybe_unlock(allsrv): if not parsed_args.unlock: return self.app.client_manager.cluster.unlock_score(allsrv) def check_deadline(): if now() > deadline: raise Exception(exc_msg.format(delay, min_score, ko)) while True: descr = [] for type_ in types: tmp = self.app.client_manager.cluster.all_services(type_) for s in tmp: s['type'] = type_ descr += tmp ko = len([s['score'] for s in descr if s['score'] <= min_score]) if ko == 0: # If a minimum has been specified, let's check we have enough # services if parsed_args.count: ok = len([s for s in descr if s['score'] > min_score]) if ok < parsed_args.count: self.log.debug("Only %d services up", ok) check_deadline() maybe_unlock(descr) sleep(1.0) continue # No service down, and enough services, we are done. for d in descr: yield d['type'], d['addr'], d['score'] return else: self.log.debug("Still %d services down", ko) check_deadline() maybe_unlock(descr) sleep(1.0)
[docs] def take_action(self, parsed_args): columns = ('Type', 'Service', 'Score') return columns, self._wait(parsed_args)
[docs]class ClusterLock(ClusterUnlock): """Lock score""" log = getLogger(__name__ + '.ClusterLock')
[docs] def get_parser(self, prog_name): parser = super(ClusterLock, self).get_parser(prog_name) parser.add_argument( '-s', '--score', metavar='<score>', type=int, default=0, help='score to set' ) return parser
def _lock_one(self, type_, addr, score): si = {'type': type_, 'addr': addr, 'score': score} try: self.app.client_manager.cluster.lock_score(si) yield type_, addr, "locked to %d" % int(score) except Exception as exc: yield type_, addr, str(exc)
[docs] def take_action(self, parsed_args): self.log.debug('take_action(%s)', parsed_args) return (('Type', 'Service', 'Result'), self._lock_one(parsed_args.srv_type, parsed_args.srv_addr, parsed_args.score))
[docs]class ClusterFlush(command.Command): """Flush all services of the cluster""" log = getLogger(__name__ + '.ClusterFlush')
[docs] def get_parser(self, prog_name): parser = super(ClusterFlush, self).get_parser(prog_name) parser.add_argument( 'srv_types', metavar='<srv_types>', nargs='+', help='Service Type(s)') return parser
[docs] def take_action(self, parsed_args): for srv_type in parsed_args.srv_types: try: self.app.client_manager.cluster.flush(srv_type) self.log.warn('services %s flushed' % (srv_type)) except Exception as e: raise Exception('Error while flushing service %s: %s' % (srv_type, str(e)))
[docs]class ClusterResolve(show.ShowOne): """Resolve a service ID to an IP address and port""" log = getLogger(__name__ + '.ClusterFlush')
[docs] def get_parser(self, prog_name): parser = super(ClusterResolve, self).get_parser(prog_name) parser.add_argument( 'srv_type', help='Service type') parser.add_argument( 'srv_id', help='ID of the service' ) return parser
[docs] def take_action(self, parsed_args): resolved = self.app.client_manager.cluster.resolve( parsed_args.srv_type, parsed_args.srv_id) return zip(*resolved.items())
[docs]class LocalNSConf(show.ShowOne): """show namespace configuration values locally configured""" log = getLogger(__name__ + '.LocalNSConf')
[docs] def get_parser(self, prog_name): parser = super(LocalNSConf, self).get_parser(prog_name) return parser
[docs] def take_action(self, parsed_args): from oio.common.configuration import load_namespace_conf self.log.debug('take_action(%s)', parsed_args) namespace = self.app.client_manager.cluster.conf['namespace'] sds_conf = load_namespace_conf(namespace) output = list() for k in sds_conf: output.append(("%s/%s" % (namespace, k), sds_conf[k])) return zip(*output)