# Copyright (C) 2019 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from datetime import datetime
from oio.common.easy_value import true_value
from oio.common.tool import Tool, ToolWorker
from oio.common.utils import cid_from_name
from oio.container.client import ContainerClient
from oio.directory.admin import AdminClient
from oio.directory.meta2 import Meta2Database
[docs]class ContainerRepairer(Tool):
"""
Repair containers.
"""
DEFAULT_REBUILD_BASES = True
DEFAULT_SYNC_BASES = True
DEFAULT_UPDATE_ACCOUNT = True
def __init__(self, conf, containers=None, **kwargs):
super(ContainerRepairer, self).__init__(conf, **kwargs)
# input
self.containers = containers
[docs] @staticmethod
def string_from_item(item):
namespace, account, container = item
return '%s|%s|%s' % (
namespace, account, container)
def _fetch_items_from_containers(self):
for obj in self.containers:
namespace = obj['namespace']
account = obj['account']
container = obj['container']
yield namespace, account, container
def _fetch_items(self):
if self.containers:
return self._fetch_items_from_containers()
def _empty_generator():
return
yield # pylint: disable=unreachable
return _empty_generator()
def _get_report(self, status, end_time, counters):
containers_processed, total_containers_processed, \
errors, total_errors = counters
time_since_last_report = (end_time - self.last_report) or 0.00001
total_time = (end_time - self.start_time) or 0.00001
report = (
'%(status)s '
'last_report=%(last_report)s %(time_since_last_report).2fs '
'containers=%(containers)d %(containers_rate).2f/s '
'errors=%(errors)d %(errors_rate).2f%% '
'start_time=%(start_time)s %(total_time).2fs '
'total_containers='
'%(total_containers)d %(total_containers_rate).2f/s '
'total_errors=%(total_errors)d %(total_errors_rate).2f%%' % {
'status': status,
'last_report': datetime.fromtimestamp(
int(self.last_report)).isoformat(),
'time_since_last_report': time_since_last_report,
'containers': containers_processed,
'containers_rate':
containers_processed / time_since_last_report,
'errors': errors,
'errors_rate': 100 * errors / float(containers_processed or 1),
'start_time': datetime.fromtimestamp(
int(self.start_time)).isoformat(),
'total_time': total_time,
'total_containers': total_containers_processed,
'total_containers_rate':
total_containers_processed / total_time,
'total_errors': total_errors,
'total_errors_rate':
100 * total_errors / float(total_containers_processed or 1)
})
if self.total_expected_items is not None:
progress = 100 * total_containers_processed / \
float(self.total_expected_items or 1)
report += ' progress=%d/%d %.2f%%' % \
(total_containers_processed, self.total_expected_items,
progress)
return report
[docs] def create_worker(self, queue_workers, queue_reply):
return ContainerRepairerWorker(self, queue_workers, queue_reply)
def _load_total_expected_items(self):
if self.containers and isinstance(self.containers, list):
self.total_expected_items = len(self.containers)
[docs]class ContainerRepairerWorker(ToolWorker):
def __init__(self, tool, queue_workers, queue_reply):
super(ContainerRepairerWorker, self).__init__(
tool, queue_workers, queue_reply)
self.rebuild_bases = true_value(self.tool.conf.get(
'rebuild_bases', self.tool.DEFAULT_REBUILD_BASES))
self.sync_bases = true_value(self.tool.conf.get(
'sync_bases', self.tool.DEFAULT_SYNC_BASES))
self.update_account = true_value(self.tool.conf.get(
'update_account', self.tool.DEFAULT_UPDATE_ACCOUNT))
self.admin_client = AdminClient(self.conf, logger=self.logger)
self.container_client = ContainerClient(self.conf, logger=self.logger)
self.meta2_database = Meta2Database(self.conf, logger=self.logger)
def _process_item(self, item):
namespace, account, container = item
if namespace != self.tool.namespace:
raise ValueError('Invalid namespace (actual=%s, expected=%s)' % (
namespace, self.tool.namespace))
errors = list()
if self.rebuild_bases:
cid = cid_from_name(account, container)
for res in self.meta2_database.rebuild(cid):
if res['err']:
errors.append('%s: %s' % (res['base'], res['err']))
if errors:
raise Exception(errors)
if self.sync_bases:
data = self.admin_client.election_sync(
service_type='meta2', account=account, reference=container)
for host, info in data.items():
if info['status']['status'] not in (200, 301):
errors.append('%s (%d): %s' % (
host, info['status']['status'],
info['status']['message']))
if errors:
raise Exception(errors)
if self.update_account:
self.container_client.container_touch(
account=account, reference=container)