# Copyright (C) 2017-2019 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.
import random
from oio.api.io import ChunkReader, READ_CHUNK_SIZE
from oio.api.ec import ECChunkDownloadHandler
from oio.common import exceptions as exc
from oio.common.constants import OBJECT_METADATA_PREFIX
from oio.common.http import http_header_from_ranges
from oio.common.decorators import ensure_headers
[docs]def wrand_choice_index(scores):
"""Choose an element from the `scores` sequence and return its index"""
scores = list(scores)
total = sum(scores)
target = random.uniform(0, total)
upto = 0
index = 0
for score in scores:
if upto + score >= target:
return index
upto += score
index += 1
assert False, "Shouldn't get here"
def _sort_chunks(raw_chunks, ec_security, logger=None):
"""
Sort a list a chunk objects. In addition to the sort,
this function adds an "offset" field to each chunk object.
:type raw_chunks: iterable of `dict`
:param ec_security: tells the sort algorithm that chunk positions are
composed (e.g. "0.4").
:type ec_security: `bool`
:returns: a `dict` with metachunk positions as keys,
and `list` of chunk objects as values.
"""
nums_by_position = dict()
chunks = dict()
for chunk in raw_chunks:
raw_position = chunk["pos"].split(".")
position = int(raw_position[0])
if ec_security:
num = int(raw_position[1])
chunk['num'] = int(raw_position[1])
nums = nums_by_position.setdefault(position, set())
if num in nums:
if logger:
logger.warning(
'Duplicated position (%s) for %s', chunk['pos'],
chunk['real_url'])
continue
nums.add(num)
chunks_at_position = chunks.setdefault(position, list())
chunks_at_position.append(chunk)
# for each position, remove incoherent chunks
for pos, local_chunks in chunks.iteritems():
if len(local_chunks) < 2:
continue
byhash = dict()
for chunk in local_chunks:
h = chunk.get('hash')
if h not in byhash:
byhash[h] = list()
byhash[h].append(chunk)
if len(byhash) < 2:
continue
# sort by length
bylength = byhash.values()
bylength.sort(key=len, reverse=True)
chunks[pos] = bylength[0]
# Append the 'offset' attribute
offset = 0
for pos in sorted(chunks.keys()):
clist = chunks[pos]
clist.sort(key=lambda x: x.get("score", 0), reverse=True)
for element in clist:
element['offset'] = offset
if not ec_security and len(clist) > 1:
# When scores are close together (e.g. [95, 94, 94, 93, 50]),
# don't always start with the highest element.
first = wrand_choice_index(x.get("score", 0) for x in clist)
clist[0], clist[first] = clist[first], clist[0]
offset += clist[0]['size']
return chunks
def _make_object_metadata(headers):
meta = {}
props = {}
prefix = OBJECT_METADATA_PREFIX
for k, v in headers.iteritems():
k = k.lower()
if k.startswith(prefix):
key = k.replace(prefix, "")
# TODO temporary workaround
# This is used by properties set through swift
if key.startswith('x-'):
props[key[2:]] = v
else:
meta[key.replace('-', '_')] = v
meta['properties'] = props
return meta
[docs]@ensure_headers
def fetch_stream(chunks, ranges, storage_method, headers=None,
**kwargs):
ranges = ranges or [(None, None)]
meta_range_list = get_meta_ranges(ranges, chunks)
for meta_range_dict in meta_range_list:
for pos in sorted(meta_range_dict.keys()):
meta_start, meta_end = meta_range_dict[pos]
if meta_start is not None and meta_end is not None:
headers['Range'] = http_header_from_ranges(
(meta_range_dict[pos], ))
reader = ChunkReader(
iter(chunks[pos]), READ_CHUNK_SIZE, headers=headers,
**kwargs)
try:
it = reader.get_iter()
except exc.NotFound as err:
raise exc.UnrecoverableContent(
"Cannot download position %d: %s" %
(pos, err))
except Exception as err:
raise exc.ServiceUnavailable(
"Error while downloading position %d: %s" %
(pos, err))
for part in it:
for dat in part['iter']:
yield dat
[docs]@ensure_headers
def fetch_stream_ec(chunks, ranges, storage_method, **kwargs):
ranges = ranges or [(None, None)]
meta_range_list = get_meta_ranges(ranges, chunks)
for meta_range_dict in meta_range_list:
for pos in sorted(meta_range_dict.keys()):
meta_start, meta_end = meta_range_dict[pos]
handler = ECChunkDownloadHandler(
storage_method, chunks[pos],
meta_start, meta_end, **kwargs)
try:
stream = handler.get_stream()
except exc.NotFound as err:
raise exc.UnrecoverableContent(
"Cannot download position %d: %s" %
(pos, err))
except Exception as err:
raise exc.ServiceUnavailable(
"Error while downloading position %d: %s" %
(pos, err))
try:
for part_info in stream:
for dat in part_info['iter']:
yield dat
finally:
# This must be done in a finally block to handle the case
# when the reader does not read until the end of the stream.
stream.close()