# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.
from __future__ import absolute_import
from oio.common.green import sleep, Timeout
from io import BufferedReader, RawIOBase, IOBase
import itertools
from urlparse import urlparse
from socket import error as SocketError
from oio.common import exceptions as exc
from oio.common.constants import REQID_HEADER
from oio.common.http import parse_content_type,\
parse_content_range, ranges_from_http_header, http_header_from_ranges
from oio.common.http_eventlet import http_connect
from oio.common.utils import GeneratorIO, group_chunk_errors, \
deadline_to_timeout, monotonic_time, set_deadline_from_read_timeout
from oio.common import green
from oio.common.storage_method import STORAGE_METHODS
from oio.common.logger import get_logger
LOGGER = get_logger({}, __name__)
WRITE_CHUNK_SIZE = 65536
READ_CHUNK_SIZE = 65536
# RAWX connection timeout
CONNECTION_TIMEOUT = 10.0
# chunk operations timeout
CHUNK_TIMEOUT = 60.0
# client read timeout
CLIENT_TIMEOUT = 60.0
PUT_QUEUE_DEPTH = 10
[docs]def close_source(source, logger=None):
"""Safely close the connection behind `source`."""
try:
source.conn.close()
except Exception:
logger = logger or LOGGER
logger.exception("Failed to close %s", source)
[docs]class IOBaseWrapper(RawIOBase):
"""
Wrap any object that has a `read` method into an `io.IOBase`.
"""
def __init__(self, wrapped):
"""
:raise AttributeError: if wrapped object has no `read` method
"""
self.__read = getattr(wrapped, "read")
[docs] def readable(self):
return True
[docs] def read(self, n=-1):
return self.__read(n)
[docs] def readinto(self, b): # pylint: disable=invalid-name
read_len = len(b)
read_data = self.read(read_len)
b[0:len(read_data)] = read_data
return len(read_data)
class _WriteHandler(object):
def __init__(self, chunk_preparer, storage_method, headers=None,
**kwargs):
self.chunk_prep = None
self._load_chunk_prep(chunk_preparer)
self.storage_method = storage_method
self.headers = headers or dict()
self.connection_timeout = kwargs.get('connection_timeout',
CONNECTION_TIMEOUT)
self.deadline = kwargs.get('deadline')
self.logger = kwargs.get('logger', LOGGER)
self.extra_kwargs = kwargs
def _load_chunk_prep(self, chunk_preparer):
if isinstance(chunk_preparer, dict):
def _sort_and_yield():
for pos in sorted(chunk_preparer.keys()):
yield chunk_preparer[pos]
self.chunk_prep = _sort_and_yield
else:
self.chunk_prep = chunk_preparer
@property
def write_timeout(self):
if 'write_timeout' in self.extra_kwargs:
return self.extra_kwargs['write_timeout']
elif self.deadline is not None:
return deadline_to_timeout(self.deadline, True)
return CHUNK_TIMEOUT
[docs]class LinkHandler(_WriteHandler):
def __init__(self, fullpath, chunk_preparer, storage_method, blob_client,
headers=None, **kwargs):
super(LinkHandler, self).__init__(
chunk_preparer, storage_method, headers=headers, **kwargs)
self.fullpath = fullpath
self.blob_client = blob_client
[docs] def link(self):
content_chunks = list()
kwargs = MetachunkLinker.filter_kwargs(self.extra_kwargs)
for meta_chunk in self.chunk_prep():
try:
handler = MetachunkLinker(
meta_chunk, self.fullpath, self.blob_client,
storage_method=self.storage_method,
reqid=self.headers.get(REQID_HEADER),
connection_timeout=self.connection_timeout,
write_timeout=self.write_timeout, **kwargs)
chunks = handler.link()
except Exception as ex:
if isinstance(ex, exc.UnfinishedUploadException):
content_chunks = content_chunks + \
ex.chunks_already_uploaded
ex = ex.exception
raise exc.UnfinishedUploadException(ex, content_chunks)
for chunk in chunks:
if not chunk.get('error'):
content_chunks.append(chunk)
return content_chunks
[docs]class WriteHandler(_WriteHandler):
def __init__(self, source, sysmeta, chunk_preparer,
storage_method, headers=None,
**kwargs):
"""
:param connection_timeout: timeout to establish the connection
:param write_timeout: timeout to send a buffer of data
:param read_timeout: timeout to read a buffer of data from source
:param chunk_checksum_algo: algorithm to use to compute chunk
checksums locally. Can be `None` to disable local checksum
computation and let the rawx compute it (will be md5).
"""
super(WriteHandler, self).__init__(
chunk_preparer, storage_method, headers=headers, **kwargs)
if isinstance(source, IOBase):
self.source = BufferedReader(source)
else:
self.source = BufferedReader(IOBaseWrapper(source))
self.sysmeta = sysmeta
@property
def read_timeout(self):
if 'read_timeout' in self.extra_kwargs:
return self.extra_kwargs['read_timeout']
elif self.deadline is not None:
return deadline_to_timeout(self.deadline, True)
return CLIENT_TIMEOUT
[docs] def stream(self):
"""
Uploads a stream of data.
:returns: a tuple of 3 which contains:
* the list of chunks to be saved in the container
* the number of bytes transfered
* the actual checksum of the data that went through the stream.
"""
raise NotImplementedError()
[docs]def consume(it):
for _x in it:
pass
[docs]class Closeable(object):
def __init__(self, *iterables):
self.iterables = iterables
def __iter__(self):
return iter(itertools.chain(*(self.iterables)))
[docs] def close(self):
for iterator in self.iterables:
close_method = getattr(iterator, 'close', None)
if close_method:
close_method()
self.iterables = None
[docs]def chain(iterable):
iterator = iter(iterable)
try:
d = ''
while not d:
d = next(iterator)
return Closeable([d], iterator)
except StopIteration:
return []
[docs]def iters_to_raw_body(parts_iter):
try:
body_iter = next(parts_iter)['iter']
except StopIteration:
return ''
def wrap(it, _j):
for d in it:
yield d
try:
next(_j)
except StopIteration:
pass
return wrap(body_iter, parts_iter)
[docs]def discard_bytes(buf_size, start):
"""
Discard the right amount of bytes so the reader
yields only full records.
"""
return (buf_size - (start % buf_size)) % buf_size
[docs]class ChunkReader(object):
"""
Reads a chunk.
"""
def __init__(self, chunk_iter, buf_size, headers,
connection_timeout=None, read_timeout=None,
align=False, perfdata=None, resp_by_chunk=None,
**_kwargs):
"""
:param chunk_iter:
:param buf_size: size of the read buffer
:param headers:
:param connection_timeout: timeout to establish the connection
:param read_timeout: timeout to read a buffer of data
:param align: if True, the reader will skip some bytes to align
on `buf_size`
"""
self.chunk_iter = chunk_iter
self.source = None
# TODO deal with provided headers
self._headers = None
self.request_headers = headers
self.sources = []
self.status = None
# buf size indicates the amount we data we yield
self.buf_size = buf_size
self.discard_bytes = 0
self.align = align
self.connection_timeout = connection_timeout or CONNECTION_TIMEOUT
self.read_timeout = read_timeout or CHUNK_TIMEOUT
if resp_by_chunk is not None:
self._resp_by_chunk = resp_by_chunk
else:
self._resp_by_chunk = dict()
self.perfdata = perfdata
self.logger = _kwargs.get('logger', LOGGER)
@property
def reqid(self):
""":returns: the request ID or None"""
if not self.request_headers:
return None
return self.request_headers.get(REQID_HEADER)
[docs] def recover(self, nb_bytes):
"""
Recover the request.
:param nb_bytes: number of bytes already consumed that we need to
discard if we perform a recovery from another source.
:raises `ValueError`: if range header is not valid
:raises `oio.common.exceptions.UnsatisfiableRange`:
:raises `oio.common.exceptions.EmptyByteRange`:
"""
if 'Range' in self.request_headers:
request_range = ranges_from_http_header(
self.request_headers['Range'])
start, end = request_range[0]
if start is None:
# suffix byte range
end -= nb_bytes
else:
start += nb_bytes
if end is not None:
if start == end + 1:
# no more bytes to serve in the requested byte range
raise exc.EmptyByteRange()
if start > end:
# invalid range
raise exc.UnsatisfiableRange()
if end and start:
# full byte range
request_range = [(start, end)] + request_range[1:]
else:
# suffix byte range
request_range = [(None, end)] + request_range[1:]
else:
# prefix byte range
request_range = [(start, None)] + request_range[1:]
self.request_headers['Range'] = http_header_from_ranges(
request_range)
else:
# just add an offset to the request
self.request_headers['Range'] = 'bytes=%d-' % nb_bytes
def _get_request(self, chunk):
"""
Connect to a chunk, fetch headers but don't read data.
Save the response object in `self.sources` list.
"""
try:
with green.ConnectionTimeout(self.connection_timeout):
raw_url = chunk.get("real_url", chunk["url"])
parsed = urlparse(raw_url)
if self.perfdata is not None:
connect_start = monotonic_time()
conn = http_connect(parsed.netloc, 'GET', parsed.path,
self.request_headers)
if self.perfdata is not None:
connect_end = monotonic_time()
perfdata_rawx = self.perfdata.setdefault('rawx', dict())
perfdata_rawx[chunk['url']] = \
perfdata_rawx.get(chunk['url'], 0.0) \
+ connect_end - connect_start
with green.OioTimeout(self.read_timeout):
source = conn.getresponse()
source.conn = conn
except (SocketError, Timeout) as err:
self.logger.error('Connection failed to %s (reqid=%s): %s',
chunk, self.reqid, err)
self._resp_by_chunk[chunk["url"]] = (0, str(err))
return False
except Exception as err:
self.logger.exception('Connection failed to %s (reqid=%s)',
chunk, self.reqid)
self._resp_by_chunk[chunk["url"]] = (0, str(err))
return False
if source.status in (200, 206):
self.status = source.status
self._headers = source.getheaders()
self.sources.append((source, chunk))
return True
else:
self.logger.warn("Invalid response from %s (reqid=%s): %d %s",
chunk, self.reqid, source.status, source.reason)
self._resp_by_chunk[chunk["url"]] = (source.status,
str(source.reason))
close_source(source, self.logger)
return False
def _get_source(self):
"""
Iterate on chunks until one answers,
and return the response object.
"""
for chunk in self.chunk_iter:
# continue to iterate until we find a valid source
if self._get_request(chunk):
break
if self.sources:
source, chunk = self.sources.pop()
return source, chunk
return None, None
[docs] def get_iter(self):
source, chunk = self._get_source()
if source:
return self._get_iter(chunk, source)
errors = group_chunk_errors(self._resp_by_chunk.items())
if len(errors) == 1:
# All errors are of the same type, group them
status, chunks = errors.popitem()
raise exc.from_status(status[0], "%s %s" % (status[1], chunks))
raise exc.ServiceUnavailable("unavailable chunks: %s" %
self._resp_by_chunk)
[docs] def stream(self):
"""
Get a generator over chunk data.
After calling this method, the `headers` field will be available
(even if no data is read from the generator).
"""
parts_iter = self.get_iter()
def _iter():
for part in parts_iter:
for data in part['iter']:
yield data
raise StopIteration
return GeneratorIO(_iter())
[docs] def fill_ranges(self, start, end, length):
"""
Fill the request ranges.
"""
if length == 0:
return
if self.align and self.buf_size:
# discard bytes
# so we only yield complete EC segments
self.discard_bytes = discard_bytes(self.buf_size, start)
# change headers for efficient recovery
if 'Range' in self.request_headers:
try:
orig_ranges = ranges_from_http_header(
self.request_headers['Range'])
new_ranges = [(start, end)] + orig_ranges[1:]
except ValueError:
new_ranges = [(start, end)]
else:
new_ranges = [(start, end)]
self.request_headers['Range'] = http_header_from_ranges(
new_ranges)
[docs] @staticmethod
def get_next_part(parts_iter):
"""
Gets next part of the body
NOTE: for the moment only return one part
(single range only)
"""
while True:
try:
with green.ChunkReadTimeout(CHUNK_TIMEOUT):
start, end, length, headers, part = next(
parts_iter[0])
return (start, end, length, headers, part)
except green.ChunkReadTimeout:
# TODO recover
raise StopIteration
[docs] def iter_from_resp(self, source, parts_iter, part, chunk):
bytes_consumed = 0
count = 0
buf = ''
if self.perfdata is not None:
perfdata_rawx = self.perfdata.setdefault('rawx', dict())
url_chunk = chunk['url']
while True:
try:
with green.ChunkReadTimeout(self.read_timeout):
if self.perfdata is not None:
download_start = monotonic_time()
data = part.read(READ_CHUNK_SIZE)
if self.perfdata is not None:
download_end = monotonic_time()
perfdata_rawx[url_chunk] = \
perfdata_rawx.get(url_chunk, 0.0) \
+ download_end - download_start
count += 1
buf += data
except (green.ChunkReadTimeout, IOError) as crto:
try:
self.recover(bytes_consumed)
except (exc.UnsatisfiableRange, ValueError):
raise
except exc.EmptyByteRange:
# we are done already
break
buf = ''
# find a new source to perform recovery
new_source, new_chunk = self._get_source()
if new_source:
self.logger.warn(
"Failed to read from %s (%s), "
"retrying from %s (reqid=%s)",
chunk, crto, new_chunk, self.reqid)
close_source(source[0], self.logger)
# switch source
source[0] = new_source
chunk = new_chunk
parts_iter[0] = make_iter_from_resp(source[0])
try:
_j, _j, _j, _j, part = \
self.get_next_part(parts_iter)
except StopIteration:
# failed to recover
# we did our best
return
else:
self.logger.warn("Failed to read from %s (%s, reqid=%s)",
chunk, crto, self.reqid)
# no valid source found to recover
raise
else:
# discard bytes
if buf and self.discard_bytes:
if self.discard_bytes < len(buf):
buf = buf[self.discard_bytes:]
bytes_consumed += self.discard_bytes
self.discard_bytes = 0
else:
self.discard_bytes -= len(buf)
bytes_consumed += len(buf)
buf = ''
# no data returned
# flush out buffer
if not data:
if buf:
bytes_consumed += len(buf)
yield buf
buf = ''
break
# If buf_size is defined, yield bounded data buffers
if self.buf_size is not None:
while len(buf) >= self.buf_size:
read_d = buf[:self.buf_size]
buf = buf[self.buf_size:]
yield read_d
bytes_consumed += len(read_d)
else:
yield buf
bytes_consumed += len(buf)
buf = ''
# avoid starvation by forcing sleep()
# every once in a while
if count % 10 == 0:
sleep()
def _get_iter(self, chunk, source):
source = [source]
try:
parts_iter = [make_iter_from_resp(source[0])]
body_iter = None
try:
while True:
start, end, length, headers, part = \
self.get_next_part(parts_iter)
self.fill_ranges(start, end, length)
body_iter = self.iter_from_resp(
source, parts_iter, part, chunk)
result = {'start': start, 'end': end, 'length': length,
'iter': body_iter, 'headers': headers}
yield result
except StopIteration:
pass
except green.ChunkReadTimeout:
self.logger.exception("Failure during chunk read (reqid=%s)",
self.reqid)
raise
except Exception:
self.logger.exception("Failure during read (reqid=%s)", self.reqid)
raise
finally:
close_source(source[0], self.logger)
@property
def headers(self):
if not self._headers:
return dict()
return dict(self._headers)
def _create_iter(self, chunk, source):
parts_iter = self._get_iter(chunk, source)
for part in parts_iter:
for d in part['iter']:
yield d
def __iter__(self):
parts_iter = self.get_iter()
if not parts_iter:
raise exc.ChunkException()
for part in parts_iter:
for data in part['iter']:
yield data
raise StopIteration
[docs]def exp_ramp_gen(start, maximum):
"""
Yield exponentially increasing numbers.
Multiply the yielded number by 2 in each iteration
after the second one, until maximum is reached.
:param start: the first number to be yielded.
:param maximum: the maximum number to yield.
"""
# Yield the minimum twice in order to keep things aligned
yield start
current = start
while True:
yield current
current = min(current * 2, maximum)
class _MetachunkWriter(object):
def __init__(self, storage_method=None, quorum=None,
reqid=None, perfdata=None, **kwargs):
self.storage_method = storage_method
self._quorum = quorum
if storage_method is None and quorum is None:
raise ValueError('Missing storage_method or quorum')
self.perfdata = perfdata
self.reqid = reqid
@property
def quorum(self):
"""Minimum number of chunks required to validate an upload"""
if self._quorum is None:
return self.storage_method.quorum
return self._quorum
def quorum_or_fail(self, successes, failures):
"""
Compare the number of uploads against the quorum.
:param successes: a list of chunk objects whose upload succeded
:type successes: `list` or `tuple`
:param failures: a list of chunk objects whose upload failed
:type failures: `list` or `tuple`
:raises `exc.SourceReadError`: if there is an error while reading
data from the client
:raises `exc.SourceReadTimeout`: if there is a timeout while reading
data from the client
:raises `exc.OioTimeout`: if there is a timeout among the errors
:raises `exc.ServiceBusy`: if quorum has not been reached
for any other reason
"""
if len(successes) < self.quorum:
errors = group_chunk_errors(
((chunk["url"], chunk.get("error", "success"))
for chunk in successes + failures))
new_exc = exc.ServiceBusy(
message=("RAWX write failure, quorum not reached (%d/%d): %s" %
(len(successes), self.quorum, errors)))
for err in [x.get('error') for x in failures]:
if isinstance(err, exc.SourceReadError):
raise exc.SourceReadError(new_exc)
elif isinstance(err, green.SourceReadTimeout):
# Never raise 'green' timeouts out of our API
raise exc.SourceReadTimeout(new_exc)
elif isinstance(err, (exc.OioTimeout, green.OioTimeout)):
raise exc.OioTimeout(new_exc)
raise new_exc
[docs]def make_iter_from_resp(resp):
"""
Makes a part iterator from a HTTP response
iterator return tuples:
(start, end, length, headers, body_file)
"""
if resp.status == 200:
content_length = int(resp.getheader('Content-Length'))
return iter([(0, content_length - 1, content_length,
resp.getheaders(), resp)])
content_type, params = parse_content_type(resp.getheader('Content-Type'))
if content_type != 'multipart/byteranges':
start, end, _ = parse_content_range(
resp.getheader('Content-Range'))
return iter([(start, end, end-start+1, resp.getheaders(), resp)])
else:
raise ValueError("Invalid response with code %d and content-type %s" %
resp.status, content_type)