Source code for oio.event.beanstalk

# Copyright (C) 2015-2017 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.


import os
import sys
import yaml
from eventlet.green import socket
from eventlet.queue import Empty, LifoQueue
from urlparse import urlparse
from cStringIO import StringIO as BytesIO


SYM_CRLF = '\r\n'

DEFAULT_PRIORITY = 2 ** 31

DEFAULT_TTR = 120

SERVER_CLOSED_CONNECTION_ERROR = "Connection closed by server."


[docs]class BeanstalkError(Exception): pass
[docs]class ConnectionError(BeanstalkError): pass
[docs]class TimeoutError(BeanstalkError): pass
[docs]class ResponseError(BeanstalkError): pass
[docs]class InvalidResponse(BeanstalkError): pass
[docs]class Reader(object): def __init__(self, socket, socket_read_size): self._sock = socket self.socket_read_size = socket_read_size self._buffer = BytesIO() self.bytes_written = 0 self.bytes_read = 0 @property def length(self): return self.bytes_written - self.bytes_read def _read_from_socket(self, length=None): socket_read_size = self.socket_read_size buf = self._buffer buf.seek(self.bytes_written) marker = 0 try: while True: data = self._sock.recv(socket_read_size) if isinstance(data, bytes) and len(data) == 0: raise socket.error(SERVER_CLOSED_CONNECTION_ERROR) buf.write(data) data_length = len(data) self.bytes_written += data_length marker += data_length if length is not None and length > marker: continue break except socket.timeout: raise TimeoutError("Timeout reading from socket") except socket.error: e = sys.exc_info()[1] raise ConnectionError("Error while reading from socket: %s" % (e.args,))
[docs] def read(self, length): length = length + 2 if length > self.length: self._read_from_socket(length - self.length) self._buffer.seek(self.bytes_read) data = self._buffer.read(length) self.bytes_read += len(data) if self.bytes_read == self.bytes_written: self.purge() return data[:-2]
[docs] def readline(self): buf = self._buffer buf.seek(self.bytes_read) data = buf.readline() while not data.endswith(SYM_CRLF): self._read_from_socket() buf.seek(self.bytes_read) data = buf.readline() self.bytes_read += len(data) if self.bytes_read == self.bytes_written: self.purge() return data[:-2]
[docs] def purge(self): self._buffer.seek(0) self._buffer.truncate() self.bytes_written = 0 self.bytes_read = 0
[docs] def close(self): try: self.purge() self._buffer.close() except Exception: pass self._buffer = None self._sock = None
[docs]class BaseParser(object): def __init__(self, socket_read_size): self.socket_read_size = socket_read_size self._sock = None self._buffer = None
[docs] def on_connect(self, connection): self._sock = connection._sock self._buffer = Reader(self._sock, self.socket_read_size) self.encoding = connection.encoding
[docs] def on_disconnect(self): if self._sock is not None: self._sock.close() self._sock = None if self._buffer is not None: self._buffer.close() self._buffer = None self.encoding = None
[docs] def can_read(self): return self._buffer and bool(self._buffer_length)
[docs] def read_response(self): response = self._buffer.readline() if not response: raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) response = response.split() return response[0], response[1:]
[docs] def read(self, size): response = self._buffer.read(size) if not response: raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) return response
[docs]class Connection(object):
[docs] @classmethod def from_url(cls, url, **kwargs): url = urlparse(url) if not url.netloc: raise ConnectionError('Invalid URL') url_options = {} url_options.update({ 'host': url.hostname, 'port': int(url.port)}) kwargs.update(url_options) return cls(**kwargs)
def __init__(self, host=None, port=None, use_tubes=None, watch_tubes=None, socket_timeout=None, socket_connect_timeout=None, socket_keepalive=False, socket_keepalive_options=None, encoding='utf-8', socket_read_size=65536): self.pid = os.getpid() self.host = host self.port = int(port) self.encoding = encoding self.socket_timeout = socket_timeout self.socket_connect_timeout = socket_connect_timeout self.socket_keepalive = socket_keepalive self.socket_keepalive_options = socket_keepalive_options self._sock = None self._parser = BaseParser(socket_read_size=socket_read_size) self.use_tubes = use_tubes or [] self.watch_tubes = watch_tubes or []
[docs] def use(self, tube): self.use_tubes.append(tube)
[docs] def watch(self, tube): self.watch_tubes.append(tube)
[docs] def connect(self): if self._sock: return try: sock = self._connect() except socket.timeout: raise TimeoutError("Timeout connecting to server") except socket.error: e = sys.exc_info()[1] raise ConnectionError(self._error_message(e)) self._sock = sock try: self.on_connect() except BeanstalkError: self.disconnect() raise
def _connect(self): err = None for res in socket.getaddrinfo(self.host, self.port, 0, socket.SOCK_STREAM): family, socktype, proto, canonname, socket_address = res sock = None try: sock = socket.socket(family, socktype, proto) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) if self.socket_keepalive: sock.setsockopt( socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) for k, v in self.socket_keepalive_options.iteritems(): sock.setsockopt(socket.SOL_TCP, k, v) sock.settimeout(self.socket_connect_timeout) sock.connect(socket_address) sock.settimeout(self.socket_timeout) return sock except socket.error as _: err = _ if sock is not None: sock.close() if err is not None: raise err raise socket.error("socket.getaddrinfo returned empty list") def _error_message(self, exception): if len(exception.args) == 1: return "Error connecting to %s:%s. %s." % \ (self.host, self.port, exception.args[0]) else: return "Error %s connecting to %s:%s. %s." % \ (exception.args[0], self.host, self.port, exception.args[1])
[docs] def on_connect(self): self._parser.on_connect(self) for use_tube in self.use_tubes: self._use(use_tube) for watch_tube in self.watch_tubes: self._watch(watch_tube)
def _use(self, tube): self.send_command('use', tube) self.read_response() def _watch(self, tube): self.send_command('watch', tube) self.read_response()
[docs] def disconnect(self): self._parser.on_disconnect() if self._sock is None: return try: self._sock.shutdown(socket.SHUT_RDWR) self._sock.close() except socket.error: pass self._sock = None
[docs] def pack_command(self, command, body, *args): output = [] # TODO handle encoding output.append(command) for arg in args: output.append(' ' + str(arg)) if body is not None: output.append(' ' + str(len(body))) output.append(SYM_CRLF) output.append(body) output.append(SYM_CRLF) return ''.join(output)
[docs] def send_command(self, command, *args, **kwargs): command = self.pack_command(command, kwargs.get('body'), *args) if not self._sock: self.connect() try: if isinstance(command, str): command = [command] for item in command: self._sock.sendall(item) except socket.timeout: self.disconnect() raise TimeoutError("Timeout writing to socket") except socket.error: e = sys.exc_info()[1] self.disconnect() if len(e.args) == 1: errno, errmsg = 'UNKNOWN', e.args[0] else: errno = e.args[0] errmsg = e.args[1] raise ConnectionError("Error %s while writing to socket. %s." % (errno, errmsg)) except Exception: self.disconnect() raise
[docs] def read_response(self): try: response = self._parser.read_response() except Exception: self.disconnect() raise if isinstance(response, ResponseError): raise response return response
[docs] def read_body(self, size): try: response = self._parser.read(size) except Exception: self.disconnect() raise if isinstance(response, ResponseError): raise response return response
[docs]def dict_merge(*dicts): merged = {} for d in dicts: merged.update(d) return merged
[docs]def parse_yaml(connection, response, **kwargs): (status, results) = response size = results[0] body = connection.read_body(int(size)) if size > 0 and not body: raise ResponseError() return yaml.load(body)
[docs]def parse_body(connection, response, **kwargs): (status, results) = response job_id = results[0] job_size = results[1] body = connection.read_body(int(job_size)) if job_size > 0 and not body: raise ResponseError() return job_id, body
[docs]class Beanstalk(object): RESPONSE_CALLBACKS = dict_merge( {'reserve': parse_body, 'stats-tube': parse_yaml} ) EXPECTED_OK = dict_merge( {'reserve': ['RESERVED'], 'delete': ['DELETED'], 'release': ['RELEASED'], 'bury': ['BURIED'], 'put': ['INSERTED'], 'use': ['USING'], 'watch': ['WATCHING'], 'stats-tube': ['OK'], 'kick': ['KICKED'], 'kick-job': ['KICKED']} ) EXPECTED_ERR = dict_merge( {'reserve': ['DEADLINE_SOON', 'TIMED_OUT'], 'delete': ['NOT_FOUND'], 'release': ['BURIED', 'NOT_FOUND', 'OUT_OF_MEMORY'], 'bury': ['NOT_FOUND', 'OUT_OF_MEMORY'], 'stats-tube': ['NOT_FOUND'], 'use': [], 'watch': [], 'put': ['JOB_TOO_BIG', 'BURIED', 'DRAINING', 'OUT_OF_MEMORY'], 'kick': ['OUT_OF_MEMORY'], 'kick-job': ['NOT_FOUND', 'OUT_OF_MEMORY']} )
[docs] @classmethod def from_url(cls, url, **kwargs): if url is None or not url: raise ConnectionError('Empty URL') if not url.startswith('beanstalk://'): import warnings warnings.warn( 'Invalid URL scheme, expecting beanstalk', DeprecationWarning) connection = Connection.from_url(url, **kwargs) return cls(connection=connection)
def __init__(self, host=None, port=None, socket_timeout=None, socket_connect_timeout=None, socket_keepalive=None, retry_on_timeout=False, socket_keepalive_options=None, max_connections=None, connection=None): if not connection: self.socket_timeout = socket_timeout kwargs = { 'host': host, 'port': int(port), 'socket_connect_timeout': socket_connect_timeout, 'socket_keepalive': socket_keepalive, 'socket_keepalive_options': socket_keepalive_options, 'socket_timeout': socket_timeout, 'retry_on_timeout': retry_on_timeout, 'max_connections': max_connections } connection = Connection(**kwargs) self.conn_queue = LifoQueue() self.conn_queue.put_nowait(connection) self._connection = connection self.response_callbacks = self.__class__.RESPONSE_CALLBACKS.copy() self.expected_ok = self.__class__.EXPECTED_OK.copy() self.expected_err = self.__class__.EXPECTED_ERR.copy() def _get_connection(self): try: connection = self.conn_queue.get(block=True, timeout=None) except Empty: raise ConnectionError("No connection available") return connection def _release_connection(self, connection): self.conn_queue.put_nowait(connection)
[docs] def execute_command(self, *args, **kwargs): connection = self._get_connection() command_name = args[0] try: connection.send_command(*args, **kwargs) return self.parse_response(connection, command_name, **kwargs) except (ConnectionError, TimeoutError): connection.disconnect() raise finally: self._release_connection(connection)
[docs] def parse_response(self, connection, command_name, **kwargs): response = connection.read_response() status, results = response if status in self.expected_ok[command_name]: if command_name in self.response_callbacks: return self.response_callbacks[command_name]( connection, response, **kwargs) return response elif status in self.expected_err[command_name]: raise ResponseError(command_name, status, results) else: raise InvalidResponse(command_name, status, results) return response
[docs] def put(self, body, priority=DEFAULT_PRIORITY, delay=0, ttr=DEFAULT_TTR): assert isinstance(body, str), 'body must be str' job_id = self.execute_command('put', priority, delay, ttr, body=body) return job_id
[docs] def use(self, tube): self._connection.use(tube)
[docs] def watch(self, tube): self._connection.watch(tube)
[docs] def reserve(self, timeout=None): if timeout is not None: return self.execute_command('reserve-with-timeout', timeout) else: return self.execute_command('reserve')
[docs] def bury(self, job_id, priority=DEFAULT_PRIORITY): self.execute_command('bury', job_id, priority)
[docs] def release(self, job_id, priority=DEFAULT_PRIORITY, delay=0): self.execute_command('release', job_id, priority, delay)
[docs] def delete(self, job_id): self.execute_command('delete', job_id)
[docs] def kick_job(self, job_id): """ Variant of` kick` that operates with a single job. :param job_id: the job id to kick :type job_id: `str` """ self.execute_command('kick-job', job_id)
[docs] def kick(self, bound=1000): """ Move jobs into the ready queue. If there are any buried jobs, it will only kick buried jobs. Otherwise it will kick delayed jobs. :param bound: upper bound on the number of jobs to kick :type bound: `int` """ kicked = int(self.execute_command('kick', str(bound))[1][0]) return kicked
[docs] def stats_tube(self, tube): return self.execute_command('stats-tube', tube)
[docs] def close(self): if self._connection: self._connection.disconnect()