Source code for oio.content.repairer

# Copyright (C) 2019 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


from datetime import datetime
from six import iteritems

from oio.api.object_storage import _sort_chunks
from oio.blob.operator import ChunkOperator
from oio.blob.client import BlobClient
from oio.common.exceptions import NotFound, ClientPreconditionFailed
from oio.common.storage_method import STORAGE_METHODS
from oio.common.tool import Tool, ToolWorker
from oio.common.utils import cid_from_name
from oio.container.client import ContainerClient


[docs]class ContentRepairer(Tool): """ Repair objects. """ def __init__(self, conf, objects=None, **kwargs): super(ContentRepairer, self).__init__(conf, **kwargs) # input self.objects = objects
[docs] @staticmethod def string_from_item(item): namespace, account, container, obj_name, version = item return '%s|%s|%s|%s|%s' % ( namespace, account, container, obj_name, version)
def _fetch_items_from_objects(self): for obj in self.objects: namespace = obj['namespace'] account = obj['account'] container = obj['container'] obj_name = obj['name'] version = obj['version'] yield namespace, account, container, obj_name, version def _fetch_items(self): if self.objects: return self._fetch_items_from_objects() def _empty_generator(): return yield # pylint: disable=unreachable return _empty_generator() def _get_report(self, status, end_time, counters): objects_processed, total_objects_processed, \ errors, total_errors = counters time_since_last_report = (end_time - self.last_report) or 0.00001 total_time = (end_time - self.start_time) or 0.00001 report = ( '%(status)s ' 'last_report=%(last_report)s %(time_since_last_report).2fs ' 'objects=%(objects)d %(objects_rate).2f/s ' 'errors=%(errors)d %(errors_rate).2f%% ' 'start_time=%(start_time)s %(total_time).2fs ' 'total_objects=%(total_objects)d %(total_objects_rate).2f/s ' 'total_errors=%(total_errors)d %(total_errors_rate).2f%%' % { 'status': status, 'last_report': datetime.fromtimestamp( int(self.last_report)).isoformat(), 'time_since_last_report': time_since_last_report, 'objects': objects_processed, 'objects_rate': objects_processed / time_since_last_report, 'errors': errors, 'errors_rate': 100 * errors / float(objects_processed or 1), 'start_time': datetime.fromtimestamp( int(self.start_time)).isoformat(), 'total_time': total_time, 'total_objects': total_objects_processed, 'total_objects_rate': total_objects_processed / total_time, 'total_errors': total_errors, 'total_errors_rate': 100 * total_errors / float(total_objects_processed or 1) }) if self.total_expected_items is not None: progress = 100 * total_objects_processed / \ float(self.total_expected_items or 1) report += ' progress=%d/%d %.2f%%' % \ (total_objects_processed, self.total_expected_items, progress) return report
[docs] def create_worker(self, queue_workers, queue_reply): return ContentRepairerWorker(self, queue_workers, queue_reply)
def _load_total_expected_items(self): if self.objects and isinstance(self.objects, list): self.total_expected_items = len(self.objects)
[docs]class ContentRepairerWorker(ToolWorker): def __init__(self, tool, queue_workers, queue_reply): super(ContentRepairerWorker, self).__init__( tool, queue_workers, queue_reply) self.chunk_operator = ChunkOperator(self.conf, logger=self.logger) self.blob_client = BlobClient(self.conf) self.container_client = ContainerClient(self.conf, logger=self.logger) def _safe_chunk_rebuild(self, item, content_id, chunk_id_or_pos, **kwargs): _, account, container, _, _ = item try: container_id = cid_from_name(account, container) self.chunk_operator.rebuild( container_id, content_id, chunk_id_or_pos, **kwargs) except Exception as exc: # pylint: disable=broad-except self.logger.error( 'ERROR when rebuilding chunk %s (%s): %s', self.tool.string_from_item(item), chunk_id_or_pos, exc) return exc def _repair_metachunk(self, item, content_id, stg_met, pos, chunks): """ Check that a metachunk has the right number of chunks. :returns: the list (generator) of missing chunks """ exceptions = list() required = stg_met.expected_chunks if len(chunks) < required: if stg_met.ec: subs = {x['num'] for x in chunks} for sub in range(required): if sub not in subs: exc = self._safe_chunk_rebuild( item, content_id, "%d.%d" % (pos, sub)) if exc: exceptions.append(exc) else: missing_chunks = required - len(chunks) for _ in range(missing_chunks): exc = self._safe_chunk_rebuild( item, content_id, pos) if exc: exceptions.append(exc) for chunk in chunks: try: self.blob_client.chunk_head( chunk['url'], xattr=True, check_hash=True) except (NotFound, ClientPreconditionFailed) as e: kwargs = {'try_chunk_delete': isinstance(e, ClientPreconditionFailed)} exc = self._safe_chunk_rebuild( item, content_id, chunk['url'], **kwargs) if exc: exceptions.append(exc) except Exception as exc: # pylint: disable=broad-except self.logger.error( 'ERROR when checking chunk %s (%s): %s', self.tool.string_from_item(item), chunk['url'], exc) exceptions.append(exc) return exceptions def _process_item(self, item): namespace, account, container, obj_name, version = item if namespace != self.tool.namespace: raise ValueError('Invalid namespace (actual=%s, expected=%s)' % ( namespace, self.tool.namespace)) obj_meta, chunks = self.container_client.content_locate( account=account, reference=container, path=obj_name, version=version, properties=False) content_id = obj_meta['id'] exceptions = list() stg_met = STORAGE_METHODS.load(obj_meta['chunk_method']) chunks_by_pos = _sort_chunks(chunks, stg_met.ec) for pos, chunks in iteritems(chunks_by_pos): try: exceptions += self._repair_metachunk( item, content_id, stg_met, pos, chunks) except Exception as exc: # pylint: disable=broad-except self.logger.error('ERROR when repair metachunk %s (%d): %s', self.tool.string_from_item(item), pos, exc) exceptions.append(exc) if exceptions: raise Exception(exceptions) self.container_client.content_touch( account=account, reference=container, path=obj_name, version=version)