oio.xcute.common package


oio.xcute.common.backend module

class oio.xcute.common.backend.XcuteBackend(conf, logger=None)[source]

Bases: oio.common.redis_conn.RedisConnection

create(*args, **kwargs)[source]
delete(*args, **kwargs)[source]
fail(*args, **kwargs)[source]
free(*args, **kwargs)[source]
get_job_info(*args, **kwargs)[source]
get_lock_info(*args, **kwargs)[source]
incr_total_tasks(*args, **kwargs)[source]
key_job_ids = 'xcute:job:ids'
key_job_info = 'xcute:job:info:%s'
key_locks = 'xcute:locks'
key_orchestrator_jobs = 'xcute:orchestrator:jobs:%s'
key_tasks_running = 'xcute:tasks:running:%s'
key_waiting_jobs = 'xcute:waiting:jobs'
list_jobs(marker=None, limit=1000)[source]
list_locks(*args, **kwargs)[source]
lua_create = "\n local mtime = KEYS[1];\n local job_id = KEYS[2];\n local job_type = KEYS[3];\n local job_config = KEYS[4];\n local lock = KEYS[5];\n\n local job_exists = redis.call('EXISTS', 'xcute:job:info:' .. job_id);\n if job_exists == 1 then\n return redis.error_reply('job_exists');\n end;\n\n local lock_exists = redis.call('HEXISTS', 'xcute:locks', lock);\n if lock_exists ~= 0 then\n return redis.error_reply('lock_exists:' .. lock);\n end;\n\n redis.call('HSET', 'xcute:locks', lock, job_id);\n\n redis.call('ZADD', 'xcute:job:ids', 0, job_id);\n redis.call(\n 'HMSET', 'xcute:job:info:' .. job_id,\n 'job.id', job_id,\n 'job.type', job_type,\n 'job.status', 'WAITING',\n 'job.request_pause', 'False',\n 'job.lock', lock,\n 'tasks.all_sent', 'False',\n 'tasks.sent', '0',\n 'tasks.processed', '0',\n 'tasks.total', '0',\n 'tasks.is_total_temp', 'True',\n 'errors.total', '0',\n 'config', job_config);\n\n redis.call('RPUSH', 'xcute:waiting:jobs', job_id);\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.ctime', mtime);\n "
lua_delete = "\n local job_id = KEYS[1];\n\n local info = redis.call('HMGET', 'xcute:job:info:' .. job_id,\n 'job.status', 'job.lock');\n local status = info[1];\n local lock = info[2];\n if status == nil or status == false then\n return redis.error_reply('no_job');\n end;\n\n if status == 'RUNNING' then\n return redis.error_reply('job_running');\n end;\n\n if status == 'WAITING' then\n redis.call('LREM', 'xcute:waiting:jobs', 1, job_id);\n end;\n\n redis.call('ZREM', 'xcute:job:ids', job_id);\n redis.call('DEL', 'xcute:job:info:' .. job_id);\n redis.call('DEL', 'xcute:tasks:running:' .. job_id);\n\n local lock_job_id = redis.call('HGET', 'xcute:locks', lock);\n\n if lock_job_id == job_id then\n redis.call('HDEL', 'xcute:locks', lock);\n end;\n "
lua_fail = "\n local mtime = KEYS[1];\n local job_id = KEYS[2];\n\n local info = redis.call('HMGET', 'xcute:job:info:' .. job_id,\n 'job.status', 'job.lock');\n local status = info[1];\n local lock = info[2];\n if status == nil or status == false then\n return redis.error_reply('no_job');\n end;\n\n if status ~= 'RUNNING' then\n return redis.error_reply('job_must_be_running');\n end;\n\n redis.call('HSET', 'xcute:job:info:' .. job_id,\n 'job.status', 'FAILED');\n redis.call('HDEL', 'xcute:locks', lock);\n -- remove the job of the orchestrator\n local orchestrator_id = redis.call(\n 'HGET', 'xcute:job:info:' .. job_id, 'orchestrator.id');\n redis.call('SREM', 'xcute:orchestrator:jobs:' .. orchestrator_id,\n job_id);\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n "
lua_free = "\n local mtime = KEYS[1];\n local job_id = KEYS[2];\n\n local status = redis.call('HGET', 'xcute:job:info:' .. job_id,\n 'job.status');\n if status ~= 'RUNNING' then\n return redis.error_reply('job_must_be_running');\n end;\n\n local orchestrator_id = redis.call(\n 'HGET', 'xcute:job:info:' .. job_id, 'orchestrator.id');\n redis.call('SREM', 'xcute:orchestrator:jobs:' .. orchestrator_id,\n job_id);\n\n redis.call('HSET', 'xcute:job:info:' .. job_id,\n 'job.status', 'WAITING');\n redis.call('LPUSH', 'xcute:waiting:jobs', job_id);\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n "
lua_incr_total = "\n local mtime = KEYS[1];\n local job_id = KEYS[2];\n local marker = KEYS[3];\n local incr_by = KEYS[4];\n local info_key = 'xcute:job:info:' .. job_id;\n\n local info = redis.call(\n 'HMGET', info_key,\n 'job.status', 'tasks.all_sent', 'tasks.is_total_temp');\n local status = info[1];\n local all_sent = info[2];\n local is_total_temp = info[3];\n\n if status == nil or status == false then\n return redis.error_reply('no_job');\n end;\n\n local stop = false;\n if all_sent == 'True' then\n stop = true\n elseif is_total_temp == 'False' then\n stop = true\n else\n redis.call('HINCRBY', info_key, 'tasks.total', incr_by);\n redis.call('HSET', info_key, 'tasks.total_marker', marker);\n\n if status == 'PAUSED' or status == 'FAILED' then\n stop = true;\n end;\n end;\n\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n return stop;\n "
lua_request_pause = "\n local mtime = KEYS[1];\n local job_id = KEYS[2];\n\n local status = redis.call('HGET', 'xcute:job:info:' .. job_id,\n 'job.status');\n if status == nil or status == false then\n return redis.error_reply('no_job');\n end;\n\n if status == 'WAITING' then\n redis.call('HSET', 'xcute:job:info:' .. job_id,\n 'job.status', 'PAUSED');\n redis.call('LREM', 'xcute:waiting:jobs', 1, job_id);\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n return;\n end;\n\n if status == 'RUNNING' then\n local all_tasks_sent = redis.call(\n 'HGET', 'xcute:job:info:' .. job_id, 'tasks.all_sent');\n if all_tasks_sent == 'True' then\n return redis.error_reply(\n 'job_cannot_be_paused_all_tasks_sent');\n else\n redis.call('HSET', 'xcute:job:info:' .. job_id,\n 'job.request_pause', 'True');\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n return;\n end;\n end;\n\n if status == 'PAUSED' then\n return redis.error_reply('job_already_paused');\n end;\n\n if status == 'FINISHED' then\n return redis.error_reply('job_finished');\n end;\n "
lua_resume = "\n local mtime = KEYS[1];\n local job_id = KEYS[2];\n\n local status = redis.call('HGET', 'xcute:job:info:' .. job_id,\n 'job.status');\n if status == nil or status == false then\n return redis.error_reply('no_job');\n end;\n\n if status == 'WAITING' then\n return redis.error_reply('job_already_waiting');\n end;\n\n if status == 'RUNNING' then\n local request_pause = redis.call(\n 'HGET', 'xcute:job:info:' .. job_id, 'job.request_pause');\n if request_pause == 'True' then\n redis.call('HSET', 'xcute:job:info:' .. job_id,\n 'job.request_pause', 'False');\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n return;\n else\n return redis.error_reply('job_already_running');\n end;\n end;\n\n if status == 'PAUSED' then\n redis.call('HSET', 'xcute:job:info:' .. job_id,\n 'job.request_pause', 'False');\n redis.call('HSET', 'xcute:job:info:' .. job_id,\n 'job.status', 'WAITING');\n redis.call('RPUSH', 'xcute:waiting:jobs', job_id);\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n return;\n end;\n\n if status == 'RUNNING_LAST_TASKS' then\n return redis.error_reply('job_already_running');\n end;\n\n if status == 'FINISHED' then\n return redis.error_reply('job_finished');\n end;\n "
lua_run_next = "\n local mtime = KEYS[1];\n local orchestrator_id = KEYS[2];\n\n local job_id = redis.call('LPOP', 'xcute:waiting:jobs');\n if job_id == nil or job_id == false then\n return nil;\n end;\n\n redis.call('HMSET', 'xcute:job:info:' .. job_id,\n 'job.status', 'RUNNING',\n 'orchestrator.id', orchestrator_id);\n redis.call('SADD', 'xcute:orchestrator:jobs:' .. orchestrator_id,\n job_id);\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n local job_info = redis.call('HGETALL', 'xcute:job:info:' .. job_id);\n return job_info;\n "
lua_total_tasks_done = "\n local mtime = KEYS[1];\n local job_id = KEYS[2];\n local info_key = 'xcute:job:info:' .. job_id;\n\n local status = redis.call('HGET', info_key, 'job.status');\n if status == nil or status == false then\n return redis.error_reply('no_job');\n end;\n\n redis.call('HSET', info_key, 'tasks.is_total_temp', 'False');\n local total_tasks = redis.call('HGET', info_key, 'tasks.total');\n\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n return tonumber(total_tasks);\n "
lua_update_tasks_processed = "\n local function get_counters(tbl, first, last)\n local sliced = {}\n for i = first or 1, last or #tbl, 2 do\n sliced[tbl[i]] = tbl[i+1];\n end;\n return sliced;\n end;\n\n local mtime = KEYS[1];\n local job_id = KEYS[2];\n local counters = get_counters(KEYS, 3, nil);\n local tasks_processed = ARGV;\n\n local info = redis.call('HMGET', 'xcute:job:info:' .. job_id,\n 'job.status', 'job.lock');\n local status = info[1];\n local lock = info[2];\n if status == nil or status == false then\n return redis.error_reply('no_job');\n end;\n\n local nb_tasks_processed = redis.call(\n 'SREM', 'xcute:tasks:running:' .. job_id, unpack(tasks_processed));\n local total_tasks_processed = redis.call(\n 'HINCRBY', 'xcute:job:info:' .. job_id,\n 'tasks.processed', nb_tasks_processed);\n\n for key, value in pairs(counters) do\n redis.call('HINCRBY', 'xcute:job:info:' .. job_id,\n key, value);\n end;\n\n local finished = false;\n local all_tasks_sent = redis.call(\n 'HGET', 'xcute:job:info:' .. job_id, 'tasks.all_sent');\n if all_tasks_sent == 'True' and status ~= 'FINISHED' then\n local total_tasks_sent = redis.call(\n 'HGET', 'xcute:job:info:' .. job_id, 'tasks.sent');\n if tonumber(total_tasks_processed) >= tonumber(\n total_tasks_sent) then\n redis.call('HSET', 'xcute:job:info:' .. job_id,\n 'job.status', 'FINISHED');\n redis.call('HDEL', 'xcute:locks', lock);\n finished = true;\n end;\n end;\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n return finished;\n "
lua_update_tasks_sent = "\n local mtime = KEYS[1];\n local job_id = KEYS[2];\n local all_tasks_sent = KEYS[3];\n local tasks_sent = ARGV;\n local tasks_sent_length = #tasks_sent;\n local info_key = 'xcute:job:info:' .. job_id;\n\n local info = redis.call('HMGET', info_key, 'job.status', 'job.lock');\n local status = info[1];\n local lock = info[2];\n if status == nil or status == false then\n return redis.error_reply('no_job');\n end;\n\n local nb_tasks_sent = 0;\n if tasks_sent_length > 0 then\n nb_tasks_sent = redis.call(\n 'SADD', 'xcute:tasks:running:' .. job_id, unpack(tasks_sent));\n redis.call('HSET', info_key,\n 'tasks.last_sent', tasks_sent[tasks_sent_length]);\n end;\n local total_tasks_sent = redis.call(\n 'HINCRBY', info_key,\n 'tasks.sent', nb_tasks_sent);\n\n if all_tasks_sent == 'True' then\n redis.call('HSET', info_key,\n 'tasks.all_sent', 'True');\n -- remove the job of the orchestrator\n local orchestrator_id = redis.call(\n 'HGET', info_key, 'orchestrator.id');\n redis.call('SREM', 'xcute:orchestrator:jobs:' .. orchestrator_id,\n job_id);\n\n local total_tasks_processed = redis.call(\n 'HGET', 'xcute:job:info:' .. job_id, 'tasks.processed');\n if tonumber(total_tasks_processed) >= tonumber(\n total_tasks_sent) then\n redis.call('HSET', 'xcute:job:info:' .. job_id,\n 'job.status', 'FINISHED');\n redis.call('HDEL', 'xcute:locks', lock);\n end;\n else\n local request_pause = redis.call(\n 'HGET', info_key, 'job.request_pause');\n if request_pause == 'True' then\n -- if waiting pause, pause the job\n redis.call('HMSET', info_key,\n 'job.status', 'PAUSED',\n 'job.request_pause', 'False');\n local orchestrator_id = redis.call(\n 'HGET', info_key, 'orchestrator.id');\n redis.call(\n 'SREM', 'xcute:orchestrator:jobs:' .. orchestrator_id,\n job_id);\n end;\n end;\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n return {nb_tasks_sent, redis.call('HGET', info_key, 'job.status')};\n "
request_pause(*args, **kwargs)[source]
resume(*args, **kwargs)[source]
run_next(*args, **kwargs)[source]
total_tasks_done(*args, **kwargs)[source]
update_tasks_processed(*args, **kwargs)[source]
update_tasks_sent(*args, **kwargs)[source]

oio.xcute.common.job module

class oio.xcute.common.job.XcuteJob(conf, logger=None)[source]

Bases: object

get_tasks(job_params, marker=None)[source]

Yields the job tasks as (task_id, task_payload) task_id must be a string and can be used as a marker

get_total_tasks(job_params, marker=None)[source]

Yields numbers of tasks as (marker, tasks_incr) The sum of all tasks_incr yielded must be the total of tasks in the job NB: do not define if not needed


Allow to execute code only once when the job is run for the first time. This method is executed before the generation of the tasks and the total.

classmethod sanitize_config(job_config)[source]

Validate and sanitize the job configuration Ex: cast a string as integer, set a default Also return the lock id if there is one

classmethod sanitize_params(job_params)[source]

Validate and sanitize the job parameters Ex: cast a string as integer, set a default Also return the lock id if there is one

class oio.xcute.common.job.XcuteTask(conf, job_params, logger=None)[source]

Bases: object

process(task_id, task_payload)[source]

oio.xcute.common.worker module

class oio.xcute.common.worker.XcuteWorker(conf, logger=None)[source]

Bases: object

reply(job_id, task_ids, task_results, task_errors, beanstalkd_reply_info)[source]

Module contents