Source code for oio.xcute.common.worker

# Copyright (C) 2019 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.

from collections import Counter
from six import iteritems

from oio.common.constants import STRLEN_REQID
from oio.common.green import ratelimit
from oio.common.json import json
from oio.common.logger import get_logger
from oio.common.utils import CacheDict, request_id
from oio.event.beanstalk import Beanstalk
from oio.xcute.jobs import JOB_TYPES


[docs]class XcuteWorker(object): def __init__(self, conf, logger=None): self.conf = conf self.logger = logger or get_logger(self.conf) self.beanstalkd_replies = dict() self.tasks = CacheDict(size=10)
[docs] def process(self, beanstalkd_job): job_id = beanstalkd_job['job_id'] job_config = beanstalkd_job['job_config'] task = self.tasks.get(job_id) if task is None: job_type = beanstalkd_job['job_type'] task_class = JOB_TYPES[job_type].TASK_CLASS job_params = job_config['params'] task = task_class(self.conf, job_params, logger=self.logger) self.tasks[job_id] = task tasks_per_second = job_config['tasks_per_second'] tasks = beanstalkd_job['tasks'] task_errors = Counter() task_results = Counter() tasks_run_time = 0 for task_id, task_payload in iteritems(tasks): tasks_run_time = ratelimit( tasks_run_time, tasks_per_second) reqid = job_id + request_id('-') reqid = reqid[:STRLEN_REQID] try: task_result = task.process(task_id, task_payload, reqid=reqid) task_results.update(task_result) except Exception as exc: self.logger.warn('[job_id=%s] Fail to process task %s: %s', job_id, task_id, exc) task_errors[type(exc).__name__] += 1 return job_id, list(tasks.keys()), task_results, task_errors, \ beanstalkd_job['beanstalkd_reply']
[docs] def reply(self, job_id, task_ids, task_results, task_errors, beanstalkd_reply_info): beanstalkd_reply_addr = beanstalkd_reply_info['addr'] beanstalkd_reply_tube = beanstalkd_reply_info['tube'] beanstalkd_reply_info = (beanstalkd_reply_addr, beanstalkd_reply_tube) beanstalkd_reply = self.beanstalkd_replies.get(beanstalkd_reply_info) if not beanstalkd_reply: beanstalkd_reply = Beanstalk.from_url(beanstalkd_reply_addr) beanstalkd_reply.use(beanstalkd_reply_tube) beanstalkd_reply.watch(beanstalkd_reply_tube) self.beanstalkd_replies[beanstalkd_reply_info] = beanstalkd_reply reply_payload = json.dumps({ 'job_id': job_id, 'task_ids': task_ids, 'task_results': task_results, 'task_errors': task_errors, }) beanstalkd_reply.put(reply_payload)