Source code for oio.cli.cluster.cluster

# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from six import iteritems
from logging import getLogger

from oio.cli import Lister, ShowOne
from oio.common.easy_value import boolean_value
from oio.common.exceptions import OioException, OioNetworkException, \
    ServiceBusy


def _batches_boundaries(srclen, size):
    for start in range(0, srclen, size):
        end = min(srclen, start + size)
        yield start, end


def _bounded_batches(src, size=1000):
    for start, end in _batches_boundaries(len(src), size):
        yield src[start:end]


[docs]class ClusterShow(ShowOne): """Show general information about the cluster.""" log = getLogger(__name__ + '.ClusterShow')
[docs] def take_action(self, parsed_args): self.log.debug('take_action(%s)', parsed_args) data = self.app.client_manager.conscience.info() output = list() output.append(('namespace', data['ns'])) output.append(('chunksize', data['chunksize'])) for k, v in iteritems(data['storage_policy']): output.append(('storage_policy.%s' % k, v)) for k, v in iteritems(data['data_security']): output.append(('data_security.%s' % k, v)) for k, v in iteritems(data['service_pools']): output.append(('service_pool.%s' % k, v)) for k, v in sorted(data['options'].items()): output.append((k, v)) return list(zip(*output))
[docs]class ClusterList(Lister): """List services of the namespace.""" log = getLogger(__name__ + '.ClusterList')
[docs] def get_parser(self, prog_name): parser = super(ClusterList, self).get_parser(prog_name) parser.add_argument( 'srv_types', metavar='<srv_type>', nargs='*', help='Type of services to list.') parser.add_argument( '--stats', '--full', action='store_true', help='Display service statistics.') parser.add_argument( '--tags', action='store_true', help='Display service tags') return parser
def _list_services(self, parsed_args): if not parsed_args.srv_types: parsed_args.srv_types = \ self.app.client_manager.conscience.service_types() for srv_type in parsed_args.srv_types: try: data = self.app.client_manager.conscience.all_services( srv_type, parsed_args.stats) except OioException as exc: self.success = False self.log.error("Failed to list services of type %s: %s", srv_type, exc) continue for srv in data: tags = srv['tags'] location = tags.pop('tag.loc', 'n/a') slots = tags.pop('tag.slots', 'n/a') volume = tags.pop('tag.vol', 'n/a') service_id = tags.pop('tag.service_id', 'n/a') addr = srv['addr'] locked = boolean_value(tags.pop('tag.lock', False), False) up = tags.pop('tag.up', 'n/a') score = srv['score'] values = [srv_type, addr, service_id, volume, location, slots, up, score, locked] if parsed_args.stats: stats = ["%s=%s" % (k, v) for k, v in iteritems(tags) if k.startswith('stat.')] values.append(" ".join(stats)) if parsed_args.tags: vals = ["%s=%s" % (k, v) for k, v in iteritems(tags) if k.startswith('tag.')] values.append(" ".join(vals)) yield values
[docs] def take_action(self, parsed_args): self.log.debug('take_action(%s)', parsed_args) columns = ['Type', 'Addr', 'Service Id', 'Volume', 'Location', 'Slots', 'Up', 'Score', 'Locked'] if parsed_args.stats: columns.append('Stats') if parsed_args.tags: columns.append('Tags') return columns, self._list_services(parsed_args)
[docs]class ClusterLocalList(Lister): """List local services.""" log = getLogger(__name__ + '.ClusterLocalList')
[docs] def get_parser(self, prog_name): parser = super(ClusterLocalList, self).get_parser(prog_name) parser.add_argument( 'srv_types', metavar='<srv_types>', nargs='*', help='Service type(s).') return parser
[docs] def take_action(self, parsed_args): self.log.debug('take_action(%s)', parsed_args) results = [] srv_types = parsed_args.srv_types local_scores = boolean_value( self.app.client_manager.sds_conf.get('proxy.quirk.local_scores'), False) if not local_scores: self.log.warn("'proxy.quirk.local_scores' not set, " "scores won't be realistic.") data = self.app.client_manager.conscience.local_services() for srv in data: tags = srv['tags'] location = tags.get('tag.loc', 'n/a') slots = tags.get('tag.slots', 'n/a') volume = tags.get('tag.vol', 'n/a') service_id = tags.get('tag.service_id', 'n/a') addr = srv['addr'] up = tags.get('tag.up', 'n/a') score = srv['score'] locked = boolean_value(tags.get('tag.lock'), False) srv_type = srv['type'] if not srv_types or srv_type in srv_types: results.append((srv_type, addr, service_id, volume, location, slots, up, score, locked)) columns = ('Type', 'Addr', 'Service Id', 'Volume', 'Location', 'Slots', 'Up', 'Score', 'Locked') result_gen = (r for r in results) return columns, result_gen
[docs]class ClusterUnlock(Lister): """Unlock the score of specific services of the cluster.""" log = getLogger(__name__ + '.ClusterUnlock')
[docs] def get_parser(self, prog_name): parser = super(ClusterUnlock, self).get_parser(prog_name) parser.add_argument( 'srv_type', metavar='<srv_type>', help='Service type.') parser.add_argument( 'srv_ids', metavar='<srv_ids>', nargs='+', help='ID(s) of the services.') return parser
def _unlock_services(self, parsed_args): srv_definitions = list() for srv_id in parsed_args.srv_ids: srv_definitions.append( self.app.client_manager.conscience.get_service_definition( parsed_args.srv_type, srv_id)) for batch in _bounded_batches(srv_definitions): result = "unlocked" try: self.app.client_manager.conscience.unlock_score(batch) except Exception as exc: self.success = False result = str(exc) for srv_definition in batch: yield (srv_definition['type'], srv_definition['addr'], result)
[docs] def take_action(self, parsed_args): self.log.debug('take_action(%s)', parsed_args) res = self._unlock_services(parsed_args) return (('Type', 'Service', 'Result'), res)
[docs]class ClusterUnlockAll(Lister): """Unlock all services of the cluster.""" log = getLogger(__name__ + '.ClusterUnlockAll')
[docs] def get_parser(self, prog_name): parser = super(ClusterUnlockAll, self).get_parser(prog_name) parser.add_argument( 'srv_types', metavar='<srv_types>', nargs='*', help='Service type(s) (or all if unset).') return parser
def _unlock_all_services(self, parsed_args): srv_types = parsed_args.srv_types if not parsed_args.srv_types: srv_types = self.app.client_manager.conscience.service_types() for srv_type in srv_types: try: srv_definitions = \ self.app.client_manager.conscience.all_services(srv_type) except OioException as exc: self.success = False self.log.error("Failed to list services of type %s: %s", srv_type, exc) continue for srv_definition in srv_definitions: srv_definition['type'] = srv_type for batch in _bounded_batches(srv_definitions): result = "unlocked" try: self.app.client_manager.conscience.unlock_score(batch) except Exception as exc: self.success = False result = str(exc) for srv_definition in batch: yield (srv_definition['type'], srv_definition['addr'], result)
[docs] def take_action(self, parsed_args): self.log.debug('take_action(%s)', parsed_args) res = self._unlock_all_services(parsed_args) return (('Type', 'Service', 'Result'), res)
def _sleep_interval(*tab): for v in tab: yield v while True: yield tab[-1]
[docs]class ClusterWait(Lister): """Wait for services to get a score above specified value.""" log = getLogger(__name__ + '.ClusterWait')
[docs] def get_parser(self, prog_name): parser = super(ClusterWait, self).get_parser(prog_name) parser.add_argument( 'types', metavar='<types>', nargs='*', help='Service type(s) to wait for (or all if unset).') parser.add_argument( '-n', '--count', metavar='<count>', type=int, default=0, help=('How many services are expected (0 by default).')) parser.add_argument( '-d', '--delay', metavar='<delay>', type=float, default=15.0, help='How long to wait for a score (15s by default).') parser.add_argument( '-s', '--score', metavar='<score>', type=int, default=1, help=('Minimum score value required for the chosen services ' '(1 by default).')) parser.add_argument( '-u', '--unlock', action='store_true', default=False, help='Should the service be unlocked.') return parser
def _wait(self, parsed_args): from time import time as now, sleep min_score = parsed_args.score delay = parsed_args.delay deadline = now() + delay descr = [] ko = -1 exc_msg = ("Timeout ({0}s) while waiting for the services to get a " "score >= {1}, {2}") def maybe_unlock(allsrv): if not parsed_args.unlock: return if not allsrv: return self.app.client_manager.conscience.unlock_score(allsrv) def check_deadline(): if now() > deadline: if ko < 0: msg = exc_msg.format(delay, min_score, "proxy and/or conscience not ready") else: msg = exc_msg.format(delay, min_score, "still %d are not." % ko) for srv in descr: if srv['score'] < min_score: self.log.warn( "%s %s %s", srv['type'], srv.get('id', None), srv['score']) raise Exception(msg) interval = _sleep_interval(0.0, 1.0, 2.0, 4.0) types = parsed_args.types if not parsed_args.types: while True: check_deadline() sleep(next(interval)) try: types = self.app.client_manager.conscience.service_types() break except OioNetworkException as exc: self.log.debug("Proxy error: %s", exc) except ServiceBusy as exc: self.log.debug("Conscience busy: %s", exc) interval = _sleep_interval(0.0, 1.0, 2.0, 4.0) while True: check_deadline() maybe_unlock(descr) sleep(next(interval)) descr = [] ko = -1 try: for typ in types: tmp = self.app.client_manager.conscience.all_services(typ) for srv in tmp: srv['type'] = typ descr += tmp except OioNetworkException as exc: self.log.debug("Proxy error: %s", exc) continue except ServiceBusy as exc: self.log.debug("Conscience busy: %s", exc) continue ko = len([s['score'] for s in descr if s['score'] < min_score]) if ko > 0: self.log.debug("Still %d services down", ko) continue # If a minimum has been specified, let's check we have enough # services if parsed_args.count: ok = len([s for s in descr if s['score'] >= min_score]) if ok < parsed_args.count: self.log.debug("Only %d services up", ok) continue # No service down, and enough services, we are done. for srv in descr: yield srv['type'], srv['addr'], srv['score'] return
[docs] def take_action(self, parsed_args): columns = ('Type', 'Service', 'Score') return columns, self._wait(parsed_args)
[docs]class ClusterLock(ClusterUnlock): """Lock the score of a service.""" log = getLogger(__name__ + '.ClusterLock')
[docs] def get_parser(self, prog_name): parser = super(ClusterLock, self).get_parser(prog_name) parser.add_argument( '-s', '--score', metavar='<score>', type=int, default=0, help='Score to set (0 by default).' ) return parser
def _lock_services(self, parsed_args): srv_definitions = list() for srv_id in parsed_args.srv_ids: srv_definitions.append( self.app.client_manager.conscience.get_service_definition( parsed_args.srv_type, srv_id, score=parsed_args.score)) for batch in _bounded_batches(srv_definitions): result = "locked to %d" % int(parsed_args.score) try: self.app.client_manager.conscience.lock_score(batch) except Exception as exc: self.success = False result = str(exc) for srv_definition in batch: yield (srv_definition['type'], srv_definition['addr'], result)
[docs] def take_action(self, parsed_args): self.log.debug('take_action(%s)', parsed_args) res = self._lock_services(parsed_args) return (('Type', 'Service', 'Result'), res)
[docs]class ClusterFlush(Lister): """Deregister all services of the cluster.""" log = getLogger(__name__ + '.ClusterFlush')
[docs] def get_parser(self, prog_name): parser = super(ClusterFlush, self).get_parser(prog_name) parser.add_argument( 'srv_types', metavar='<srv_types>', nargs='*', help='Service type(s) (or all if unset).') return parser
def _flush_srv_types(self, parsed_args): srv_types = parsed_args.srv_types if not parsed_args.srv_types: srv_types = self.app.client_manager.conscience.service_types() for srv_type in srv_types: result = "flushed" try: self.app.client_manager.conscience.flush(srv_type) except Exception as err: self.success = False result = err yield (srv_type, result)
[docs] def take_action(self, parsed_args): self.log.debug('take_action(%s)', parsed_args) res = self._flush_srv_types(parsed_args) return (('Type', 'Result'), res)
[docs]class ClusterDeregister(Lister): """Deregister specific services of the cluster.""" log = getLogger(__name__ + '.ClusterDeregister')
[docs] def get_parser(self, prog_name): parser = super(ClusterDeregister, self).get_parser(prog_name) parser.add_argument( 'srv_type', help='Service type.') parser.add_argument( 'srv_ids', metavar='<srv_ids>', nargs='+', help='ID(s) of the services.') return parser
def _deregister_services(self, parsed_args): srv_definitions = list() for srv_id in parsed_args.srv_ids: srv_definitions.append( self.app.client_manager.conscience.get_service_definition( parsed_args.srv_type, srv_id)) for batch in _bounded_batches(srv_definitions): result = "deregistered" try: self.app.client_manager.conscience.deregister(batch) except Exception as exc: self.success = False result = str(exc) for srv_definition in batch: yield (srv_definition['type'], srv_definition['addr'], result)
[docs] def take_action(self, parsed_args): self.log.debug('take_action(%s)', parsed_args) res = self._deregister_services(parsed_args) return (('Type', 'Service', 'Result'), res)
[docs]class ClusterResolve(ShowOne): """Resolve a service ID to an IP address and port.""" log = getLogger(__name__ + '.ClusterFlush')
[docs] def get_parser(self, prog_name): parser = super(ClusterResolve, self).get_parser(prog_name) parser.add_argument( 'srv_type', help='Service type.') parser.add_argument( 'srv_id', help='ID of the service.' ) return parser
[docs] def take_action(self, parsed_args): resolved = self.app.client_manager.conscience.resolve( parsed_args.srv_type, parsed_args.srv_id) return zip(*resolved.items())
[docs]class LocalNSConf(ShowOne): """Show namespace configuration values locally configured.""" log = getLogger(__name__ + '.LocalNSConf')
[docs] def get_parser(self, prog_name): parser = super(LocalNSConf, self).get_parser(prog_name) return parser
[docs] def take_action(self, parsed_args): self.log.debug('take_action(%s)', parsed_args) namespace = self.app.client_manager.client_conf['namespace'] sds_conf = self.app.client_manager.sds_conf output = list() for k in sds_conf: output.append(("%s/%s" % (namespace, k), sds_conf[k])) return list(zip(*output))