# Copyright (C) 2015-2018 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.
from __future__ import absolute_import
from io import BufferedReader, RawIOBase, IOBase
import itertools
import logging
from urlparse import urlparse
from eventlet import sleep, Timeout
from socket import error as SocketError
from oio.common import exceptions as exc
from oio.common.http import parse_content_type,\
parse_content_range, ranges_from_http_header, http_header_from_ranges
from oio.common.http_eventlet import http_connect
from oio.common.utils import GeneratorIO, group_chunk_errors, \
deadline_to_timeout
from oio.common import green
from oio.common.storage_method import STORAGE_METHODS
logger = logging.getLogger(__name__)
WRITE_CHUNK_SIZE = 65536
READ_CHUNK_SIZE = 65536
# RAWX connection timeout
CONNECTION_TIMEOUT = 10.0
# chunk operations timeout
CHUNK_TIMEOUT = 60.0
# client read timeout
CLIENT_TIMEOUT = 60.0
PUT_QUEUE_DEPTH = 10
[docs]def close_source(source):
"""Safely close the connection behind `source`."""
try:
source.conn.close()
except Exception:
logger.exception("Failed to close %s", source)
[docs]class IOBaseWrapper(RawIOBase):
"""
Wrap any object that has a `read` method into an `io.IOBase`.
"""
def __init__(self, wrapped):
"""
:raise AttributeError: if wrapped object has no `read` method
"""
self.__read = getattr(wrapped, "read")
[docs] def readable(self):
return True
[docs] def read(self, n=-1):
return self.__read(n)
[docs] def readinto(self, b): # pylint: disable=invalid-name
read_len = len(b)
read_data = self.read(read_len)
b[0:len(read_data)] = read_data
return len(read_data)
[docs]class WriteHandler(object):
def __init__(self, source, sysmeta, chunk_preparer,
storage_method, headers=None,
connection_timeout=None, write_timeout=None,
read_timeout=None, deadline=None, chunk_checksum_algo='md5',
**_kwargs):
"""
:param connection_timeout: timeout to establish the connection
:param write_timeout: timeout to send a buffer of data
:param read_timeout: timeout to read a buffer of data from source
:param chunk_checksum_algo: algorithm to use to compute chunk
checksums locally. Can be `None` to disable local checksum
computation and let the rawx compute it (will be md5).
"""
if isinstance(source, IOBase):
self.source = BufferedReader(source)
else:
self.source = BufferedReader(IOBaseWrapper(source))
if isinstance(chunk_preparer, dict):
def _sort_and_yield():
for pos in sorted(chunk_preparer.keys()):
yield chunk_preparer[pos]
self.chunk_prep = _sort_and_yield
else:
self.chunk_prep = chunk_preparer
self.sysmeta = sysmeta
self.storage_method = storage_method
self.headers = headers or dict()
self.connection_timeout = connection_timeout or CONNECTION_TIMEOUT
self.deadline = deadline
self._read_timeout = read_timeout or CLIENT_TIMEOUT
self._write_timeout = write_timeout or CHUNK_TIMEOUT
self.chunk_checksum_algo = chunk_checksum_algo
@property
def read_timeout(self):
if self.deadline is None:
return self._read_timeout
dl_to = deadline_to_timeout(self.deadline, True)
return min(dl_to, self._read_timeout)
@property
def write_timeout(self):
if self.deadline is None:
return self._write_timeout
dl_to = deadline_to_timeout(self.deadline, True)
return min(dl_to, self._write_timeout)
[docs] def stream(self):
"""
Uploads a stream of data.
:returns: a tuple of 3 which contains:
* the list of chunks to be saved in the container
* the number of bytes transfered
* the actual checksum of the data that went through the stream.
"""
raise NotImplementedError()
[docs]def consume(it):
for _x in it:
pass
[docs]class Closeable(object):
def __init__(self, *iterables):
self.iterables = iterables
def __iter__(self):
return iter(itertools.chain(*(self.iterables)))
[docs] def close(self):
for iterator in self.iterables:
close_method = getattr(iterator, 'close', None)
if close_method:
close_method()
self.iterables = None
[docs]def chain(iterable):
iterator = iter(iterable)
try:
d = ''
while not d:
d = next(iterator)
return Closeable([d], iterator)
except StopIteration:
return []
[docs]def iters_to_raw_body(parts_iter):
try:
body_iter = next(parts_iter)['iter']
except StopIteration:
return ''
def wrap(it, _j):
for d in it:
yield d
try:
next(_j)
except StopIteration:
pass
return wrap(body_iter, parts_iter)
[docs]def discard_bytes(buf_size, start):
"""
Discard the right amount of bytes so the reader
yields only full records.
"""
return (buf_size - (start % buf_size)) % buf_size
[docs]class ChunkReader(object):
"""
Reads a chunk.
"""
def __init__(self, chunk_iter, buf_size, headers,
connection_timeout=None, read_timeout=None,
align=False, **_kwargs):
"""
:param chunk_iter:
:param buf_size: size of the read buffer
:param headers:
:param connection_timeout: timeout to establish the connection
:param read_timeout: timeout to read a buffer of data
:param align: if True, the reader will skip some bytes to align
on `buf_size`
"""
self.chunk_iter = chunk_iter
self.source = None
# TODO deal with provided headers
self._headers = None
self.request_headers = headers
self.sources = []
self.status = None
# buf size indicates the amount we data we yield
self.buf_size = buf_size
self.discard_bytes = 0
self.align = align
self.connection_timeout = connection_timeout or CONNECTION_TIMEOUT
self.read_timeout = read_timeout or CHUNK_TIMEOUT
self._resp_by_chunk = dict()
@property
def reqid(self):
""":returns: the request ID or None"""
if not self.request_headers:
return None
return self.request_headers.get('X-oio-req-id')
[docs] def recover(self, nb_bytes):
"""
Recover the request.
:param nb_bytes: number of bytes already consumed that we need to
discard if we perform a recovery from another source.
:raises `ValueError`: if range header is not valid
:raises `oio.common.exceptions.UnsatisfiableRange`:
:raises `oio.common.exceptions.EmptyByteRange`:
"""
if 'Range' in self.request_headers:
request_range = ranges_from_http_header(
self.request_headers['Range'])
start, end = request_range[0]
if start is None:
# suffix byte range
end -= nb_bytes
else:
start += nb_bytes
if end is not None:
if start == end + 1:
# no more bytes to serve in the requested byte range
raise exc.EmptyByteRange()
if start > end:
# invalid range
raise exc.UnsatisfiableRange()
if end and start:
# full byte range
request_range = [(start, end)] + request_range[1:]
else:
# suffix byte range
request_range = [(None, end)] + request_range[1:]
else:
# prefix byte range
request_range = [(start, None)] + request_range[1:]
self.request_headers['Range'] = http_header_from_ranges(
request_range)
else:
# just add an offset to the request
self.request_headers['Range'] = 'bytes=%d-' % nb_bytes
def _get_request(self, chunk):
"""
Connect to a chunk, fetch headers but don't read data.
Save the response object in `self.sources` list.
"""
try:
with green.ConnectionTimeout(self.connection_timeout):
raw_url = chunk.get("real_url", chunk["url"])
parsed = urlparse(raw_url)
conn = http_connect(parsed.netloc, 'GET', parsed.path,
self.request_headers)
with green.OioTimeout(self.read_timeout):
source = conn.getresponse()
source.conn = conn
except (SocketError, Timeout) as err:
logger.error('Connection failed to %s (reqid=%s): %s',
chunk, self.reqid, err)
self._resp_by_chunk[chunk["url"]] = (0, str(err))
return False
except Exception as err:
logger.exception('Connection failed to %s (reqid=%s)',
chunk, self.reqid)
self._resp_by_chunk[chunk["url"]] = (0, str(err))
return False
if source.status in (200, 206):
self.status = source.status
self._headers = source.getheaders()
self.sources.append((source, chunk))
return True
else:
logger.warn("Invalid response from %s (reqid=%s): %d %s",
chunk, self.reqid, source.status, source.reason)
self._resp_by_chunk[chunk["url"]] = (source.status,
str(source.reason))
close_source(source)
return False
def _get_source(self):
"""
Iterate on chunks until one answers,
and return the response object.
"""
for chunk in self.chunk_iter:
# continue to iterate until we find a valid source
if self._get_request(chunk):
break
if self.sources:
source, chunk = self.sources.pop()
return source, chunk
return None, None
[docs] def get_iter(self):
source, chunk = self._get_source()
if source:
return self._get_iter(chunk, source)
errors = group_chunk_errors(self._resp_by_chunk.items())
if len(errors) == 1:
# All errors are of the same type, group them
status, chunks = errors.popitem()
raise exc.from_status(status[0], "%s %s" % (status[1], chunks))
raise exc.ServiceUnavailable("unavailable chunks: %s" %
self._resp_by_chunk)
[docs] def stream(self):
"""
Get a generator over chunk data.
After calling this method, the `headers` field will be available
(even if no data is read from the generator).
"""
parts_iter = self.get_iter()
def _iter():
for part in parts_iter:
for data in part['iter']:
yield data
raise StopIteration
return GeneratorIO(_iter())
[docs] def fill_ranges(self, start, end, length):
"""
Fill the request ranges.
"""
if length == 0:
return
if self.align and self.buf_size:
# discard bytes
# so we only yield complete EC segments
self.discard_bytes = discard_bytes(self.buf_size, start)
# change headers for efficient recovery
if 'Range' in self.request_headers:
try:
orig_ranges = ranges_from_http_header(
self.request_headers['Range'])
new_ranges = [(start, end)] + orig_ranges[1:]
except ValueError:
new_ranges = [(start, end)]
else:
new_ranges = [(start, end)]
self.request_headers['Range'] = http_header_from_ranges(
new_ranges)
[docs] @staticmethod
def get_next_part(parts_iter):
"""
Gets next part of the body
NOTE: for the moment only return one part
(single range only)
"""
while True:
try:
with green.ChunkReadTimeout(CHUNK_TIMEOUT):
start, end, length, headers, part = next(
parts_iter[0])
return (start, end, length, headers, part)
except green.ChunkReadTimeout:
# TODO recover
raise StopIteration
[docs] def iter_from_resp(self, source, parts_iter, part, chunk):
bytes_consumed = 0
count = 0
buf = ''
while True:
try:
with green.ChunkReadTimeout(self.read_timeout):
data = part.read(READ_CHUNK_SIZE)
count += 1
buf += data
except green.ChunkReadTimeout as crto:
try:
self.recover(bytes_consumed)
except (exc.UnsatisfiableRange, ValueError):
raise
except exc.EmptyByteRange:
# we are done already
break
buf = ''
# find a new source to perform recovery
new_source, new_chunk = self._get_source()
if new_source:
logger.warn(
"Failed to read from %s (%s), "
"retrying from %s (reqid=%s)",
chunk, crto, new_chunk, self.reqid)
close_source(source[0])
# switch source
source[0] = new_source
chunk = new_chunk
parts_iter[0] = make_iter_from_resp(source[0])
try:
_j, _j, _j, _j, part = \
self.get_next_part(parts_iter)
except StopIteration:
# failed to recover
# we did our best
return
else:
logger.warn("Failed to read from %s (%s, reqid=%s)",
chunk, crto, self.reqid)
# no valid source found to recover
raise
else:
# discard bytes
if buf and self.discard_bytes:
if self.discard_bytes < len(buf):
buf = buf[self.discard_bytes:]
bytes_consumed += self.discard_bytes
self.discard_bytes = 0
else:
self.discard_bytes -= len(buf)
bytes_consumed += len(buf)
buf = ''
# no data returned
# flush out buffer
if not data:
if buf:
bytes_consumed += len(buf)
yield buf
buf = ''
break
# If buf_size is defined, yield bounded data buffers
if self.buf_size is not None:
while len(buf) >= self.buf_size:
read_d = buf[:self.buf_size]
buf = buf[self.buf_size:]
yield read_d
bytes_consumed += len(read_d)
else:
yield buf
bytes_consumed += len(buf)
buf = ''
# avoid starvation by forcing sleep()
# every once in a while
if count % 10 == 0:
sleep()
def _get_iter(self, chunk, source):
source = [source]
try:
parts_iter = [make_iter_from_resp(source[0])]
body_iter = None
try:
while True:
start, end, length, headers, part = \
self.get_next_part(parts_iter)
self.fill_ranges(start, end, length)
body_iter = self.iter_from_resp(
source, parts_iter, part, chunk)
result = {'start': start, 'end': end, 'length': length,
'iter': body_iter, 'headers': headers}
yield result
except StopIteration:
pass
except green.ChunkReadTimeout:
logger.exception("Failure during chunk read (reqid=%s)",
self.reqid)
raise
except Exception:
logger.exception("Failure during read")
raise
finally:
close_source(source[0])
@property
def headers(self):
if not self._headers:
return dict()
return dict(self._headers)
def _create_iter(self, chunk, source):
parts_iter = self._get_iter(chunk, source)
for part in parts_iter:
for d in part['iter']:
yield d
def __iter__(self):
parts_iter = self.get_iter()
if not parts_iter:
raise exc.ChunkException()
for part in parts_iter:
for data in part['iter']:
yield data
raise StopIteration
[docs]def make_iter_from_resp(resp):
"""
Makes a part iterator from a HTTP response
iterator return tuples:
(start, end, length, headers, body_file)
"""
if resp.status == 200:
content_length = int(resp.getheader('Content-Length'))
return iter([(0, content_length - 1, content_length,
resp.getheaders(), resp)])
content_type, params = parse_content_type(resp.getheader('Content-Type'))
if content_type != 'multipart/byteranges':
start, end, _ = parse_content_range(
resp.getheader('Content-Range'))
return iter([(start, end, end-start+1, resp.getheaders(), resp)])
else:
raise ValueError("Invalid response with code %d and content-type %s" %
resp.status, content_type)