Source code for oio.common.tool

# Copyright (C) 2019 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import signal

from oio.common.easy_value import int_value
from oio.common.exceptions import OioException, OioTimeout, RetryLater
from oio.common.green import ContextPool, eventlet, ratelimit, sleep, \
    threading, time
from oio.common.json import json
from oio.common.logger import get_logger
from oio.conscience.client import ConscienceClient
from oio.event.beanstalk import Beanstalk, BeanstalkdListener, \
    BeanstalkdSender


DISTRIBUTED_DISPATCHER_TIMEOUT = 300


[docs]class Tool(object): """ Process all found items. For the task_res variable, the following format must be respected: (item, info, error). """ DEFAULT_BEANSTALKD_WORKER_TUBE = 'oio-process' DEFAULT_REPORT_INTERVAL = 3600 DEFAULT_RETRY_DELAY = 3600 DEFAULT_ITEM_PER_SECOND = 30 DEFAULT_CONCURRENCY = 1 DEFAULT_DISTRIBUTED_BEANSTALKD_WORKER_TUBE = 'oio-process' def __init__(self, conf, beanstalkd_addr=None, logger=None): self.conf = conf self.logger = logger or get_logger(self.conf) self.namespace = conf['namespace'] self.success = True # exit gracefully self.running = True signal.signal(signal.SIGINT, self.exit_gracefully) signal.signal(signal.SIGTERM, self.exit_gracefully) # counters self.items_processed = 0 self.total_items_processed = 0 self.errors = 0 self.total_errors = 0 self.total_expected_items = None # report self.start_time = 0 self.last_report = 0 self.report_interval = int_value(self.conf.get( 'report_interval'), self.DEFAULT_REPORT_INTERVAL) # dispatcher self.dispatcher = None # input self.beanstalkd = None if beanstalkd_addr: self.beanstalkd = BeanstalkdListener( beanstalkd_addr, self.conf.get('beanstalkd_worker_tube') or self.DEFAULT_BEANSTALKD_WORKER_TUBE, self.logger) # retry self.retryer = None self.retry_queue = None if self.beanstalkd: self.retryer = BeanstalkdSender( self.beanstalkd.addr, self.beanstalkd.tube, self.logger) self.retry_queue = eventlet.Queue() self.retry_delay = int_value(self.conf.get('retry_delay'), self.DEFAULT_RETRY_DELAY)
[docs] @staticmethod def items_from_task_event(task_event): """ Convert the task event into a list (generator) of items. """ raise NotImplementedError()
[docs] @staticmethod def task_event_from_item(item): """ Convert the item into a task event. """ raise NotImplementedError()
[docs] @staticmethod def tasks_res_from_res_event(res_event): """ Convert the result event into a list (generator) of tasks result. """ raise NotImplementedError()
[docs] @staticmethod def res_event_from_task_res(task_res): """ Convert the task result into a result event. """ raise NotImplementedError()
[docs] @staticmethod def string_from_item(item): """ Convert the item into a string. """ raise NotImplementedError()
[docs] def exit_gracefully(self, signum, frame): self.logger.info( 'Stop sending and wait for all results already sent') self.success = False self.running = False if self.beanstalkd: self.beanstalkd.running = False
def _item_with_beanstalkd_reply_from_task_event(self, job_id, data): task_event = json.loads(data) beanstalkd_reply = task_event.get('beanstalkd_reply') items = self.items_from_task_event(task_event) for item in items: yield (item, beanstalkd_reply) def _fetch_items_with_beanstalkd_reply_from_beanstalkd(self): # Do not block more than 2 seconds return self.beanstalkd.fetch_jobs( self._item_with_beanstalkd_reply_from_task_event, reserve_timeout=2) def _fetch_items(self): """ Fetch items from inputs (other than the beanstalkd). """ raise NotImplementedError() def _fetch_items_with_beanstalkd_reply(self): items = self._fetch_items() for item in items: yield (item, None)
[docs] def fetch_items_with_beanstalkd_reply(self): """ Fetch items with beanstalkd reply (useful if the task is distributed). """ if self.beanstalkd: return self._fetch_items_with_beanstalkd_reply_from_beanstalkd() return self._fetch_items_with_beanstalkd_reply()
[docs] def update_counters(self, task_res): """ Update all counters of the tool. """ _, _, error = task_res self.items_processed += 1 if error is not None: self.errors += 1
def _update_total_counters(self): items_processed = self.items_processed self.items_processed = 0 self.total_items_processed += items_processed errors = self.errors self.errors = 0 self.total_errors += errors return items_processed, self.total_items_processed, \ errors, self.total_errors def _get_report(self, status, end_time, counters): raise NotImplementedError()
[docs] def log_report(self, status, force=False): """ Log a report with a fixed interval. """ end_time = time.time() if force or (end_time - self.last_report >= self.report_interval): counters = self._update_total_counters() self.logger.info(self._get_report(status, end_time, counters)) self.last_report = end_time
[docs] def create_worker(self, queue_workers, queue_reply): """ Create worker to process the items. """ raise NotImplementedError()
[docs] def prepare_local_dispatcher(self): """ The tool will dispatch the tasks locally. """ self.dispatcher = _LocalDispatcher(self.conf, self)
[docs] def prepare_distributed_dispatcher(self): """ The tool will dispatch the tasks on the platform. """ self.dispatcher = _DistributedDispatcher( self.conf, self)
def _load_total_expected_items(self): raise NotImplementedError() def _read_retry_queue(self): if self.retry_queue is None: return while True: # Reschedule jobs we were not able to handle. item = self.retry_queue.get() if self.retryer: sent = False while not sent: sent = self.retryer.send_job( json.dumps(self.task_event_from_item(item)), delay=self.retry_delay) if not sent: sleep(1.0) self.retryer.job_done() self.retry_queue.task_done()
[docs] def run(self): """ Start processing all found items. """ if self.dispatcher is None: raise ValueError('No dispatcher') eventlet.spawn_n(self._load_total_expected_items) # spawn one worker for the retry queue eventlet.spawn_n(self._read_retry_queue) for task_res in self.dispatcher.run(): yield task_res # block until the retry queue is empty if self.retry_queue: self.retry_queue.join()
[docs] def is_success(self): """ Check if there are any errors. """ if not self.success: return False if self.total_items_processed == 0: self.logger.warn('No item to process') return self.total_errors == 0
[docs]class ToolWorker(object): """ Process all items given by the tool. """ def __init__(self, tool, queue_workers, queue_reply): self.tool = tool self.conf = self.tool.conf self.logger = self.tool.logger self.queue_workers = queue_workers self.queue_reply = queue_reply # reply self.beanstalkd_reply = None def _process_item(self, item): raise NotImplementedError() def _reply_task_res(self, beanstalkd_reply, task_res): self.queue_reply.put(task_res) if beanstalkd_reply is None: return res_event = self.tool.res_event_from_task_res(task_res) if self.tool.beanstalkd is not None: res_event['beanstalkd_worker'] = \ { 'addr': self.tool.beanstalkd.addr, 'tube': self.tool.beanstalkd.tube } try: if self.beanstalkd_reply is None \ or self.beanstalkd_reply.addr != beanstalkd_reply['addr'] \ or self.beanstalkd_reply.tube != beanstalkd_reply['tube']: if self.beanstalkd_reply is not None: self.beanstalkd_reply.close() self.beanstalkd_reply = BeanstalkdSender( beanstalkd_reply['addr'], beanstalkd_reply['tube'], self.logger) sent = False event_json = json.dumps(res_event) # This will loop forever if there is a connection issue with the # beanstalkd server. We chose to let it loop until someone fixes # the problem (or the problem resolves by magic). while not sent: sent = self.beanstalkd_reply.send_job(event_json) if not sent: sleep(1.0) self.beanstalkd_reply.job_done() except Exception as exc: # pylint: disable=broad-except item, info, error = task_res self.logger.warn( 'Beanstalkd reply failed %s (info=%s error=%s): %s', self.tool.string_from_item(item), str(info), error, exc)
[docs] def run(self): """ Starting processing all items given by the tool. """ while True: item_with_beanstalkd_reply = self.queue_workers.get() if item_with_beanstalkd_reply is None: # end signal break item, beanstalkd_reply = item_with_beanstalkd_reply info = None error = None try: info = self._process_item(item) except RetryLater as exc: # Schedule a retry only if the sender did not set reply address # (rebuild CLIs set reply address, meta2 does not). if self.tool.retry_queue and not beanstalkd_reply: self.logger.warn( "Putting an item (%s) in the retry queue: %s", self.tool.string_from_item(item), exc.args[0]) self.tool.retry_queue.put(item) else: error = str(exc.args[0]) except Exception as exc: # pylint: disable=broad-except error = str(exc) task_res = (item, info, error) self._reply_task_res(beanstalkd_reply, task_res) self.queue_workers.task_done()
class _Dispatcher(object): """ Dispatch tasks. """ def __init__(self, conf, tool): self.conf = conf self.tool = tool self.logger = self.tool.logger def run(self): """ Start dispatching tasks. :returns: the list (generator) of processed tasks """ raise NotImplementedError() class _LocalDispatcher(_Dispatcher): """ Dispatch tasks locally. """ def __init__(self, conf, tool): super(_LocalDispatcher, self).__init__(conf, tool) concurrency = int_value(self.conf.get( 'concurrency'), self.tool.DEFAULT_CONCURRENCY) self.max_items_per_second = int_value(self.conf.get( 'items_per_second'), self.tool.DEFAULT_ITEM_PER_SECOND) if self.max_items_per_second > 0: # Max 2 seconds in advance queue_size = self.max_items_per_second * 2 else: queue_size = concurrency * 64 self.queue_workers = eventlet.Queue(queue_size) self.queue_reply = eventlet.Queue() self.workers = list() for _ in range(concurrency): worker = self.tool.create_worker( self.queue_workers, self.queue_reply) self.workers.append(worker) def _fill_queue(self): """ Fill the queue. """ items_run_time = 0 try: items_with_beanstalkd_reply = \ self.tool.fetch_items_with_beanstalkd_reply() for item_with_beanstalkd_reply in items_with_beanstalkd_reply: items_run_time = ratelimit(items_run_time, self.max_items_per_second) self.queue_workers.put(item_with_beanstalkd_reply) if not self.tool.running: break except Exception as exc: if self.tool.running: self.logger.error("Failed to fill queue: %s", exc) self.tool.success = False def _fill_queue_and_wait_all_items(self): """ Fill the queue and wait for all items to be processed. """ self._fill_queue() self.queue_workers.join() for _ in self.workers: self.queue_workers.put(None) self.queue_reply.put(None) def run(self): self.tool.start_time = self.tool.last_report = time.time() self.tool.log_report('START', force=True) try: with ContextPool(len(self.workers) + 1) as pool: # spawn workers for worker in self.workers: pool.spawn(worker.run) # spawn one worker to fill the queue pool.spawn(self._fill_queue_and_wait_all_items) # with the main thread while True: task_res = self.queue_reply.get() if task_res is None: # end signal break self.tool.update_counters(task_res) yield task_res self.tool.log_report('RUN') except Exception: # pylint: disable=broad-except self.logger.exception('ERROR in local dispatcher') self.tool.success = False self.tool.log_report('DONE', force=True)
[docs]def locate_tube(services, tube): """ Get a list of beanstalkd services hosting the specified tube. :param services: known beanstalkd services. :type services: iterable of dictionaries :param tube: the tube to locate. :returns: a list of beanstalkd services hosting the the specified tube. :rtype: `list` of `dict` """ available = list() for bsd in services: tubes = Beanstalk.from_url( 'beanstalk://' + bsd['addr']).tubes() if tube in tubes: available.append(bsd) return available
class _DistributedDispatcher(_Dispatcher): """ Dispatch tasks on the platform. """ def __init__(self, conf, tool): super(_DistributedDispatcher, self).__init__(conf, tool) self.sending = None self.max_items_per_second = int_value(self.conf.get( 'items_per_second'), self.tool.DEFAULT_ITEM_PER_SECOND) # All available beanstalkd conscience_client = ConscienceClient(self.conf) all_beanstalkd = conscience_client.all_services('beanstalkd') all_available_beanstalkd = dict() for beanstalkd in all_beanstalkd: if beanstalkd['score'] <= 0: continue all_available_beanstalkd[beanstalkd['addr']] = beanstalkd if not all_available_beanstalkd: raise OioException('No beanstalkd available') # Beanstalkd workers workers_tube = self.conf.get('distributed_beanstalkd_worker_tube') \ or self.tool.DEFAULT_DISTRIBUTED_BEANSTALKD_WORKER_TUBE self.beanstalkd_workers = dict() for beanstalkd in locate_tube(all_available_beanstalkd.values(), workers_tube): beanstalkd_worker = BeanstalkdSender( beanstalkd['addr'], workers_tube, self.logger) self.beanstalkd_workers[beanstalkd['addr']] = beanstalkd_worker self.logger.info( 'Beanstalkd %s using tube %s is selected as a worker', beanstalkd_worker.addr, beanstalkd_worker.tube) if not self.beanstalkd_workers: raise OioException('No beanstalkd worker available') nb_workers = len(self.beanstalkd_workers) if self.max_items_per_second > 0: # Max 2 seconds in advance queue_size_per_worker = self.max_items_per_second * 2 / nb_workers else: queue_size_per_worker = 64 for _, beanstalkd_worker in self.beanstalkd_workers.items(): beanstalkd_worker.low_limit = queue_size_per_worker / 2 beanstalkd_worker.high_limit = queue_size_per_worker # Beanstalkd reply beanstalkd_reply = dict() try: local_services = conscience_client.local_services() for local_service in local_services: if local_service['type'] != 'beanstalkd': continue beanstalkd = all_available_beanstalkd.get( local_service['addr']) if beanstalkd is None: continue if beanstalkd_reply \ and beanstalkd_reply['score'] >= beanstalkd['score']: continue beanstalkd_reply = beanstalkd except Exception as exc: # pylint: disable=broad-except self.logger.warning( 'ERROR when searching for beanstalkd locally: %s', exc) if not beanstalkd_reply: self.logger.warn('No beanstalkd available locally') try: beanstalkd = conscience_client.next_instance('beanstalkd') beanstalkd_reply = all_available_beanstalkd[beanstalkd['addr']] except Exception as exc: # pylint: disable=broad-except self.logger.warning( 'ERROR when searching for beanstalkd: %s', exc) beanstalkd_reply_addr = beanstalkd_reply['addr'] # If the tube exists, another service must have already used this tube tube_reply = workers_tube + '.reply.' + str(time.time()) tubes = Beanstalk.from_url( 'beanstalk://' + beanstalkd_reply_addr).tubes() if tube_reply in tubes: raise OioException('Beanstalkd %s using tube %s is already used') self.beanstalkd_reply = BeanstalkdListener( beanstalkd_reply_addr, tube_reply, self.logger) self.logger.info( 'Beanstalkd %s using tube %s is selected for the replies', self.beanstalkd_reply.addr, self.beanstalkd_reply.tube) def _fetch_tasks_events_to_send(self): items_with_beanstalkd_reply = \ self.tool.fetch_items_with_beanstalkd_reply() for item, _ in items_with_beanstalkd_reply: yield self.tool.task_event_from_item(item) def _tasks_res_from_res_event(self, job_id, data, **kwargs): res_event = json.loads(data) beanstalkd_worker_addr = res_event['beanstalkd_worker']['addr'] tasks_res = self.tool.tasks_res_from_res_event(res_event) self.beanstalkd_workers[beanstalkd_worker_addr].job_done() return tasks_res def _all_events_are_processed(self): """ Tell if all workers have finished to process their events. """ if self.sending: return False total_events = 0 for worker in self.beanstalkd_workers.values(): total_events += worker.nb_jobs return total_events <= 0 def _send_task_event(self, task_event, reply_loc, next_worker): """ Send the event through a non-full sender. """ task_event['beanstalkd_reply'] = reply_loc workers = list(self.beanstalkd_workers.values()) nb_workers = len(workers) while True: for _ in range(nb_workers): success = workers[next_worker].send_job( json.dumps(task_event)) next_worker = (next_worker + 1) % nb_workers if success: return next_worker self.logger.warn("All beanstalkd workers are full") sleep(5) def _distribute_events(self, reply_loc=None): next_worker = 0 items_run_time = 0 try: tasks_events = self._fetch_tasks_events_to_send() items_run_time = ratelimit( items_run_time, self.max_items_per_second) next_worker = self._send_task_event( next(tasks_events), reply_loc, next_worker) self.sending = True for task_event in tasks_events: items_run_time = ratelimit(items_run_time, self.max_items_per_second) next_worker = self._send_task_event(task_event, reply_loc, next_worker) if not self.tool.running: break except Exception as exc: if not isinstance(exc, StopIteration) and self.tool.running: self.logger.error("Failed to distribute events: %s", exc) self.tool.success = False finally: self.sending = False def run(self): self.tool.start_time = self.tool.last_report = time.time() self.tool.log_report('START', force=True) reply_loc = {'addr': self.beanstalkd_reply.addr, 'tube': self.beanstalkd_reply.tube} # pylint: disable=no-member thread = threading.Thread(target=self._distribute_events, args=[reply_loc]) thread.start() # Wait until the thread is started sending events while self.sending is None: sleep(0.1) # Retrieve responses until all events are processed try: while not self._all_events_are_processed(): tasks_res = self.beanstalkd_reply.fetch_job( self._tasks_res_from_res_event, timeout=DISTRIBUTED_DISPATCHER_TIMEOUT) for task_res in tasks_res: self.tool.update_counters(task_res) yield task_res self.tool.log_report('RUN') except OioTimeout: self.logger.error('No response for %d seconds', DISTRIBUTED_DISPATCHER_TIMEOUT) self.tool.success = False except Exception: # pylint: disable=broad-except self.logger.exception('ERROR in distributed dispatcher') self.tool.success = False self.tool.log_report('DONE', force=True)