Source code for oio.xcute.jobs.tester
# Copyright (C) 2019 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.
import random
import oio.common.exceptions as exc
from oio.common.easy_value import int_value
from oio.xcute.common.job import XcuteJob, XcuteTask
EXCEPTIONS = [exc.BadRequest,
exc.Forbidden,
exc.NotFound,
exc.MethodNotAllowed,
exc.Conflict,
exc.ClientPreconditionFailed,
exc.TooLarge,
exc.UnsatisfiableRange,
exc.ServiceBusy]
[docs]class TesterTask(XcuteTask):
def __init__(self, conf, job_params, logger=None):
super(TesterTask, self).__init__(
conf, job_params, logger=logger)
self.error_percentage = job_params['error_percentage']
[docs] def process(self, task_id, task_payload, reqid=None):
msg = task_payload['msg']
self.logger.info('[reqid=%s] Hello: %s', reqid, msg)
if self.error_percentage \
and random.randrange(100) < self.error_percentage:
exc_class = random.choice(EXCEPTIONS)
raise exc_class()
return {'counter': len(msg)}
[docs]class TesterJob(XcuteJob):
JOB_TYPE = 'tester'
TASK_CLASS = TesterTask
DEFAULT_START = 0
DEFAULT_END = 5
DEFAULT_ERROR_PERCENTAGE = 0
DEFAULT_LOCK = 'tester_lock'
[docs] @classmethod
def sanitize_params(cls, job_params):
sanitized_job_params, _ = super(
TesterJob, cls).sanitize_params(job_params)
sanitized_job_params['start'] = int_value(
job_params.get('start'), cls.DEFAULT_START)
sanitized_job_params['end'] = int_value(
job_params.get('end'), cls.DEFAULT_END)
sanitized_job_params['error_percentage'] = int_value(
job_params.get('error_percentage'),
cls.DEFAULT_ERROR_PERCENTAGE)
return sanitized_job_params, job_params.get('lock', cls.DEFAULT_LOCK)
[docs] def get_tasks(self, job_params, marker=None):
start = job_params['start']
end = job_params['end']
if marker:
start = int(marker) + 1
for i in range(start, end):
task_id = str(i)
task_payload = {'msg': 'World %d' % i}
yield (task_id, task_payload)
[docs] def get_total_tasks(self, job_params, marker=None):
start = job_params['start']
if marker is not None:
start = int(marker) + 1
end = job_params['end']
yield (str(end - 1), end - start)