# Copyright (C) 2015-2017 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from time import time
import redis
import redis.sentinel
from werkzeug.exceptions import NotFound, Conflict, BadRequest
from oio.common.timestamp import Timestamp
from oio.common.easy_value import int_value, true_value, float_value
from oio.common.redis_conn import RedisConn
EXPIRE_TIME = 60 # seconds
account_fields = ['ns', 'name', 'ctime', 'containers', 'objects',
'bytes', 'storage_policy']
container_fields = ['ns', 'account', 'type', 'objects', 'bytes', 'ctime',
'mtime', 'name']
[docs]class AccountBackend(RedisConn):
lua_is_sup = """
-- With lua float we are losing precision for this reason
-- we keep the number as a string
local is_sup = function(a,b)
local int_a = string.match(a,"%d+")
local int_b = string.match(b,"%d+")
if string.len(int_a) > string.len(int_b) then
return true;
end;
return a > b;
end;
"""
lua_update_container = (lua_is_sup + """
local account_id = redis.call('HGET', KEYS[4], 'id');
if not account_id then
if ARGV[6] == 'True' then
redis.call('HSET', 'accounts:', KEYS[1], 1);
redis.call('HMSET', KEYS[4], 'id', KEYS[1],
'bytes', 0, 'objects', 0, 'ctime', ARGV[7]);
else
return redis.error_reply('no_account');
end;
end;
if ARGV[9] == 'False' then
local container_name = redis.call('HGET', KEYS[2], 'name');
if not container_name then
return redis.error_reply('no_container');
end;
end;
local objects = redis.call('HGET', KEYS[2], 'objects');
local name = ARGV[1];
local mtime = redis.call('HGET', KEYS[2], 'mtime');
local dtime = redis.call('HGET', KEYS[2], 'dtime');
local bytes = redis.call('HGET', KEYS[2], 'bytes');
-- When the keys do not exist redis return false and not nil
if objects == false then
objects = 0
end
if dtime == false then
dtime = '0'
end
if mtime == false then
mtime = '0'
end
if bytes == false then
bytes = 0
end
if ARGV[9] == 'False' and is_sup(dtime, mtime) then
return redis.error_reply('no_container');
end;
local old_mtime = mtime;
local inc_objects;
local inc_bytes;
if not is_sup(ARGV[3],dtime) and
not is_sup(ARGV[2],mtime) then
return redis.error_reply('no_update_needed');
end;
if is_sup(ARGV[2],mtime) then
mtime = ARGV[2];
end;
if is_sup(ARGV[3],dtime) then
dtime = ARGV[3];
end;
if is_sup(dtime,mtime) then
inc_objects = -objects;
inc_bytes = -bytes;
redis.call('HMSET', KEYS[2], 'bytes', 0, 'objects', 0);
redis.call('EXPIRE', KEYS[2], tonumber(ARGV[8]));
redis.call('ZREM', KEYS[3], name);
elseif is_sup(mtime,old_mtime) then
redis.call('PERSIST', KEYS[2]);
inc_objects = tonumber(ARGV[4]) - objects
inc_bytes = tonumber(ARGV[5]) - bytes
redis.call('HMSET', KEYS[2], 'bytes', tonumber(ARGV[5]),
'objects', tonumber(ARGV[4]));
redis.call('ZADD', KEYS[3], '0', name);
else
return redis.error_reply('no_update_needed');
end;
redis.call('HMSET', KEYS[2], 'mtime', mtime,
'dtime', dtime, 'name', name)
if inc_objects ~= 0 then
redis.call('HINCRBY', KEYS[4], 'objects', inc_objects);
end;
if inc_bytes ~= 0 then
redis.call('HINCRBY', KEYS[4], 'bytes', inc_bytes);
end;
""")
lua_refresh_account = """
local account_id = redis.call('HGET', KEYS[1], 'id');
if not account_id then
return redis.error_reply('no_account');
end;
local containers = redis.call('ZRANGE', KEYS[2], 0, -1);
local container_key = ''
local bytes_sum = 0;
local objects_sum = 0;
for _,container in ipairs(containers) do
container_key = KEYS[3] .. container;
bytes_sum = bytes_sum + redis.call('HGET', container_key, 'bytes')
objects_sum = objects_sum + redis.call('HGET', container_key,
'objects')
end;
redis.call('HMSET', KEYS[1], 'bytes', bytes_sum,
'objects', objects_sum)
"""
lua_flush_account = """
local account_id = redis.call('HGET', KEYS[1], 'id');
if not account_id then
return redis.error_reply('no_account');
end;
redis.call('HMSET', KEYS[1], 'bytes', 0, 'objects', 0)
local containers = redis.call('ZRANGE', KEYS[2], 0, -1);
redis.call('DEL', KEYS[2]);
for _,container in ipairs(containers) do
redis.call('DEL', KEYS[3] .. container);
end;
"""
def __init__(self, conf, connection=None):
self.conf = conf
self.autocreate = true_value(conf.get('autocreate', 'true'))
super(AccountBackend, self).__init__(conf, connection)
self.script_update_container = self.register_script(
self.lua_update_container)
self.script_refresh_account = self.register_script(
self.lua_refresh_account)
self.script_flush_account = self.register_script(
self.lua_flush_account)
[docs] @staticmethod
def ckey(account, name):
"""Build the key of a container description"""
return 'container:%s:%s' % (account, unicode(name))
[docs] def create_account(self, account_id):
conn = self.conn
if not account_id:
return None
if conn.hget('accounts:', account_id):
return None
lock = self.acquire_lock_with_timeout('account:%s' % account_id, 1)
if not lock:
return None
pipeline = conn.pipeline(True)
pipeline.hset('accounts:', account_id, 1)
pipeline.hmset('account:%s' % account_id, {
'id': account_id,
'bytes': 0,
'objects': 0,
'ctime': Timestamp(time()).normal
})
pipeline.execute()
self.release_lock('account:%s' % account_id, lock)
return account_id
[docs] def delete_account(self, account_id):
conn = self.conn
if not account_id:
return None
account_id = conn.hget('account:%s' % account_id, 'id')
if not account_id:
return None
lock = self.acquire_lock_with_timeout('account:%s' % account_id, 1)
if not lock:
return None
num_containers = conn.zcard('containers:%s' % account_id)
if int(num_containers) > 0:
return False
pipeline = conn.pipeline(True)
pipeline.delete('metadata:%s' % account_id)
pipeline.delete('containers:%s' % account_id)
pipeline.delete('account:%s' % account_id)
pipeline.hdel('accounts:', account_id)
pipeline.execute()
self.release_lock('account:%s' % account_id, lock)
return True
[docs] def info_account(self, account_id):
conn = self.conn
if not account_id:
return None
account_id = conn.hget('account:%s' % account_id, 'id')
if not account_id:
return None
pipeline = conn.pipeline(False)
pipeline.hgetall('account:%s' % account_id)
pipeline.zcard('containers:%s' % account_id)
pipeline.hgetall('metadata:%s' % account_id)
data = pipeline.execute()
info = data[0]
for r in ['bytes', 'objects']:
info[r] = int_value(info[r], 0)
info['containers'] = data[1]
info['metadata'] = data[2]
return info
[docs] def list_account(self):
conn = self.conn
accounts = conn.hkeys('accounts:')
return accounts
[docs] def update_container(self, account_id, name, mtime, dtime, object_count,
bytes_used, autocreate_account=None,
autocreate_container=True):
conn = self.conn
if not account_id or not name:
raise BadRequest("Missing account or container")
if autocreate_account is None:
autocreate_account = self.autocreate
if mtime is None:
mtime = '0'
else:
mtime = Timestamp(float(mtime)).normal
if dtime is None:
dtime = '0'
else:
dtime = Timestamp(float(dtime)).normal
if object_count is None:
object_count = 0
if bytes_used is None:
bytes_used = 0
keys = [account_id, AccountBackend.ckey(account_id, name),
("containers:%s" % (account_id)),
("account:%s" % (account_id))]
args = [name, mtime, dtime, object_count, bytes_used,
autocreate_account, Timestamp(time()).normal, EXPIRE_TIME,
autocreate_container]
try:
self.script_update_container(keys=keys, args=args, client=conn)
except redis.exceptions.ResponseError as exc:
if str(exc) == "no_account":
raise NotFound("Account %s not found" % account_id)
if str(exc) == "no_container":
raise NotFound("Container %s not found" % name)
elif str(exc) == "no_update_needed":
raise Conflict("No update needed, "
"event older than last container update")
else:
raise
return name
def _raw_listing(self, account_id, limit, marker, end_marker, delimiter,
prefix):
"""Fetch tuple list of containers matching options.
Tuple is [(container|prefix),
0 *reserved for objects*,
0 *reserved for size*,
0 for container, 1 for prefix]"""
conn = self.conn
if delimiter and not prefix:
prefix = ''
orig_marker = marker
results = []
while len(results) < limit:
min = '-'
max = '+'
if end_marker:
max = '(' + end_marker
if marker and marker >= prefix:
min = '(' + marker
elif prefix:
min = '[' + prefix
offset = 0
container_ids = conn.zrangebylex('containers:%s' % account_id, min,
max, offset, limit - len(results))
container_ids = [cid.decode('utf8', errors='ignore')
for cid in container_ids]
if prefix is None:
containers = [[c_id, 0, 0, 0, 0] for c_id in container_ids]
return containers
if not delimiter:
if not prefix:
containers = [[c_id, 0, 0, 0, 0] for c_id in container_ids]
return containers
else:
containers = [[c_id, 0, 0, 0, 0] for c_id in container_ids
if c_id.startswith(prefix)]
return containers
count = 0
for container_id in container_ids:
count += 1
marker = container_id
if len(results) >= limit\
or not container_id.startswith(prefix):
return results
end = container_id.find(delimiter, len(prefix))
if end > 0:
marker = container_id[:end] + chr(ord(delimiter) + 1)
dir_name = container_id[:end + 1]
if dir_name != orig_marker:
results.append([dir_name, 0, 0, 1, 0])
break
results.append([container_id, 0, 0, 0, 0])
if not count:
break
return results
[docs] def list_containers(self, account_id, limit=1000, marker=None,
end_marker=None, prefix=None, delimiter=None):
raw_list = self._raw_listing(account_id, limit=limit, marker=marker,
end_marker=end_marker, prefix=prefix,
delimiter=delimiter)
pipeline = self.conn.pipeline(True)
# skip prefix
for container in [entry for entry in raw_list if not entry[3]]:
pipeline.hmget(AccountBackend.ckey(account_id, container[0]),
'objects', 'bytes', 'mtime')
res = pipeline.execute()
i = 0
for container in raw_list:
if not container[3]:
container[1] = int_value(res[i][0], 0)
container[2] = int_value(res[i][1], 0)
container[4] = float_value(res[i][2], 0.0)
i += 1
return raw_list
[docs] def status(self):
conn = self.conn
account_count = conn.hlen('accounts:')
status = {'account_count': account_count}
return status
[docs] def refresh_account(self, account_id):
if not account_id:
raise BadRequest("Missing account")
keys = ["account:%s" % account_id,
"containers:%s" % account_id,
"container:%s:" % account_id]
try:
self.script_refresh_account(keys=keys, client=self.conn)
except redis.exceptions.ResponseError as exc:
if str(exc) == "no_account":
raise NotFound(account_id)
else:
raise
[docs] def flush_account(self, account_id):
if not account_id:
raise BadRequest("Missing account")
keys = ["account:%s" % account_id,
"containers:%s" % account_id,
"container:%s:" % account_id]
try:
self.script_flush_account(keys=keys, client=self.conn)
except redis.exceptions.ResponseError as exc:
if str(exc) == "no_account":
raise NotFound(account_id)
else:
raise