# Copyright (C) 2019-2020 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.
import time
from oio.blob.operator import ChunkOperator
from oio.common.easy_value import boolean_value, float_value, int_value
from oio.common.exceptions import ContentNotFound, OrphanChunk
from oio.rdir.client import RdirClient
from oio.xcute.common.job import XcuteTask
from oio.xcute.jobs.common import XcuteRdirJob
[docs]class RawxRebuildTask(XcuteTask):
def __init__(self, conf, job_params, logger=None):
super(RawxRebuildTask, self).__init__(
conf, job_params, logger=logger)
self.service_id = job_params['service_id']
self.rawx_timeout = job_params['rawx_timeout']
self.allow_frozen_container = job_params['allow_frozen_container']
self.allow_same_rawx = job_params['allow_same_rawx']
self.try_chunk_delete = job_params['try_chunk_delete']
self.dry_run = job_params['dry_run']
self.chunk_operator = ChunkOperator(self.conf, logger=self.logger)
[docs] def process(self, task_id, task_payload, reqid=None):
container_id = task_payload['container_id']
content_id = task_payload['content_id']
chunk_id = task_payload['chunk_id']
if self.dry_run:
self.logger.debug('[reqid=%s] [dryrun] Rebuilding %s',
reqid, chunk_id)
return {'skipped_chunks': 1}
# Start rebuilding the chunk
self.logger.debug('[reqid=%s] Rebuilding %s', reqid, chunk_id)
try:
chunk_size = self.chunk_operator.rebuild(
container_id, content_id, chunk_id,
rawx_id=self.service_id,
try_chunk_delete=self.try_chunk_delete,
allow_frozen_container=self.allow_frozen_container,
allow_same_rawx=self.allow_same_rawx)
except (ContentNotFound, OrphanChunk):
return {'orphan_chunks': 1}
return {'rebuilt_chunks': 1, 'rebuilt_bytes': chunk_size}
[docs]class RawxRebuildJob(XcuteRdirJob):
JOB_TYPE = 'rawx-rebuild'
TASK_CLASS = RawxRebuildTask
DEFAULT_RAWX_TIMEOUT = 60.0
DEFAULT_DRY_RUN = False
DEFAULT_ALLOW_SAME_RAWX = True
DEFAULT_TRY_CHUNK_DELETE = False
DEFAULT_ALLOW_FROZEN_CT = False
DEFAULT_DECLARE_INCIDENT_DATE = False
[docs] @classmethod
def sanitize_params(cls, job_params):
sanitized_job_params, _ = super(
RawxRebuildJob, cls).sanitize_params(job_params)
# specific configuration
service_id = job_params.get('service_id')
if not service_id:
raise ValueError('Missing service ID')
sanitized_job_params['service_id'] = service_id
sanitized_job_params['rawx_timeout'] = float_value(
job_params.get('rawx_timeout'),
cls.DEFAULT_RAWX_TIMEOUT)
sanitized_job_params['dry_run'] = boolean_value(
job_params.get('dry_run'),
cls.DEFAULT_DRY_RUN)
sanitized_job_params['allow_same_rawx'] = boolean_value(
job_params.get('allow_same_rawx'),
cls.DEFAULT_ALLOW_SAME_RAWX)
sanitized_job_params['try_chunk_delete'] = boolean_value(
job_params.get('try_chunk_delete'),
cls.DEFAULT_TRY_CHUNK_DELETE)
sanitized_job_params['allow_frozen_container'] = boolean_value(
job_params.get('allow_frozen_container'),
cls.DEFAULT_ALLOW_FROZEN_CT)
set_specific_incident_date = int_value(
job_params.get('set_specific_incident_date'),
None)
if set_specific_incident_date is None:
set_incident_date = boolean_value(
job_params.get('set_incident_date'),
cls.DEFAULT_DECLARE_INCIDENT_DATE)
if set_incident_date:
set_specific_incident_date = int(time.time())
else:
set_incident_date = True
sanitized_job_params['set_incident_date'] = set_incident_date
sanitized_job_params['set_specific_incident_date'] = \
set_specific_incident_date
return sanitized_job_params, 'rawx/%s' % service_id
def __init__(self, conf, logger=None):
super(RawxRebuildJob, self).__init__(conf, logger=logger)
self.rdir_client = RdirClient(self.conf, logger=self.logger)
[docs] def prepare(self, job_params):
service_id = job_params['service_id']
rdir_timeout = job_params['rdir_timeout']
set_incident_date = job_params['set_incident_date']
set_specific_incident_date = job_params['set_specific_incident_date']
if not set_incident_date:
return
self.rdir_client.admin_incident_set(
service_id, set_specific_incident_date, timeout=rdir_timeout)
[docs] def get_tasks(self, job_params, marker=None):
chunk_infos = self.get_chunk_infos(job_params, marker=marker)
for container_id, content_id, chunk_id, _ in chunk_infos:
task_id = '|'.join((container_id, content_id, chunk_id))
yield task_id, {'container_id': container_id,
'content_id': content_id,
'chunk_id': chunk_id}
[docs] def get_total_tasks(self, job_params, marker=None):
chunk_infos = self.get_chunk_infos(job_params, marker=marker)
i = 0
for i, (container_id, content_id, chunk_id, _) \
in enumerate(chunk_infos, 1):
if i % 1000 == 0:
yield '|'.join((container_id, content_id, chunk_id)), 1000
remaining = i % 1000
if remaining > 0:
yield '|'.join((container_id, content_id, chunk_id)), remaining
[docs] def get_chunk_infos(self, job_params, marker=None):
service_id = job_params['service_id']
rdir_fetch_limit = job_params['rdir_fetch_limit']
rdir_timeout = job_params['rdir_timeout']
chunk_infos = self.rdir_client.chunk_fetch(
service_id, rebuild=True, timeout=rdir_timeout,
limit=rdir_fetch_limit, start_after=marker)
return chunk_infos