Source code for oio.xcute.common.backend

# Copyright (C) 2019 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.

from functools import wraps

import redis
import random

from oio.common.easy_value import true_value
from oio.common.exceptions import Forbidden, NotFound
from oio.common.green import datetime
from oio.common.json import json
from oio.common.logger import get_logger
from oio.common.redis_conn import RedisConnection
from oio.common.timestamp import Timestamp


[docs]def handle_redis_exceptions(func): @wraps(func) def handle_redis_exceptions(self, *args, **kwargs): try: return func(self, *args, **kwargs) except redis.exceptions.ResponseError as exc: error_parts = str(exc).split(':', 1) error_type = error_parts[0] error_param = error_parts[1:] error = self._lua_errors.get(error_type) if error is None: raise error_cls, error_msg = error raise error_cls(message=error_msg.format(*error_param)) return handle_redis_exceptions
[docs]class XcuteBackend(RedisConnection): DEFAULT_LIMIT = 1000 _lua_errors = { 'job_exists': ( Forbidden, 'The job already exists'), 'lock_exists': ( Forbidden, 'A job with the same lock ({}) is already in progress'), 'no_job': ( NotFound, 'The job does\'nt exist'), 'job_must_be_running': ( Forbidden, 'The job must be running'), 'job_cannot_be_paused_all_tasks_sent': ( Forbidden, 'The job cannot be paused anymore, all jobs have been sent'), 'job_already_paused': ( Forbidden, 'The job is already paused'), 'job_finished': ( Forbidden, 'The job is finished'), 'job_already_waiting': ( Forbidden, 'The job is already waiting'), 'job_already_running': ( Forbidden, 'The job is already running'), 'job_running': ( Forbidden, 'The job running') } key_job_ids = 'xcute:job:ids' key_job_info = 'xcute:job:info:%s' key_waiting_jobs = 'xcute:waiting:jobs' key_tasks_running = 'xcute:tasks:running:%s' key_orchestrator_jobs = 'xcute:orchestrator:jobs:%s' key_locks = 'xcute:locks' _lua_update_mtime = """ redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.mtime', mtime); """ lua_create = """ local mtime = KEYS[1]; local job_id = KEYS[2]; local job_type = KEYS[3]; local job_config = KEYS[4]; local lock = KEYS[5]; local job_exists = redis.call('EXISTS', 'xcute:job:info:' .. job_id); if job_exists == 1 then return redis.error_reply('job_exists'); end; local lock_exists = redis.call('HEXISTS', 'xcute:locks', lock); if lock_exists ~= 0 then return redis.error_reply('lock_exists:' .. lock); end; redis.call('HSET', 'xcute:locks', lock, job_id); redis.call('ZADD', 'xcute:job:ids', 0, job_id); redis.call( 'HMSET', 'xcute:job:info:' .. job_id, 'job.id', job_id, 'job.type', job_type, 'job.status', 'WAITING', 'job.request_pause', 'False', 'job.lock', lock, 'tasks.all_sent', 'False', 'tasks.sent', '0', 'tasks.processed', '0', 'tasks.total', '0', 'tasks.is_total_temp', 'True', 'errors.total', '0', 'config', job_config); redis.call('RPUSH', 'xcute:waiting:jobs', job_id); """ + _lua_update_mtime + """ redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.ctime', mtime); """ lua_run_next = """ local mtime = KEYS[1]; local orchestrator_id = KEYS[2]; local job_id = redis.call('LPOP', 'xcute:waiting:jobs'); if job_id == nil or job_id == false then return nil; end; redis.call('HMSET', 'xcute:job:info:' .. job_id, 'job.status', 'RUNNING', 'orchestrator.id', orchestrator_id); redis.call('SADD', 'xcute:orchestrator:jobs:' .. orchestrator_id, job_id); """ + _lua_update_mtime + """ local job_info = redis.call('HGETALL', 'xcute:job:info:' .. job_id); return job_info; """ lua_free = """ local mtime = KEYS[1]; local job_id = KEYS[2]; local status = redis.call('HGET', 'xcute:job:info:' .. job_id, 'job.status'); if status ~= 'RUNNING' then return redis.error_reply('job_must_be_running'); end; local orchestrator_id = redis.call( 'HGET', 'xcute:job:info:' .. job_id, 'orchestrator.id'); redis.call('SREM', 'xcute:orchestrator:jobs:' .. orchestrator_id, job_id); redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.status', 'WAITING'); redis.call('LPUSH', 'xcute:waiting:jobs', job_id); """ + _lua_update_mtime lua_fail = """ local mtime = KEYS[1]; local job_id = KEYS[2]; local info = redis.call('HMGET', 'xcute:job:info:' .. job_id, 'job.status', 'job.lock'); local status = info[1]; local lock = info[2]; if status == nil or status == false then return redis.error_reply('no_job'); end; if status ~= 'RUNNING' then return redis.error_reply('job_must_be_running'); end; redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.status', 'FAILED'); redis.call('HDEL', 'xcute:locks', lock); -- remove the job of the orchestrator local orchestrator_id = redis.call( 'HGET', 'xcute:job:info:' .. job_id, 'orchestrator.id'); redis.call('SREM', 'xcute:orchestrator:jobs:' .. orchestrator_id, job_id); """ + _lua_update_mtime lua_request_pause = """ local mtime = KEYS[1]; local job_id = KEYS[2]; local status = redis.call('HGET', 'xcute:job:info:' .. job_id, 'job.status'); if status == nil or status == false then return redis.error_reply('no_job'); end; if status == 'WAITING' then redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.status', 'PAUSED'); redis.call('LREM', 'xcute:waiting:jobs', 1, job_id); """ + _lua_update_mtime + """ return; end; if status == 'RUNNING' then local all_tasks_sent = redis.call( 'HGET', 'xcute:job:info:' .. job_id, 'tasks.all_sent'); if all_tasks_sent == 'True' then return redis.error_reply( 'job_cannot_be_paused_all_tasks_sent'); else redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.request_pause', 'True'); """ + _lua_update_mtime + """ return; end; end; if status == 'PAUSED' then return redis.error_reply('job_already_paused'); end; if status == 'FINISHED' then return redis.error_reply('job_finished'); end; """ lua_resume = """ local mtime = KEYS[1]; local job_id = KEYS[2]; local status = redis.call('HGET', 'xcute:job:info:' .. job_id, 'job.status'); if status == nil or status == false then return redis.error_reply('no_job'); end; if status == 'WAITING' then return redis.error_reply('job_already_waiting'); end; if status == 'RUNNING' then local request_pause = redis.call( 'HGET', 'xcute:job:info:' .. job_id, 'job.request_pause'); if request_pause == 'True' then redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.request_pause', 'False'); """ + _lua_update_mtime + """ return; else return redis.error_reply('job_already_running'); end; end; if status == 'PAUSED' then redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.request_pause', 'False'); redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.status', 'WAITING'); redis.call('RPUSH', 'xcute:waiting:jobs', job_id); """ + _lua_update_mtime + """ return; end; if status == 'RUNNING_LAST_TASKS' then return redis.error_reply('job_already_running'); end; if status == 'FINISHED' then return redis.error_reply('job_finished'); end; """ lua_update_tasks_sent = """ local mtime = KEYS[1]; local job_id = KEYS[2]; local all_tasks_sent = KEYS[3]; local tasks_sent = ARGV; local tasks_sent_length = #tasks_sent; local info_key = 'xcute:job:info:' .. job_id; local info = redis.call('HMGET', info_key, 'job.status', 'job.lock'); local status = info[1]; local lock = info[2]; if status == nil or status == false then return redis.error_reply('no_job'); end; local nb_tasks_sent = 0; if tasks_sent_length > 0 then nb_tasks_sent = redis.call( 'SADD', 'xcute:tasks:running:' .. job_id, unpack(tasks_sent)); redis.call('HSET', info_key, 'tasks.last_sent', tasks_sent[tasks_sent_length]); end; local total_tasks_sent = redis.call( 'HINCRBY', info_key, 'tasks.sent', nb_tasks_sent); if all_tasks_sent == 'True' then redis.call('HSET', info_key, 'tasks.all_sent', 'True'); -- remove the job of the orchestrator local orchestrator_id = redis.call( 'HGET', info_key, 'orchestrator.id'); redis.call('SREM', 'xcute:orchestrator:jobs:' .. orchestrator_id, job_id); local total_tasks_processed = redis.call( 'HGET', 'xcute:job:info:' .. job_id, 'tasks.processed'); if tonumber(total_tasks_processed) >= tonumber( total_tasks_sent) then redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.status', 'FINISHED'); redis.call('HDEL', 'xcute:locks', lock); end; else local request_pause = redis.call( 'HGET', info_key, 'job.request_pause'); if request_pause == 'True' then -- if waiting pause, pause the job redis.call('HMSET', info_key, 'job.status', 'PAUSED', 'job.request_pause', 'False'); local orchestrator_id = redis.call( 'HGET', info_key, 'orchestrator.id'); redis.call( 'SREM', 'xcute:orchestrator:jobs:' .. orchestrator_id, job_id); end; end; """ + _lua_update_mtime + """ return {nb_tasks_sent, redis.call('HGET', info_key, 'job.status')}; """ lua_update_tasks_processed = """ local function get_counters(tbl, first, last) local sliced = {} for i = first or 1, last or #tbl, 2 do sliced[tbl[i]] = tbl[i+1]; end; return sliced; end; local mtime = KEYS[1]; local job_id = KEYS[2]; local counters = get_counters(KEYS, 3, nil); local tasks_processed = ARGV; local info = redis.call('HMGET', 'xcute:job:info:' .. job_id, 'job.status', 'job.lock'); local status = info[1]; local lock = info[2]; if status == nil or status == false then return redis.error_reply('no_job'); end; local nb_tasks_processed = redis.call( 'SREM', 'xcute:tasks:running:' .. job_id, unpack(tasks_processed)); local total_tasks_processed = redis.call( 'HINCRBY', 'xcute:job:info:' .. job_id, 'tasks.processed', nb_tasks_processed); for key, value in pairs(counters) do redis.call('HINCRBY', 'xcute:job:info:' .. job_id, key, value); end; local finished = false; local all_tasks_sent = redis.call( 'HGET', 'xcute:job:info:' .. job_id, 'tasks.all_sent'); if all_tasks_sent == 'True' and status ~= 'FINISHED' then local total_tasks_sent = redis.call( 'HGET', 'xcute:job:info:' .. job_id, 'tasks.sent'); if tonumber(total_tasks_processed) >= tonumber( total_tasks_sent) then redis.call('HSET', 'xcute:job:info:' .. job_id, 'job.status', 'FINISHED'); redis.call('HDEL', 'xcute:locks', lock); finished = true; end; end; """ + _lua_update_mtime + """ return finished; """ lua_incr_total = """ local mtime = KEYS[1]; local job_id = KEYS[2]; local marker = KEYS[3]; local incr_by = KEYS[4]; local info_key = 'xcute:job:info:' .. job_id; local info = redis.call( 'HMGET', info_key, 'job.status', 'tasks.all_sent', 'tasks.is_total_temp'); local status = info[1]; local all_sent = info[2]; local is_total_temp = info[3]; if status == nil or status == false then return redis.error_reply('no_job'); end; local stop = false; if all_sent == 'True' then stop = true elseif is_total_temp == 'False' then stop = true else redis.call('HINCRBY', info_key, 'tasks.total', incr_by); redis.call('HSET', info_key, 'tasks.total_marker', marker); if status == 'PAUSED' or status == 'FAILED' then stop = true; end; end; """ + _lua_update_mtime + """ return stop; """ lua_total_tasks_done = """ local mtime = KEYS[1]; local job_id = KEYS[2]; local info_key = 'xcute:job:info:' .. job_id; local status = redis.call('HGET', info_key, 'job.status'); if status == nil or status == false then return redis.error_reply('no_job'); end; redis.call('HSET', info_key, 'tasks.is_total_temp', 'False'); local total_tasks = redis.call('HGET', info_key, 'tasks.total'); """ + _lua_update_mtime + """ return tonumber(total_tasks); """ lua_delete = """ local job_id = KEYS[1]; local info = redis.call('HMGET', 'xcute:job:info:' .. job_id, 'job.status', 'job.lock'); local status = info[1]; local lock = info[2]; if status == nil or status == false then return redis.error_reply('no_job'); end; if status == 'RUNNING' then return redis.error_reply('job_running'); end; if status == 'WAITING' then redis.call('LREM', 'xcute:waiting:jobs', 1, job_id); end; redis.call('ZREM', 'xcute:job:ids', job_id); redis.call('DEL', 'xcute:job:info:' .. job_id); redis.call('DEL', 'xcute:tasks:running:' .. job_id); local lock_job_id = redis.call('HGET', 'xcute:locks', lock); if lock_job_id == job_id then redis.call('HDEL', 'xcute:locks', lock); end; """ def __init__(self, conf, logger=None): self.conf = conf self.logger = logger or get_logger(self.conf) redis_conf = {k[6:]: v for k, v in self.conf.items() if k.startswith('redis_')} super(XcuteBackend, self).__init__(**redis_conf) self.script_create = self.register_script( self.lua_create) self.script_run_next = self.register_script( self.lua_run_next) self.script_free = self.register_script( self.lua_free) self.script_fail = self.register_script( self.lua_fail) self.script_request_pause = self.register_script( self.lua_request_pause) self.script_resume = self.register_script( self.lua_resume) self.script_update_tasks_sent = self.register_script( self.lua_update_tasks_sent) self.script_update_tasks_processed = self.register_script( self.lua_update_tasks_processed) self.script_incr_total = self.register_script( self.lua_incr_total) self.script_total_tasks_done = self.register_script( self.lua_total_tasks_done) self.script_delete = self.register_script( self.lua_delete)
[docs] def status(self): job_count = self.conn.zcard(self.key_job_ids) status = {'job_count': job_count} return status
[docs] def list_jobs(self, marker=None, limit=1000): limit = limit or self.DEFAULT_LIMIT jobs = list() while True: limit_ = limit - len(jobs) if limit_ <= 0: break range_min = '-' if marker: range_max = '(' + marker else: range_max = '+' job_ids = self.conn.zrevrangebylex( self.key_job_ids, range_max, range_min, 0, limit_) pipeline = self.conn.pipeline() for job_id in job_ids: self._get_job_info(job_id, client=pipeline) job_infos = pipeline.execute() for job_info in job_infos: if not job_info: # The job can be deleted between two requests continue jobs.append(self._unmarshal_job_info(job_info)) if len(job_ids) < limit_: break marker = job_id return jobs
def _get_timestamp(self): return Timestamp().normal
[docs] @handle_redis_exceptions def create(self, job_type, job_config, lock): job_id = datetime.utcnow().strftime('%Y%m%d%H%M%S%f') \ + '-%011x' % random.randrange(16**11) job_config = json.dumps(job_config) self.script_create( keys=[self._get_timestamp(), job_id, job_type, job_config, lock], client=self.conn) return job_id
[docs] def list_orchestrator_jobs(self, orchestrator_id): orchestrator_jobs_key = self.key_orchestrator_jobs % orchestrator_id job_ids = self.conn.smembers(orchestrator_jobs_key) pipeline = self.conn.pipeline() for job_id in job_ids: self._get_job_info(job_id, client=pipeline) job_infos = pipeline.execute() jobs = list() for job_info in job_infos: if not job_info: # The job can be deleted between two requests continue jobs.append(self._unmarshal_job_info(job_info)) return jobs
[docs] @handle_redis_exceptions def run_next(self, orchestrator_id): job_info = self.script_run_next( keys=[self._get_timestamp(), orchestrator_id], client=self.conn) if not job_info: return None job_info = self._unmarshal_job_info( self._lua_array_to_dict(job_info)) return job_info
[docs] @handle_redis_exceptions def free(self, job_id): self.script_free( keys=[self._get_timestamp(), job_id], client=self.conn)
[docs] @handle_redis_exceptions def fail(self, job_id): self.script_fail( keys=[self._get_timestamp(), job_id], client=self.conn)
[docs] @handle_redis_exceptions def request_pause(self, job_id): self.script_request_pause( keys=[self._get_timestamp(), job_id], client=self.conn)
[docs] @handle_redis_exceptions def resume(self, job_id): self.script_resume( keys=[self._get_timestamp(), job_id], client=self.conn)
[docs] @handle_redis_exceptions def update_tasks_sent(self, job_id, task_ids, all_tasks_sent=False): nb_tasks_sent, status = self.script_update_tasks_sent( keys=[self._get_timestamp(), job_id, str(all_tasks_sent)], args=task_ids, client=self.conn) if nb_tasks_sent != len(task_ids): self.logger.warn('%s tasks were sent several times', len(task_ids) - nb_tasks_sent) return status
[docs] @handle_redis_exceptions def update_tasks_processed(self, job_id, task_ids, task_errors, task_results): counters = dict() if task_errors: total_errors = 0 for key, value in task_errors.items(): total_errors += value counters['errors.' + key] = value counters['errors.total'] = total_errors if task_results: for key, value in task_results.items(): counters['results.' + key] = value return self.script_update_tasks_processed( keys=[self._get_timestamp(), job_id] + self._dict_to_lua_array(counters), args=task_ids, client=self.conn)
[docs] @handle_redis_exceptions def incr_total_tasks(self, job_id, total_marker, tasks_incr): return self.script_incr_total( keys=[self._get_timestamp(), job_id, total_marker, tasks_incr])
[docs] @handle_redis_exceptions def total_tasks_done(self, job_id): return self.script_total_tasks_done( keys=[self._get_timestamp(), job_id])
[docs] @handle_redis_exceptions def delete(self, job_id): self.script_delete(keys=[job_id])
[docs] @handle_redis_exceptions def get_job_info(self, job_id): job_info = self._get_job_info(job_id, client=self.conn) if not job_info: raise redis.exceptions.ResponseError('no_job') return self._unmarshal_job_info(job_info)
def _get_job_info(self, job_id, client): return client.hgetall(self.key_job_info % job_id)
[docs] @handle_redis_exceptions def list_locks(self): locks = self.conn.hgetall(self.key_locks) return [ dict(lock=lock[0], job_id=lock[1]) for lock in sorted(locks.items()) ]
[docs] @handle_redis_exceptions def get_lock_info(self, lock): job_id = self.conn.hget(self.key_locks, lock) return dict(lock=lock, job_id=job_id)
@staticmethod def _unmarshal_job_info(marshalled_job_info): job_info = dict( job=dict(), orchestrator=dict(), tasks=dict(), errors=dict(), results=dict(), config=dict()) for key, value in marshalled_job_info.items(): split_key = key.decode('utf-8').split('.', 1) value = value.decode('utf-8') if len(split_key) == 1: job_info[split_key[0]] = value else: job_info[split_key[0]][split_key[1]] = value job_main_info = job_info['job'] job_main_info['ctime'] = float(job_main_info['ctime']) job_main_info['mtime'] = float(job_main_info['mtime']) job_main_info['request_pause'] = true_value( job_main_info['request_pause']) job_tasks = job_info['tasks'] job_tasks['sent'] = int(job_tasks['sent']) job_tasks.setdefault('last_sent') job_tasks['all_sent'] = true_value(job_tasks['all_sent']) job_tasks['processed'] = int(job_tasks['processed']) # To have a coherent total if the estimate was not correct if job_tasks['all_sent']: job_tasks['total'] = job_tasks['sent'] else: job_tasks['total'] = max(job_tasks['sent'], int(job_tasks['total'])) job_tasks['is_total_temp'] = true_value( job_tasks['is_total_temp']) job_tasks.setdefault('total_marker') job_errors = job_info['errors'] for key, value in job_errors.items(): job_errors[key] = int(value) job_results = job_info.get('results', dict()) for key, value in job_results.items(): job_results[key] = int(value) job_info['config'] = json.loads(job_info['config']) return job_info @staticmethod def _lua_array_to_dict(array): it = iter(array) return dict(zip(*([it] * 2))) @staticmethod def _dict_to_lua_array(dict_): array = list() for key, value in dict_.items(): array.append(key) array.append(value) return array