Source code for oio.event.filters.xcute

# Copyright (C) 2019 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from oio.event.evob import Event, EventTypes
from oio.event.beanstalk import BeanstalkError
from oio.event.filters.base import Filter
from oio.common.easy_value import int_value
from oio.common.json import json
from oio.xcute.common.worker import XcuteWorker


[docs]class XcuteFilter(Filter): DEFAULT_RETRY_DELAY_TO_REPLY = 60
[docs] def init(self): self.retry_delay_to_reply = int_value( self.conf.get('retry_delay_to_reply'), self.DEFAULT_RETRY_DELAY_TO_REPLY) self.worker = XcuteWorker(self.conf, logger=self.logger)
[docs] def process(self, env, beanstalkd, cb): event = Event(env) if event.data.get('processed'): job_id = event.data['job_id'] task_ids = event.data['task_ids'] task_results = event.data['task_results'] task_errors = event.data['task_errors'] beanstalkd_reply_info = event.data['beanstalkd_reply'] else: job_id, task_ids, task_results, task_errors, \ beanstalkd_reply_info = self.worker.process(event.data) try: self.worker.reply( job_id, task_ids, task_results, task_errors, beanstalkd_reply_info) except BeanstalkError as exc: self.logger.warn( '[job_id=%s] Fail to reply, retry later (%d): %s', job_id, self.retry_delay_to_reply, exc) tasks_processed_event = json.dumps({ 'event': EventTypes.XCUTE_TASKS, 'data': { 'job_id': job_id, 'task_ids': task_ids, 'task_results': task_results, 'task_errors': task_errors, 'beanstalkd_reply': beanstalkd_reply_info, 'processed': True } }) beanstalkd.put( tasks_processed_event, delay=self.retry_delay_to_reply) return self.app(env, beanstalkd, cb)
[docs]def filter_factory(global_conf, **local_conf): conf = global_conf.copy() conf.update(local_conf) def account_filter(app): return XcuteFilter(app, conf) return account_filter