# Copyright (C) 2019 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.
from functools import wraps
import redis
import random
from oio.common.easy_value import true_value
from oio.common.exceptions import Forbidden, NotFound
from oio.common.green import datetime
from oio.common.json import json
from oio.common.logger import get_logger
from oio.common.redis_conn import RedisConnection
from oio.common.timestamp import Timestamp
[docs]def handle_redis_exceptions(func):
@wraps(func)
def handle_redis_exceptions(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except redis.exceptions.ResponseError as exc:
error_parts = str(exc).split(':', 1)
error_type = error_parts[0]
error_param = error_parts[1:]
error = self._lua_errors.get(error_type)
if error is None:
raise
error_cls, error_msg = error
raise error_cls(message=error_msg.format(*error_param))
return handle_redis_exceptions
[docs]class XcuteBackend(RedisConnection):
DEFAULT_LIMIT = 1000
_lua_errors = {
'job_exists': (
Forbidden,
'The job already exists'),
'lock_exists': (
Forbidden,
'A job with the same lock ({}) is already in progress'),
'no_job': (
NotFound,
'The job does\'nt exist'),
'job_must_be_running': (
Forbidden,
'The job must be running'),
'job_cannot_be_paused_all_tasks_sent': (
Forbidden,
'The job cannot be paused anymore, all jobs have been sent'),
'job_already_paused': (
Forbidden,
'The job is already paused'),
'job_finished': (
Forbidden,
'The job is finished'),
'job_already_waiting': (
Forbidden,
'The job is already waiting'),
'job_already_running': (
Forbidden,
'The job is already running'),
'job_running': (
Forbidden,
'The job running')
}
key_job_ids = 'xcute:job:ids'
key_job_info = 'xcute:job:info:%s'
key_waiting_jobs = 'xcute:waiting:jobs'
key_tasks_running = 'xcute:tasks:running:%s'
key_orchestrator_jobs = 'xcute:orchestrator:jobs:%s'
key_locks = 'xcute:locks'
_lua_update_mtime = """
redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime);
"""
lua_create = """
local mtime = KEYS[1];
local job_id = KEYS[2];
local job_type = KEYS[3];
local job_config = KEYS[4];
local lock = KEYS[5];
local job_exists = redis.call('EXISTS', 'xcute:job:info:' .. job_id);
if job_exists == 1 then
return redis.error_reply('job_exists');
end;
local lock_exists = redis.call('HEXISTS', 'xcute:locks', lock);
if lock_exists ~= 0 then
return redis.error_reply('lock_exists:' .. lock);
end;
redis.call('HSET', 'xcute:locks', lock, job_id);
redis.call('ZADD', 'xcute:job:ids', 0, job_id);
redis.call(
'HMSET', 'xcute:job:info:' .. job_id,
'job.id', job_id,
'job.type', job_type,
'job.status', 'WAITING',
'job.request_pause', 'False',
'job.lock', lock,
'tasks.all_sent', 'False',
'tasks.sent', '0',
'tasks.processed', '0',
'tasks.total', '0',
'tasks.is_total_temp', 'True',
'errors.total', '0',
'config', job_config);
redis.call('RPUSH', 'xcute:waiting:jobs', job_id);
""" + _lua_update_mtime + """
redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.ctime', mtime);
"""
lua_run_next = """
local mtime = KEYS[1];
local orchestrator_id = KEYS[2];
local job_id = redis.call('LPOP', 'xcute:waiting:jobs');
if job_id == nil or job_id == false then
return nil;
end;
redis.call('HMSET', 'xcute:job:info:' .. job_id,
'job.status', 'RUNNING',
'orchestrator.id', orchestrator_id);
redis.call('SADD', 'xcute:orchestrator:jobs:' .. orchestrator_id,
job_id);
""" + _lua_update_mtime + """
local job_info = redis.call('HGETALL', 'xcute:job:info:' .. job_id);
return job_info;
"""
lua_free = """
local mtime = KEYS[1];
local job_id = KEYS[2];
local status = redis.call('HGET', 'xcute:job:info:' .. job_id,
'job.status');
if status ~= 'RUNNING' then
return redis.error_reply('job_must_be_running');
end;
local orchestrator_id = redis.call(
'HGET', 'xcute:job:info:' .. job_id, 'orchestrator.id');
redis.call('SREM', 'xcute:orchestrator:jobs:' .. orchestrator_id,
job_id);
redis.call('HSET', 'xcute:job:info:' .. job_id,
'job.status', 'WAITING');
redis.call('LPUSH', 'xcute:waiting:jobs', job_id);
""" + _lua_update_mtime
lua_fail = """
local mtime = KEYS[1];
local job_id = KEYS[2];
local info = redis.call('HMGET', 'xcute:job:info:' .. job_id,
'job.status', 'job.lock');
local status = info[1];
local lock = info[2];
if status == nil or status == false then
return redis.error_reply('no_job');
end;
if status ~= 'RUNNING' then
return redis.error_reply('job_must_be_running');
end;
redis.call('HSET', 'xcute:job:info:' .. job_id,
'job.status', 'FAILED');
redis.call('HDEL', 'xcute:locks', lock);
-- remove the job of the orchestrator
local orchestrator_id = redis.call(
'HGET', 'xcute:job:info:' .. job_id, 'orchestrator.id');
redis.call('SREM', 'xcute:orchestrator:jobs:' .. orchestrator_id,
job_id);
""" + _lua_update_mtime
lua_request_pause = """
local mtime = KEYS[1];
local job_id = KEYS[2];
local status = redis.call('HGET', 'xcute:job:info:' .. job_id,
'job.status');
if status == nil or status == false then
return redis.error_reply('no_job');
end;
if status == 'WAITING' then
redis.call('HSET', 'xcute:job:info:' .. job_id,
'job.status', 'PAUSED');
redis.call('LREM', 'xcute:waiting:jobs', 1, job_id);
""" + _lua_update_mtime + """
return;
end;
if status == 'RUNNING' then
local all_tasks_sent = redis.call(
'HGET', 'xcute:job:info:' .. job_id, 'tasks.all_sent');
if all_tasks_sent == 'True' then
return redis.error_reply(
'job_cannot_be_paused_all_tasks_sent');
else
redis.call('HSET', 'xcute:job:info:' .. job_id,
'job.request_pause', 'True');
""" + _lua_update_mtime + """
return;
end;
end;
if status == 'PAUSED' then
return redis.error_reply('job_already_paused');
end;
if status == 'FINISHED' then
return redis.error_reply('job_finished');
end;
"""
lua_resume = """
local mtime = KEYS[1];
local job_id = KEYS[2];
local status = redis.call('HGET', 'xcute:job:info:' .. job_id,
'job.status');
if status == nil or status == false then
return redis.error_reply('no_job');
end;
if status == 'WAITING' then
return redis.error_reply('job_already_waiting');
end;
if status == 'RUNNING' then
local request_pause = redis.call(
'HGET', 'xcute:job:info:' .. job_id, 'job.request_pause');
if request_pause == 'True' then
redis.call('HSET', 'xcute:job:info:' .. job_id,
'job.request_pause', 'False');
""" + _lua_update_mtime + """
return;
else
return redis.error_reply('job_already_running');
end;
end;
if status == 'PAUSED' then
redis.call('HSET', 'xcute:job:info:' .. job_id,
'job.request_pause', 'False');
redis.call('HSET', 'xcute:job:info:' .. job_id,
'job.status', 'WAITING');
redis.call('RPUSH', 'xcute:waiting:jobs', job_id);
""" + _lua_update_mtime + """
return;
end;
if status == 'RUNNING_LAST_TASKS' then
return redis.error_reply('job_already_running');
end;
if status == 'FINISHED' then
return redis.error_reply('job_finished');
end;
"""
lua_update_tasks_sent = """
local mtime = KEYS[1];
local job_id = KEYS[2];
local all_tasks_sent = KEYS[3];
local tasks_sent = ARGV;
local tasks_sent_length = #tasks_sent;
local info_key = 'xcute:job:info:' .. job_id;
local info = redis.call('HMGET', info_key, 'job.status', 'job.lock');
local status = info[1];
local lock = info[2];
if status == nil or status == false then
return redis.error_reply('no_job');
end;
local nb_tasks_sent = 0;
if tasks_sent_length > 0 then
nb_tasks_sent = redis.call(
'SADD', 'xcute:tasks:running:' .. job_id, unpack(tasks_sent));
redis.call('HSET', info_key,
'tasks.last_sent', tasks_sent[tasks_sent_length]);
end;
local total_tasks_sent = redis.call(
'HINCRBY', info_key,
'tasks.sent', nb_tasks_sent);
if all_tasks_sent == 'True' then
redis.call('HSET', info_key,
'tasks.all_sent', 'True');
-- remove the job of the orchestrator
local orchestrator_id = redis.call(
'HGET', info_key, 'orchestrator.id');
redis.call('SREM', 'xcute:orchestrator:jobs:' .. orchestrator_id,
job_id);
local total_tasks_processed = redis.call(
'HGET', 'xcute:job:info:' .. job_id, 'tasks.processed');
if tonumber(total_tasks_processed) >= tonumber(
total_tasks_sent) then
redis.call('HSET', 'xcute:job:info:' .. job_id,
'job.status', 'FINISHED');
redis.call('HDEL', 'xcute:locks', lock);
end;
else
local request_pause = redis.call(
'HGET', info_key, 'job.request_pause');
if request_pause == 'True' then
-- if waiting pause, pause the job
redis.call('HMSET', info_key,
'job.status', 'PAUSED',
'job.request_pause', 'False');
local orchestrator_id = redis.call(
'HGET', info_key, 'orchestrator.id');
redis.call(
'SREM', 'xcute:orchestrator:jobs:' .. orchestrator_id,
job_id);
end;
end;
""" + _lua_update_mtime + """
return {nb_tasks_sent, redis.call('HGET', info_key, 'job.status')};
"""
lua_update_tasks_processed = """
local function get_counters(tbl, first, last)
local sliced = {}
for i = first or 1, last or #tbl, 2 do
sliced[tbl[i]] = tbl[i+1];
end;
return sliced;
end;
local mtime = KEYS[1];
local job_id = KEYS[2];
local counters = get_counters(KEYS, 3, nil);
local tasks_processed = ARGV;
local info = redis.call('HMGET', 'xcute:job:info:' .. job_id,
'job.status', 'job.lock');
local status = info[1];
local lock = info[2];
if status == nil or status == false then
return redis.error_reply('no_job');
end;
local nb_tasks_processed = redis.call(
'SREM', 'xcute:tasks:running:' .. job_id, unpack(tasks_processed));
local total_tasks_processed = redis.call(
'HINCRBY', 'xcute:job:info:' .. job_id,
'tasks.processed', nb_tasks_processed);
for key, value in pairs(counters) do
redis.call('HINCRBY', 'xcute:job:info:' .. job_id,
key, value);
end;
local finished = false;
local all_tasks_sent = redis.call(
'HGET', 'xcute:job:info:' .. job_id, 'tasks.all_sent');
if all_tasks_sent == 'True' and status ~= 'FINISHED' then
local total_tasks_sent = redis.call(
'HGET', 'xcute:job:info:' .. job_id, 'tasks.sent');
if tonumber(total_tasks_processed) >= tonumber(
total_tasks_sent) then
redis.call('HSET', 'xcute:job:info:' .. job_id,
'job.status', 'FINISHED');
redis.call('HDEL', 'xcute:locks', lock);
finished = true;
end;
end;
""" + _lua_update_mtime + """
return finished;
"""
lua_incr_total = """
local mtime = KEYS[1];
local job_id = KEYS[2];
local marker = KEYS[3];
local incr_by = KEYS[4];
local info_key = 'xcute:job:info:' .. job_id;
local info = redis.call(
'HMGET', info_key,
'job.status', 'tasks.all_sent', 'tasks.is_total_temp');
local status = info[1];
local all_sent = info[2];
local is_total_temp = info[3];
if status == nil or status == false then
return redis.error_reply('no_job');
end;
local stop = false;
if all_sent == 'True' then
stop = true
elseif is_total_temp == 'False' then
stop = true
else
redis.call('HINCRBY', info_key, 'tasks.total', incr_by);
redis.call('HSET', info_key, 'tasks.total_marker', marker);
if status == 'PAUSED' or status == 'FAILED' then
stop = true;
end;
end;
""" + _lua_update_mtime + """
return stop;
"""
lua_total_tasks_done = """
local mtime = KEYS[1];
local job_id = KEYS[2];
local info_key = 'xcute:job:info:' .. job_id;
local status = redis.call('HGET', info_key, 'job.status');
if status == nil or status == false then
return redis.error_reply('no_job');
end;
redis.call('HSET', info_key, 'tasks.is_total_temp', 'False');
local total_tasks = redis.call('HGET', info_key, 'tasks.total');
""" + _lua_update_mtime + """
return tonumber(total_tasks);
"""
lua_delete = """
local job_id = KEYS[1];
local info = redis.call('HMGET', 'xcute:job:info:' .. job_id,
'job.status', 'job.lock');
local status = info[1];
local lock = info[2];
if status == nil or status == false then
return redis.error_reply('no_job');
end;
if status == 'RUNNING' then
return redis.error_reply('job_running');
end;
if status == 'WAITING' then
redis.call('LREM', 'xcute:waiting:jobs', 1, job_id);
end;
redis.call('ZREM', 'xcute:job:ids', job_id);
redis.call('DEL', 'xcute:job:info:' .. job_id);
redis.call('DEL', 'xcute:tasks:running:' .. job_id);
local lock_job_id = redis.call('HGET', 'xcute:locks', lock);
if lock_job_id == job_id then
redis.call('HDEL', 'xcute:locks', lock);
end;
"""
def __init__(self, conf, logger=None):
self.conf = conf
self.logger = logger or get_logger(self.conf)
redis_conf = {k[6:]: v for k, v in self.conf.items()
if k.startswith('redis_')}
super(XcuteBackend, self).__init__(**redis_conf)
self.script_create = self.register_script(
self.lua_create)
self.script_run_next = self.register_script(
self.lua_run_next)
self.script_free = self.register_script(
self.lua_free)
self.script_fail = self.register_script(
self.lua_fail)
self.script_request_pause = self.register_script(
self.lua_request_pause)
self.script_resume = self.register_script(
self.lua_resume)
self.script_update_tasks_sent = self.register_script(
self.lua_update_tasks_sent)
self.script_update_tasks_processed = self.register_script(
self.lua_update_tasks_processed)
self.script_incr_total = self.register_script(
self.lua_incr_total)
self.script_total_tasks_done = self.register_script(
self.lua_total_tasks_done)
self.script_delete = self.register_script(
self.lua_delete)
[docs] def status(self):
job_count = self.conn.zcard(self.key_job_ids)
status = {'job_count': job_count}
return status
[docs] def list_jobs(self, marker=None, limit=1000):
limit = limit or self.DEFAULT_LIMIT
jobs = list()
while True:
limit_ = limit - len(jobs)
if limit_ <= 0:
break
range_min = '-'
if marker:
range_max = '(' + marker
else:
range_max = '+'
job_ids = self.conn.zrevrangebylex(
self.key_job_ids, range_max, range_min, 0, limit_)
pipeline = self.conn.pipeline()
for job_id in job_ids:
self._get_job_info(job_id, client=pipeline)
job_infos = pipeline.execute()
for job_info in job_infos:
if not job_info:
# The job can be deleted between two requests
continue
jobs.append(self._unmarshal_job_info(job_info))
if len(job_ids) < limit_:
break
marker = job_id
return jobs
def _get_timestamp(self):
return Timestamp().normal
[docs] @handle_redis_exceptions
def create(self, job_type, job_config, lock):
job_id = datetime.utcnow().strftime('%Y%m%d%H%M%S%f') \
+ '-%011x' % random.randrange(16**11)
job_config = json.dumps(job_config)
self.script_create(
keys=[self._get_timestamp(), job_id, job_type, job_config, lock],
client=self.conn)
return job_id
[docs] def list_orchestrator_jobs(self, orchestrator_id):
orchestrator_jobs_key = self.key_orchestrator_jobs % orchestrator_id
job_ids = self.conn.smembers(orchestrator_jobs_key)
pipeline = self.conn.pipeline()
for job_id in job_ids:
self._get_job_info(job_id, client=pipeline)
job_infos = pipeline.execute()
jobs = list()
for job_info in job_infos:
if not job_info:
# The job can be deleted between two requests
continue
jobs.append(self._unmarshal_job_info(job_info))
return jobs
[docs] @handle_redis_exceptions
def run_next(self, orchestrator_id):
job_info = self.script_run_next(
keys=[self._get_timestamp(), orchestrator_id],
client=self.conn)
if not job_info:
return None
job_info = self._unmarshal_job_info(
self._lua_array_to_dict(job_info))
return job_info
[docs] @handle_redis_exceptions
def free(self, job_id):
self.script_free(
keys=[self._get_timestamp(), job_id],
client=self.conn)
[docs] @handle_redis_exceptions
def fail(self, job_id):
self.script_fail(
keys=[self._get_timestamp(), job_id],
client=self.conn)
[docs] @handle_redis_exceptions
def request_pause(self, job_id):
self.script_request_pause(
keys=[self._get_timestamp(), job_id],
client=self.conn)
[docs] @handle_redis_exceptions
def resume(self, job_id):
self.script_resume(
keys=[self._get_timestamp(), job_id],
client=self.conn)
[docs] @handle_redis_exceptions
def update_tasks_sent(self, job_id, task_ids, all_tasks_sent=False):
nb_tasks_sent, status = self.script_update_tasks_sent(
keys=[self._get_timestamp(), job_id, str(all_tasks_sent)],
args=task_ids, client=self.conn)
if nb_tasks_sent != len(task_ids):
self.logger.warn('%s tasks were sent several times',
len(task_ids) - nb_tasks_sent)
return status
[docs] @handle_redis_exceptions
def update_tasks_processed(self, job_id, task_ids,
task_errors, task_results):
counters = dict()
if task_errors:
total_errors = 0
for key, value in task_errors.items():
total_errors += value
counters['errors.' + key] = value
counters['errors.total'] = total_errors
if task_results:
for key, value in task_results.items():
counters['results.' + key] = value
return self.script_update_tasks_processed(
keys=[self._get_timestamp(),
job_id] + self._dict_to_lua_array(counters),
args=task_ids,
client=self.conn)
[docs] @handle_redis_exceptions
def incr_total_tasks(self, job_id, total_marker, tasks_incr):
return self.script_incr_total(
keys=[self._get_timestamp(), job_id, total_marker, tasks_incr])
[docs] @handle_redis_exceptions
def total_tasks_done(self, job_id):
return self.script_total_tasks_done(
keys=[self._get_timestamp(), job_id])
[docs] @handle_redis_exceptions
def delete(self, job_id):
self.script_delete(keys=[job_id])
[docs] @handle_redis_exceptions
def get_job_info(self, job_id):
job_info = self._get_job_info(job_id, client=self.conn)
if not job_info:
raise redis.exceptions.ResponseError('no_job')
return self._unmarshal_job_info(job_info)
def _get_job_info(self, job_id, client):
return client.hgetall(self.key_job_info % job_id)
[docs] @handle_redis_exceptions
def list_locks(self):
locks = self.conn.hgetall(self.key_locks)
return [
dict(lock=lock[0], job_id=lock[1])
for lock in sorted(locks.items())
]
[docs] @handle_redis_exceptions
def get_lock_info(self, lock):
job_id = self.conn.hget(self.key_locks, lock)
return dict(lock=lock, job_id=job_id)
@staticmethod
def _unmarshal_job_info(marshalled_job_info):
job_info = dict(
job=dict(),
orchestrator=dict(),
tasks=dict(),
errors=dict(),
results=dict(),
config=dict())
for key, value in marshalled_job_info.items():
split_key = key.decode('utf-8').split('.', 1)
value = value.decode('utf-8')
if len(split_key) == 1:
job_info[split_key[0]] = value
else:
job_info[split_key[0]][split_key[1]] = value
job_main_info = job_info['job']
job_main_info['ctime'] = float(job_main_info['ctime'])
job_main_info['mtime'] = float(job_main_info['mtime'])
job_main_info['request_pause'] = true_value(
job_main_info['request_pause'])
job_tasks = job_info['tasks']
job_tasks['sent'] = int(job_tasks['sent'])
job_tasks.setdefault('last_sent')
job_tasks['all_sent'] = true_value(job_tasks['all_sent'])
job_tasks['processed'] = int(job_tasks['processed'])
# To have a coherent total if the estimate was not correct
if job_tasks['all_sent']:
job_tasks['total'] = job_tasks['sent']
else:
job_tasks['total'] = max(job_tasks['sent'],
int(job_tasks['total']))
job_tasks['is_total_temp'] = true_value(
job_tasks['is_total_temp'])
job_tasks.setdefault('total_marker')
job_errors = job_info['errors']
for key, value in job_errors.items():
job_errors[key] = int(value)
job_results = job_info.get('results', dict())
for key, value in job_results.items():
job_results[key] = int(value)
job_info['config'] = json.loads(job_info['config'])
return job_info
@staticmethod
def _lua_array_to_dict(array):
it = iter(array)
return dict(zip(*([it] * 2)))
@staticmethod
def _dict_to_lua_array(dict_):
array = list()
for key, value in dict_.items():
array.append(key)
array.append(value)
return array