Source code for oio.xcute.jobs.meta2_rebuilder
# Copyright (C) 2020 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.
from collections import Counter
from oio.directory.meta2 import Meta2Database
from oio.rdir.client import RdirClient
from oio.xcute.common.job import XcuteTask
from oio.xcute.jobs.common import XcuteRdirJob
[docs]class Meta2RebuildTask(XcuteTask):
def __init__(self, conf, job_params, logger=None):
super(Meta2RebuildTask, self).__init__(
conf, job_params, logger=logger)
self.meta2_id = job_params['service_id']
self.meta2 = Meta2Database(conf, logger=logger)
[docs] def process(self, task_id, task_payload, reqid=None):
container_id = task_payload['container_id']
rebuilt = self.meta2.rebuild(container_id, reqid=reqid)
resp = Counter()
for res in rebuilt:
if res['err'] is not None:
resp['errors'] += 1
continue
resp['rebuilt_seq'] += 1
return resp
[docs]class Meta2RebuildJob(XcuteRdirJob):
JOB_TYPE = 'meta2-rebuild'
TASK_CLASS = Meta2RebuildTask
[docs] @classmethod
def sanitize_params(cls, job_params):
sanitized_job_params, _ = super(
Meta2RebuildJob, cls).sanitize_params(job_params)
# specific configuration
service_id = job_params.get('service_id')
if not service_id:
raise ValueError('Missing service ID')
sanitized_job_params['service_id'] = service_id
return sanitized_job_params, 'meta2/%s' % service_id
def __init__(self, conf, logger=None):
super(Meta2RebuildJob, self).__init__(conf, logger=logger)
self.rdir_client = RdirClient(conf, logger=logger)
[docs] def get_tasks(self, job_params, marker=None):
containers_it = self._containers_from_rdir(job_params, marker)
for url, container_id in containers_it:
yield url, dict(container_id=container_id)
[docs] def get_total_tasks(self, job_params, marker=None):
containers_it = self._containers_from_rdir(job_params, marker)
i = 0
for i, (url, _) in enumerate(containers_it, 1):
if i % 1000 == 0:
yield url, 1000
remaining = i % 1000
if remaining == 0:
return
yield marker, remaining
def _containers_from_rdir(self, job_params, marker):
service_id = job_params['service_id']
rdir_fetch_limit = job_params['rdir_fetch_limit']
rdir_timeout = job_params['rdir_timeout']
containers = self.rdir_client.meta2_index_fetch_all(
service_id, marker=marker, timeout=rdir_timeout,
limit=rdir_fetch_limit)
for container_info in containers:
container_url = container_info['container_url']
container_id = container_info['container_id']
yield container_url, container_id