Source code for oio.event.beanstalk

# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public
# License along with this library.

from import socket, Empty, LifoQueue, threading, time

import os
import sys
from six import BytesIO, iteritems
from six.moves.urllib_parse import urlparse
import yaml

from oio.common import exceptions

SYM_CRLF = '\r\n'
SYM_CRLF_BYTES = b'\r\n'



SERVER_CLOSED_CONNECTION_ERROR = "Connection closed by server."

[docs]class BeanstalkError(Exception): pass
[docs]class ConnectionError(BeanstalkError): pass
[docs]class TimeoutError(BeanstalkError): pass
[docs]class ResponseError(BeanstalkError): pass
[docs]class InvalidResponse(BeanstalkError): pass
[docs]class Reader(object): def __init__(self, socket, socket_read_size): self._sock = socket self.socket_read_size = socket_read_size self._buffer = BytesIO() self.bytes_written = 0 self.bytes_read = 0 @property def length(self): return self.bytes_written - self.bytes_read def _read_from_socket(self, length=None): # pylint: disable=no-member socket_read_size = self.socket_read_size buf = self._buffer marker = 0 try: while True: data = self._sock.recv(socket_read_size) if isinstance(data, bytes) and len(data) == 0: raise socket.error(SERVER_CLOSED_CONNECTION_ERROR) buf.write(data) data_length = len(data) self.bytes_written += data_length marker += data_length if length is not None and length > marker: continue break except socket.timeout: raise TimeoutError("Timeout reading from socket") except socket.error: e = sys.exc_info()[1] raise ConnectionError("Error while reading from socket: %s" % (e.args,))
[docs] def read(self, length): length = length + 2 if length > self.length: self._read_from_socket(length - self.length) data = self.bytes_read += len(data) if self.bytes_read == self.bytes_written: self.purge() return data[:-2].decode('utf-8')
[docs] def readline(self): buf = self._buffer data = buf.readline() while not data.endswith(SYM_CRLF_BYTES): self._read_from_socket() data = buf.readline() self.bytes_read += len(data) if self.bytes_read == self.bytes_written: self.purge() return data[:-2].decode('utf-8')
[docs] def purge(self): self._buffer.truncate() self.bytes_written = 0 self.bytes_read = 0
[docs] def close(self): try: self.purge() self._buffer.close() except Exception: pass self._buffer = None self._sock = None
[docs]class BaseParser(object): def __init__(self, socket_read_size): self.socket_read_size = socket_read_size self._sock = None self._buffer = None
[docs] def on_connect(self, connection): self._sock = connection._sock self._buffer = Reader(self._sock, self.socket_read_size) self.encoding = connection.encoding
[docs] def on_disconnect(self): if self._sock is not None: self._sock.close() self._sock = None if self._buffer is not None: self._buffer.close() self._buffer = None self.encoding = None
[docs] def can_read(self): # pylint: disable=no-member return self._buffer and bool(self._buffer_length)
[docs] def read_response(self): response = self._buffer.readline() if not response: raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) response = response.split() return response[0], response[1:]
[docs] def read(self, size): response = if not response: raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) return response
[docs]class Connection(object): # pylint: disable=no-member
[docs] @classmethod def from_url(cls, url, **kwargs): url = urlparse(url) if not url.netloc: raise ConnectionError('Invalid URL') url_options = {} url_options.update({ 'host': url.hostname, 'port': int(url.port)}) kwargs.update(url_options) return cls(**kwargs)
def __init__(self, host=None, port=None, use_tubes=None, watch_tubes=None, socket_timeout=None, socket_connect_timeout=None, socket_keepalive=False, socket_keepalive_options=None, encoding='utf-8', socket_read_size=65536): = os.getpid() = host self.port = int(port) self.encoding = encoding self.socket_timeout = socket_timeout self.socket_connect_timeout = socket_connect_timeout self.socket_keepalive = socket_keepalive self.socket_keepalive_options = socket_keepalive_options self._sock = None self._parser = BaseParser(socket_read_size=socket_read_size) self.use_tubes = use_tubes or [] self.watch_tubes = watch_tubes or []
[docs] def use(self, tube): self.use_tubes.append(tube) if self._sock: self._use(tube)
[docs] def watch(self, tube): self.watch_tubes.append(tube) if self._sock: self._watch(tube)
[docs] def connect(self): if self._sock: return try: sock = self._connect() except socket.timeout: raise TimeoutError("Timeout connecting to server") except socket.error: e = sys.exc_info()[1] raise ConnectionError(self._error_message(e)) self._sock = sock try: self.on_connect() except BeanstalkError: self.disconnect() raise
def _connect(self): err = None for res in socket.getaddrinfo(, self.port, 0, socket.SOCK_STREAM): family, socktype, proto, canonname, socket_address = res sock = None try: sock = socket.socket(family, socktype, proto) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) if self.socket_keepalive: sock.setsockopt( socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) for k, v in iteritems(self.socket_keepalive_options): sock.setsockopt(socket.SOL_TCP, k, v) sock.settimeout(self.socket_connect_timeout) sock.connect(socket_address) sock.settimeout(self.socket_timeout) return sock except socket.error as _: err = _ if sock is not None: sock.close() if err is not None: raise err # pylint: disable=raising-bad-type raise socket.error("socket.getaddrinfo returned empty list") def _error_message(self, exception): if len(exception.args) == 1: return "Error connecting to %s:%s. %s." % \ (, self.port, exception.args[0]) else: return "Error %s connecting to %s:%s. %s." % \ (exception.args[0],, self.port, exception.args[1])
[docs] def on_connect(self): self._parser.on_connect(self) for use_tube in self.use_tubes: self._use(use_tube) for watch_tube in self.watch_tubes: self._watch(watch_tube)
def _use(self, tube): self.send_command('use', tube) self.read_response() def _watch(self, tube): self.send_command('watch', tube) self.read_response()
[docs] def disconnect(self): self._parser.on_disconnect() if self._sock is None: return try: self._sock.shutdown(socket.SHUT_RDWR) self._sock.close() except socket.error: pass self._sock = None
[docs] def pack_command(self, command, body, *args): output = [] # TODO handle encoding output.append(command) for arg in args: output.append(' ' + str(arg)) if body is not None: output.append(' ' + str(len(body))) output.append(SYM_CRLF) output.append(body) output.append(SYM_CRLF) return ''.join(output).encode('utf-8')
[docs] def send_command(self, command, *args, **kwargs): encoded = self.pack_command(command, kwargs.get('body'), *args) if not self._sock: self.connect() try: self._sock.sendall(encoded) except socket.timeout: self.disconnect() raise TimeoutError("Timeout writing to socket") except socket.error: err = sys.exc_info()[1] self.disconnect() if len(err.args) == 1: errno, errmsg = 'UNKNOWN', err.args[0] else: errno = err.args[0] errmsg = err.args[1] raise ConnectionError("Error %s while writing to socket. %s." % (errno, errmsg)) except Exception: self.disconnect() raise
[docs] def read_response(self): try: response = self._parser.read_response() except Exception: self.disconnect() raise if isinstance(response, ResponseError): raise response return response
[docs] def read_body(self, size): try: response = except Exception: self.disconnect() raise if isinstance(response, ResponseError): raise response return response
[docs]def dict_merge(*dicts): merged = {} for d in dicts: merged.update(d) return merged
[docs]def parse_yaml(connection, response, **kwargs): results = response[1] size = int(results[0]) body = connection.read_body(size) if size > 0 and not body: raise ResponseError() return yaml.load(body, Loader=yaml.Loader)
[docs]def parse_body(connection, response, **kwargs): results = response[1] job_id = results[0] job_size = int(results[1]) body = connection.read_body(job_size) if job_size > 0 and not body: raise ResponseError() return job_id, body
[docs]class Beanstalk(object): RESPONSE_CALLBACKS = dict_merge({ 'list-tubes': parse_yaml, 'peek': parse_body, 'peek-buried': parse_body, 'peek-delayed': parse_body, 'peek-ready': parse_body, 'reserve': parse_body, 'reserve-with-timeout': parse_body, 'stats': parse_yaml, 'stats-tube': parse_yaml }) EXPECTED_OK = dict_merge({ 'bury': ['BURIED'], 'delete': ['DELETED'], 'list-tubes': ['OK'], 'kick': ['KICKED'], 'kick-job': ['KICKED'], 'peek': ['FOUND'], 'peek-buried': ['FOUND'], 'peek-delayed': ['FOUND'], 'peek-ready': ['FOUND'], 'put': ['INSERTED'], 'release': ['RELEASED'], 'reserve': ['RESERVED'], 'reserve-with-timeout': ['RESERVED'], 'stats': ['OK'], 'stats-tube': ['OK'], 'use': ['USING'], 'watch': ['WATCHING'], }) EXPECTED_ERR = dict_merge({ 'bury': ['NOT_FOUND', 'OUT_OF_MEMORY'], 'delete': ['NOT_FOUND'], 'list-tubes': [], 'kick': ['OUT_OF_MEMORY'], 'kick-job': ['NOT_FOUND', 'OUT_OF_MEMORY'], 'peek': ['NOT_FOUND'], 'peek-buried': ['NOT_FOUND'], 'peek-delayed': ['NOT_FOUND'], 'peek-ready': ['NOT_FOUND'], 'put': ['JOB_TOO_BIG', 'BURIED', 'DRAINING', 'OUT_OF_MEMORY'], 'reserve': ['DEADLINE_SOON', 'TIMED_OUT'], 'reserve-with-timeout': ['DEADLINE_SOON', 'TIMED_OUT'], 'release': ['BURIED', 'NOT_FOUND', 'OUT_OF_MEMORY'], 'stats': [], 'stats-tube': ['NOT_FOUND'], 'use': [], 'watch': [], })
[docs] @classmethod def from_url(cls, url, **kwargs): if url is None or not url: raise ConnectionError('Empty URL') if not url.startswith('beanstalk://'): import warnings warnings.warn( 'Invalid URL scheme, expecting beanstalk', DeprecationWarning) connection = Connection.from_url(url, **kwargs) return cls(connection=connection)
def __init__(self, host=None, port=None, socket_timeout=None, socket_connect_timeout=None, socket_keepalive=None, socket_keepalive_options=None, connection=None, **kwargs): if not connection: self.socket_timeout = socket_timeout kwargs2 = { 'host': host, 'port': int(port), 'socket_connect_timeout': socket_connect_timeout, 'socket_keepalive': socket_keepalive, 'socket_keepalive_options': socket_keepalive_options, 'socket_timeout': socket_timeout, } connection = Connection(**kwargs2) self.conn_queue = LifoQueue() self.conn_queue.put_nowait(connection) self._connection = connection self.response_callbacks = self.__class__.RESPONSE_CALLBACKS.copy() self.expected_ok = self.__class__.EXPECTED_OK.copy() self.expected_err = self.__class__.EXPECTED_ERR.copy() def _get_connection(self): try: connection = self.conn_queue.get(block=True, timeout=None) except Empty: raise ConnectionError("No connection available") return connection def _release_connection(self, connection): self.conn_queue.put_nowait(connection)
[docs] def execute_command(self, *args, **kwargs): connection = self._get_connection() command_name = args[0] try: connection.send_command(*args, **kwargs) return self.parse_response(connection, command_name, **kwargs) except (ConnectionError, TimeoutError): connection.disconnect() raise finally: self._release_connection(connection)
[docs] def parse_response(self, connection, command_name, **kwargs): response = connection.read_response() status, results = response if status in self.expected_ok[command_name]: if command_name in self.response_callbacks: return self.response_callbacks[command_name]( connection, response, **kwargs) return response elif status in self.expected_err[command_name]: raise ResponseError(command_name, status, results) else: raise InvalidResponse(command_name, status, results) return response
[docs] def put(self, body, priority=DEFAULT_PRIORITY, delay=0, ttr=DEFAULT_TTR): assert isinstance(body, str), 'body must be str' job_id = self.execute_command('put', priority, delay, ttr, body=body) return job_id
[docs] def use(self, tube): self._connection.use(tube)
[docs] def watch(self, tube):
[docs] def reserve(self, timeout=None): if timeout is not None: return self.execute_command('reserve-with-timeout', timeout) else: return self.execute_command('reserve')
[docs] def bury(self, job_id, priority=DEFAULT_PRIORITY): self.execute_command('bury', job_id, priority)
[docs] def release(self, job_id, priority=DEFAULT_PRIORITY, delay=0): self.execute_command('release', job_id, priority, delay)
[docs] def delete(self, job_id): self.execute_command('delete', job_id)
def _drain(self, fetch_func): try: job_id = True while job_id is not None: job_id, _ = fetch_func() self.delete(job_id) except ResponseError: pass
[docs] def drain_buried(self, tube): self.use(tube) return self._drain(self.peek_buried)
[docs] def drain_tube(self, tube, timeout=0.0): """Delete all jobs from the specified tube.""" from functools import partial return self._drain(partial(self.reserve, timeout=timeout))
[docs] def kick_job(self, job_id): """ Variant of` kick` that operates with a single job. :param job_id: the job id to kick :type job_id: `str` """ self.execute_command('kick-job', job_id)
[docs] def kick(self, bound=1000): """ Move jobs into the ready queue. If there are any buried jobs, it will only kick buried jobs. Otherwise it will kick delayed jobs. :param bound: upper bound on the number of jobs to kick :type bound: `int` """ kicked = int(self.execute_command('kick', str(bound))[1][0]) return kicked
def _peek_generic(self, command_suffix=''): command = 'peek' + command_suffix try: return self.execute_command(command) except ResponseError as err: if err.args[0] == command and err.args[1] == 'NOT_FOUND': return None, None else: raise
[docs] def peek_buried(self): """ Read the next buried job without kicking it. """ return self._peek_generic('-buried')
[docs] def peek_ready(self): """ read the next ready job without reserving it. """ return self._peek_generic('-ready')
[docs] def wait_until_empty(self, tube, timeout=float('inf'), poll_interval=0.2, initial_delay=0.0): """ Wait until the the specified tube is empty, or the timeout expires. """ # TODO(FVE): check tube stats to ensure some jobs have passed through # and then get rid of the initial_delay # peek-ready requires "use", not "watch" self.use(tube) if initial_delay > 0.0: time.sleep(initial_delay) job_id, _ = self.peek_ready() deadline = time.time() + timeout while job_id is not None and time.time() < deadline: time.sleep(poll_interval) job_id, _ = self.peek_ready()
[docs] def wait_for_ready_job(self, tube, timeout=float('inf'), poll_interval=0.2): """ Wait until the the specified tube has a ready job, or the timeout expires. """ self.use(tube) job_id, data = self.peek_ready() deadline = time.time() + timeout while job_id is None and time.time() < deadline: time.sleep(poll_interval) job_id, data = self.peek_ready() return job_id, data
[docs] def stats(self): return self.execute_command('stats')
[docs] def stats_tube(self, tube): return self.execute_command('stats-tube', tube)
[docs] def tubes(self): return self.execute_command('list-tubes')
[docs] def close(self): if self._connection: self._connection.disconnect() self._connection = None
[docs]class TubedBeanstalkd(object): """ Beanstalkd wrapper that will talk to a single tube. """ def __init__(self, addr, tube, logger, **kwargs): addr = addr.strip() if addr.startswith('beanstalk://'): addr = addr[12:] self.addr = addr = tube self.logger = logger self.beanstalkd = None self.connected = False self._connect() # Check the connection self.beanstalkd.stats_tube( def _connect(self, **kwargs): if self.connected: return self.logger.debug('Connecting to %s using tube %s', self.addr, self.beanstalkd = Beanstalk.from_url('beanstalk://' + self.addr) self.beanstalkd.use( self.connected = True
[docs] def close(self): """Disconnect the wrapped Beanstalkd client.""" if not self.connected: return try: self.beanstalkd.close() except BeanstalkError: pass self.connected = False
[docs]class BeanstalkdListener(TubedBeanstalkd): def __init__(self, addr, tube, logger, **kwargs): # pylint: disable=no-member super(BeanstalkdListener, self).__init__(addr, tube, logger, **kwargs) self.running = True
[docs] def fetch_job(self, on_job, timeout=None, **kwargs): job_id = None try: self._connect(**kwargs) job_id, data = self.beanstalkd.reserve(timeout=timeout) try: for job_info in on_job(job_id, data, **kwargs): yield job_info except GeneratorExit: # If the reader finishes to handle the job, but does not want # any new job, it will break the generator. This does not mean # the current job has failed, thus we must delete it. self.beanstalkd.delete(job_id) raise except Exception as err: try: self.beanstalkd.bury(job_id) except BeanstalkError as exc: self.logger.error("Could not bury job %s: %s", job_id, exc) exceptions.reraise(err.__class__, err) else: self.beanstalkd.delete(job_id) return except ConnectionError as exc: self.close() self.logger.warn( 'Disconnected from %s using tube %s (job=%s): %s', self.addr,, job_id, exc) if 'Invalid URL' in str(exc): raise time.sleep(1.0) except exceptions.ExplicitBury as exc: self.logger.warn("Job bury on %s using tube %s (job=%s): %s", self.addr,, job_id, exc) except BeanstalkError as exc: if isinstance(exc, ResponseError) and 'TIMED_OUT' in str(exc): raise exceptions.OioTimeout() self.logger.exception("ERROR on %s using tube %s (job=%s)", self.addr,, job_id) except Exception: self.logger.exception("ERROR on %s using tube %s (job=%s)", self.addr,, job_id)
[docs] def fetch_jobs(self, on_job, reserve_timeout=None, **kwargs): while self.running: try: for job_info in self.fetch_job(on_job, timeout=reserve_timeout, **kwargs): yield job_info except exceptions.OioTimeout: pass
[docs]class BeanstalkdSender(TubedBeanstalkd): """ Send jobs to the specified beanstalkd tube, until the specified high_limit is reached. """ def __init__(self, addr, tube, logger, low_limit=512, high_limit=1024, **kwargs): # pylint: disable=no-member super(BeanstalkdSender, self).__init__(addr, tube, logger, **kwargs) self.low_limit = low_limit self.high_limit = high_limit self.accepts_jobs = True self.nb_jobs = 0 self.nb_jobs_lock = threading.Lock()
[docs] def send_job(self, job, priority=DEFAULT_PRIORITY, delay=0, **kwargs): """ Send a job, if the queue has not reached its size limit. :returns: True if the job has been sent, False otherwise. """ if self.nb_jobs <= self.low_limit: self.accepts_jobs = True elif not self.accepts_jobs or self.nb_jobs > self.high_limit: return False job_id = None try: self._connect(**kwargs) with self.nb_jobs_lock: job_id = self.beanstalkd.put( job, priority=priority, delay=delay) self.nb_jobs += 1 if self.nb_jobs >= self.high_limit: self.accepts_jobs = False return True except ConnectionError as exc: self.close() self.logger.warn( 'Disconnected from %s using tube %s (job=%s): %s', self.addr,, job_id, exc) if 'Invalid URL' in str(exc): raise except Exception: self.logger.exception("ERROR on %s using tube %s (job=%s)", self.addr,, job_id) return False
[docs] def send_event(self, event, **kwargs): """Deprecated""" return self.send_job(job=event, **kwargs)
[docs] def job_done(self): """ Declare that a job previously sent by this sender has been fully processed (the sender received a response, or does not expect one). """ with self.nb_jobs_lock: self.nb_jobs -= 1