oio.xcute.common package
Submodules
oio.xcute.common.backend module
-
class
oio.xcute.common.backend.
XcuteBackend
(conf, logger=None)[source] Bases:
oio.common.redis_conn.RedisConnection
-
DEFAULT_LIMIT
= 1000
-
create
(*args, **kwargs)[source]
-
delete
(*args, **kwargs)[source]
-
fail
(*args, **kwargs)[source]
-
free
(*args, **kwargs)[source]
-
get_job_info
(*args, **kwargs)[source]
-
get_lock_info
(*args, **kwargs)[source]
-
incr_total_tasks
(*args, **kwargs)[source]
-
key_job_ids
= 'xcute:job:ids'
-
key_job_info
= 'xcute:job:info:%s'
-
key_locks
= 'xcute:locks'
-
key_orchestrator_jobs
= 'xcute:orchestrator:jobs:%s'
-
key_tasks_running
= 'xcute:tasks:running:%s'
-
key_waiting_jobs
= 'xcute:waiting:jobs'
-
list_jobs
(marker=None, limit=1000)[source]
-
list_locks
(*args, **kwargs)[source]
-
list_orchestrator_jobs
(orchestrator_id)[source]
-
lua_create
= "\n local mtime = KEYS[1];\n local job_id = KEYS[2];\n local job_type = KEYS[3];\n local job_config = KEYS[4];\n local lock = KEYS[5];\n\n local job_exists = redis.call('EXISTS', 'xcute:job:info:' .. job_id);\n if job_exists == 1 then\n return redis.error_reply('job_exists');\n end;\n\n local lock_exists = redis.call('HEXISTS', 'xcute:locks', lock);\n if lock_exists ~= 0 then\n return redis.error_reply('lock_exists:' .. lock);\n end;\n\n redis.call('HSET', 'xcute:locks', lock, job_id);\n\n redis.call('ZADD', 'xcute:job:ids', 0, job_id);\n redis.call(\n 'HMSET', 'xcute:job:info:' .. job_id,\n 'job.id', job_id,\n 'job.type', job_type,\n 'job.status', 'WAITING',\n 'job.request_pause', 'False',\n 'job.lock', lock,\n 'tasks.all_sent', 'False',\n 'tasks.sent', '0',\n 'tasks.processed', '0',\n 'tasks.total', '0',\n 'tasks.is_total_temp', 'True',\n 'errors.total', '0',\n 'config', job_config);\n\n redis.call('RPUSH', 'xcute:waiting:jobs', job_id);\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.ctime', mtime);\n "
-
lua_delete
= "\n local job_id = KEYS[1];\n\n local info = redis.call('HMGET', 'xcute:job:info:' .. job_id,\n 'job.status', 'job.lock');\n local status = info[1];\n local lock = info[2];\n if status == nil or status == false then\n return redis.error_reply('no_job');\n end;\n\n if status == 'RUNNING' then\n return redis.error_reply('job_running');\n end;\n\n if status == 'WAITING' then\n redis.call('LREM', 'xcute:waiting:jobs', 1, job_id);\n end;\n\n redis.call('ZREM', 'xcute:job:ids', job_id);\n redis.call('DEL', 'xcute:job:info:' .. job_id);\n redis.call('DEL', 'xcute:tasks:running:' .. job_id);\n\n local lock_job_id = redis.call('HGET', 'xcute:locks', lock);\n\n if lock_job_id == job_id then\n redis.call('HDEL', 'xcute:locks', lock);\n end;\n "
-
lua_fail
= "\n local mtime = KEYS[1];\n local job_id = KEYS[2];\n\n local info = redis.call('HMGET', 'xcute:job:info:' .. job_id,\n 'job.status', 'job.lock');\n local status = info[1];\n local lock = info[2];\n if status == nil or status == false then\n return redis.error_reply('no_job');\n end;\n\n if status ~= 'RUNNING' then\n return redis.error_reply('job_must_be_running');\n end;\n\n redis.call('HSET', 'xcute:job:info:' .. job_id,\n 'job.status', 'FAILED');\n redis.call('HDEL', 'xcute:locks', lock);\n -- remove the job of the orchestrator\n local orchestrator_id = redis.call(\n 'HGET', 'xcute:job:info:' .. job_id, 'orchestrator.id');\n redis.call('SREM', 'xcute:orchestrator:jobs:' .. orchestrator_id,\n job_id);\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n "
-
lua_free
= "\n local mtime = KEYS[1];\n local job_id = KEYS[2];\n\n local status = redis.call('HGET', 'xcute:job:info:' .. job_id,\n 'job.status');\n if status ~= 'RUNNING' then\n return redis.error_reply('job_must_be_running');\n end;\n\n local orchestrator_id = redis.call(\n 'HGET', 'xcute:job:info:' .. job_id, 'orchestrator.id');\n redis.call('SREM', 'xcute:orchestrator:jobs:' .. orchestrator_id,\n job_id);\n\n redis.call('HSET', 'xcute:job:info:' .. job_id,\n 'job.status', 'WAITING');\n redis.call('LPUSH', 'xcute:waiting:jobs', job_id);\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n "
-
lua_incr_total
= "\n local mtime = KEYS[1];\n local job_id = KEYS[2];\n local marker = KEYS[3];\n local incr_by = KEYS[4];\n local info_key = 'xcute:job:info:' .. job_id;\n\n local info = redis.call(\n 'HMGET', info_key,\n 'job.status', 'tasks.all_sent', 'tasks.is_total_temp');\n local status = info[1];\n local all_sent = info[2];\n local is_total_temp = info[3];\n\n if status == nil or status == false then\n return redis.error_reply('no_job');\n end;\n\n local stop = false;\n if all_sent == 'True' then\n stop = true\n elseif is_total_temp == 'False' then\n stop = true\n else\n redis.call('HINCRBY', info_key, 'tasks.total', incr_by);\n redis.call('HSET', info_key, 'tasks.total_marker', marker);\n\n if status == 'PAUSED' or status == 'FAILED' then\n stop = true;\n end;\n end;\n\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n return stop;\n "
-
lua_request_pause
= "\n local mtime = KEYS[1];\n local job_id = KEYS[2];\n\n local status = redis.call('HGET', 'xcute:job:info:' .. job_id,\n 'job.status');\n if status == nil or status == false then\n return redis.error_reply('no_job');\n end;\n\n if status == 'WAITING' then\n redis.call('HSET', 'xcute:job:info:' .. job_id,\n 'job.status', 'PAUSED');\n redis.call('LREM', 'xcute:waiting:jobs', 1, job_id);\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n return;\n end;\n\n if status == 'RUNNING' then\n local all_tasks_sent = redis.call(\n 'HGET', 'xcute:job:info:' .. job_id, 'tasks.all_sent');\n if all_tasks_sent == 'True' then\n return redis.error_reply(\n 'job_cannot_be_paused_all_tasks_sent');\n else\n redis.call('HSET', 'xcute:job:info:' .. job_id,\n 'job.request_pause', 'True');\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n return;\n end;\n end;\n\n if status == 'PAUSED' then\n return redis.error_reply('job_already_paused');\n end;\n\n if status == 'FINISHED' then\n return redis.error_reply('job_finished');\n end;\n "
-
lua_resume
= "\n local mtime = KEYS[1];\n local job_id = KEYS[2];\n\n local status = redis.call('HGET', 'xcute:job:info:' .. job_id,\n 'job.status');\n if status == nil or status == false then\n return redis.error_reply('no_job');\n end;\n\n if status == 'WAITING' then\n return redis.error_reply('job_already_waiting');\n end;\n\n if status == 'RUNNING' then\n local request_pause = redis.call(\n 'HGET', 'xcute:job:info:' .. job_id, 'job.request_pause');\n if request_pause == 'True' then\n redis.call('HSET', 'xcute:job:info:' .. job_id,\n 'job.request_pause', 'False');\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n return;\n else\n return redis.error_reply('job_already_running');\n end;\n end;\n\n if status == 'PAUSED' then\n redis.call('HSET', 'xcute:job:info:' .. job_id,\n 'job.request_pause', 'False');\n redis.call('HSET', 'xcute:job:info:' .. job_id,\n 'job.status', 'WAITING');\n redis.call('RPUSH', 'xcute:waiting:jobs', job_id);\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n return;\n end;\n\n if status == 'RUNNING_LAST_TASKS' then\n return redis.error_reply('job_already_running');\n end;\n\n if status == 'FINISHED' then\n return redis.error_reply('job_finished');\n end;\n "
-
lua_run_next
= "\n local mtime = KEYS[1];\n local orchestrator_id = KEYS[2];\n\n local job_id = redis.call('LPOP', 'xcute:waiting:jobs');\n if job_id == nil or job_id == false then\n return nil;\n end;\n\n redis.call('HMSET', 'xcute:job:info:' .. job_id,\n 'job.status', 'RUNNING',\n 'orchestrator.id', orchestrator_id);\n redis.call('SADD', 'xcute:orchestrator:jobs:' .. orchestrator_id,\n job_id);\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n local job_info = redis.call('HGETALL', 'xcute:job:info:' .. job_id);\n return job_info;\n "
-
lua_total_tasks_done
= "\n local mtime = KEYS[1];\n local job_id = KEYS[2];\n local info_key = 'xcute:job:info:' .. job_id;\n\n local status = redis.call('HGET', info_key, 'job.status');\n if status == nil or status == false then\n return redis.error_reply('no_job');\n end;\n\n redis.call('HSET', info_key, 'tasks.is_total_temp', 'False');\n local total_tasks = redis.call('HGET', info_key, 'tasks.total');\n\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n return tonumber(total_tasks);\n "
-
lua_update_tasks_processed
= "\n local function get_counters(tbl, first, last)\n local sliced = {}\n for i = first or 1, last or #tbl, 2 do\n sliced[tbl[i]] = tbl[i+1];\n end;\n return sliced;\n end;\n\n local mtime = KEYS[1];\n local job_id = KEYS[2];\n local counters = get_counters(KEYS, 3, nil);\n local tasks_processed = ARGV;\n\n local info = redis.call('HMGET', 'xcute:job:info:' .. job_id,\n 'job.status', 'job.lock');\n local status = info[1];\n local lock = info[2];\n if status == nil or status == false then\n return redis.error_reply('no_job');\n end;\n\n local nb_tasks_processed = redis.call(\n 'SREM', 'xcute:tasks:running:' .. job_id, unpack(tasks_processed));\n local total_tasks_processed = redis.call(\n 'HINCRBY', 'xcute:job:info:' .. job_id,\n 'tasks.processed', nb_tasks_processed);\n\n for key, value in pairs(counters) do\n redis.call('HINCRBY', 'xcute:job:info:' .. job_id,\n key, value);\n end;\n\n local finished = false;\n local all_tasks_sent = redis.call(\n 'HGET', 'xcute:job:info:' .. job_id, 'tasks.all_sent');\n if all_tasks_sent == 'True' and status ~= 'FINISHED' then\n local total_tasks_sent = redis.call(\n 'HGET', 'xcute:job:info:' .. job_id, 'tasks.sent');\n if tonumber(total_tasks_processed) >= tonumber(\n total_tasks_sent) then\n redis.call('HSET', 'xcute:job:info:' .. job_id,\n 'job.status', 'FINISHED');\n redis.call('HDEL', 'xcute:locks', lock);\n finished = true;\n end;\n end;\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n return finished;\n "
-
lua_update_tasks_sent
= "\n local mtime = KEYS[1];\n local job_id = KEYS[2];\n local all_tasks_sent = KEYS[3];\n local tasks_sent = ARGV;\n local tasks_sent_length = #tasks_sent;\n local info_key = 'xcute:job:info:' .. job_id;\n\n local info = redis.call('HMGET', info_key, 'job.status', 'job.lock');\n local status = info[1];\n local lock = info[2];\n if status == nil or status == false then\n return redis.error_reply('no_job');\n end;\n\n local nb_tasks_sent = 0;\n if tasks_sent_length > 0 then\n nb_tasks_sent = redis.call(\n 'SADD', 'xcute:tasks:running:' .. job_id, unpack(tasks_sent));\n redis.call('HSET', info_key,\n 'tasks.last_sent', tasks_sent[tasks_sent_length]);\n end;\n local total_tasks_sent = redis.call(\n 'HINCRBY', info_key,\n 'tasks.sent', nb_tasks_sent);\n\n if all_tasks_sent == 'True' then\n redis.call('HSET', info_key,\n 'tasks.all_sent', 'True');\n -- remove the job of the orchestrator\n local orchestrator_id = redis.call(\n 'HGET', info_key, 'orchestrator.id');\n redis.call('SREM', 'xcute:orchestrator:jobs:' .. orchestrator_id,\n job_id);\n\n local total_tasks_processed = redis.call(\n 'HGET', 'xcute:job:info:' .. job_id, 'tasks.processed');\n if tonumber(total_tasks_processed) >= tonumber(\n total_tasks_sent) then\n redis.call('HSET', 'xcute:job:info:' .. job_id,\n 'job.status', 'FINISHED');\n redis.call('HDEL', 'xcute:locks', lock);\n end;\n else\n local request_pause = redis.call(\n 'HGET', info_key, 'job.request_pause');\n if request_pause == 'True' then\n -- if waiting pause, pause the job\n redis.call('HMSET', info_key,\n 'job.status', 'PAUSED',\n 'job.request_pause', 'False');\n local orchestrator_id = redis.call(\n 'HGET', info_key, 'orchestrator.id');\n redis.call(\n 'SREM', 'xcute:orchestrator:jobs:' .. orchestrator_id,\n job_id);\n end;\n end;\n \n redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);\n \n return {nb_tasks_sent, redis.call('HGET', info_key, 'job.status')};\n "
-
request_pause
(*args, **kwargs)[source]
-
resume
(*args, **kwargs)[source]
-
run_next
(*args, **kwargs)[source]
-
status
()[source]
-
total_tasks_done
(*args, **kwargs)[source]
-
update_tasks_processed
(*args, **kwargs)[source]
-
update_tasks_sent
(*args, **kwargs)[source]
-
-
oio.xcute.common.backend.
handle_redis_exceptions
(func)[source]
oio.xcute.common.job module
-
class
oio.xcute.common.job.
XcuteJob
(conf, logger=None)[source] Bases:
object
-
DEFAULT_TASKS_PER_SECOND
= 32
-
JOB_TYPE
= None
-
MAX_TASKS_BATCH_SIZE
= 64
-
TASK_CLASS
= None
-
get_tasks
(job_params, marker=None)[source] Yields the job tasks as (task_id, task_payload) task_id must be a string and can be used as a marker
-
get_total_tasks
(job_params, marker=None)[source] Yields numbers of tasks as (marker, tasks_incr) The sum of all tasks_incr yielded must be the total of tasks in the job NB: do not define if not needed
-
prepare
(job_params)[source] Allow to execute code only once when the job is run for the first time. This method is executed before the generation of the tasks and the total.
-
classmethod
sanitize_config
(job_config)[source] Validate and sanitize the job configuration Ex: cast a string as integer, set a default Also return the lock id if there is one
-
classmethod
sanitize_params
(job_params)[source] Validate and sanitize the job parameters Ex: cast a string as integer, set a default Also return the lock id if there is one
-