Source code for oio.rebuilder.blob_rebuilder

# Copyright (C) 2015-2018 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import json
import time
from datetime import datetime
from socket import gethostname

from oio.common.easy_value import int_value, true_value
from oio.common.exceptions import ContentNotFound, NotFound, OrphanChunk
from oio.content.factory import ContentFactory
from oio.event.beanstalk import Beanstalk, ConnectionError
from oio.rdir.client import RdirClient
from oio.rebuilder.rebuilder import Rebuilder, RebuilderWorker


DEFAULT_REBUILDER_TUBE = 'oio-rebuild'


[docs]class BlobRebuilder(Rebuilder): def __init__(self, conf, logger, volume, try_chunk_delete=False, beanstalkd_addr=None, **kwargs): super(BlobRebuilder, self).__init__(conf, logger, **kwargs) self.volume = volume self.rdir_client = RdirClient(conf, logger=self.logger) self.try_chunk_delete = try_chunk_delete self.beanstalkd_addr = beanstalkd_addr self.beanstalkd_tube = conf.get('beanstalkd_tube', DEFAULT_REBUILDER_TUBE) self.beanstalk = None self.rdir_fetch_limit = int_value(conf.get('rdir_fetch_limit'), 100) def _fetch_chunks_from_event(self, job_id, data): env = json.loads(data) for chunk_pos in env['data']['missing_chunks']: yield [env['url']['id'], env['url']['content'], str(chunk_pos), None] def _connect_to_beanstalk(self): self.logger.debug('Connecting to %s', self.beanstalkd_addr) self.beanstalk = Beanstalk.from_url(self.beanstalkd_addr) self.logger.debug('Using tube %s', self.beanstalkd_tube) self.beanstalk.use(self.beanstalkd_tube) self.beanstalk.watch(self.beanstalkd_tube) def _handle_beanstalk_event(self, conn_error): try: job_id, data = self.beanstalk.reserve() if conn_error: self.logger.warn("beanstalk reconnected") except ConnectionError: if not conn_error: self.logger.warn("beanstalk connection error") raise try: for chunk in self._fetch_chunks_from_event(job_id, data): yield chunk self.beanstalk.delete(job_id) except Exception: self.logger.exception("handling event %s (bury)", job_id) self.beanstalk.bury(job_id) def _fetch_chunks_from_beanstalk(self): conn_error = False while 1: try: self._connect_to_beanstalk() for chunk in self._handle_beanstalk_event(conn_error): conn_error = False yield chunk except ConnectionError as exc: self.logger.warn('Disconnected: %s', exc) if 'Invalid URL' in str(exc): raise conn_error = True time.sleep(1.0) def _fetch_chunks_from_file(self): with open(self.input_file, 'r') as ifile: for line in ifile: stripped = line.strip() if stripped and not stripped.startswith('#'): yield stripped.split('|', 3)[:3] + [None] def _fetch_chunks(self): if self.input_file: return self._fetch_chunks_from_file() elif self.beanstalkd_addr: return self._fetch_chunks_from_beanstalk() else: return self.rdir_client.chunk_fetch(self.volume, limit=self.rdir_fetch_limit, rebuild=True)
[docs] def rebuilder_pass_with_lock(self): success = False self.rdir_client.admin_lock(self.volume, "rebuilder on %s" % gethostname()) try: success = self.rebuilder_pass() finally: self.rdir_client.admin_unlock(self.volume) return success
def _create_worker(self, **kwargs): return BlobRebuilderWorker( self.conf, self.logger, self.volume, self.try_chunk_delete) def _fill_queue(self, queue, **kwargs): chunks = self._fetch_chunks() for chunk in chunks: queue.put(chunk) def _init_info(self, **kwargs): return 0 def _compute_info(self, worker, total_bytes_processed, **kwargs): total_bytes_processed += worker.total_bytes_processed return total_bytes_processed def _get_report(self, start_time, end_time, passes, errors, waiting_time, rebuilder_time, elapsed, total_chunks_processed, total_bytes_processed, **kwargs): return ('DONE %(volume)s ' 'started=%(start_time)s ' 'ended=%(end_time)s ' 'elapsed=%(elapsed).2f ' 'passes=%(passes)d ' 'errors=%(errors)d ' 'chunks=%(nb_chunks)d %(c_rate).2f/s ' 'bytes=%(nb_bytes)d %(b_rate).2fB/s ' 'waiting_time=%(waiting_time).2f ' 'rebuilder_time=%(rebuilder_time).2f ' '(rebuilder: %(success_rate).2f%%)' % { 'volume': self.volume, 'start_time': datetime.fromtimestamp( int(start_time)).isoformat(), 'end_time': datetime.fromtimestamp( int(end_time)).isoformat(), 'elapsed': elapsed, 'passes': passes, 'errors': errors, 'nb_chunks': total_chunks_processed, 'nb_bytes': total_bytes_processed, 'c_rate': total_chunks_processed / elapsed, 'b_rate': total_bytes_processed / elapsed, 'rebuilder_time': rebuilder_time, 'waiting_time': waiting_time, 'success_rate': 100 * ((total_chunks_processed - errors) / float(total_chunks_processed or 1)) })
[docs]class BlobRebuilderWorker(RebuilderWorker): def __init__(self, conf, logger, volume, try_chunk_delete=False, **kwargs): super(BlobRebuilderWorker, self).__init__(conf, logger, **kwargs) self.volume = volume self.bytes_processed = 0 self.total_bytes_processed = 0 self.dry_run = true_value( conf.get('dry_run', False)) self.allow_same_rawx = true_value( conf.get('allow_same_rawx')) self.rdir_client = RdirClient(conf, logger=self.logger) self.content_factory = ContentFactory(conf, logger=self.logger) self.try_chunk_delete = try_chunk_delete def _rebuild_one(self, chunk, **kwargs): cid, content_id, chunk_id_or_pos, _ = chunk if self.dry_run: self.dryrun_chunk_rebuild(cid, content_id, chunk_id_or_pos) else: self.safe_chunk_rebuild(cid, content_id, chunk_id_or_pos) def _get_report(self, num, start_time, end_time, total_time, report_time, **kwargs): return ('RUN %(volume)s ' 'worker=%(num)d ' 'started=%(start_time)s ' 'passes=%(passes)d ' 'errors=%(errors)d ' 'chunks=%(nb_chunks)d %(c_rate).2f/s ' 'bytes=%(nb_bytes)d %(b_rate).2fB/s ' 'waiting_time=%(waiting_time).2f ' 'rebuilder_time=%(rebuilder_time).2f ' 'total_time=%(total_time).2f ' '(rebuilder: %(success_rate).2f%%)' % { 'volume': self.volume, 'num': num, 'start_time': datetime.fromtimestamp( int(report_time)).isoformat(), 'passes': self.passes, 'errors': self.errors, 'nb_chunks': self.total_items_processed, 'nb_bytes': self.total_bytes_processed, 'c_rate': self.passes / (end_time - report_time), 'b_rate': self.bytes_processed / (end_time - report_time), 'waiting_time': self.waiting_time, 'rebuilder_time': self.rebuilder_time, 'total_time': (end_time - start_time), 'success_rate': 100 * ((self.total_items_processed - self.errors) / float(self.total_items_processed)) })
[docs] def dryrun_chunk_rebuild(self, container_id, content_id, chunk_id_or_pos): self.logger.info("[dryrun] Rebuilding " "container %s, content %s, chunk %s", container_id, content_id, chunk_id_or_pos) self.passes += 1
[docs] def safe_chunk_rebuild(self, container_id, content_id, chunk_id_or_pos): try: self.chunk_rebuild(container_id, content_id, chunk_id_or_pos) except Exception as e: self.errors += 1 self.logger.error('ERROR while rebuilding chunk %s|%s|%s: %s', container_id, content_id, chunk_id_or_pos, e) self.passes += 1
[docs] def chunk_rebuild(self, container_id, content_id, chunk_id_or_pos): self.logger.info('Rebuilding (container %s, content %s, chunk %s)', container_id, content_id, chunk_id_or_pos) try: content = self.content_factory.get(container_id, content_id) except ContentNotFound: raise OrphanChunk('Content not found: possible orphan chunk') chunk_size = 0 chunk_pos = None if len(chunk_id_or_pos) < 32: chunk_pos = chunk_id_or_pos chunk_id = None metapos = int(chunk_pos.split('.', 1)[0]) chunk_size = content.chunks.filter(metapos=metapos).all()[0].size else: if '/' in chunk_id_or_pos: chunk_id = chunk_id_or_pos.rsplit('/', 1)[-1] else: chunk_id = chunk_id_or_pos chunk = content.chunks.filter(id=chunk_id).one() if chunk is None: raise OrphanChunk(("Chunk not found in content:" 'possible orphan chunk')) elif self.volume and chunk.host != self.volume: raise ValueError("Chunk does not belong to this volume") chunk_size = chunk.size content.rebuild_chunk(chunk_id, allow_same_rawx=self.allow_same_rawx, chunk_pos=chunk_pos) if self.try_chunk_delete: try: content.blob_client.chunk_delete(chunk.url) self.logger.info("Chunk %s deleted", chunk.url) except NotFound as exc: self.logger.debug("Chunk %s: %s", chunk.url, exc) # This call does not raise exception if chunk is not referenced if chunk_id is not None: self.rdir_client.chunk_delete(chunk.host, container_id, content_id, chunk_id) self.bytes_processed += chunk_size self.total_bytes_processed += chunk_size