# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from oio.common.green import Queue, GreenPile, sleep
from logging import getLogger, INFO
from oio.cli import Command
from oio.common.configuration import load_namespace_conf
from oio.common.exceptions import \
ClientException, ConfigurationException, PreconditionFailed
from oio.common import green
from oio.directory.meta0 import generate_prefixes, count_prefixes
M0_CONN_TIMEOUT = 30.0
M0_READ_TIMEOUT = 60.0
[docs]class DirectoryCmd(Command):
"""Base class for directory subcommands"""
log = getLogger(__name__ + '.Directory')
[docs] def get_parser(self, prog_name):
parser = super(DirectoryCmd, self).get_parser(prog_name)
parser.add_argument(
'--no-rdir', action='store_true', help='Deprecated')
parser.add_argument(
'--replicas', metavar='<N>', dest='replicas',
type=int, default=3,
help='Set the number of replicas (3 by default)')
parser.add_argument(
'--min-dist',
type=int, default=1,
help="Minimum distance between replicas")
parser.add_argument(
'--meta0-timeout', metavar='<SECONDS>',
type=float, default=M0_READ_TIMEOUT,
help=("Timeout for meta0-related operations (%.3fs by default)" %
M0_READ_TIMEOUT))
return parser
[docs] def get_prefix_mapping(self, parsed_args):
"""
Create a meta0 prefix mapping with the parsed parameters.
"""
from oio.directory.meta0 import Meta0PrefixMapping
meta0_client = self.app.client_manager.directory.meta0
conscience_client = self.app.client_manager.directory.cluster
digits = self.app.client_manager.meta1_digits
return Meta0PrefixMapping(meta0_client,
conscience_client=conscience_client,
replicas=parsed_args.replicas,
digits=digits,
min_dist=parsed_args.min_dist,
logger=self.log)
def _apply(self, mapping, moved=None,
max_attempts=7, read_timeout=M0_READ_TIMEOUT):
"""
Upload the specified mapping to the meta0 service,
retry in case or error.
"""
from time import sleep
self.log.info("Saving...")
for i in range(max_attempts):
try:
mapping.apply(moved=moved,
connection_timeout=M0_CONN_TIMEOUT,
read_timeout=read_timeout)
break
except ClientException as ex:
# Manage several unretriable errors
retry = (503, 504)
if ex.status >= 400 and ex.status not in retry:
raise
# Monotonic backoff (retriable and net errors)
if i < max_attempts - 1:
sleep(i * 1.0)
continue
# Too many attempts
raise
[docs]class DirectoryInit(DirectoryCmd):
"""
Initialize the service directory.
Distribute database prefixes among meta1 services and fill the meta0.
"""
[docs] def get_parser(self, prog_name):
parser = super(DirectoryInit, self).get_parser(prog_name)
parser.add_argument(
'--level', metavar='<LEVEL>', dest='level',
choices=('site', 'rack', 'host', 'volume'), default='volume',
help='Which location level should be perfectly balanced')
parser.add_argument(
'--degradation', metavar='<DEGRADATION>', dest='degradation',
type=int, default=None,
help='How many location levels we accept to lose to keep the '
'quorums valid. Not set by default, it is then autodetected '
'to the replication set minus the quorum')
parser.add_argument(
'--force',
action='store_true',
help="Do the bootstrap even if already done")
parser.add_argument(
'--check',
action='store_true',
help="Check that all prefixes have the right number of replicas")
return parser
def _assign_meta1(self, parsed_args):
# Pre-check
mapping = self.get_prefix_mapping(parsed_args)
mapping.load_meta0(read_timeout=parsed_args.meta0_timeout)
if mapping and not parsed_args.force:
self.log.info("Meta1 prefix mapping already initialized")
if not parsed_args.check:
return True
self.log.info("Checking...")
return mapping.check_replicas()
if parsed_args.degradation is None:
quorum = parsed_args.replicas / 2 + 1
parsed_args.degradation = parsed_args.replicas - quorum
# Reset and bootstrap
mapping = self.get_prefix_mapping(parsed_args)
try:
mapping.bootstrap(level=parsed_args.level,
degradation=parsed_args.degradation)
except ConfigurationException:
self.log.warn("Namespace poorly configured, some meta1 services "
"carry no location or an invalid one.")
raise
except PreconditionFailed:
self.log.warn("Namespace too constrained, please consider a "
"less constrained setup, using either --level or "
"--degradation with different values.")
raise
if mapping.check_replicas():
self._apply(mapping, read_timeout=parsed_args.meta0_timeout)
return True
else:
raise Exception("Failed to initialize prefix mapping")
[docs] def take_action(self, parsed_args):
self.log.debug('take_action(%s)', parsed_args)
if parsed_args.no_rdir:
self.log.warn('--no-rdir option is deprecated')
checked = self._assign_meta1(parsed_args)
if checked:
self.log.info("Done")
else:
self.log.warn("Errors encountered")
raise Exception("Bad meta1 prefix mapping")
[docs]class DirectoryList(DirectoryCmd):
"""
List the content of meta0 database as a JSON object.
The output can be used later to restore the database.
WARNING: output is >2MB.
"""
[docs] def take_action(self, parsed_args):
self.log.debug('take_action(%s)', parsed_args)
mapping = self.get_prefix_mapping(parsed_args)
mapping.load_meta0(connection_timeout=M0_CONN_TIMEOUT,
read_timeout=parsed_args.meta0_timeout)
print mapping.to_json()
[docs]class DirectoryRebalance(DirectoryCmd):
"""
Rebalance the container prefixes.
WARNING: A maximum of 1 service per prefixe is modified
"""
[docs] def take_action(self, parsed_args):
self.log.debug('take_action(%s)', parsed_args)
mapping = self.get_prefix_mapping(parsed_args)
mapping.load_meta0(read_timeout=parsed_args.meta0_timeout)
moved = mapping.rebalance()
self._apply(mapping, moved=moved,
read_timeout=parsed_args.meta0_timeout)
self.log.info("Moved %s", moved)
[docs]class DirectoryRestore(DirectoryCmd):
"""
Restore the content of meta0 database from a JSON object.
Use with caution.
"""
[docs] def get_parser(self, prog_name):
parser = super(DirectoryRestore, self).get_parser(prog_name)
parser.add_argument(
'backup', help='Path to the JSON-formatted backup file, or "-".')
parser.add_argument(
'--I-know-what-I-am-doing', action='store_true',
help='Confirm that you know what you are doing.')
return parser
[docs] def take_action(self, parsed_args):
self.log.debug('take_action(%s)', parsed_args)
if parsed_args.backup == '-':
self.log.info('Loading from stdin...')
from sys import stdin
backup = stdin.read()
else:
with open(parsed_args.backup, 'r') as inputf:
self.log.info('Loading from %s...', parsed_args.backup)
backup = inputf.read()
mapping = self.get_prefix_mapping(parsed_args)
mapping.load_json(backup)
self.log.info('Checking...')
if mapping.check_replicas():
self.log.info('OK')
elif parsed_args.I_know_what_I_am_doing:
self.log.info('Errors encountered, but "I know what I am doing".')
else:
raise Exception('Bad meta1 prefix mapping')
if parsed_args.I_know_what_I_am_doing:
self.log.info('Applying...')
self._apply(mapping, read_timeout=parsed_args.meta0_timeout)
else:
self.log.info('Please tell me that you know what you are doing.')
[docs]class DirectoryDecommission(DirectoryCmd):
"""
Decommission a Meta1 service (or only some bases).
"""
[docs] def get_parser(self, prog_name):
parser = super(DirectoryDecommission, self).get_parser(prog_name)
parser.add_argument('addr', metavar='<ADDR>',
help='Address of service to decommission')
parser.add_argument('base', metavar='<BASE>', nargs='*',
help="Name of bases to decommission")
parser.add_argument('--ignore-replicas-number-errors',
action='store_true',
help=("Continue even if the number of replicas "
"is not as expected. Dangerous."))
return parser
[docs] def take_action(self, parsed_args):
self.log.debug('take_action(%s)', parsed_args)
# Ensure we see 'info' logs.
if self.log.getEffectiveLevel() > INFO:
self.log.setLevel(INFO)
mapping = self.get_prefix_mapping(parsed_args)
mapping.load_meta0(read_timeout=parsed_args.meta0_timeout)
self.log.info("meta1_digits=%d", mapping.digits)
moved = mapping.decommission(parsed_args.addr,
bases_to_remove=parsed_args.base)
if (mapping.check_replicas() or
parsed_args.ignore_replicas_number_errors):
self._apply(mapping, moved=moved,
read_timeout=parsed_args.meta0_timeout)
self.log.info("Moved %s", sorted(moved))
else:
self.log.warn("Did nothing due to errors.")
self.log.warn("If the errors are not related to the bases "
"you want to decommission, try to rebalance.")
return 1
[docs]class DirectoryWarmup(DirectoryCmd):
"""Ping each prefix of a Meta0 hash to prepare each Meta1 base"""
[docs] def get_parser(self, prog_name):
parser = super(DirectoryWarmup, self).get_parser(prog_name)
parser.add_argument('--workers', type=int, default=1,
help="How many concurrent bases to warm up")
parser.add_argument('--proxy', type=str, default=None,
help="Specific proxy IP:PORT")
return parser
def _ping_prefix(self, prefix):
pass
[docs] def take_action(self, parsed_args):
self.log.debug('take_action(%s)', parsed_args)
digits = self.app.client_manager.meta1_digits
workers_count = parsed_args.workers
conf = {'namespace': self.app.client_manager.namespace}
if parsed_args.proxy:
conf.update({'proxyd_url': parsed_args.proxy})
else:
ns_conf = load_namespace_conf(conf['namespace'])
proxy = ns_conf.get('proxy')
conf.update({'proxyd_url': proxy})
workers = list()
with green.ContextPool(workers_count) as pool:
pile = GreenPile(pool)
prefix_queue = Queue(16)
# Prepare some workers
for _ in range(workers_count):
worker = WarmupWorker(conf, self.log)
workers.append(worker)
pile.spawn(worker.run, prefix_queue)
# Feed the queue
trace_increment = 0.01
trace_next = trace_increment
sent, total = 0, float(count_prefixes(digits))
for prefix in generate_prefixes(digits):
sent += 1
prefix_queue.put(prefix)
# Display the progression
ratio = float(sent) / total
if ratio >= trace_next:
self.log.info("... %d%%", int(ratio * 100.0))
trace_next += trace_increment
self.log.debug("Send the termination marker")
prefix_queue.join()
self.log.info("All the workers are done")
[docs]class WarmupWorker(object):
def __init__(self, conf, log):
from oio.common.http_urllib3 import get_pool_manager
self.log = log
self.pool = get_pool_manager()
self.url_prefix = 'http://%s/v3.0/%s/admin/status?type=meta1&cid=' % (
conf['proxyd_url'], conf['namespace'])
[docs] def run(self, prefix_queue):
while True:
prefix = prefix_queue.get()
self.ping(prefix)
prefix_queue.task_done()
[docs] def ping(self, prefix):
url = self.url_prefix + prefix.ljust(64, '0')
max_attempts = 5
for i in range(max_attempts):
rep = self.pool.request('POST', url)
if rep.status == 200:
return
self.log.warn("%d %s", rep.status, prefix)
if rep.status == 503:
sleep(i * 0.5)
else:
break