Source code for oio.event.consumer

# Copyright (C) 2015-2018 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import time
import signal
import os
import sys

import greenlet
import eventlet
from eventlet import Timeout, greenthread
from oio.common.exceptions import OioNetworkException, ClientException

from oio.conscience.client import ConscienceClient
from oio.rdir.client import RdirClient
from oio.event.beanstalk import Beanstalk, ConnectionError
from oio.common.utils import drop_privileges
from oio.common.easy_value import int_value
from oio.common.json import json
from oio.event.evob import is_success, is_error
from oio.event.loader import loadhandlers
from oio.common.exceptions import ExplicitBury


SLEEP_TIME = 1
ACCOUNT_SERVICE_TIMEOUT = 60
ACCOUNT_SERVICE = 'account'
DEFAULT_TUBE = 'oio'

BEANSTALK_RECONNECTION = 2.0
# default release delay (in seconds)
RELEASE_DELAY = 15


def _eventlet_stop(client, server, beanstalk):
    try:
        try:
            client.wait()
        finally:
            beanstalk.close()
    except greenlet.GreenletExit:
        pass
    except Exception:
        greenthread.kill(server, *sys.exc_info())


[docs]class StopServe(Exception): pass
[docs]class Worker(object): SIGNALS = [getattr(signal, "SIG%s" % x) for x in "HUP QUIT INT TERM CHLD".split()] def __init__(self, ppid, conf, logger): self.ppid = ppid self.conf = conf self.started = False self.aborted = False self.alive = True self.logger = logger @property def pid(self): return os.getpid()
[docs] def run(self): raise NotImplementedError()
[docs] def init(self): drop_privileges(self.conf.get("user", "openio")) self.init_signals() self.started = True # main loop self.run()
[docs] def init_signals(self): [signal.signal(s, signal.SIG_DFL) for s in self.SIGNALS] signal.signal(signal.SIGQUIT, self.handle_quit) signal.signal(signal.SIGTERM, self.handle_exit) signal.signal(signal.SIGINT, self.handle_quit) signal.siginterrupt(signal.SIGTERM, False)
[docs] def handle_exit(self, sig, frame): self.alive = False
[docs] def handle_quit(self, sig, frame): self.alive = False eventlet.sleep(0.1) sys.exit(0)
[docs] def parent_alive(self): if self.ppid != os.getppid(): self.logger.warn("parent changed, shutting down") return False return True
[docs]class EventTypes(object): ACCOUNT_SERVICES = 'account.services' CHUNK_NEW = 'storage.chunk.new' CHUNK_DELETED = 'storage.chunk.deleted' CONTAINER_NEW = 'storage.container.new' CONTAINER_DELETED = 'storage.container.deleted' CONTAINER_STATE = 'storage.container.state' CONTENT_BROKEN = 'storage.content.broken' CONTENT_NEW = 'storage.content.new' CONTENT_DELETED = 'storage.content.deleted'
def _stop(client, server): try: client.wait() except greenlet.GreenletExit: pass except Exception: greenthread.kill(server, *sys.exc_info())
[docs]class EventWorker(Worker): def __init__(self, *args, **kwargs): super(EventWorker, self).__init__(*args, **kwargs) self.app_env = dict()
[docs] def init(self): eventlet.monkey_patch(os=False) self.tube = self.conf.get("tube", DEFAULT_TUBE) self.cs = ConscienceClient(self.conf, logger=self.logger) self.rdir = RdirClient(self.conf, logger=self.logger) self._acct_addr = None self.acct_update = 0 self.graceful_timeout = 1 self.acct_refresh_interval = int_value( self.conf.get('acct_refresh_interval'), 60 ) self.app_env['acct_addr'] = self.acct_addr if 'handlers_conf' not in self.conf: raise ValueError("'handlers_conf' path not defined in conf") self.handlers = loadhandlers(self.conf.get('handlers_conf'), global_conf=self.conf, app=self) for opt in ('acct_update', 'rdir_update', 'retries_per_second', 'batch_size'): if opt in self.conf: self.logger.warn('Deprecated option: %s', opt) super(EventWorker, self).init()
[docs] def notify(self): """TODO""" pass
[docs] def safe_decode_job(self, job_id, data): try: env = json.loads(data) env['job_id'] = job_id return env except Exception as exc: self.logger.warn('Failed to decode job %s: "%s"', job_id, str(exc.message)) return None
[docs] def run(self): coros = [] queue_url = self.conf.get('queue_url', 'beanstalk://127.0.0.1:11300') concurrency = int_value(self.conf.get('concurrency'), 10) server_gt = greenthread.getcurrent() for url in queue_url.split(';'): for i in range(concurrency): beanstalk = Beanstalk.from_url(url) gt = eventlet.spawn(self.handle, beanstalk) gt.link(_eventlet_stop, server_gt, beanstalk) coros.append(gt) beanstalk, gt = None, None while self.alive: self.notify() try: eventlet.sleep(1.0) except AssertionError: self.alive = False break self.notify() try: with Timeout(self.graceful_timeout) as t: [c.kill(StopServe()) for c in coros] [c.wait() for c in coros] except Timeout as te: if te != t: raise [c.kill() for c in coros]
[docs] def handle(self, beanstalk): conn_error = False try: if self.tube: beanstalk.use(self.tube) beanstalk.watch(self.tube) while True: try: job_id, data = beanstalk.reserve() if conn_error: self.logger.warn("beanstalk reconnected") conn_error = False except ConnectionError: if not conn_error: self.logger.warn("beanstalk connection error") conn_error = True eventlet.sleep(BEANSTALK_RECONNECTION) continue event = self.safe_decode_job(job_id, data) if not event: self.logger.warn("Burying event %s: %s", job_id, "malformed") beanstalk.bury(job_id) else: try: self.process_event(job_id, event, beanstalk) except (ClientException, OioNetworkException) as exc: self.logger.warn("Burying event %s (%s): %s", job_id, event.get('event'), exc) beanstalk.bury(job_id) except ExplicitBury: self.logger.info("Burying event %s (%s)", job_id, event.get('event')) beanstalk.bury(job_id) except Exception: self.logger.exception("Burying event %s: %s", job_id, event) beanstalk.bury(job_id) except StopServe: pass
[docs] def process_event(self, job_id, event, beanstalk): handler = self.get_handler(event) if not handler: self.logger.warn('no handler found for %r' % event) beanstalk.delete(job_id) return def cb(status, msg): if is_success(status): beanstalk.delete(job_id) elif is_error(status): self.logger.warn( 'event %s handling failure (release with delay): %s', event['job_id'], msg) beanstalk.release(job_id, delay=RELEASE_DELAY) handler(event, cb)
[docs] def get_handler(self, event): return self.handlers.get(event.get('event'), None)
[docs] def acct_addr(self): if not self._acct_addr or self.acct_refresh(): acct_instance = self.cs.next_instance(ACCOUNT_SERVICE) self._acct_addr = acct_instance.get('addr') self.acct_update = time.time() return self._acct_addr
[docs] def acct_refresh(self): return (time.time() - self.acct_update) > self.acct_refresh_interval