Source code for oio.rebuilder.meta_rebuilder

# Copyright (C) 2018 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import time
import sys

from oio import ObjectStorageApi
from oio.common.exceptions import NotFound, OioTimeout, ServiceBusy
from oio.directory.admin import AdminClient
from oio.rebuilder.rebuilder import Rebuilder, RebuilderWorker


[docs]class MetaRebuilder(Rebuilder): """ Abstract class for directory rebuilders. """ def __init__(self, conf, logger, volume, **kwargs): super(MetaRebuilder, self).__init__(conf, logger, volume, **kwargs) self.api = ObjectStorageApi(self.namespace, logger=self.logger, **kwargs) def _fill_queue_from_file(self, queue, **kwargs): if self.input_file is None: return False with open(self.input_file, 'r') as ifile: for line in ifile: stripped = line.strip() if stripped and not stripped.startswith('#'): queue.put(stripped) if not self.running: break return True def _full_container_list(self, account, **kwargs): listing = self.api.container_list(account, **kwargs) for element in listing: yield element while listing: kwargs['marker'] = listing[-1][0] listing = self.api.container_list(account, **kwargs) if listing: for element in listing: yield element
[docs]class MetaRebuilderWorker(RebuilderWorker): def __init__(self, rebuilder, type_, max_attempts=5, **kwargs): super(MetaRebuilderWorker, self).__init__(rebuilder, **kwargs) self.type = type_ self.max_attempts = max_attempts self.admin_client = AdminClient(self.conf, logger=self.logger) def _rebuild_one(self, cid, **kwargs): missing_base = False for attempts in range(self.max_attempts): try: if not missing_base: # Check if the bases exist info = self.admin_client.has_base(self.type, cid=cid) for meta in info.values(): if meta['status']['status'] != 200: missing_base = True break if missing_base: self.admin_client.election_leave(self.type, cid=cid) # TODO(adu): use self.admin_client.election_sync # Setting a property will trigger a database replication properties = {'sys.last_rebuild': str(int(time.time()))} self.admin_client.set_properties(self.type, cid=cid, properties=properties) break except Exception as err: if attempts < self.max_attempts - 1: if isinstance(err, NotFound): self.logger.warn('%s: %s', cid, err) continue if isinstance(err, OioTimeout) \ or isinstance(err, ServiceBusy): self.logger.warn('%s: %s', cid, err) time.sleep(attempts * 0.5) continue self.logger.error('ERROR while rebuilding %s: %s', cid, err) sys.stdout.write(cid+'\n') break