Source code for oio.xcute.common.job

# Copyright (C) 2019 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.

from oio.common.easy_value import int_value
from oio.common.logger import get_logger


[docs]class XcuteTask(object): def __init__(self, conf, job_params, logger=None): self.conf = conf self.logger = logger or get_logger(self.conf)
[docs] def process(self, task_id, task_payload): raise NotImplementedError()
[docs]class XcuteJob(object): JOB_TYPE = None TASK_CLASS = None DEFAULT_TASKS_PER_SECOND = 32 MAX_TASKS_BATCH_SIZE = 64 def __init__(self, conf, logger=None): self.conf = conf self.logger = logger or get_logger(self.conf)
[docs] @classmethod def sanitize_config(cls, job_config): """ Validate and sanitize the job configuration Ex: cast a string as integer, set a default Also return the lock id if there is one """ sanitized_job_config = dict() tasks_per_second = int_value( job_config.get('tasks_per_second'), cls.DEFAULT_TASKS_PER_SECOND) sanitized_job_config['tasks_per_second'] = tasks_per_second tasks_batch_size = int_value( job_config.get('tasks_batch_size'), None) if tasks_batch_size is None: if tasks_per_second > 0: tasks_batch_size = min( tasks_per_second, cls.MAX_TASKS_BATCH_SIZE) else: tasks_batch_size = cls.MAX_TASKS_BATCH_SIZE elif tasks_batch_size < 1: raise ValueError('Tasks batch size should be positive') elif tasks_batch_size > cls.MAX_TASKS_BATCH_SIZE: raise ValueError('Tasks batch size should be less than %d' % cls.MAX_TASKS_BATCH_SIZE) sanitized_job_config['tasks_batch_size'] = tasks_batch_size sanitized_job_params, lock = cls.sanitize_params( job_config.get('params') or dict()) sanitized_job_config['params'] = sanitized_job_params return sanitized_job_config, lock
[docs] @classmethod def sanitize_params(cls, job_params): """ Validate and sanitize the job parameters Ex: cast a string as integer, set a default Also return the lock id if there is one """ sanitized_job_params = dict() return sanitized_job_params, None
[docs] def prepare(self, job_params): """ Allow to execute code only once when the job is run for the first time. This method is executed before the generation of the tasks and the total. """ pass
[docs] def get_tasks(self, job_params, marker=None): """ Yields the job tasks as (task_id, task_payload) task_id must be a string and can be used as a marker """ raise NotImplementedError()
[docs] def get_total_tasks(self, job_params, marker=None): """ Yields numbers of tasks as (marker, tasks_incr) The sum of all tasks_incr yielded must be the total of tasks in the job NB: do not define if not needed """ return None