Source code for oio.rebuilder.meta1_rebuilder

# Copyright (C) 2018 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


from datetime import datetime

from oio.common.configuration import load_namespace_conf
from oio.common.utils import cid_from_name
from oio.conscience.client import ConscienceClient
from oio.rebuilder.meta_rebuilder import MetaRebuilder, MetaRebuilderWorker


[docs]class Meta1Rebuilder(MetaRebuilder): def __init__(self, conf, logger, **kwargs): super(Meta1Rebuilder, self).__init__(conf, logger, None, **kwargs) self.conscience = ConscienceClient(self.conf, logger=self.logger) sds_conf = load_namespace_conf(self.conf['namespace']) or {} self.meta1_digits = int(sds_conf.get('ns.meta1_digits', sds_conf.get('meta1_digits', 4))) def _create_worker(self, **kwargs): return Meta1RebuilderWorker(self, **kwargs) def _fill_queue(self, queue, **kwargs): if self._fill_queue_from_file(queue, **kwargs): return prefixes = set() rawx_services = self.conscience.all_services('rawx') for rawx in rawx_services: cid = cid_from_name('_RDIR', rawx['addr']) prefix = cid[:self.meta1_digits] if prefix not in prefixes: queue.put(prefix.ljust(64, '0')) prefixes.add(prefix) accounts = self.api.account_list() for account in accounts: containers = self._full_container_list(account) for container in containers: cid = cid_from_name(account, container[0]) prefix = cid[:self.meta1_digits] if prefix not in prefixes: queue.put(prefix.ljust(64, '0')) prefixes.add(prefix) if not self.running: break def _item_to_string(self, prefix, **kwargs): return 'prefix %s' % prefix def _get_report(self, status, end_time, counters, **kwargs): prefixes_processed, errors, total_prefixes_processed, \ total_errors = counters time_since_last_report = (end_time - self.last_report) or 0.00001 total_time = (end_time - self.start_time) or 0.00001 return ('%(status)s volume=%(volume)s ' 'last_report=%(last_report)s %(time_since_last_report).2fs ' 'prefixes=%(prefixes)d %(prefixes_rate).2f/s ' 'errors=%(errors)d %(errors_rate).2f%% ' 'start_time=%(start_time)s %(total_time).2fs ' 'total_prefixes=%(total_prefixes)d ' '%(total_prefixes_rate).2f/s ' 'total_errors=%(total_errors)d %(total_errors_rate).2f%%' % { 'status': status, 'volume': self.volume, 'last_report': datetime.fromtimestamp( int(self.last_report)).isoformat(), 'time_since_last_report': time_since_last_report, 'prefixes': prefixes_processed, 'prefixes_rate': prefixes_processed / time_since_last_report, 'errors': errors, 'errors_rate': 100 * errors / float(prefixes_processed or 1), 'start_time': datetime.fromtimestamp( int(self.start_time)).isoformat(), 'total_time': total_time, 'total_prefixes': total_prefixes_processed, 'total_prefixes_rate': total_prefixes_processed / total_time, 'total_errors': total_errors, 'total_errors_rate': 100 * total_errors / float(total_prefixes_processed or 1) })
[docs]class Meta1RebuilderWorker(MetaRebuilderWorker): def __init__(self, rebuilder, **kwargs): super(Meta1RebuilderWorker, self).__init__(rebuilder, 'meta1', **kwargs)