oio.xcute.common package

Submodules

oio.xcute.common.backend module

class oio.xcute.common.backend.XcuteBackend(conf, logger=None)[source]

Bases: oio.common.redis_conn.RedisConnection

DEFAULT_LIMIT = 1000
create(*args, **kwargs)[source]
delete(*args, **kwargs)[source]
fail(*args, **kwargs)[source]
free(*args, **kwargs)[source]
get_job_info(*args, **kwargs)[source]
get_lock_info(*args, **kwargs)[source]
incr_total_tasks(*args, **kwargs)[source]
key_job_ids = 'xcute:job:ids'
key_job_info = 'xcute:job:info:%s'
key_locks = 'xcute:locks'
key_orchestrator_jobs = 'xcute:orchestrator:jobs:%s'
key_tasks_running = 'xcute:tasks:running:%s'
key_waiting_jobs = 'xcute:waiting:jobs'
list_jobs(marker=None, limit=1000)[source]
list_locks(*args, **kwargs)[source]
list_orchestrator_jobs(orchestrator_id)[source]
lua_create = "\n local mtime = KEYS[1];\n local job_id = KEYS[2];\n local job_type = KEYS[3];\n local job_config = KEYS[4];\n local lock = KEYS[5];\n\n local job_exists = redis.call('EXISTS', 'xcute:job:info:' .. job_id);\n if job_exists == 1 then\n return redis.error_reply('job_exists');\n end;\n\n local lock_exists = redis.call('HEXISTS', 'xcute:locks', lock);\n if lock_exists ~= 0 then\n return redis.error_reply('lock_exists:' .. lock);\n end;\n\n redis.call('HSET', 'xcute:locks', lock, job_id);\n\n redis.call('ZADD', 'xcute:job:ids', 0, job_id);\n redis.call(\n 'HMSET', 'xcute:job:info:' .. job_id,\n 'job.id', job_id,\n 'job.type', job_type,\n 'job.status', 'WAITING',\n 'job.request_pause', 'False',\n 'job.lock', lock,\n 'tasks.all_sent', 'False',\n 'tasks.sent', '0',\n 'tasks.processed', '0',\n 'tasks.total', '0',\n 'tasks.is_total_temp', 'True',\n 'errors.total', '0',\n 'config', job_config);\n\n redis.call('RPUSH', 'xcute:waiting:jobs', job_id);\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.ctime', mtime);\n "
lua_delete = "\n local job_id = KEYS[1];\n\n local info = redis.call('HMGET', 'xcute:job:info:' .. job_id,\n 'job.status', 'job.lock');\n local status = info[1];\n local lock = info[2];\n if status == nil or status == false then\n return redis.error_reply('no_job');\n end;\n\n if status == 'RUNNING' then\n return redis.error_reply('job_running');\n end;\n\n if status == 'WAITING' then\n redis.call('LREM', 'xcute:waiting:jobs', 1, job_id);\n end;\n\n redis.call('ZREM', 'xcute:job:ids', job_id);\n redis.call('DEL', 'xcute:job:info:' .. job_id);\n redis.call('DEL', 'xcute:tasks:running:' .. job_id);\n\n local lock_job_id = redis.call('HGET', 'xcute:locks', lock);\n\n if lock_job_id == job_id then\n redis.call('HDEL', 'xcute:locks', lock);\n end;\n "
lua_fail = "\n local mtime = KEYS[1];\n local job_id = KEYS[2];\n\n local info = redis.call('HMGET', 'xcute:job:info:' .. job_id,\n 'job.status', 'job.lock');\n local status = info[1];\n local lock = info[2];\n if status == nil or status == false then\n return redis.error_reply('no_job');\n end;\n\n if status ~= 'RUNNING' then\n return redis.error_reply('job_must_be_running');\n end;\n\n redis.call('HSET', 'xcute:job:info:' .. job_id,\n 'job.status', 'FAILED');\n redis.call('HDEL', 'xcute:locks', lock);\n -- remove the job of the orchestrator\n local orchestrator_id = redis.call(\n 'HGET', 'xcute:job:info:' .. job_id, 'orchestrator.id');\n redis.call('SREM', 'xcute:orchestrator:jobs:' .. orchestrator_id,\n job_id);\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n "
lua_free = "\n local mtime = KEYS[1];\n local job_id = KEYS[2];\n\n local status = redis.call('HGET', 'xcute:job:info:' .. job_id,\n 'job.status');\n if status ~= 'RUNNING' then\n return redis.error_reply('job_must_be_running');\n end;\n\n local orchestrator_id = redis.call(\n 'HGET', 'xcute:job:info:' .. job_id, 'orchestrator.id');\n redis.call('SREM', 'xcute:orchestrator:jobs:' .. orchestrator_id,\n job_id);\n\n redis.call('HSET', 'xcute:job:info:' .. job_id,\n 'job.status', 'WAITING');\n redis.call('LPUSH', 'xcute:waiting:jobs', job_id);\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n "
lua_incr_total = "\n local mtime = KEYS[1];\n local job_id = KEYS[2];\n local marker = KEYS[3];\n local incr_by = KEYS[4];\n local info_key = 'xcute:job:info:' .. job_id;\n\n local info = redis.call(\n 'HMGET', info_key,\n 'job.status', 'tasks.all_sent', 'tasks.is_total_temp');\n local status = info[1];\n local all_sent = info[2];\n local is_total_temp = info[3];\n\n if status == nil or status == false then\n return redis.error_reply('no_job');\n end;\n\n local stop = false;\n if all_sent == 'True' then\n stop = true\n elseif is_total_temp == 'False' then\n stop = true\n else\n redis.call('HINCRBY', info_key, 'tasks.total', incr_by);\n redis.call('HSET', info_key, 'tasks.total_marker', marker);\n\n if status == 'PAUSED' or status == 'FAILED' then\n stop = true;\n end;\n end;\n\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n return stop;\n "
lua_request_pause = "\n local mtime = KEYS[1];\n local job_id = KEYS[2];\n\n local status = redis.call('HGET', 'xcute:job:info:' .. job_id,\n 'job.status');\n if status == nil or status == false then\n return redis.error_reply('no_job');\n end;\n\n if status == 'WAITING' then\n redis.call('HSET', 'xcute:job:info:' .. job_id,\n 'job.status', 'PAUSED');\n redis.call('LREM', 'xcute:waiting:jobs', 1, job_id);\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n return;\n end;\n\n if status == 'RUNNING' then\n local all_tasks_sent = redis.call(\n 'HGET', 'xcute:job:info:' .. job_id, 'tasks.all_sent');\n if all_tasks_sent == 'True' then\n return redis.error_reply(\n 'job_cannot_be_paused_all_tasks_sent');\n else\n redis.call('HSET', 'xcute:job:info:' .. job_id,\n 'job.request_pause', 'True');\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n return;\n end;\n end;\n\n if status == 'PAUSED' then\n return redis.error_reply('job_already_paused');\n end;\n\n if status == 'FINISHED' then\n return redis.error_reply('job_finished');\n end;\n "
lua_resume = "\n local mtime = KEYS[1];\n local job_id = KEYS[2];\n\n local status = redis.call('HGET', 'xcute:job:info:' .. job_id,\n 'job.status');\n if status == nil or status == false then\n return redis.error_reply('no_job');\n end;\n\n if status == 'WAITING' then\n return redis.error_reply('job_already_waiting');\n end;\n\n if status == 'RUNNING' then\n local request_pause = redis.call(\n 'HGET', 'xcute:job:info:' .. job_id, 'job.request_pause');\n if request_pause == 'True' then\n redis.call('HSET', 'xcute:job:info:' .. job_id,\n 'job.request_pause', 'False');\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n return;\n else\n return redis.error_reply('job_already_running');\n end;\n end;\n\n if status == 'PAUSED' then\n redis.call('HSET', 'xcute:job:info:' .. job_id,\n 'job.request_pause', 'False');\n redis.call('HSET', 'xcute:job:info:' .. job_id,\n 'job.status', 'WAITING');\n redis.call('RPUSH', 'xcute:waiting:jobs', job_id);\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n return;\n end;\n\n if status == 'RUNNING_LAST_TASKS' then\n return redis.error_reply('job_already_running');\n end;\n\n if status == 'FINISHED' then\n return redis.error_reply('job_finished');\n end;\n "
lua_run_next = "\n local mtime = KEYS[1];\n local orchestrator_id = KEYS[2];\n\n local job_id = redis.call('LPOP', 'xcute:waiting:jobs');\n if job_id == nil or job_id == false then\n return nil;\n end;\n\n redis.call('HMSET', 'xcute:job:info:' .. job_id,\n 'job.status', 'RUNNING',\n 'orchestrator.id', orchestrator_id);\n redis.call('SADD', 'xcute:orchestrator:jobs:' .. orchestrator_id,\n job_id);\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n local job_info = redis.call('HGETALL', 'xcute:job:info:' .. job_id);\n return job_info;\n "
lua_total_tasks_done = "\n local mtime = KEYS[1];\n local job_id = KEYS[2];\n local info_key = 'xcute:job:info:' .. job_id;\n\n local status = redis.call('HGET', info_key, 'job.status');\n if status == nil or status == false then\n return redis.error_reply('no_job');\n end;\n\n redis.call('HSET', info_key, 'tasks.is_total_temp', 'False');\n local total_tasks = redis.call('HGET', info_key, 'tasks.total');\n\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n return tonumber(total_tasks);\n "
lua_update_tasks_processed = "\n local function get_counters(tbl, first, last)\n local sliced = {}\n for i = first or 1, last or #tbl, 2 do\n sliced[tbl[i]] = tbl[i+1];\n end;\n return sliced;\n end;\n\n local mtime = KEYS[1];\n local job_id = KEYS[2];\n local counters = get_counters(KEYS, 3, nil);\n local tasks_processed = ARGV;\n\n local info = redis.call('HMGET', 'xcute:job:info:' .. job_id,\n 'job.status', 'job.lock');\n local status = info[1];\n local lock = info[2];\n if status == nil or status == false then\n return redis.error_reply('no_job');\n end;\n\n local nb_tasks_processed = redis.call(\n 'SREM', 'xcute:tasks:running:' .. job_id, unpack(tasks_processed));\n local total_tasks_processed = redis.call(\n 'HINCRBY', 'xcute:job:info:' .. job_id,\n 'tasks.processed', nb_tasks_processed);\n\n for key, value in pairs(counters) do\n redis.call('HINCRBY', 'xcute:job:info:' .. job_id,\n key, value);\n end;\n\n local finished = false;\n local all_tasks_sent = redis.call(\n 'HGET', 'xcute:job:info:' .. job_id, 'tasks.all_sent');\n if all_tasks_sent == 'True' and status ~= 'FINISHED' then\n local total_tasks_sent = redis.call(\n 'HGET', 'xcute:job:info:' .. job_id, 'tasks.sent');\n if tonumber(total_tasks_processed) >= tonumber(\n total_tasks_sent) then\n redis.call('HSET', 'xcute:job:info:' .. job_id,\n 'job.status', 'FINISHED');\n redis.call('HDEL', 'xcute:locks', lock);\n finished = true;\n end;\n end;\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n return finished;\n "
lua_update_tasks_sent = "\n local mtime = KEYS[1];\n local job_id = KEYS[2];\n local all_tasks_sent = KEYS[3];\n local tasks_sent = ARGV;\n local tasks_sent_length = #tasks_sent;\n local info_key = 'xcute:job:info:' .. job_id;\n\n local info = redis.call('HMGET', info_key, 'job.status', 'job.lock');\n local status = info[1];\n local lock = info[2];\n if status == nil or status == false then\n return redis.error_reply('no_job');\n end;\n\n local nb_tasks_sent = 0;\n if tasks_sent_length > 0 then\n nb_tasks_sent = redis.call(\n 'SADD', 'xcute:tasks:running:' .. job_id, unpack(tasks_sent));\n redis.call('HSET', info_key,\n 'tasks.last_sent', tasks_sent[tasks_sent_length]);\n end;\n local total_tasks_sent = redis.call(\n 'HINCRBY', info_key,\n 'tasks.sent', nb_tasks_sent);\n\n if all_tasks_sent == 'True' then\n redis.call('HSET', info_key,\n 'tasks.all_sent', 'True');\n -- remove the job of the orchestrator\n local orchestrator_id = redis.call(\n 'HGET', info_key, 'orchestrator.id');\n redis.call('SREM', 'xcute:orchestrator:jobs:' .. orchestrator_id,\n job_id);\n\n local total_tasks_processed = redis.call(\n 'HGET', 'xcute:job:info:' .. job_id, 'tasks.processed');\n if tonumber(total_tasks_processed) >= tonumber(\n total_tasks_sent) then\n redis.call('HSET', 'xcute:job:info:' .. job_id,\n 'job.status', 'FINISHED');\n redis.call('HDEL', 'xcute:locks', lock);\n end;\n else\n local request_pause = redis.call(\n 'HGET', info_key, 'job.request_pause');\n if request_pause == 'True' then\n -- if waiting pause, pause the job\n redis.call('HMSET', info_key,\n 'job.status', 'PAUSED',\n 'job.request_pause', 'False');\n local orchestrator_id = redis.call(\n 'HGET', info_key, 'orchestrator.id');\n redis.call(\n 'SREM', 'xcute:orchestrator:jobs:' .. orchestrator_id,\n job_id);\n end;\n end;\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n return {nb_tasks_sent, redis.call('HGET', info_key, 'job.status')};\n "
request_pause(*args, **kwargs)[source]
resume(*args, **kwargs)[source]
run_next(*args, **kwargs)[source]
status()[source]
total_tasks_done(*args, **kwargs)[source]
update_tasks_processed(*args, **kwargs)[source]
update_tasks_sent(*args, **kwargs)[source]
oio.xcute.common.backend.handle_redis_exceptions(func)[source]

oio.xcute.common.job module

class oio.xcute.common.job.XcuteJob(conf, logger=None)[source]

Bases: object

DEFAULT_TASKS_PER_SECOND = 32
JOB_TYPE = None
MAX_TASKS_BATCH_SIZE = 64
TASK_CLASS = None
get_tasks(job_params, marker=None)[source]

Yields the job tasks as (task_id, task_payload) task_id must be a string and can be used as a marker

get_total_tasks(job_params, marker=None)[source]

Yields numbers of tasks as (marker, tasks_incr) The sum of all tasks_incr yielded must be the total of tasks in the job NB: do not define if not needed

prepare(job_params)[source]

Allow to execute code only once when the job is run for the first time. This method is executed before the generation of the tasks and the total.

classmethod sanitize_config(job_config)[source]

Validate and sanitize the job configuration Ex: cast a string as integer, set a default Also return the lock id if there is one

classmethod sanitize_params(job_params)[source]

Validate and sanitize the job parameters Ex: cast a string as integer, set a default Also return the lock id if there is one

class oio.xcute.common.job.XcuteTask(conf, job_params, logger=None)[source]

Bases: object

process(task_id, task_payload)[source]

oio.xcute.common.worker module

class oio.xcute.common.worker.XcuteWorker(conf, logger=None)[source]

Bases: object

process(beanstalkd_job)[source]
reply(job_id, task_ids, task_results, task_errors, beanstalkd_reply_info)[source]

Module contents