Source code for oio.common.redis_conn

# Copyright (C) 2015-2017 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.

from time import time, sleep
import uuid
import math

import redis
import redis.sentinel


[docs]class RedisConn(object): def __init__(self, conf, connection=None, **kwargs): self.conf = conf self._conn = connection self._sentinel = None self._sentinel_hosts = conf.get('sentinel_hosts', None) self._sentinel_name = conf.get('sentinel_master_name', 'oio') # Do not use Sentinel if a connection object is provided if self._sentinel_hosts and not self._conn: self._sentinel = redis.sentinel.Sentinel( [(h, int(p)) for h, p, in (hp.split(':', 2) for hp in self._sentinel_hosts.split(','))])
[docs] def register_script(self, script): """Register a LUA script and return Script object.""" return self.conn.register_script(script)
@property def conn(self): """Retrieve Redis connection (normal or sentinel)""" if self._sentinel: return self._sentinel.master_for(self._sentinel_name) if not self._conn: redis_host = self.conf.get('redis_host', '127.0.0.1') redis_port = int(self.conf.get('redis_port', '6379')) self._conn = redis.StrictRedis(host=redis_host, port=redis_port) return self._conn
[docs] def acquire_lock_with_timeout(self, lockname, acquire_timeout=10, lock_timeout=10): """Acquire a lock :lockname:""" conn = self.conn identifier = str(uuid.uuid4()) lockname = 'lock:' + lockname lock_timeout = int(math.ceil(lock_timeout)) end = time() + acquire_timeout while time() < end: if conn.setnx(lockname, identifier): conn.expire(lockname, lock_timeout) return identifier elif not conn.ttl(lockname): conn.expire(lockname, lock_timeout) sleep(.001) return False
[docs] def release_lock(self, lockname, identifier): """Release a previously acquired Lock""" conn = self.conn pipe = conn.pipeline(True) lockname = 'lock:' + lockname while True: try: pipe.watch(lockname) if pipe.get(lockname) == identifier: pipe.multi() pipe.delete(lockname) pipe.execute() return True pipe.unwatch() break except redis.exceptions.WatchError: pass return False