# Copyright (C) 2019-2020 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from six import iteritems
from cliff import lister
from oio.crawler.integrity import Target
from oio.cli.admin.item_check import ItemCheckCommand
from oio.cli.admin.common import MultipleServicesCommandMixin
CID_PREFIX_COUNT = 65536
[docs]class BaseCheckCommand(lister.Lister):
"""
Base class for all check commands.
"""
columns = ('Status', 'Errors')
SRV = None
def __init__(self, *args, **kwargs):
super(BaseCheckCommand, self).__init__(*args, **kwargs)
self._zkcnxstr = None
self.catalog = None
self.live = None
@property
def logger(self):
return self.app.client_manager.logger
[docs] def get_parser(self, prog_name):
parser = super(BaseCheckCommand, self).get_parser(prog_name)
parser.add_argument(
'--catalog',
type=str,
help="Load service catalog from file."
)
return parser
[docs] def load_catalog(self, parsed_args):
# Load the live services
self.live = self.load_live_services()
self.live = tuple(self.live)
self.logger.info("Catalog: loaded %d services", len(self.live))
for type_, host, port, score in self.live:
self.logger.debug("live> %s %s %d score=%d",
type_, host, port, score)
# Load a catalog of expected services
self.catalog = list()
if parsed_args.catalog:
self.catalog = self.load_catalog_from_file(parsed_args.catalog)
else:
for type_, host, port, score in self.live:
self.catalog.append((type_, host, port, score))
self.catalog = tuple(self.catalog)
self.logger.info("Catalog: loaded %d services", len(self.catalog))
for type_, host, port, score in self.catalog:
self.logger.debug("catalog> %s %s %d", type_, host, port)
[docs] @staticmethod
def filter_services(srv, srvtype):
for type_, host, port, score in srv:
if type_ == srvtype:
yield type_, host, port, score
[docs] def load_live_services(self):
client = self.app.client_manager.conscience
for srvtype in client.service_types():
for srv in client.all_services(srvtype):
ip, port = srv['addr'].split(':')
yield str(srvtype), str(ip), int(port), int(srv['score'])
[docs] def load_catalog_from_file(self, path):
with open(path, "r") as fin:
for line in fin:
line = line.strip()
if not line or line.startswith('#'):
continue
try:
type_, host, port = line.split()
yield str(type_), str(host), int(port), 0
except Exception as ex:
self.logger.exception("Failed to decode line: %s", ex)
[docs] def zookeeper(self):
if self._zkcnxstr:
return self._zkcnxstr
conf = self.app.client_manager.sds_conf
self._zkcnxstr = conf.get('zookeeper.%s' % self.SRV,
conf.get('zookeeper'))
return self._zkcnxstr
def _take_action(self, parsed_args):
raise NotImplementedError()
[docs] def take_action(self, parsed_args):
self.logger.debug('take_action(%s)', parsed_args)
self.load_catalog(parsed_args)
return self.columns, self._take_action(parsed_args)
[docs]class DirectoryCheck(BaseCheckCommand):
"""
Check the directory has been fully bootstraped.
Check all container prefixes are set, from oioproxy and meta0.
Check all meta0 show the same information.
Check all meta1 are assigned.
"""
def _take_action(self, parsed_args):
import subprocess
from oio.directory.meta0 import Meta0Client
from oio.common.json import json
self.logger.debug("Checking the directory bootstrap.")
# Get an official dump from the proxy, check its size
m0 = Meta0Client({"namespace": self.app.options.ns})
prefixes = m0.list()
if len(prefixes) != CID_PREFIX_COUNT:
raise ValueError('Found %d entries in meta0, expected %d' % (
len(prefixes), CID_PREFIX_COUNT))
self.logger.info("The proxy serves a full meta0 dump.")
# contact each M0 to perform a check: any "get" command will
# fail if the meta0 is not complete. Unfortunately we just have
# oio-meta0-client to target a specific service.
for _, host, port, _ in self.filter_services(self.catalog, 'meta0'):
url = '%s:%d' % (host, port)
res = subprocess.check_output(
['oio-meta0-client', url, 'get', '0000'])
self.logger.info(res)
self.logger.info("All meta0 services are complete.")
# contact each meta0 to check that all the dumps are identical
dump0 = None
first = None
for _, host, port, _ in self.filter_services(self.catalog, 'meta0'):
url = '%s:%d' % (host, port)
dump = subprocess.check_output(['oio-meta0-client', url, 'list'])
if dump0 is None:
dump0 = dump
first = url
elif dump0 != dump:
raise ValueError('The dump returned by meta0 %s differs from '
'the dump returned by %s' % (url, first))
self.logger.info("All meta0 services serve the same base.")
# Check all the meta1 are concerned
reverse_dump = set()
for _, v in iteritems(json.loads(dump0)):
for url in v:
reverse_dump.add(url)
m1 = {':'.join((descr[1], str(descr[2])))
for descr in self.filter_services(self.catalog, 'meta1')}
if m1 != reverse_dump:
raise ValueError(
'Meta1 used but not visible: %s, '
'meta1 visible but not used: %s' % (
reverse_dump - m1, m1 - reverse_dump))
self.logger.info("All meta1 services have been assigned.")
yield ('OK', None)
[docs]class RdirCheck(BaseCheckCommand):
"""
Check rdir services.
Verify that all rawx and meta2 services are assigned.
Check registered rdir services against deployed rdir services.
"""
def _take_action(self, parsed_args):
from oio.rdir.client import RdirDispatcher
self.logger.debug("Checking rdir services.")
# Load the assigned rdir services
client = RdirDispatcher({"namespace": self.app.options.ns})
# rawx
all_rawx, all_rdir = client.get_assignments('rawx')
assert not any(r['rdir'] is None for r in all_rawx)
self.logger.info("All rawx services have an rdir service assigned.")
# meta2
all_meta2, all_rdir = client.get_assignments('meta2')
assert not any(r['rdir'] is None for r in all_meta2)
self.logger.info("All meta2 services have an rdir service assigned.")
# Compare with the number of expected services
l0 = list(self.filter_services(self.live, 'rdir'))
c0 = list(self.filter_services(self.catalog, 'rdir'))
assert len(l0) == len(c0)
assert len(l0) == len(all_rdir)
self.logger.info("All rdir services are alive.")
yield ('OK', None)
[docs]class RawxCheck(MultipleServicesCommandMixin, ItemCheckCommand):
"""
Check all rawx chunks.
Every chunk will also have his account, container and object quickly
checked. This is similar to 'openio-admin chunk check' but for every
chunk hosted by the service.
Default output format is 'value'.
"""
service_type = 'rawx'
columns = ('Chunk', 'Status', 'Errors')
reqid_prefix = 'ACLI-RC-'
@property
def formatter_default(self):
return 'value'
def _format_results(self, checker):
for res in checker.run():
if res.target.type == 'chunk':
if not res.errors:
status = 'OK'
else:
status = 'error'
self.success = False
yield (repr(res.target)[len('chunk='):],
status, res.errors_to_str())
[docs] def get_parser(self, prog_name):
parser = super(RawxCheck, self).get_parser(prog_name)
MultipleServicesCommandMixin.patch_parser(self, parser)
return parser
[docs] def take_action(self, parsed_args):
MultipleServicesCommandMixin.check_and_load_parsed_args(
self, self.app, parsed_args)
return super(RawxCheck, self).take_action(parsed_args)
def _take_action(self, parsed_args):
for service in parsed_args.services:
reqid = self.app.request_id(self.reqid_prefix)
chunks = self.app.client_manager.rdir.chunk_fetch(service,
reqid=reqid)
for res in self.check_chunks(service, chunks, self.checker):
yield res
[docs] def check_chunks(self, service, chunks, checker):
url = 'http://' + service
for chunk in chunks:
checker.check(Target(self.app.options.account,
chunk=url + '/' + chunk[2],
content_id=chunk[1], cid=chunk[0]))
for res in self._format_results(checker):
yield res