Source code for oio.event.beanstalk

# Copyright (C) 2015-2017 OpenIO SAS, as part of OpenIO SDS
from import socket, Empty, LifoQueue, threading, time

import os
import sys
import yaml
from urlparse import urlparse
from cStringIO import StringIO as BytesIO

from oio.common import exceptions

SYM_CRLF = '\r\n'



SERVER_CLOSED_CONNECTION_ERROR = "Connection closed by server."

[docs]class BeanstalkError(Exception): pass
[docs]class ConnectionError(BeanstalkError): pass
[docs]class TimeoutError(BeanstalkError): pass
[docs]class ResponseError(BeanstalkError): pass
[docs]class InvalidResponse(BeanstalkError): pass
[docs]class Reader(object): def __init__(self, socket, socket_read_size): self._sock = socket self.socket_read_size = socket_read_size self._buffer = BytesIO() self.bytes_written = 0 self.bytes_read = 0 @property def length(self): return self.bytes_written - self.bytes_read def _read_from_socket(self, length=None): # pylint: disable=no-member socket_read_size = self.socket_read_size buf = self._buffer marker = 0 try: while True: data = self._sock.recv(socket_read_size) if isinstance(data, bytes) and len(data) == 0: raise socket.error(SERVER_CLOSED_CONNECTION_ERROR) buf.write(data) data_length = len(data) self.bytes_written += data_length marker += data_length if length is not None and length > marker: continue break except socket.timeout: raise TimeoutError("Timeout reading from socket") except socket.error: e = sys.exc_info()[1] raise ConnectionError("Error while reading from socket: %s" % (e.args,))
[docs] def read(self, length): length = length + 2 if length > self.length: self._read_from_socket(length - self.length) data = self.bytes_read += len(data) if self.bytes_read == self.bytes_written: self.purge() return data[:-2]
[docs] def readline(self): buf = self._buffer data = buf.readline() while not data.endswith(SYM_CRLF): self._read_from_socket() data = buf.readline() self.bytes_read += len(data) if self.bytes_read == self.bytes_written: self.purge() return data[:-2]
[docs] def purge(self): self._buffer.truncate() self.bytes_written = 0 self.bytes_read = 0
[docs] def close(self): try: self.purge() self._buffer.close() except Exception: pass self._buffer = None self._sock = None
[docs]class BaseParser(object): def __init__(self, socket_read_size): self.socket_read_size = socket_read_size self._sock = None self._buffer = None
[docs] def on_connect(self, connection): self._sock = connection._sock self._buffer = Reader(self._sock, self.socket_read_size) self.encoding = connection.encoding
[docs] def on_disconnect(self): if self._sock is not None: self._sock.close() self._sock = None if self._buffer is not None: self._buffer.close() self._buffer = None self.encoding = None
[docs] def can_read(self): # pylint: disable=no-member return self._buffer and bool(self._buffer_length)
[docs] def read_response(self): response = self._buffer.readline() if not response: raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) response = response.split() return response[0], response[1:]
[docs] def read(self, size): response = if not response: raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) return response
[docs]class Connection(object): # pylint: disable=no-member
[docs] @classmethod def from_url(cls, url, **kwargs): url = urlparse(url) if not url.netloc: raise ConnectionError('Invalid URL') url_options = {} url_options.update({ 'host': url.hostname, 'port': int(url.port)}) kwargs.update(url_options) return cls(**kwargs)
def __init__(self, host=None, port=None, use_tubes=None, watch_tubes=None, socket_timeout=None, socket_connect_timeout=None, socket_keepalive=False, socket_keepalive_options=None, encoding='utf-8', socket_read_size=65536): = os.getpid() = host self.port = int(port) self.encoding = encoding self.socket_timeout = socket_timeout self.socket_connect_timeout = socket_connect_timeout self.socket_keepalive = socket_keepalive self.socket_keepalive_options = socket_keepalive_options self._sock = None self._parser = BaseParser(socket_read_size=socket_read_size) self.use_tubes = use_tubes or [] self.watch_tubes = watch_tubes or []
[docs] def use(self, tube): self.use_tubes.append(tube) if self._sock: self._use(tube)
[docs] def watch(self, tube): self.watch_tubes.append(tube) if self._sock: self._watch(tube)
[docs] def connect(self): if self._sock: return try: sock = self._connect() except socket.timeout: raise TimeoutError("Timeout connecting to server") except socket.error: e = sys.exc_info()[1] raise ConnectionError(self._error_message(e)) self._sock = sock try: self.on_connect() except BeanstalkError: self.disconnect() raise
def _connect(self): err = None for res in socket.getaddrinfo(, self.port, 0, socket.SOCK_STREAM): family, socktype, proto, canonname, socket_address = res sock = None try: sock = socket.socket(family, socktype, proto) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) if self.socket_keepalive: sock.setsockopt( socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) for k, v in self.socket_keepalive_options.iteritems(): sock.setsockopt(socket.SOL_TCP, k, v) sock.settimeout(self.socket_connect_timeout) sock.connect(socket_address) sock.settimeout(self.socket_timeout) return sock except socket.error as _: err = _ if sock is not None: sock.close() if err is not None: raise err # pylint: disable=raising-bad-type raise socket.error("socket.getaddrinfo returned empty list") def _error_message(self, exception): if len(exception.args) == 1: return "Error connecting to %s:%s. %s." % \ (, self.port, exception.args[0]) else: return "Error %s connecting to %s:%s. %s." % \ (exception.args[0],, self.port, exception.args[1])
[docs] def on_connect(self): self._parser.on_connect(self) for use_tube in self.use_tubes: self._use(use_tube) for watch_tube in self.watch_tubes: self._watch(watch_tube)
def _use(self, tube): self.send_command('use', tube) self.read_response() def _watch(self, tube): self.send_command('watch', tube) self.read_response()
[docs] def disconnect(self): self._parser.on_disconnect() if self._sock is None: return try: self._sock.shutdown(socket.SHUT_RDWR) self._sock.close() except socket.error: pass self._sock = None
[docs] def pack_command(self, command, body, *args): output = [] # TODO handle encoding output.append(command) for arg in args: output.append(' ' + str(arg)) if body is not None: output.append(' ' + str(len(body))) output.append(SYM_CRLF) output.append(body) output.append(SYM_CRLF) return ''.join(output)
[docs] def send_command(self, command, *args, **kwargs): command = self.pack_command(command, kwargs.get('body'), *args) if not self._sock: self.connect() try: self._sock.sendall(command) except socket.timeout: self.disconnect() raise TimeoutError("Timeout writing to socket") except socket.error: err = sys.exc_info()[1] self.disconnect() if len(err.args) == 1: errno, errmsg = 'UNKNOWN', err.args[0] else: errno = err.args[0] errmsg = err.args[1] raise ConnectionError("Error %s while writing to socket. %s." % (errno, errmsg)) except Exception: self.disconnect() raise
[docs] def read_response(self): try: response = self._parser.read_response() except Exception: self.disconnect() raise if isinstance(response, ResponseError): raise response return response
[docs] def read_body(self, size): try: response = except Exception: self.disconnect() raise if isinstance(response, ResponseError): raise response return response
[docs]def dict_merge(*dicts): merged = {} for d in dicts: merged.update(d) return merged
[docs]def parse_yaml(connection, response, **kwargs): (status, results) = response size = results[0] body = connection.read_body(int(size)) if size > 0 and not body: raise ResponseError() return yaml.load(body)
[docs]def parse_body(connection, response, **kwargs): (status, results) = response job_id = results[0] job_size = results[1] body = connection.read_body(int(job_size)) if job_size > 0 and not body: raise ResponseError() return job_id, body
[docs]class Beanstalk(object): RESPONSE_CALLBACKS = dict_merge({ 'peek': parse_body, 'peek-buried': parse_body, 'peek-delayed': parse_body, 'peek-ready': parse_body, 'reserve': parse_body, 'reserve-with-timeout': parse_body, 'stats-tube': parse_yaml }) EXPECTED_OK = dict_merge({ 'bury': ['BURIED'], 'delete': ['DELETED'], 'kick': ['KICKED'], 'kick-job': ['KICKED'], 'peek': ['FOUND'], 'peek-buried': ['FOUND'], 'peek-delayed': ['FOUND'], 'peek-ready': ['FOUND'], 'put': ['INSERTED'], 'release': ['RELEASED'], 'reserve': ['RESERVED'], 'reserve-with-timeout': ['RESERVED'], 'stats-tube': ['OK'], 'use': ['USING'], 'watch': ['WATCHING'], }) EXPECTED_ERR = dict_merge({ 'bury': ['NOT_FOUND', 'OUT_OF_MEMORY'], 'delete': ['NOT_FOUND'], 'kick': ['OUT_OF_MEMORY'], 'kick-job': ['NOT_FOUND', 'OUT_OF_MEMORY'], 'peek': ['NOT_FOUND'], 'peek-buried': ['NOT_FOUND'], 'peek-delayed': ['NOT_FOUND'], 'peek-ready': ['NOT_FOUND'], 'put': ['JOB_TOO_BIG', 'BURIED', 'DRAINING', 'OUT_OF_MEMORY'], 'reserve': ['DEADLINE_SOON', 'TIMED_OUT'], 'reserve-with-timeout': ['DEADLINE_SOON', 'TIMED_OUT'], 'release': ['BURIED', 'NOT_FOUND', 'OUT_OF_MEMORY'], 'stats-tube': ['NOT_FOUND'], 'use': [], 'watch': [], })
[docs] @classmethod def from_url(cls, url, **kwargs): if url is None or not url: raise ConnectionError('Empty URL') if not url.startswith('beanstalk://'): import warnings warnings.warn( 'Invalid URL scheme, expecting beanstalk', DeprecationWarning) connection = Connection.from_url(url, **kwargs) return cls(connection=connection)
def __init__(self, host=None, port=None, socket_timeout=None, socket_connect_timeout=None, socket_keepalive=None, socket_keepalive_options=None, connection=None, **kwargs): if not connection: self.socket_timeout = socket_timeout kwargs2 = { 'host': host, 'port': int(port), 'socket_connect_timeout': socket_connect_timeout, 'socket_keepalive': socket_keepalive, 'socket_keepalive_options': socket_keepalive_options, 'socket_timeout': socket_timeout, } connection = Connection(**kwargs2) self.conn_queue = LifoQueue() self.conn_queue.put_nowait(connection) self._connection = connection self.response_callbacks = self.__class__.RESPONSE_CALLBACKS.copy() self.expected_ok = self.__class__.EXPECTED_OK.copy() self.expected_err = self.__class__.EXPECTED_ERR.copy() def _get_connection(self): try: connection = self.conn_queue.get(block=True, timeout=None) except Empty: raise ConnectionError("No connection available") return connection def _release_connection(self, connection): self.conn_queue.put_nowait(connection)
[docs] def execute_command(self, *args, **kwargs): connection = self._get_connection() command_name = args[0] try: connection.send_command(*args, **kwargs) return self.parse_response(connection, command_name, **kwargs) except (ConnectionError, TimeoutError): connection.disconnect() raise finally: self._release_connection(connection)
[docs] def parse_response(self, connection, command_name, **kwargs): response = connection.read_response() status, results = response if status in self.expected_ok[command_name]: if command_name in self.response_callbacks: return self.response_callbacks[command_name]( connection, response, **kwargs) return response elif status in self.expected_err[command_name]: raise ResponseError(command_name, status, results) else: raise InvalidResponse(command_name, status, results) return response
[docs] def put(self, body, priority=DEFAULT_PRIORITY, delay=0, ttr=DEFAULT_TTR): assert isinstance(body, str), 'body must be str' job_id = self.execute_command('put', priority, delay, ttr, body=body) return job_id
[docs] def use(self, tube): self._connection.use(tube)
[docs] def watch(self, tube):
[docs] def reserve(self, timeout=None): if timeout is not None: return self.execute_command('reserve-with-timeout', timeout) else: return self.execute_command('reserve')
[docs] def bury(self, job_id, priority=DEFAULT_PRIORITY): self.execute_command('bury', job_id, priority)
[docs] def release(self, job_id, priority=DEFAULT_PRIORITY, delay=0): self.execute_command('release', job_id, priority, delay)
[docs] def delete(self, job_id): self.execute_command('delete', job_id)
def _drain(self, fetch_func): try: job_id = True while job_id is not None: job_id, _ = fetch_func() self.delete(job_id) except ResponseError: pass
[docs] def drain_buried(self, tube): self.use(tube) return self._drain(self.peek_buried)
[docs] def drain_tube(self, tube): """Delete all jobs from the specified tube.""" from functools import partial return self._drain(partial(self.reserve, timeout=0))
[docs] def kick_job(self, job_id): """ Variant of` kick` that operates with a single job. :param job_id: the job id to kick :type job_id: `str` """ self.execute_command('kick-job', job_id)
[docs] def kick(self, bound=1000): """ Move jobs into the ready queue. If there are any buried jobs, it will only kick buried jobs. Otherwise it will kick delayed jobs. :param bound: upper bound on the number of jobs to kick :type bound: `int` """ kicked = int(self.execute_command('kick', str(bound))[1][0]) return kicked
def _peek_generic(self, command_suffix=''): command = 'peek' + command_suffix try: return self.execute_command(command) except ResponseError as err: if err.args[0] == command and err.args[1] == 'NOT_FOUND': return None, None else: raise
[docs] def peek_buried(self): """ Read the next buried job without kicking it. """ return self._peek_generic('-buried')
[docs] def peek_ready(self): """ read the next ready job without reserving it. """ return self._peek_generic('-ready')
[docs] def wait_until_empty(self, tube, timeout=float('inf'), poll_interval=0.2, initial_delay=0.0): """ Wait until the the specified tube is empty, or the timeout expires. """ # TODO(FVE): check tube stats to ensure some jobs have passed through # and then get rid of the initial_delay # peek-ready requires "use", not "watch" self.use(tube) if initial_delay > 0.0: time.sleep(initial_delay) job_id, _ = self.peek_ready() deadline = time.time() + timeout while job_id is not None and time.time() < deadline: time.sleep(poll_interval) job_id, _ = self.peek_ready()
[docs] def wait_for_ready_job(self, tube, timeout=float('inf'), poll_interval=0.2): """ Wait until the the specified tube has a ready job, or the timeout expires. """ self.use(tube) job_id, _ = self.peek_ready() deadline = time.time() + timeout while job_id is None and time.time() < deadline: time.sleep(poll_interval) job_id, _ = self.peek_ready() return job_id
[docs] def stats_tube(self, tube): return self.execute_command('stats-tube', tube)
[docs] def close(self): if self._connection: self._connection.disconnect() self._connection = None
[docs]class TubedBeanstalkd(object): """ Beanstalkd wrapper that will talk to a single tube. """ def __init__(self, addr, tube, logger, **kwargs): addr = addr.strip() if addr.startswith('beanstalk://'): addr = addr[12:] self.addr = addr = tube self.logger = logger self.beanstalkd = None self.connected = False self._connect() # Check the connection self.beanstalkd.stats_tube( def _connect(self, **kwargs): if self.connected: return self.logger.debug('Connecting to %s using tube %s', self.addr, self.beanstalkd = Beanstalk.from_url('beanstalk://' + self.addr) self.beanstalkd.use( self.connected = True
[docs] def close(self): """Disconnect the wrapped Beanstalkd client.""" if not self.connected: return try: self.beanstalkd.close() except BeanstalkError: pass self.connected = False
[docs]class BeanstalkdListener(TubedBeanstalkd): def __init__(self, addr, tube, logger, **kwargs): # pylint: disable=no-member super(BeanstalkdListener, self).__init__(addr, tube, logger, **kwargs) self.running = True
[docs] def fetch_job(self, on_job, timeout=None, **kwargs): job_id = None try: self._connect(**kwargs) job_id, data = self.beanstalkd.reserve(timeout=timeout) try: for job_info in on_job(job_id, data, **kwargs): yield job_info except GeneratorExit: # If the reader finishes to handle the job, but does not want # any new job, it will break the generator. This does not mean # the current job has failed, thus we must delete it. self.beanstalkd.delete(job_id) raise except Exception as err: try: self.beanstalkd.bury(job_id) except BeanstalkError as exc: self.logger.error("Could not bury job %s: %s", job_id, exc) exceptions.reraise(err.__class__, err) else: self.beanstalkd.delete(job_id) return except ConnectionError as exc: self.close() self.logger.warn( 'Disconnected from %s using tube %s (job=%s): %s', self.addr,, job_id, exc) if 'Invalid URL' in str(exc): raise time.sleep(1.0) except exceptions.ExplicitBury as exc: self.logger.warn("Job bury on %s using tube %s (job=%s): %s", self.addr,, job_id, exc) except BeanstalkError as exc: if isinstance(exc, ResponseError) and 'TIMED_OUT' in str(exc): raise exceptions.OioTimeout() self.logger.exception("ERROR on %s using tube %s (job=%s)", self.addr,, job_id) except Exception: self.logger.exception("ERROR on %s using tube %s (job=%s)", self.addr,, job_id)
[docs] def fetch_jobs(self, on_job, reserve_timeout=None, **kwargs): while self.running: try: for job_info in self.fetch_job(on_job, timeout=reserve_timeout, **kwargs): yield job_info except exceptions.OioTimeout: pass
[docs]class BeanstalkdSender(TubedBeanstalkd): """ Send jobs to the specified beanstalkd tube, until the specified high_limit is reached. """ def __init__(self, addr, tube, logger, low_limit=512, high_limit=1024, **kwargs): # pylint: disable=no-member super(BeanstalkdSender, self).__init__(addr, tube, logger, **kwargs) self.low_limit = low_limit self.high_limit = high_limit self.accepts_jobs = True self.nb_jobs = 0 self.nb_jobs_lock = threading.Lock()
[docs] def send_job(self, job, priority=DEFAULT_PRIORITY, delay=0, **kwargs): """ Send a job, if the queue has not reached its size limit. :returns: True if the job has been sent, False otherwise. """ if self.nb_jobs <= self.low_limit: self.accepts_jobs = True elif not self.accepts_jobs or self.nb_jobs > self.high_limit: return False job_id = None try: if not self.connected: self.logger.debug('Connecting to %s using tube %s', self.addr, self._connect(**kwargs) with self.nb_jobs_lock: job_id = self.beanstalkd.put( job, priority=priority, delay=delay) self.nb_jobs += 1 if self.nb_jobs >= self.high_limit: self.accepts_jobs = False return True except ConnectionError as exc: self.close() self.logger.warn( 'Disconnected from %s using tube %s (job=%s): %s', self.addr,, job_id, exc) if 'Invalid URL' in str(exc): raise except Exception: self.logger.exception("ERROR on %s using tube %s (job=%s)", self.addr,, job_id) return False
[docs] def send_event(self, event, **kwargs): """Deprecated""" return self.send_job(job=event, **kwargs)
[docs] def job_done(self): """ Declare that a job previously sent by this sender has been fully processed. """ with self.nb_jobs_lock: self.nb_jobs -= 1