# Copyright (C) 2015-2020 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
from six import text_type
from six.moves.urllib_parse import urlparse
import redis
import redis.sentinel
from werkzeug.exceptions import NotFound, Conflict, BadRequest
from oio.common.constants import BUCKET_PROP_REPLI_ENABLED
from oio.common.timestamp import Timestamp
from oio.common.easy_value import int_value, boolean_value, float_value, \
debinarize
from oio.common.redis_conn import RedisConnection, catch_service_errors
ACCOUNT_KEY_PREFIX = 'account:'
BUCKET_KEY_PREFIX = 'bucket:'
BUCKET_LIST_PREFIX = 'buckets:'
CONTAINER_LIST_PREFIX = 'containers:'
SEGMENTS_BUCKET_SUFFIX = '+segments'
EXPIRE_TIME = 60 # seconds
account_fields = ['ns', 'name', 'ctime', 'containers', 'objects',
'bytes', 'storage_policy']
container_fields = ['ns', 'account', 'type', 'objects', 'bytes', 'ctime',
'mtime', 'name']
[docs]class AccountBackend(RedisConnection):
lua_is_sup = """
-- With lua float we are losing precision for this reason
-- we keep the number as a string
local is_sup = function(a,b)
local int_a = string.match(a,"%d+")
local int_b = string.match(b,"%d+")
if string.len(int_a) > string.len(int_b) then
return true;
end;
return a > b;
end;
"""
lua_update_bucket_func = """
-- Note that bucket is the key name, not just the bucket name
-- (it has a prefix).
local update_bucket_stats = function(
bucket, account, mtime,
inc_objects, inc_bytes, inc_damaged_objects, inc_missing_chunks)
-- Set the bucket owner.
-- FIXME(FVE): do some checks instead of overwriting
redis.call('HSET', bucket, 'account', account)
-- Increment the counters.
redis.call('HINCRBY', bucket, 'objects', inc_objects)
redis.call('HINCRBY', bucket, 'bytes', inc_bytes)
redis.call('HINCRBY', bucket, 'damaged_objects', inc_damaged_objects)
redis.call('HINCRBY', bucket, 'missing_chunks', inc_missing_chunks)
-- Finally update the modification time.
redis.call('HSET', bucket, 'mtime', mtime)
end
"""
lua_update_bucket_list = """
-- Key to the set of bucket of a specific account
local bucket_set = KEYS[1]
-- Actual name of the bucket
local bucket_name = ARGV[1]
-- True if the bucket has just been deleted
local deleted = ARGV[2]
if deleted ~= 'True' then
redis.call('ZADD', bucket_set, 0, bucket_name);
else
redis.call('ZREM', bucket_set, bucket_name);
end
"""
# FIXME(FVE): declare local variables
lua_update_container = (
lua_is_sup +
lua_update_bucket_func +
"""
-- KEYS[1] account name
-- KEYS[2] key to the container hash
-- KEYS[3] key to the account's container set
-- KEYS[4] key to the account hash
-- KEYS[5] key to the bucket hash
local bkey = KEYS[5]
-- ARGV[1] container name
-- ARGV[2] mtime
-- ARGV[3] dtime
-- ARGV[4] new object count
-- ARGV[5] new total size
-- ARGV[6] damaged objects
-- ARGV[7] missing chunks
-- ARGV[8] autocreate account?
-- ARGV[9] current timestamp
local now = ARGV[9]
-- ARGV[10] container key expiration time
-- ARGV[11] autocreate container?
-- ARGV[12] update bucket object count?
local update_bucket_object_count = ARGV[12]
local account_id = redis.call('HGET', KEYS[4], 'id');
if not account_id then
if ARGV[8] == 'True' then
redis.call('HSET', 'accounts:', KEYS[1], 1);
redis.call('HMSET', KEYS[4], 'id', KEYS[1],
'bytes', 0, 'objects', 0,
'damaged_objects', 0, 'missing_chunks', 0,
'ctime', now);
account_id = KEYS[1]
else
return redis.error_reply('no_account');
end;
end;
if ARGV[11] == 'False' then
local container_name = redis.call('HGET', KEYS[2], 'name');
if not container_name then
return redis.error_reply('no_container');
end;
end;
local name = ARGV[1];
local mtime = redis.call('HGET', KEYS[2], 'mtime');
local dtime = redis.call('HGET', KEYS[2], 'dtime');
local objects = redis.call('HGET', KEYS[2], 'objects');
local bytes = redis.call('HGET', KEYS[2], 'bytes');
local damaged_objects = redis.call('HGET', KEYS[2],
'damaged_objects');
local missing_chunks = redis.call('HGET', KEYS[2],
'missing_chunks');
-- When the keys do not exist redis return false and not nil
if dtime == false then
dtime = '0'
end
if mtime == false then
mtime = '0'
end
if objects == false then
objects = 0
else
objects = tonumber(objects)
end
if bytes == false then
bytes = 0
else
bytes = tonumber(bytes)
end
if damaged_objects == false then
damaged_objects = 0
else
damaged_objects = tonumber(damaged_objects)
end
if missing_chunks == false then
missing_chunks = 0
else
missing_chunks = tonumber(missing_chunks)
end
if ARGV[11] == 'False' and is_sup(dtime, mtime) then
return redis.error_reply('no_container');
end;
local old_mtime = mtime;
local inc_objects = 0;
local inc_bytes = 0;
local inc_damaged_objects = 0;
local inc_missing_chunks = 0;
if not is_sup(ARGV[3],dtime) and not is_sup(ARGV[2],mtime) then
return redis.error_reply('no_update_needed');
end;
if is_sup(ARGV[2],mtime) then
mtime = ARGV[2];
end;
if is_sup(ARGV[3],dtime) then
dtime = ARGV[3];
end;
if is_sup(dtime,mtime) then
-- Protect against "minus zero".
if objects ~= 0 then
inc_objects = -objects;
end
if bytes ~= 0 then
inc_bytes = -bytes;
end
if damaged_objects ~= 0 then
inc_damaged_objects = -damaged_objects
end
if missing_chunks ~= 0 then
inc_missing_chunks = -missing_chunks;
end
redis.call('HMSET', KEYS[2],
'bytes', 0, 'objects', 0,
'damaged_objects', 0, 'missing_chunks', 0);
redis.call('EXPIRE', KEYS[2], tonumber(ARGV[10]));
redis.call('ZREM', KEYS[3], name);
elseif is_sup(mtime,old_mtime) then
redis.call('PERSIST', KEYS[2]);
inc_objects = tonumber(ARGV[4]) - objects
inc_bytes = tonumber(ARGV[5]) - bytes
inc_damaged_objects = tonumber(ARGV[6]) - damaged_objects
inc_missing_chunks = tonumber(ARGV[7]) - missing_chunks
redis.call('HMSET', KEYS[2],
'objects', tonumber(ARGV[4]),
'bytes', tonumber(ARGV[5]),
'damaged_objects', tonumber(ARGV[6]),
'missing_chunks', tonumber(ARGV[7]));
redis.call('ZADD', KEYS[3], '0', name);
else
return redis.error_reply('no_update_needed');
end;
redis.call('HMSET', KEYS[2], 'mtime', mtime,
'dtime', dtime, 'name', name)
if inc_objects ~= 0 then
redis.call('HINCRBY', KEYS[4], 'objects', inc_objects);
end;
if inc_bytes ~= 0 then
redis.call('HINCRBY', KEYS[4], 'bytes', inc_bytes);
end;
if inc_damaged_objects ~= 0 then
redis.call('HINCRBY', KEYS[4], 'damaged_objects',
inc_damaged_objects);
end;
if inc_missing_chunks ~= 0 then
redis.call('HINCRBY', KEYS[4], 'missing_chunks',
inc_missing_chunks);
end;
if bkey ~= 'False' then
-- For container holding MPU segments, we do not want to count
-- each segment as an object. But we still want to consider
-- their size.
if update_bucket_object_count ~= 'True' then
inc_objects = 0
end
update_bucket_stats(bkey, account_id, now,
inc_objects, inc_bytes,
inc_damaged_objects, inc_missing_chunks)
end
""")
lua_refresh_account = """
local account_id = redis.call('HGET', KEYS[1], 'id');
if not account_id then
return redis.error_reply('no_account');
end;
local containers = redis.call('ZRANGE', KEYS[2], 0, -1);
local container_key = ''
local bytes_sum = 0;
local objects_sum = 0;
local damaged_objects_sum = 0;
local missing_chunks_sum = 0;
for _,container in ipairs(containers) do
container_key = KEYS[3] .. container;
objects_sum = objects_sum + redis.call('HGET', container_key,
'objects')
bytes_sum = bytes_sum + redis.call('HGET', container_key, 'bytes')
local damaged_objects = redis.call('HGET', container_key,
'damaged_objects')
if damaged_objects == false then
damaged_objects = 0
end
damaged_objects_sum = damaged_objects_sum + damaged_objects
local missing_chunks = redis.call('HGET', container_key,
'missing_chunks')
if missing_chunks == false then
missing_chunks = 0
end
missing_chunks_sum = missing_chunks_sum + missing_chunks
end;
redis.call('HMSET', KEYS[1], 'objects', objects_sum,
'bytes', bytes_sum,
'damaged_objects', damaged_objects_sum,
'missing_chunks', missing_chunks_sum)
"""
lua_flush_account = """
local account_id = redis.call('HGET', KEYS[1], 'id');
if not account_id then
return redis.error_reply('no_account');
end;
redis.call('HMSET', KEYS[1], 'objects', 0, 'bytes', 0,
'damaged_objects', 0, 'missing_chunks', 0)
local containers = redis.call('ZRANGE', KEYS[2], 0, -1);
redis.call('DEL', KEYS[2]);
for _,container in ipairs(containers) do
redis.call('DEL', KEYS[3] .. container);
end;
"""
lua_get_extended_container_info = """
local ckey = KEYS[1]
local bucket_prefix = ARGV[1]
local bname = redis.call('HGET', ckey, 'bucket')
local replication_enabled = 'false'
if bname then
local bkey = bucket_prefix .. bname
local enabled_str = redis.call('HGET', bkey, 'replication_enabled')
if enabled_str ~= nil then
-- Do not cast into boolean here
replication_enabled = enabled_str
end
end
local res = redis.call('HGETALL', ckey)
table.insert(res, 'replication_enabled')
table.insert(res, replication_enabled)
return res
"""
# This regex comes from https://stackoverflow.com/a/50484916
#
# The first group looks ahead to ensure that the match
# is between 3 and 63 characters long.
#
# The next group (?!^(\d+\.)+\d+$) looks ahead to forbid matching
# bucket names that look like IP addresses.
#
# <The last group matches zero or more labels followed by a dot *
buckets_pattern = re.compile(
r"""(?=^.{3,63}$) # first group
(?!^(\d+\.)+\d+$) # second
(^(([a-z0-9]|[a-z0-9][a-z0-9\-]*[a-z0-9])\.)* #third
([a-z0-9]|[a-z0-9][a-z0-9\-]*[a-z0-9])$)""", re.X)
def __init__(self, conf):
self.conf = conf
redis_conf = {k[6:]: v for k, v in self.conf.items()
if k.startswith("redis_")}
redis_host = redis_conf.pop('host', None)
if redis_host:
parsed = urlparse('http://' + redis_host)
if parsed.port is None:
redis_host = '%s:%s' % (redis_host,
redis_conf.pop('port', '6379'))
redis_sentinel_hosts = redis_conf.pop(
'sentinel_hosts',
# TODO(adu): Delete when it will no longer be used
self.conf.get('sentinel_hosts'))
redis_sentinel_name = redis_conf.pop(
'sentinel_name',
# TODO(adu): Delete when it will no longer be used
self.conf.get('sentinel_master_name'))
super(AccountBackend, self).__init__(
host=redis_host, sentinel_hosts=redis_sentinel_hosts,
sentinel_name=redis_sentinel_name, **redis_conf)
self.autocreate = boolean_value(conf.get('autocreate'), True)
self.script_update_container = self.register_script(
self.lua_update_container)
self.script_update_bucket_list = self.register_script(
self.lua_update_bucket_list)
self.script_refresh_account = self.register_script(
self.lua_refresh_account)
self.script_flush_account = self.register_script(
self.lua_flush_account)
self.script_get_container_info = self.register_script(
self.lua_get_extended_container_info)
self._account_prefix = conf.get('account_prefix', ACCOUNT_KEY_PREFIX)
self._bucket_prefix = conf.get('bucket_prefix', BUCKET_KEY_PREFIX)
self._bucket_list_prefix = conf.get('bucket_list_prefix',
BUCKET_LIST_PREFIX)
self._container_list_prefix = conf.get('container_list_prefix',
CONTAINER_LIST_PREFIX)
[docs] def akey(self, account):
"""Build the key of an account description"""
return self._account_prefix + account
[docs] def bkey(self, bucket):
"""Build the key of a bucket description"""
return self._bucket_prefix + bucket
[docs] def blistkey(self, account):
"""Build the key of an account's bucket list"""
return self._bucket_list_prefix + account
[docs] @staticmethod
def ckey(account, name):
"""Build the key of a container description"""
return 'container:%s:%s' % (account, text_type(name))
[docs] def clistkey(self, account):
"""Build the key of an account's container list"""
return self._container_list_prefix + account
[docs] @catch_service_errors
def create_account(self, account_id):
conn = self.conn
if not account_id:
return None
if conn.hget('accounts:', account_id):
return None
lock = self.acquire_lock_with_timeout(self.akey(account_id), 1)
if not lock:
return None
pipeline = conn.pipeline(True)
pipeline.hset('accounts:', account_id, 1)
pipeline.hmset(self.akey(account_id), {
'id': account_id,
'objects': 0,
'bytes': 0,
'damaged_objects': 0,
'missing_chunks': 0,
'ctime': Timestamp().normal
})
pipeline.execute()
self.release_lock(self.akey(account_id), lock)
return account_id
[docs] @catch_service_errors
def delete_account(self, req_account_id):
conn = self.conn
if not req_account_id:
return None
account_id = conn.hget(self.akey(req_account_id), 'id')
if not account_id:
return None
account_id = account_id.decode('utf-8')
lock = self.acquire_lock_with_timeout(self.akey(account_id), 1)
if not lock:
return None
num_containers = conn.zcard(self.clistkey(account_id))
if int(num_containers) > 0:
self.release_lock('account:%s' % account_id, lock)
return False
pipeline = conn.pipeline(True)
pipeline.delete('metadata:%s' % account_id)
pipeline.delete(self.clistkey(account_id))
pipeline.delete(self.akey(account_id))
pipeline.hdel('accounts:', account_id)
pipeline.execute()
self.release_lock(self.akey(account_id), lock)
return True
[docs] def cast_fields(self, info):
"""
Cast dict entries to the type they are supposed to be.
"""
for what in (b'bytes', b'objects', b'damaged_objects',
b'missing_chunks'):
try:
info[what] = int_value(info.get(what), 0)
except (TypeError, ValueError):
pass
for what in (BUCKET_PROP_REPLI_ENABLED.encode('utf-8'), ):
try:
decoded = info.get(what, b'').decode('utf-8')
info[what] = boolean_value(decoded)
except (TypeError, ValueError):
pass
[docs] @catch_service_errors
def get_bucket_info(self, bname):
"""
Get all available information about a bucket.
"""
if not bname:
return None
binfo = self.conn_slave.hgetall(self.bkey(bname))
self.cast_fields(binfo)
return binfo
[docs] @catch_service_errors
def get_container_info(self, account_id, cname):
"""
Get all available information about a container, including some
information coming from the bucket it belongs to.
"""
if not cname:
return None
keys = [self.ckey(account_id, cname)]
args = [self._bucket_prefix]
cinfolist = self.script_get_container_info(
keys=keys, args=args, client=self.conn_slave)
key = None
cinfo = dict()
for cursor in cinfolist:
if key is not None:
cinfo[key] = cursor
key = None
continue
key = cursor
self.cast_fields(cinfo)
return cinfo
# return None
[docs] @catch_service_errors
def info_account(self, req_account_id):
conn = self.conn_slave
if not req_account_id:
return None
account_id = conn.hget(self.akey(req_account_id), 'id')
if not account_id:
return None
account_id = account_id.decode('utf-8')
pipeline = conn.pipeline(False)
pipeline.hgetall(self.akey(account_id))
pipeline.zcard(self.blistkey(account_id))
pipeline.zcard(self.clistkey(account_id))
pipeline.hgetall('metadata:%s' % account_id)
data = pipeline.execute()
info = data[0]
self.cast_fields(info)
info[b'buckets'] = data[1]
info[b'containers'] = data[2]
info[b'metadata'] = data[3]
return debinarize(info)
[docs] @catch_service_errors
def list_accounts(self):
"""
Get the list of all accounts.
"""
conn = self.conn_slave
accounts = conn.hkeys('accounts:')
return debinarize(accounts)
[docs] @catch_service_errors
def update_container(self, account_id, name, mtime, dtime,
object_count, bytes_used,
damaged_objects, missing_chunks,
bucket_name=None,
autocreate_account=None, autocreate_container=True):
if not account_id or not name:
raise BadRequest("Missing account or container")
if autocreate_account is None:
autocreate_account = self.autocreate
if mtime is None:
mtime = '0'
else:
mtime = Timestamp(mtime).normal
if dtime is None:
dtime = '0'
else:
dtime = Timestamp(dtime).normal
deleted = float(dtime) > float(mtime)
if object_count is None:
object_count = 0
if bytes_used is None:
bytes_used = 0
if damaged_objects is None:
damaged_objects = 0
if missing_chunks is None:
missing_chunks = 0
# If no bucket name is provided, set it to 'False'
# (we cannot pass None to the Lua script).
bucket_key = self.bkey(bucket_name) if bucket_name else str(False)
now = Timestamp().normal
# With some sharding middlewares, the suffix may be
# in the middle of the container name.
update_bucket_object_count = SEGMENTS_BUCKET_SUFFIX not in name
ckey = AccountBackend.ckey(account_id, name)
keys = [account_id, ckey,
self.clistkey(account_id),
self.akey(account_id),
bucket_key]
args = [name, mtime, dtime, object_count, bytes_used,
damaged_objects, missing_chunks,
str(autocreate_account), now, EXPIRE_TIME,
str(autocreate_container),
str(update_bucket_object_count)]
pipeline = self.conn.pipeline(True)
try:
self.script_update_container(
keys=keys, args=args, client=pipeline)
if bucket_name and not deleted:
pipeline.hset(ckey, "bucket", bucket_name)
# Only execute when the main shard is created/deleted.
if bucket_name == name:
self.script_update_bucket_list(
keys=[self.blistkey(account_id)],
args=[bucket_name, str(deleted)],
client=pipeline)
pipeline.execute()
except redis.exceptions.ResponseError as exc:
if text_type(exc).endswith("no_account"):
raise NotFound("Account %s not found" % account_id)
if text_type(exc).endswith("no_container"):
raise NotFound("Container %s not found" % name)
elif text_type(exc).endswith("no_update_needed"):
raise Conflict("No update needed, "
"event older than last container update")
else:
raise
return name
def _should_be_listed(self, c_id, s3_buckets_only):
return not s3_buckets_only or self.buckets_pattern.match(c_id)
@catch_service_errors
def _raw_listing(self, key, limit, marker=None, end_marker=None,
delimiter=None, prefix=None, s3_buckets_only=False):
"""
Fetch a list of tuples of items matching the specified options.
Each tuple is formed like
[(container|prefix),
0 *reserved for objects*,
0 *reserved for size*,
0 for container, 1 for prefix,
0 *reserved for mtime*]
:returns: the list of results, and the marker for the next request
(in case the list of results is truncated)
"""
orig_marker = marker
results = list()
beyond_prefix = False
if prefix is None:
prefix = ''
while len(results) < limit and not beyond_prefix:
min_k = '-'
max_k = '+'
if end_marker:
max_k = '(' + end_marker
if marker:
if marker < prefix:
# Start on the prefix
min_k = '[' + prefix
else:
# Start just after the marker
min_k = '(' + marker
elif prefix:
min_k = '[' + prefix
# Ask for one extra element, to be able to tell if the
# list of results is truncated.
cnames = self.conn_slave.zrangebylex(
key, min_k, max_k,
0, limit - len(results) + 1)
if not cnames:
# No more items
marker = None
break
cnames = [c.decode('utf8', errors='ignore') for c in cnames]
for cname in cnames:
if len(results) >= limit:
# Do not reset marker, there are more items
break
elif prefix and not cname.startswith(prefix):
beyond_prefix = True
# No more items
marker = None
break
marker = cname
if delimiter:
end = cname.find(delimiter, len(prefix))
if end > 0:
# Delimiter found after the prefix.
# Build a new marker, and continue listing from there.
# TODO(FVE): we can avoid another request to Redis by
# analyzing the rest of the list ourselves.
dir_name = cname[:end + 1]
marker = dir_name + u'\ufffd'
if dir_name != orig_marker:
results.append([dir_name, 0, 0, 1, 0])
break
if self._should_be_listed(cname, s3_buckets_only):
results.append([cname, 0, 0, 0, 0])
return results, marker
[docs] @catch_service_errors
def list_buckets(self, account_id, limit=1000, marker=None,
end_marker=None, prefix=None):
"""
Get the list of buckets of the specified account.
:returns: the list of buckets (with metadata), and the next
marker (in case the list is truncated).
"""
raw_list, next_marker = self._raw_listing(
self.blistkey(account_id),
limit=limit, marker=marker,
end_marker=end_marker, prefix=prefix)
pipeline = self.conn_slave.pipeline(True)
for entry in raw_list:
# For real buckets (not prefixes), fetch metadata.
if not entry[3]:
pipeline.hmget(self.bkey(entry[0]),
'objects', 'bytes', 'mtime')
res = pipeline.execute()
output = list()
i = 0
for bucket in raw_list:
if not bucket[3]:
bdict = {
'name': bucket[0],
'objects': int_value(res[i][0], 0),
'bytes': int_value(res[i][1], 0),
'mtime': float_value(res[i][2], 0.0),
}
i += 1
else:
bdict = {'prefix': bucket}
output.append(bdict)
return output, next_marker
[docs] @catch_service_errors
def list_containers(self, account_id, limit=1000, marker=None,
end_marker=None, prefix=None, delimiter=None,
s3_buckets_only=False):
raw_list, _next_marker = self._raw_listing(
self.clistkey(account_id),
limit=limit, marker=marker,
end_marker=end_marker, prefix=prefix,
delimiter=delimiter,
s3_buckets_only=s3_buckets_only)
pipeline = self.conn_slave.pipeline(True)
# skip prefix
for container in [entry for entry in raw_list if not entry[3]]:
pipeline.hmget(AccountBackend.ckey(account_id, container[0]),
'objects', 'bytes', 'mtime')
res = pipeline.execute()
i = 0
for container in raw_list:
if not container[3]:
# FIXME(adu) Convert to dict
container[1] = int_value(res[i][0], 0)
container[2] = int_value(res[i][1], 0)
container[4] = float_value(res[i][2], 0.0)
i += 1
return raw_list
[docs] @catch_service_errors
def status(self):
conn = self.conn_slave
account_count = conn.hlen('accounts:')
status = {'account_count': account_count}
return status
[docs] @catch_service_errors
def refresh_account(self, account_id):
if not account_id:
raise BadRequest("Missing account")
keys = [self.akey(account_id),
self.clistkey(account_id),
"container:%s:" % account_id]
try:
self.script_refresh_account(keys=keys, client=self.conn)
except redis.exceptions.ResponseError as exc:
if str(exc) == "no_account":
raise NotFound(account_id)
else:
raise
[docs] @catch_service_errors
def flush_account(self, account_id):
if not account_id:
raise BadRequest("Missing account")
keys = [self.akey(account_id),
self.clistkey(account_id),
"container:%s:" % account_id]
try:
self.script_flush_account(keys=keys, client=self.conn)
except redis.exceptions.ResponseError as exc:
if str(exc) == "no_account":
raise NotFound(account_id)
else:
raise