Source code for oio.xcute.orchestrator

# Copyright (C) 2019 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.

import math
import random
from collections import OrderedDict
from redis import ConnectionError as RedisConnectionError, \
    TimeoutError as RedisTimeoutError

from oio.common.easy_value import int_value
from oio.common.exceptions import OioTimeout
from oio.common.logger import get_logger
from oio.common.green import ratelimit, sleep, threading
from oio.common.json import json
from oio.conscience.client import ConscienceClient
from oio.event.beanstalk import Beanstalk, BeanstalkdListener, \
    ConnectionError, DEFAULT_TTR
from oio.event.evob import EventTypes
from oio.xcute.common.backend import XcuteBackend
from oio.xcute.jobs import JOB_TYPES


[docs]class XcuteOrchestrator(object): DEFAULT_DISPATCHER_TIMEOUT = 2 DEFAULT_REFRESH_TIME_BEANSTALKD_WORKERS = 30 DEFAULT_MAX_JOBS_PER_BEANSTALKD = 1024 def __init__(self, conf, logger=None): self.conf = conf self.logger = logger or get_logger(self.conf) self.backend = XcuteBackend(self.conf, logger=self.logger) self.conscience_client = ConscienceClient(self.conf) self.orchestrator_id = self.conf.get('orchestrator_id') if not self.orchestrator_id: raise ValueError('Missing orchestrator ID') self.logger.info('Using orchestrator ID: %s', self.orchestrator_id) self.beanstalkd_workers_tube = self.conf.get('beanstalkd_workers_tube') if not self.beanstalkd_workers_tube: raise ValueError('Missing beanstalkd workers tube') self.logger.info('Using beanstalkd workers tube: %s', self.beanstalkd_workers_tube) self.beanstalkd_reply_addr = self.conf.get('beanstalkd_reply_addr') if not self.beanstalkd_reply_addr: raise ValueError('Missing beanstalkd reply address') self.beanstalkd_reply_tube = self.conf.get( 'beanstalkd_reply_tube', self.beanstalkd_workers_tube + '.reply') self.logger.info('Using beanstalkd reply : %s %s', self.beanstalkd_reply_addr, self.beanstalkd_reply_tube) self.refresh_time_beanstalkd_workers = int_value( self.conf.get('refresh_time_beanstalkd_workers'), self.DEFAULT_REFRESH_TIME_BEANSTALKD_WORKERS) self.max_jobs_per_beanstalkd = int_value( self.conf.get('max_jobs_per_beanstalkd'), self.DEFAULT_MAX_JOBS_PER_BEANSTALKD) self.running = True self.beanstalkd_workers = dict() self.refresh_beanstalkd_workers_thread = None self.listen_beanstalkd_reply_thread = None self.dispatch_tasks_threads = dict() self.compute_total_tasks_threads = dict()
[docs] def handle_backend_errors(self, func, *args, **kwargs): while True: try: return func(*args, **kwargs), None except (RedisConnectionError, RedisTimeoutError) as exc: self.logger.warn( 'Fail to communicate with redis: %s', exc) if not self.running: return None, exc sleep(1)
[docs] def safe_run_forever(self): try: self.run_forever() except Exception as exc: self.logger.exception('Fail to run forever: %s', exc) self.exit_gracefully() if self.refresh_beanstalkd_workers_thread: self.refresh_beanstalkd_workers_thread.join() if self.listen_beanstalkd_reply_thread: self.listen_beanstalkd_reply_thread.join() for dispatch_tasks_thread in self.dispatch_tasks_threads.values(): dispatch_tasks_thread.join() for compute_total_tasks_thread \ in self.compute_total_tasks_threads.values(): compute_total_tasks_thread.join() self.logger.info('Exited running thread')
[docs] def run_forever(self): """ Take jobs from the queue and spawn threads to dispatch them """ # gather beanstalkd info self.refresh_beanstalkd_workers_thread = threading.Thread( target=self.refresh_beanstalkd_workers_forever) self.refresh_beanstalkd_workers_thread.start() # start processing replies self.listen_beanstalkd_reply_thread = threading.Thread( target=self.listen_beanstalkd_reply_forever) self.listen_beanstalkd_reply_thread.start() if not self.running: return # restart running jobs self.logger.debug('Look for unfinished jobs') orchestrator_jobs, exc = self.handle_backend_errors( self.backend.list_orchestrator_jobs, self.orchestrator_id) if exc is not None: self.logger.warn( 'Unable to list running jobs for this orchestrator: %s', exc) return for job_info in orchestrator_jobs: if not self.running: return self.safe_handle_running_job(job_info) # run next jobs while self.running: sleep(1) job_info, exc = self.handle_backend_errors( self.backend.run_next, self.orchestrator_id) if exc is not None: self.logger.warn('Unable to run next job: %s', exc) return if not job_info: continue self.safe_handle_running_job(job_info)
[docs] def safe_handle_running_job(self, job_info): try: job_id = job_info['job']['id'] job_type = job_info['job']['type'] self.logger.info('Run job %s: %s', job_id, job_type) self.handle_running_job(job_id, job_type, job_info) except Exception as exc: self.logger.exception('Failed to run job %s: %s', job_id, exc) _, exc = self.handle_backend_errors( self.backend.fail, job_id) if exc is not None: self.logger.warn( '[job_id=%s] Job has not been updated ' 'with the failure: %s', job_id, exc)
[docs] def handle_running_job(self, job_id, job_type, job_info): """ First launch the computation of total number of tasks, then launch the dispatchnig of all tasks across the platform. """ if job_info['tasks']['all_sent']: self.logger.info( '[job_id=%s] All tasks are already sent', job_id) return job_class = JOB_TYPES[job_type] job = job_class(self.conf, logger=self.logger) if job_info['tasks']['total'] == 0 \ and job_info['tasks']['is_total_temp'] \ and job_info['tasks']['sent'] == 0 \ and not job_info['tasks']['all_sent']: job.prepare(job_info['config']['params']) if job_id in self.compute_total_tasks_threads: self.logger.info( '[job_id=%s] Already computing the total number of tasks', job_id) elif job_info['tasks']['is_total_temp']: compute_total_tasks_thread = threading.Thread( target=self.safe_compute_total_tasks, args=(job_id, job_type, job_info, job)) compute_total_tasks_thread.start() self.compute_total_tasks_threads[job_id] = \ compute_total_tasks_thread else: self.logger.info( '[job_id=%s] The total number of tasks is already computed', job_id) if job_id in self.dispatch_tasks_threads: self.logger.warning( '[job_id=%s] Already dispatching the tasks', job_id) else: dispatch_tasks_thread = threading.Thread( target=self.safe_dispatch_tasks, args=(job_id, job_type, job_info, job)) dispatch_tasks_thread.start() self.dispatch_tasks_threads[job_id] = dispatch_tasks_thread
[docs] def safe_dispatch_tasks(self, job_id, job_type, job_info, job): """ Dispatch all tasks across the platform and update the backend. """ try: self.logger.info( '[job_id=%s] Start to dispatch tasks', job_id) self.dispatch_tasks(job_id, job_type, job_info, job) self.logger.info( '[job_id=%s] Finish to dispatch tasks', job_id) except Exception as exc: self.logger.exception( '[job_id=%s] Fail to dispatch tasks: %s', job_id, exc) _, exc = self.handle_backend_errors( self.backend.fail, job_id) if exc is not None: self.logger.warn( '[job_id=%s] Job has not been updated ' 'with the failure: %s', job_id, exc) finally: del self.dispatch_tasks_threads[job_id]
[docs] def dispatch_tasks(self, job_id, job_type, job_info, job): job_config = job_info['config'] job_params = job_config['params'] tasks_per_second = job_config['tasks_per_second'] tasks_batch_size = job_config['tasks_batch_size'] last_task_id = job_info['tasks']['last_sent'] job_tasks = job.get_tasks(job_params, marker=last_task_id) beanstalkd_workers = self.get_beanstalkd_workers() tasks_run_time = 0 batch_per_second = tasks_per_second / float( tasks_batch_size) # The backend must have the tasks in order # to know the last task sent tasks = OrderedDict() for task_id, task_payload in job_tasks: if not self.running: break tasks[task_id] = task_payload if len(tasks) < tasks_batch_size: continue tasks_run_time = ratelimit( tasks_run_time, batch_per_second) sent = self.dispatch_tasks_batch( beanstalkd_workers, job_id, job_type, job_config, tasks) if sent: job_status, exc = self.handle_backend_errors( self.backend.update_tasks_sent, job_id, tasks.keys()) tasks.clear() if exc is not None: self.logger.warn( '[job_id=%s] Job has not been updated ' 'with the sent tasks: %s', job_id, exc) break if job_status == 'PAUSED': self.logger.info('Job %s is paused', job_id) return if not self.running: break else: sent = True if tasks: sent = self.dispatch_tasks_batch( beanstalkd_workers, job_id, job_type, job_config, tasks) if sent: job_status, exc = self.handle_backend_errors( self.backend.update_tasks_sent, job_id, tasks.keys(), all_tasks_sent=True) if exc is None: if job_status == 'FINISHED': self.logger.info('Job %s is finished', job_id) self.logger.info( 'Finished dispatching job (job_id=%s)', job_id) return else: self.logger.warn( '[job_id=%s] Job has not been updated ' 'with the last sent tasks: %s', job_id, exc) _, exc = self.handle_backend_errors(self.backend.free, job_id) if exc is not None: self.logger.warn( '[job_id=%s] Job has not been freed: %s', job_id, exc)
[docs] def dispatch_tasks_batch(self, beanstalkd_workers, job_id, job_type, job_config, tasks): """ Try sending a task until it's ok """ beanstalkd_payload = self.make_beanstalkd_payload( job_id, job_type, job_config, tasks) if len(beanstalkd_payload) > 2**16: raise ValueError('Task payload is too big (length=%s)' % len(beanstalkd_payload)) # max 2 minutes per task ttr = len(tasks) * DEFAULT_TTR while self.running: for beanstalkd_worker in beanstalkd_workers: if not self.running: return False if beanstalkd_worker is not None: break try: beanstalkd_worker.put(beanstalkd_payload, ttr=ttr) self.logger.debug( '[job_id=%s] Tasks sent to %s: %s', job_id, beanstalkd_worker.addr, str(tasks)) return True except Exception as exc: self.logger.warn( '[job_id=%s] Fail to send beanstalkd job: %s', job_id, exc) # TODO(adu): We could be more lenient # and wait for a few errors in a row # to happen before marking it as broken. beanstalkd_worker.is_broken = True return False
[docs] def make_beanstalkd_payload(self, job_id, job_type, job_config, tasks): return json.dumps( { 'event': EventTypes.XCUTE_TASKS, 'data': { 'job_id': job_id, 'job_type': job_type, 'job_config': job_config, 'tasks': tasks, 'beanstalkd_reply': { 'addr': self.beanstalkd_reply_addr, 'tube': self.beanstalkd_reply_tube, }, } })
[docs] def safe_compute_total_tasks(self, job_id, job_type, job_info, job): """ Compute the total number of tasks and update the backend. """ try: self.logger.info( '[job_id=%s] Start to compute the total number of tasks', job_id) self.compute_total_tasks(job_id, job_type, job_info, job) self.logger.info( '[job_id=%s] Finish to compute the total number of tasks', job_id) except Exception as exc: self.logger.exception( '[job_id=%s] Fail to compute the total number of tasks: %s', job_id, exc) finally: del self.compute_total_tasks_threads[job_id]
[docs] def compute_total_tasks(self, job_id, job_type, job_info, job): job_params = job_info['config']['params'] total_marker = job_info['tasks']['total_marker'] tasks_counter = job.get_total_tasks( job_params, marker=total_marker) for total_marker, tasks_incr in tasks_counter: stop, exc = self.handle_backend_errors( self.backend.incr_total_tasks, job_id, total_marker, tasks_incr) if exc is not None: self.logger.warn( '[job_id=%s] Job has not been updated ' 'with total tasks: %s', job_id, exc) return if stop or not self.running: return total_tasks, exc = self.handle_backend_errors( self.backend.total_tasks_done, job_id) if exc is not None: self.logger.warn( '[job_id=%s] Job has not been updated ' 'with last total tasks: %s', job_id, exc) return self.logger.info( '[job_id=%s] %s estimated tasks', job_id, total_tasks)
[docs] def listen_beanstalkd_reply_forever(self): """ Process this orchestrator's job replies """ self.logger.info('Connecting to the reply beanstalkd') while self.running: try: listener = BeanstalkdListener( addr=self.beanstalkd_reply_addr, tube=self.beanstalkd_reply_tube, logger=self.logger) break except ConnectionError: self.logger.error('Failed to connect to the reply beanstalkd') sleep(5) self.logger.info('Listening to replies on %s (tube=%s)', self.beanstalkd_reply_addr, self.beanstalkd_reply_tube) # keep the job results in memory while self.running: connection_error = self.listen_loop(listener) # in case of a beanstalkd connection error # sleep to avoid spamming if connection_error: sleep(2) self.logger.info('Exited listening thread')
[docs] def listen_loop(self, listener): """ One iteration of the listening loop """ connection_error = False try: replies = listener.fetch_job( self.process_reply, timeout=self.DEFAULT_DISPATCHER_TIMEOUT) # to force the execution of process_reply # if there were no replies, consider it as a connection error connection_error = len(list(replies)) == 0 except OioTimeout: pass return connection_error
[docs] def process_reply(self, beanstalkd_job_id, encoded_reply): reply = json.loads(encoded_reply) job_id = reply['job_id'] task_ids = reply['task_ids'] task_results = reply['task_results'] task_errors = reply['task_errors'] self.logger.debug('Tasks processed (job_id=%s): %s', job_id, task_ids) try: finished, exc = self.handle_backend_errors( self.backend.update_tasks_processed, job_id, task_ids, task_errors, task_results) if exc is None: if finished: self.logger.info('Job %s is finished', job_id) else: self.logger.warn( '[job_id=%s] Job has not been updated ' 'with the processed tasks: %s', job_id, exc) except Exception: self.logger.exception('Error processing reply') yield None
[docs] def refresh_beanstalkd_workers_forever(self): """ Refresh beanstalkd workers by looking at the score, existing tubes and tube statistics. """ while self.running: try: beanstalkd_workers = self._find_beanstalkd_workers() except Exception as exc: self.logger.error( 'Fail to find beanstalkd workers: %s', exc) # TODO(adu): We could keep trying to send jobs # to the beanstalkd we already found. # But we need the score to know how to dispatch the tasks... beanstalkd_workers = dict() old_beanstalkd_workers_addr = set(self.beanstalkd_workers.keys()) new_beanstalkd_workers_addr = set(beanstalkd_workers.keys()) added_beanstalkds = new_beanstalkd_workers_addr \ - old_beanstalkd_workers_addr for beanstalkd_addr in added_beanstalkds: self.logger.info('Add beanstalkd %s' % beanstalkd_addr) beanstalkd = beanstalkd_workers[beanstalkd_addr] beanstalkd.use(self.beanstalkd_workers_tube) removed_beanstalkds = old_beanstalkd_workers_addr \ - new_beanstalkd_workers_addr for beanstalkd_addr in removed_beanstalkds: self.logger.info('Remove beanstalkd %s' % beanstalkd_addr) self.logger.info('Refresh beanstalkd workers') self.beanstalkd_workers = beanstalkd_workers for _ in range(self.refresh_time_beanstalkd_workers): if not self.running: break sleep(1) self.logger.info('Exited beanstalkd workers thread')
def _find_beanstalkd_workers(self): """ Find beanstalkd workers by looking at the score, existing tubes and tube statistics. """ all_beanstalkd = self.conscience_client.all_services( 'beanstalkd') beanstalkd_workers = dict() for beanstalkd_info in all_beanstalkd: try: beanstalkd = self._check_beanstalkd_worker(beanstalkd_info) if not beanstalkd: continue beanstalkd_workers[beanstalkd.addr] = beanstalkd except Exception as exc: self.logger.error('Fail to check beanstalkd: %s', exc) return beanstalkd_workers def _check_beanstalkd_worker(self, beanstalkd_info): """ Check beanstalkd worker by looking at the score, existing tubes and tube statistics. """ beanstalkd_addr = 'beanstalk://' + beanstalkd_info['addr'] beanstalkd_score = beanstalkd_info['score'] if beanstalkd_score == 0: self.logger.debug( 'Ignore beanstalkd %s: score=0', beanstalkd_addr) return None beanstalkd = self.beanstalkd_workers.get(beanstalkd_addr) if not beanstalkd: beanstalkd = Beanstalk.from_url(beanstalkd_addr) beanstalkd.addr = beanstalkd_addr beanstalkd_tubes = beanstalkd.tubes() if self.beanstalkd_workers_tube not in beanstalkd_tubes: self.logger.debug( 'Ignore beanstalkd %s: ' 'No worker has ever listened to the tube %s', beanstalkd_addr, self.beanstalkd_workers_tube) return None current_stats = beanstalkd.stats_tube( self.beanstalkd_workers_tube) beanstalkd_jobs_ready = current_stats['current-jobs-ready'] if beanstalkd_jobs_ready > 0: beanstalkd_jobs_reserved = current_stats['current-jobs-reserved'] if beanstalkd_jobs_reserved <= 0: self.logger.warn( 'Ignore beanstalkd %s: The worker doesn\'t process task ' '(current-jobs-ready=%d, current-jobs-reserved=%d)', beanstalkd_addr, beanstalkd_jobs_ready, beanstalkd_jobs_reserved) return None if beanstalkd_jobs_ready >= self.max_jobs_per_beanstalkd: self.logger.warn( 'Ignore beanstalkd %s: The queue is full ' '(current-jobs-ready=%d, current-jobs-reserved=%d)', beanstalkd_addr, beanstalkd_jobs_ready, beanstalkd_jobs_reserved) return None if hasattr(beanstalkd, 'is_broken') and beanstalkd.is_broken: self.logger.info( 'Beanstalkd %s was broken, and now it\'s coming back', beanstalkd_addr) beanstalkd.is_broken = False # Favor the workers with a good score # 50% -> beanstalkd score worker_score = beanstalkd_score * 50. / 100. # 50% -> beanstalkd tube size worker_score += 50 - (beanstalkd_jobs_ready * 50. / self.max_jobs_per_beanstalkd) beanstalkd.occurrence = int(math.ceil(worker_score / 10.)) self.logger.debug( 'Give the green light to beanstalkd %s (worker_score=%d)', beanstalkd_addr, worker_score) return beanstalkd
[docs] def get_beanstalkd_workers(self): """ Yield beanstalkd workers following a loadbalancing strategy """ beanstalkd_workers_id = None beanstalkd_workers = list() while True: if not self.beanstalkd_workers: self.logger.info('No beanstalkd worker available') sleep(1) yield None continue if id(self.beanstalkd_workers) != beanstalkd_workers_id: beanstalkd_workers_id = id(self.beanstalkd_workers) beanstalkd_workers = list() for beanstalkd in self.beanstalkd_workers.values(): for _ in range(beanstalkd.occurrence): beanstalkd_workers.append(beanstalkd) # Shuffle to not have the same suite for all jobs random.shuffle(beanstalkd_workers) yielded = False for beanstalkd_worker in beanstalkd_workers: if id(self.beanstalkd_workers) != beanstalkd_workers_id: break if beanstalkd_worker.is_broken: continue yield beanstalkd_worker yielded = True else: if not yielded: self.logger.info( 'All beanstalkd workers available are broken') sleep(1) yield None
[docs] def exit_gracefully(self, *args, **kwargs): if self.running: self.logger.info('Exiting gracefully') self.running = False else: self.logger.info('Already exiting gracefully')