Source code for oio.cli.directory.directory

# Copyright (C) 2015-2018 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from logging import getLogger
from cliff.command import Command
from eventlet import Queue, GreenPile
from oio.common.configuration import load_namespace_conf
from oio.common.exceptions import ClientException
from oio.common import green
from oio.directory.meta0 import generate_prefixes, count_prefixes


[docs]class DirectoryCmd(Command): """Base class for directory subcommands""" log = getLogger(__name__ + '.Directory')
[docs] def get_parser(self, prog_name): parser = super(DirectoryCmd, self).get_parser(prog_name) parser.add_argument('--replicas', metavar='<N>', dest='replicas', type=int, default=3, help='Set the number of replicas (3 by default)') parser.add_argument('--min-dist', type=int, default=1, help="Minimum distance between replicas") parser.add_argument( '--meta0-timeout', metavar='<SECONDS>', type=float, default=60.0, help="Timeout for meta0-related operations (60.0s by default)") return parser
[docs] def get_prefix_mapping(self, parsed_args): from oio.directory.meta0 import Meta0PrefixMapping meta0_client = self.app.client_manager.directory.meta0 conscience_client = self.app.client_manager.directory.cluster digits = self.app.client_manager.meta1_digits return Meta0PrefixMapping(meta0_client, conscience_client=conscience_client, replicas=parsed_args.replicas, digits=digits, min_dist=parsed_args.min_dist, logger=self.log)
[docs]class DirectoryInit(DirectoryCmd): """ Initialize the service directory. Distribute database prefixes among meta1 services and fill the meta0. """
[docs] def get_parser(self, prog_name): parser = super(DirectoryInit, self).get_parser(prog_name) parser.add_argument( '--no-rdir', action='store_true', help='Deprecated') parser.add_argument( '--force', action='store_true', help="Do the bootstrap even if already done") parser.add_argument( '--check', action='store_true', help="Check that all prefixes have the right number of replicas") return parser
def _assign_meta1(self, parsed_args): mapping = self.get_prefix_mapping(parsed_args) mapping.load(read_timeout=parsed_args.meta0_timeout) if mapping and not parsed_args.force: self.log.info("Meta1 prefix mapping already initialized") if not parsed_args.check: return True self.log.info("Checking...") return mapping.check_replicas() # Bootstrap with the 'random' strategy, then rebalance with the # 'less_prefixes' strategy to ensure the same number of prefixes # per meta1. This is faster than bootstrapping directly with the # 'less_prefixes' strategy. checked = False for i in range(3): self.log.info("Computing meta1 prefix mapping (pass %d)", i) mapping.bootstrap() self.log.info("Equilibrating...") mapping.rebalance() if parsed_args.check: self.log.info("Checking...") checked = mapping.check_replicas() else: checked = True if checked: break if checked: from time import sleep self.log.info("Saving...") max_attempts = 7 for i in range(max_attempts): try: mapping.apply(connection_timeout=30.0, read_timeout=parsed_args.meta0_timeout) break except ClientException as ex: # Manage several unretriable errors retry = (503, 504) if ex.status >= 400 and ex.status not in retry: raise # Monotonic backoff (retriable and net errors) if i < max_attempts - 1: sleep(i * 1.0) continue # Too many attempts raise else: raise Exception("Failed to initialize prefix mapping") return checked
[docs] def take_action(self, parsed_args): self.log.debug('take_action(%s)', parsed_args) if parsed_args.no_rdir: self.log.warn('--no-rdir option is deprecated') checked = self._assign_meta1(parsed_args) if checked: self.log.info("Done") else: self.log.warn("Errors encountered") raise Exception("Bad meta1 prefix mapping")
[docs]class DirectoryList(DirectoryCmd): """ List the content of meta0 database as a JSON object. WARNING: output is >2MB. """
[docs] def take_action(self, parsed_args): self.log.debug('take_action(%s)', parsed_args) mapping = self.get_prefix_mapping(parsed_args) mapping.load(connection_timeout=30.0, read_timeout=parsed_args.meta0_timeout) print mapping.to_json()
[docs]class DirectoryRebalance(DirectoryCmd): """Rebalance the container prefixes."""
[docs] def take_action(self, parsed_args): self.log.debug('take_action(%s)', parsed_args) mapping = self.get_prefix_mapping(parsed_args) mapping.load(read_timeout=parsed_args.meta0_timeout) moved = mapping.rebalance() mapping.apply(moved, read_timeout=parsed_args.meta0_timeout) self.log.info("Moved %s", moved)
[docs]class DirectoryDecommission(DirectoryCmd): """Decommission a Meta1 service (or only some bases)."""
[docs] def get_parser(self, prog_name): parser = super(DirectoryDecommission, self).get_parser(prog_name) parser.add_argument('addr', metavar='<ADDR>', help='Address of service to decommission') parser.add_argument('base', metavar='<BASE>', nargs='*', help="Name of bases to decommission") return parser
[docs] def take_action(self, parsed_args): self.log.debug('take_action(%s)', parsed_args) mapping = self.get_prefix_mapping(parsed_args) mapping.load(read_timeout=parsed_args.meta0_timeout) moved = mapping.decommission(parsed_args.addr, bases_to_remove=parsed_args.base) mapping.apply(moved, read_timeout=parsed_args.meta0_timeout) self.log.info("Moved %s", moved)
[docs]class DirectoryWarmup(DirectoryCmd): """Ping each prefix of a Meta0 hash to prepare each Meta1 base"""
[docs] def get_parser(self, prog_name): parser = super(DirectoryWarmup, self).get_parser(prog_name) parser.add_argument('--workers', type=int, default=1, help="How many concurrent bases to warm up") parser.add_argument('--proxy', type=str, default=None, help="Specific proxy IP:PORT") return parser
def _ping_prefix(self, prefix): pass
[docs] def take_action(self, parsed_args): self.log.debug('take_action(%s)', parsed_args) digits = self.app.client_manager.meta1_digits workers_count = parsed_args.workers conf = {'namespace': self.app.client_manager.namespace} if parsed_args.proxy: conf.update({'proxyd_url': parsed_args.proxy}) else: ns_conf = load_namespace_conf(conf['namespace']) proxy = ns_conf.get('proxy') conf.update({'proxyd_url': proxy}) workers = list() with green.ContextPool(workers_count) as pool: pile = GreenPile(pool) prefix_queue = Queue(16) # Prepare some workers for i in range(workers_count): w = WarmupWorker(conf, self.log) workers.append(w) pile.spawn(w.run, prefix_queue) # Feed the queue trace_increment = 0.01 trace_next = trace_increment sent, total = 0, float(count_prefixes(digits)) for prefix in generate_prefixes(digits): sent += 1 prefix_queue.put(prefix) # Display the progression ratio = float(sent) / total if ratio >= trace_next: self.log.info("... %d%%", int(ratio * 100.0)) trace_next += trace_increment self.log.debug("Send the termination marker") prefix_queue.join() self.log.info("All the workers are done")
[docs]class WarmupWorker(object): def __init__(self, conf, log): from oio.common.http_urllib3 import get_pool_manager self.log = log self.pool = get_pool_manager() self.url_prefix = 'http://%s/v3.0/%s/admin/status?type=meta1&cid=' % ( conf['proxyd_url'], conf['namespace'])
[docs] def run(self, prefix_queue): while True: prefix = prefix_queue.get() self.ping(prefix) prefix_queue.task_done()
[docs] def ping(self, prefix): url = self.url_prefix + prefix.ljust(64, '0') max_attempts = 5 for i in range(max_attempts): rep = self.pool.request('POST', url) if rep.status == 200: return self.log.warn("%d %s", rep.status, prefix) if rep.status == 503: from eventlet import sleep sleep(i * 0.5) else: break