# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.
from oio.common.green import ChunkReadTimeout, ChunkWriteTimeout, \
ContextPool, ConnectionTimeout, Empty, GreenPile, LightQueue, \
SourceReadTimeout, Timeout, Queue, eventlet_yield
import collections
import math
import hashlib
from urlparse import urlparse
from socket import error as SocketError
from greenlet import GreenletExit
from oio.api import io
from oio.common import exceptions
from oio.common.constants import CHUNK_HEADERS, REQID_HEADER
from oio.common.exceptions import SourceReadError
from oio.common.http import HeadersDict, parse_content_range, \
ranges_from_http_header, headers_from_object_metadata
from oio.common.logger import get_logger
from oio.common.utils import fix_ranges, monotonic_time
LOGGER = get_logger({}, __name__)
[docs]def segment_range_to_fragment_range(segment_start, segment_end, segment_size,
fragment_size):
"""
Converts a segment range into a fragment range.
:returns: a tuple (fragment_start, fragment_end)
* fragment_start is the first byte of the first fragment,
or None if this is a suffix byte range
* fragment_end is the last byte of the last fragment,
or None if this is a prefix byte range
"""
fragment_start = ((segment_start / segment_size * fragment_size)
if segment_start is not None else None)
fragment_end = (None if segment_end is None else
((segment_end + 1) / segment_size * fragment_size)
if segment_start is None else
((segment_end + 1) / segment_size * fragment_size) - 1)
return (fragment_start, fragment_end)
[docs]class ECChunkDownloadHandler(object):
"""
Handles the download of an EC meta chunk
"""
def __init__(self, storage_method, chunks, meta_start, meta_end, headers,
connection_timeout=None, read_timeout=None, reqid=None,
perfdata=None, **_kwargs):
"""
:param connection_timeout: timeout to establish the connections
:param read_timeout: timeout to read a buffer of data
"""
self.storage_method = storage_method
self.chunks = chunks
self.meta_start = meta_start
self.meta_end = meta_end
# the meta chunk length
# (the amount of actual data stored into the meta chunk)
self.meta_length = self.chunks[0]['size']
self.headers = headers
self.connection_timeout = connection_timeout
self.read_timeout = read_timeout
self.reqid = reqid
self.perfdata = perfdata
self.logger = _kwargs.get('logger', LOGGER)
self._resp_by_chunk = dict()
def _get_range_infos(self):
"""
Converts requested Range on meta chunk to actual chunk Range
:returns: a dict with infos about all the requested Ranges
"""
segment_size = self.storage_method.ec_segment_size
fragment_size = self.storage_method.ec_fragment_size
range_infos = []
# read all the meta chunk
if self.meta_start is None and self.meta_end is None:
return range_infos
if self.meta_start is not None and self.meta_start < 0:
self.meta_start = self.meta_length + self.meta_start
segment_start, segment_end = meta_chunk_range_to_segment_range(
self.meta_start, self.meta_end, segment_size)
fragment_start, fragment_end = segment_range_to_fragment_range(
segment_start, segment_end, segment_size, fragment_size)
range_infos.append({
'req_meta_start': self.meta_start,
'req_meta_end': self.meta_end,
'req_segment_start': segment_start,
'req_segment_end': segment_end,
'req_fragment_start': fragment_start,
'req_fragment_end': fragment_end})
return range_infos
def _get_fragment(self, chunk_iter, range_infos, storage_method):
headers = dict()
headers.update(self.headers)
if range_infos:
# only handle one range
range_info = range_infos[0]
headers['Range'] = 'bytes=%s-%s' % (
range_info['req_fragment_start'],
range_info['req_fragment_end'])
reader = io.ChunkReader(chunk_iter, storage_method.ec_fragment_size,
headers, self.connection_timeout,
self.read_timeout, perfdata=self.perfdata,
align=True, logger=self.logger,
resp_by_chunk=self._resp_by_chunk)
return (reader, reader.get_iter())
[docs] def get_stream(self):
range_infos = self._get_range_infos()
chunk_iter = iter(self.chunks)
# we use eventlet GreenPool to manage readers
with ContextPool(self.storage_method.ec_nb_data) as pool:
pile = GreenPile(pool)
# we use eventlet GreenPile to spawn readers
for _j in range(self.storage_method.ec_nb_data):
pile.spawn(self._get_fragment, chunk_iter, range_infos,
self.storage_method)
readers = []
for reader, parts_iter in pile:
if reader.status in (200, 206):
readers.append((reader, parts_iter))
# TODO log failures?
# with EC we need at least ec_nb_data valid readers
if len(readers) >= self.storage_method.ec_nb_data:
# all readers should return the same Content-Length
# so just take the headers from one of them
resp_headers = HeadersDict(readers[0][0].headers)
fragment_length = int(resp_headers.get('Content-Length'))
read_iterators = [it for _, it in readers]
stream = ECStream(self.storage_method, read_iterators, range_infos,
self.meta_length, fragment_length,
reqid=self.reqid, perfdata=self.perfdata,
logger=self.logger)
# start the stream
stream.start()
return stream
else:
raise exceptions.ServiceUnavailable(
'Not enough valid sources to read (%d/%d)' % (
len(readers), self.storage_method.ec_nb_data))
[docs]class ECStream(object):
"""
Reads an EC meta chunk.
Handles the different readers.
"""
def __init__(self, storage_method, readers, range_infos, meta_length,
fragment_length, reqid=None, perfdata=None, logger=None):
self.storage_method = storage_method
self.readers = readers
self.range_infos = range_infos
self.meta_length = meta_length
self.fragment_length = fragment_length
self._iter = None
self.reqid = reqid
self.perfdata = perfdata
self.logger = logger or LOGGER
[docs] def start(self):
self._iter = io.chain(self._stream())
[docs] def close(self):
if self._iter:
self._iter.close()
self._iter = None
if self.readers:
for reader in self.readers:
reader.close()
self.readers = None
def _next(self):
fragment_iterators = []
for iterator in self.readers:
part_info = next(iterator)
fragment_iterators.append(part_info['iter'])
headers = HeadersDict(part_info['headers'])
return headers, fragment_iterators
def _iter_range(self, range_info, segment_iter):
meta_start = range_info['resp_meta_start']
meta_end = range_info['resp_meta_end']
segment_start = range_info['resp_segment_start']
segment_end = range_info['resp_segment_end']
segment_end = (min(segment_end, self.meta_length - 1)
if segment_end is not None
else self.meta_length - 1)
meta_end = (min(meta_end, self.meta_length - 1)
if meta_end is not None
else self.meta_length - 1)
num_segments = int(
math.ceil(float(segment_end + 1 - segment_start) /
self.storage_method.ec_segment_size))
# we read full segments from the chunks
# however we may be requested a byte range
# that is not aligned with the segments
# so we read and trim extra bytes from the segment
start_over = meta_start - segment_start
end_over = segment_end - meta_end
for i, segment in enumerate(segment_iter):
if start_over > 0:
segment_len = len(segment)
if segment_len <= start_over:
start_over -= segment_len
continue
else:
segment = segment[start_over:]
start_over = 0
if i == (num_segments - 1) and end_over:
segment = segment[:-end_over]
yield segment
def _decode_segments(self, fragment_iterators):
"""
Reads from fragments and yield full segments
"""
# we use eventlet Queue to read fragments
queues = []
# each iterators has its queue
for _j in range(len(fragment_iterators)):
queues.append(LightQueue(1))
def put_in_queue(fragment_iterator, queue):
"""
Coroutine to read the fragments from the iterator
"""
try:
for fragment in fragment_iterator:
# put the read fragment in the queue
queue.put(fragment)
# the queues are of size 1 so this coroutine blocks
# until we decode a full segment
except GreenletExit:
# ignore
pass
except ChunkReadTimeout as err:
self.logger.error('%s (reqid=%s)', err, self.reqid)
except Exception:
self.logger.exception("Exception on reading (reqid=%s)",
self.reqid)
finally:
queue.resize(2)
# put None to indicate the decoding loop
# this is over
queue.put(None)
# close the iterator
fragment_iterator.close()
# we use eventlet GreenPool to manage the read of fragments
with ContextPool(len(fragment_iterators)) as pool:
# spawn coroutines to read the fragments
for fragment_iterator, queue in zip(fragment_iterators, queues):
pool.spawn(put_in_queue, fragment_iterator, queue)
# main decoding loop
while True:
data = []
# get the fragments from the queues
for queue in queues:
fragment = queue.get()
data.append(fragment)
if not all(data):
# one of the readers returned None
# impossible to read segment
break
# actually decode the fragments into a segment
try:
segment = self.storage_method.driver.decode(data)
except exceptions.ECError:
# something terrible happened
self.logger.exception(
"ERROR decoding fragments (reqid=%s)", self.reqid)
raise
yield segment
def _convert_range(self, req_start, req_end, length):
try:
ranges = ranges_from_http_header("bytes=%s-%s" % (
req_start if req_start is not None else '',
req_end if req_end is not None else ''))
except ValueError:
return (None, None)
result = fix_ranges(ranges, length)
if not result:
return (None, None)
else:
return (result[0][0], result[0][1])
def _add_ranges(self, range_infos):
for range_info in range_infos:
meta_start, meta_end = self._convert_range(
range_info['req_meta_start'], range_info['req_meta_end'],
self.meta_length)
range_info['resp_meta_start'] = meta_start
range_info['resp_meta_end'] = meta_end
range_info['satisfiable'] = \
(meta_start is not None and meta_end is not None)
segment_start, segment_end = self._convert_range(
range_info['req_segment_start'], range_info['req_segment_end'],
self.meta_length)
segment_size = self.storage_method.ec_segment_size
if range_info['req_segment_start'] is None and \
segment_start % segment_size != 0:
segment_start += segment_start - (segment_start % segment_size)
range_info['resp_segment_start'] = segment_start
range_info['resp_segment_end'] = segment_end
def _add_ranges_for_fragment(self, fragment_length, range_infos):
for range_info in range_infos:
fragment_start, fragment_end = self._convert_range(
range_info['req_fragment_start'],
range_info['req_fragment_end'],
fragment_length)
range_info['resp_fragment_start'] = fragment_start
range_info['resp_fragment_end'] = fragment_end
def _stream(self):
if not self.range_infos:
range_infos = [{
'req_meta_start': 0,
'req_meta_end': self.meta_length - 1,
'resp_meta_start': 0,
'resp_meta_end': self.meta_length - 1,
'req_segment_start': 0,
'req_segment_end': self.meta_length - 1,
'req_fragment_start': 0,
'req_fragment_end': self.fragment_length - 1,
'resp_fragment_start': 0,
'resp_fragment_end': self.fragment_length - 1,
'satisfiable': self.meta_length > 0
}]
else:
range_infos = self.range_infos
self._add_ranges(range_infos)
def range_iter():
results = {}
while True:
next_range = self._next()
headers, fragment_iters = next_range
content_range = headers.get('Content-Range')
if content_range is not None:
fragment_start, fragment_end, fragment_length = \
parse_content_range(content_range)
elif self.fragment_length <= 0:
fragment_start = None
fragment_end = None
fragment_length = 0
else:
fragment_start = 0
fragment_end = self.fragment_length - 1
fragment_length = self.fragment_length
self._add_ranges_for_fragment(fragment_length, range_infos)
satisfiable = False
for range_info in range_infos:
satisfiable |= range_info['satisfiable']
k = (range_info['resp_fragment_start'],
range_info['resp_fragment_end'])
results.setdefault(k, []).append(range_info)
try:
range_info = results[(fragment_start, fragment_end)].pop(0)
except KeyError:
self.logger.error(
"Invalid range: %s, available: %s (reqid=%s)",
repr((fragment_start, fragment_end)),
results.keys(), self.reqid)
raise
if self.perfdata is not None:
ec_start = monotonic_time()
segment_iter = self._decode_segments(fragment_iters)
if self.perfdata is not None:
ec_end = monotonic_time()
rawx_perfdata = self.perfdata.setdefault('rawx', dict())
rawx_perfdata['ec'] = rawx_perfdata.get('ec', 0.0) \
+ ec_end - ec_start
if not range_info['satisfiable']:
io.consume(segment_iter)
continue
byterange_iter = self._iter_range(range_info, segment_iter)
result = {'start': range_info['resp_meta_start'],
'end': range_info['resp_meta_end'],
'iter': byterange_iter}
yield result
return range_iter()
def __iter__(self):
return iter(self._iter)
[docs] def get_iter(self):
return self
[docs]def ec_encode(storage_method, n):
"""
Encode EC segments
"""
segment_size = storage_method.ec_segment_size
buf = collections.deque()
total_len = 0
data = yield
while data:
buf.append(data)
total_len += len(data)
if total_len >= segment_size:
encode_result = []
while total_len >= segment_size:
# take data from buf
amount = segment_size
# the goal here is to encode a full segment
parts = []
while amount > 0:
part = buf.popleft()
if len(part) > amount:
# too much data taken
# put the extra data back into the buf
buf.appendleft(part[amount:])
part = part[:amount]
parts.append(part)
amount -= len(part)
total_len -= len(part)
# let's encode!
encode_result.append(
storage_method.driver.encode(''.join(parts)))
# transform the result
#
# from:
# [[fragment_0_0, fragment_1_0, fragment_2_0, ...],
# [fragment_0_1, fragment_1_1, fragment_2_1, ...], ...]
#
# to:
#
# [(fragment_0_0 + fragment_0_1 + ...), # write to chunk 0
# [(fragment_1_0 + fragment_1_1 + ...), # write to chunk 1
# [(fragment_2_0 + fragment_2_1 + ...), # write to chunk 2
# ...]
result = [''.join(p) for p in zip(*encode_result)]
data = yield result
else:
# not enough data to encode
data = yield None
# empty input data
# which means end of stream
# encode what is left in the buf
whats_left = ''.join(buf)
if whats_left:
last_fragments = storage_method.driver.encode(whats_left)
else:
last_fragments = [''] * n
yield last_fragments
[docs]class EcChunkWriter(object):
"""
Writes an EC chunk
"""
def __init__(self, chunk, conn, write_timeout=None,
chunk_checksum_algo='md5', perfdata=None, **kwargs):
self._chunk = chunk
self._conn = conn
self.failed = False
self.bytes_transferred = 0
if chunk_checksum_algo:
self.checksum = hashlib.new(chunk_checksum_algo)
else:
self.checksum = None
self.write_timeout = write_timeout or io.CHUNK_TIMEOUT
# we use eventlet Queue to pass data to the send coroutine
self.queue = Queue(io.PUT_QUEUE_DEPTH)
self.reqid = kwargs.get('reqid')
self.perfdata = perfdata
self.logger = kwargs.get('logger', LOGGER)
@property
def chunk(self):
return self._chunk
@property
def conn(self):
return self._conn
[docs] @classmethod
def connect(cls, chunk, sysmeta, reqid=None,
connection_timeout=None, write_timeout=None, **kwargs):
raw_url = chunk.get("real_url", chunk["url"])
parsed = urlparse(raw_url)
chunk_path = parsed.path.split('/')[-1]
hdrs = headers_from_object_metadata(sysmeta)
if reqid:
hdrs[REQID_HEADER] = reqid
hdrs[CHUNK_HEADERS["chunk_pos"]] = chunk["pos"]
hdrs[CHUNK_HEADERS["chunk_id"]] = chunk_path
# in the trailer
# metachunk_size & metachunk_hash
trailers = (CHUNK_HEADERS["metachunk_size"],
CHUNK_HEADERS["metachunk_hash"])
if kwargs.get('chunk_checksum_algo'):
trailers = trailers + (CHUNK_HEADERS["chunk_hash"], )
hdrs["Trailer"] = ', '.join(trailers)
with ConnectionTimeout(
connection_timeout or io.CONNECTION_TIMEOUT):
perfdata = kwargs.get('perfdata', None)
if perfdata is not None:
connect_start = monotonic_time()
conn = io.http_connect(
parsed.netloc, 'PUT', parsed.path, hdrs)
conn.set_cork(True)
if perfdata is not None:
connect_end = monotonic_time()
perfdata_rawx = perfdata.setdefault('rawx', dict())
perfdata_rawx[chunk['url']] = \
perfdata_rawx.get(chunk['url'], 0.0) \
+ connect_end - connect_start
conn.chunk = chunk
return cls(chunk, conn, write_timeout=write_timeout,
reqid=reqid, **kwargs)
[docs] def start(self, pool):
"""Spawn the send coroutine"""
pool.spawn(self._send)
def _send(self):
"""Send coroutine loop"""
self.conn.upload_start = None
while not self.failed:
# fetch input data from the queue
data = self.queue.get()
# use HTTP transfer encoding chunked
# to write data to RAWX
try:
with ChunkWriteTimeout(self.write_timeout):
if self.perfdata is not None \
and self.conn.upload_start is None:
self.conn.upload_start = monotonic_time()
self.conn.send("%x\r\n" % len(data))
self.conn.send(data)
self.conn.send("\r\n")
self.bytes_transferred += len(data)
eventlet_yield()
except (Exception, ChunkWriteTimeout) as exc:
self.failed = True
msg = str(exc)
self.logger.warn("Failed to write to %s (%s, reqid=%s)",
self.chunk, msg, self.reqid)
self.chunk['error'] = 'write: %s' % msg
# Indicate that the data is completely sent
self.queue.task_done()
# Drain the queue before quitting
while True:
try:
self.queue.get_nowait()
self.queue.task_done()
except Empty:
break
[docs] def wait(self):
"""
Wait until all data in the queue
has been processed by the send coroutine
"""
self.logger.debug("Waiting for %s to receive data", self.chunk['url'])
# Wait until the data is completely sent to continue
self.queue.join()
[docs] def send(self, data):
# do not send empty data because
# this will end the chunked body
if not data:
return
# put the data to send into the queue
# it will be processed by the send coroutine
self.queue.put(data)
[docs] def finish(self, metachunk_size, metachunk_hash):
"""
Send metachunk_size and metachunk_hash as trailers.
:returns: the chunk object if the upload has failed, else None
"""
self.wait()
if self.failed:
self.logger.debug("NOT sending end marker and trailers to %s, "
"because upload has failed", self.chunk['url'])
return self.chunk
self.logger.debug("Sending end marker and trailers to %s",
self.chunk['url'])
parts = [
'0\r\n',
'%s: %s\r\n' % (CHUNK_HEADERS['metachunk_size'],
metachunk_size),
'%s: %s\r\n' % (CHUNK_HEADERS['metachunk_hash'],
metachunk_hash)
]
if self.checksum:
parts.append('%s: %s\r\n' % (CHUNK_HEADERS['chunk_hash'],
self.checksum.hexdigest()))
parts.append('\r\n')
to_send = "".join(parts)
try:
with ChunkWriteTimeout(self.write_timeout):
self.conn.send(to_send)
# Last segment sent, disable TCP_CORK to flush buffers
self.conn.set_cork(False)
except (Exception, ChunkWriteTimeout) as exc:
self.failed = True
msg = str(exc)
self.logger.warn("Failed to finish %s (%s, reqid=%s)",
self.chunk, msg, self.reqid)
self.chunk['error'] = 'finish: %s' % msg
return self.chunk
return None
[docs] def getresponse(self):
"""Read the HTTP response from the connection"""
try:
# As the server may buffer data before writing it to non-volatile
# storage, we don't know if we have to wait while sending data or
# while reading response, thus we apply the same timeout to both.
with ChunkWriteTimeout(self.write_timeout):
resp = self.conn.getresponse()
return resp
finally:
if self.perfdata is not None:
perfdata_rawx = self.perfdata.setdefault('rawx', dict())
url_chunk = self.conn.chunk['url']
upload_end = monotonic_time()
perfdata_rawx[url_chunk] = \
perfdata_rawx.get(url_chunk, 0.0) \
+ upload_end - self.conn.upload_start
[docs]class ECWriteHandler(io.WriteHandler):
"""
Handles writes to an EC content.
For initialization parameters, see oio.api.io.WriteHandler.
"""
[docs] def stream(self):
# the checksum context for the content
global_checksum = hashlib.md5()
total_bytes_transferred = 0
content_chunks = []
# the platform chunk size
chunk_size = self.sysmeta['chunk_size']
# this gives us an upper bound
max_size = self.storage_method.ec_nb_data * chunk_size
if max_size > self.storage_method.ec_segment_size:
# align metachunk size on EC segment size
max_size = \
max_size - max_size % self.storage_method.ec_segment_size
# meta chunks:
#
# {0: [{"url": "http://...", "pos": "0.0"},
# {"url": "http://...", "pos": "0.1"}, ...],
# 1: [{"url": "http://...", "pos": "1.0"},
# {"url": "http://...", "pos": "1.1"}, ...],
# ..}
#
# iterate through the meta chunks
bytes_transferred = -1
kwargs = EcMetachunkWriter.filter_kwargs(self.extra_kwargs)
for meta_chunk in self.chunk_prep():
handler = EcMetachunkWriter(
self.sysmeta, meta_chunk,
global_checksum, self.storage_method,
reqid=self.headers.get(REQID_HEADER),
connection_timeout=self.connection_timeout,
write_timeout=self.write_timeout,
read_timeout=self.read_timeout,
**kwargs)
bytes_transferred, checksum, chunks = handler.stream(self.source,
max_size)
# chunks checksum is the metachunk hash
# chunks size is the metachunk size
for chunk in chunks:
chunk['hash'] = checksum
chunk['size'] = bytes_transferred
# add the chunks whose upload succeeded
# to the content chunk list
if not chunk.get('error'):
content_chunks.append(chunk)
total_bytes_transferred += bytes_transferred
if bytes_transferred < max_size:
break
if len(self.source.peek()) == 0:
break
# compute the final content checksum
content_checksum = global_checksum.hexdigest()
return content_chunks, total_bytes_transferred, content_checksum
[docs]class ECRebuildHandler(object):
def __init__(self, meta_chunk, missing, storage_method,
connection_timeout=None, read_timeout=None,
**_kwargs):
self.meta_chunk = meta_chunk
self.missing = missing
self.storage_method = storage_method
self.connection_timeout = connection_timeout or io.CONNECTION_TIMEOUT
self.read_timeout = read_timeout or io.CHUNK_TIMEOUT
self.logger = _kwargs.get('logger', LOGGER)
def _get_response(self, chunk, headers):
resp = None
parsed = urlparse(chunk.get('real_url', chunk['url']))
try:
with ConnectionTimeout(self.connection_timeout):
conn = io.http_connect(
parsed.netloc, 'GET', parsed.path, headers)
with ChunkReadTimeout(self.read_timeout):
resp = conn.getresponse()
if resp.status != 200:
self.logger.warning('Invalid GET response from %s: %s %s',
chunk, resp.status, resp.reason)
resp = None
except (SocketError, Timeout) as err:
self.logger.error('ERROR fetching %s: %s', chunk, err)
except Exception:
self.logger.exception('ERROR fetching %s', chunk)
return resp
[docs] def rebuild(self):
pile = GreenPile(len(self.meta_chunk))
nb_data = self.storage_method.ec_nb_data
headers = {}
for chunk in self.meta_chunk:
pile.spawn(self._get_response, chunk, headers)
resps = []
for resp in pile:
if not resp:
continue
resps.append(resp)
if len(resps) >= self.storage_method.ec_nb_data:
break
else:
self.logger.error('Unable to read enough valid sources to rebuild')
raise exceptions.UnrecoverableContent(
'Not enough valid sources to rebuild')
rebuild_iter = self._make_rebuild_iter(resps[:nb_data])
return rebuild_iter
def _make_rebuild_iter(self, resps):
def _get_frag(resp):
buf = ''
remaining = self.storage_method.ec_fragment_size
while remaining:
data = resp.read(remaining)
if not data:
break
remaining -= len(data)
buf += data
return buf
def frag_iter():
pile = GreenPile(len(resps))
while True:
for resp in resps:
pile.spawn(_get_frag, resp)
try:
with Timeout(self.read_timeout):
frag = [frag for frag in pile]
except Timeout as to:
self.logger.error('ERROR while rebuilding: %s', to)
except Exception:
self.logger.exception('ERROR while rebuilding')
break
if not all(frag):
break
rebuilt_frag = self._reconstruct(frag)
yield rebuilt_frag
return frag_iter()
def _reconstruct(self, frag):
return self.storage_method.driver.reconstruct(frag, [self.missing])[0]