Source code for oio.api.replication
# Copyright (C) 2015-2020 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.
import hashlib
from oio.common.green import LightQueue, Timeout, GreenPile
from socket import error as SocketError
from six import text_type
from six.moves.urllib_parse import urlparse
from oio.api import io
from oio.common.exceptions import OioTimeout, SourceReadError, \
SourceReadTimeout
from oio.common.http import headers_from_object_metadata
from oio.common.utils import encode, monotonic_time
from oio.common.constants import CHUNK_HEADERS
from oio.common import green
from oio.common.logger import get_logger
LOGGER = get_logger({}, __name__)
[docs]class FakeChecksum(object):
"""Acts as a checksum object but does not compute anything"""
def __init__(self, actual_checksum):
self.checksum = actual_checksum
[docs] def hexdigest(self):
"""Returns the checksum passed as constructor parameter"""
return self.checksum
[docs]class ReplicatedMetachunkWriter(io.MetachunkWriter):
def __init__(self, sysmeta, meta_chunk, checksum, storage_method,
quorum=None, connection_timeout=None, write_timeout=None,
read_timeout=None, headers=None, **kwargs):
super(ReplicatedMetachunkWriter, self).__init__(
storage_method=storage_method, quorum=quorum, **kwargs)
self.sysmeta = sysmeta
self.meta_chunk = meta_chunk
self.checksum = checksum
self.connection_timeout = connection_timeout or io.CONNECTION_TIMEOUT
self.write_timeout = write_timeout or io.CHUNK_TIMEOUT
self.read_timeout = read_timeout or io.CLIENT_TIMEOUT
self.headers = headers or {}
self.logger = kwargs.get('logger', LOGGER)
[docs] def stream(self, source, size):
bytes_transferred = 0
meta_chunk = self.meta_chunk
if self.chunk_checksum_algo:
meta_checksum = hashlib.new(self.chunk_checksum_algo)
else:
meta_checksum = None
pile = GreenPile(len(meta_chunk))
failed_chunks = []
current_conns = []
for chunk in meta_chunk:
pile.spawn(self._connect_put, chunk)
for conn, chunk in pile:
if not conn:
failed_chunks.append(chunk)
else:
current_conns.append(conn)
self.quorum_or_fail([co.chunk for co in current_conns], failed_chunks)
bytes_transferred = 0
try:
with green.ContextPool(len(meta_chunk)) as pool:
for conn in current_conns:
conn.failed = False
conn.queue = LightQueue(io.PUT_QUEUE_DEPTH)
pool.spawn(self._send_data, conn)
while True:
buffer_size = self.buffer_size()
if size is not None:
remaining_bytes = size - bytes_transferred
if buffer_size < remaining_bytes:
read_size = buffer_size
else:
read_size = remaining_bytes
else:
read_size = buffer_size
with green.SourceReadTimeout(self.read_timeout):
try:
data = source.read(read_size)
except (ValueError, IOError) as err:
raise SourceReadError(str(err))
if len(data) == 0:
for conn in current_conns:
if not conn.failed:
conn.queue.put(b'')
break
self.checksum.update(data)
if meta_checksum:
meta_checksum.update(data)
bytes_transferred += len(data)
# copy current_conns to be able to remove a failed conn
for conn in current_conns[:]:
if not conn.failed:
conn.queue.put(data)
else:
current_conns.remove(conn)
failed_chunks.append(conn.chunk)
self.quorum_or_fail([co.chunk for co in current_conns],
failed_chunks)
for conn in current_conns:
while conn.queue.qsize():
green.eventlet_yield()
except green.SourceReadTimeout as err:
self.logger.warn('Source read timeout (reqid=%s): %s',
self.reqid, err)
raise SourceReadTimeout(err)
except SourceReadError as err:
self.logger.warn('Source read error (reqid=%s): %s',
self.reqid, err)
raise
except Timeout as to:
self.logger.warn('Timeout writing data (reqid=%s): %s',
self.reqid, to)
raise OioTimeout(to)
except Exception:
self.logger.exception('Exception writing data (reqid=%s)',
self.reqid)
raise
success_chunks = []
for conn in current_conns:
if conn.failed:
failed_chunks.append(conn.chunk)
continue
pile.spawn(self._get_response, conn)
for (conn, resp) in pile:
if resp:
self._handle_resp(
conn, resp,
meta_checksum.hexdigest() if meta_checksum else None,
success_chunks, failed_chunks)
self.quorum_or_fail(success_chunks, failed_chunks)
for chunk in success_chunks:
chunk["size"] = bytes_transferred
return bytes_transferred, success_chunks[0]['hash'], success_chunks
def _connect_put(self, chunk):
"""
Create a connection in order to PUT `chunk`.
:returns: a tuple with the connection object and `chunk`
"""
raw_url = chunk.get("real_url", chunk["url"])
parsed = urlparse(raw_url)
try:
chunk_path = parsed.path.split('/')[-1]
hdrs = headers_from_object_metadata(self.sysmeta)
hdrs[CHUNK_HEADERS["chunk_pos"]] = chunk["pos"]
hdrs[CHUNK_HEADERS["chunk_id"]] = chunk_path
hdrs.update(self.headers)
hdrs = encode(hdrs)
with green.ConnectionTimeout(self.connection_timeout):
if self.perfdata is not None:
connect_start = monotonic_time()
conn = io.http_connect(
parsed.netloc, 'PUT', parsed.path, hdrs,
scheme=parsed.scheme)
conn.set_cork(True)
if self.perfdata is not None:
connect_end = monotonic_time()
perfdata_rawx = self.perfdata.setdefault('rawx', dict())
perfdata_rawx['connect.' + chunk['url']] = \
connect_end - connect_start
conn.chunk = chunk
return conn, chunk
except (SocketError, Timeout) as err:
msg = str(err)
self.logger.warn("Failed to connect to %s (reqid=%s): %s",
chunk, self.reqid, err)
except Exception as err:
msg = str(err)
self.logger.exception("Failed to connect to %s (reqid=%s)",
chunk, self.reqid)
chunk['error'] = msg
return None, chunk
def _send_data(self, conn):
"""
Send data to an open connection, taking data blocks from `conn.queue`.
"""
conn.upload_start = None
while True:
data = conn.queue.get()
if isinstance(data, text_type):
data = data.encode('utf-8')
if not conn.failed:
try:
with green.ChunkWriteTimeout(self.write_timeout):
if self.perfdata is not None \
and conn.upload_start is None:
conn.upload_start = monotonic_time()
conn.send(b'%x\r\n' % len(data))
conn.send(data)
conn.send(b'\r\n')
if not data:
if self.perfdata is not None:
fin_start = monotonic_time()
# Last segment sent, disable TCP_CORK to flush buffers
conn.set_cork(False)
if self.perfdata is not None:
fin_end = monotonic_time()
rawx_perfdata = self.perfdata.setdefault('rawx',
dict())
chunk_url = conn.chunk['url']
rawx_perfdata['upload_finish.' + chunk_url] = \
fin_end - fin_start
green.eventlet_yield()
except (Exception, green.ChunkWriteTimeout) as err:
conn.failed = True
conn.chunk['error'] = str(err)
def _get_response(self, conn):
"""
Wait for server response.
:returns: a tuple with `conn` and the reponse object or an exception.
"""
try:
with green.ChunkWriteTimeout(self.write_timeout):
resp = conn.getresponse()
if self.perfdata is not None:
upload_end = monotonic_time()
perfdata_rawx = self.perfdata.setdefault('rawx', dict())
chunk_url = conn.chunk['url']
perfdata_rawx['upload.' + chunk_url] = \
upload_end - conn.upload_start
except Timeout as err:
resp = err
self.logger.warn('Failed to read response from %s (reqid=%s): %s',
conn.chunk, self.reqid, err)
except Exception as err:
resp = err
self.logger.exception("Failed to read response from %s (reqid=%s)",
conn.chunk, self.reqid)
return (conn, resp)
def _handle_resp(self, conn, resp, checksum, successes, failures):
"""
If `resp` is an exception or its status is not 201,
declare `conn` as failed and put `conn.chunk` in
`failures` list.
Otherwise put `conn.chunk` in `successes` list.
And then close `conn`.
"""
if resp:
if isinstance(resp, (Exception, Timeout)):
conn.failed = True
conn.chunk['error'] = str(resp)
failures.append(conn.chunk)
elif resp.status != 201:
conn.failed = True
conn.chunk['error'] = 'HTTP %s' % resp.status
failures.append(conn.chunk)
self.logger.error(
"Unexpected status code from %s (reqid=%s): %s",
conn.chunk, self.reqid, resp.status)
else:
rawx_checksum = resp.getheader(CHUNK_HEADERS['chunk_hash'])
if rawx_checksum and checksum and \
rawx_checksum.lower() != checksum:
conn.failed = True
conn.chunk['error'] = \
"checksum mismatch: %s (local), %s (rawx)" % \
(checksum, rawx_checksum.lower())
failures.append(conn.chunk)
self.logger.error("%s (reqid=%s): %s",
conn.chunk['url'], self.reqid,
conn.chunk['error'])
else:
conn.chunk['hash'] = checksum or rawx_checksum
successes.append(conn.chunk)
conn.close()
[docs]class ReplicatedWriteHandler(io.WriteHandler):
"""
Handles writes to a replicated content.
For initialization parameters, see oio.api.io.WriteHandler.
"""
[docs] def stream(self):
global_checksum = hashlib.md5()
total_bytes_transferred = 0
content_chunks = []
kwargs = ReplicatedMetachunkWriter.filter_kwargs(self.extra_kwargs)
for meta_chunk in self.chunk_prep():
size = self.sysmeta['chunk_size']
handler = ReplicatedMetachunkWriter(
self.sysmeta, meta_chunk, global_checksum, self.storage_method,
connection_timeout=self.connection_timeout,
write_timeout=self.write_timeout,
read_timeout=self.read_timeout,
headers=self.headers,
**kwargs)
bytes_transferred, _h, chunks = handler.stream(self.source, size)
content_chunks += chunks
total_bytes_transferred += bytes_transferred
if bytes_transferred < size:
break
if len(self.source.peek()) == 0:
break
content_checksum = global_checksum.hexdigest()
return content_chunks, total_bytes_transferred, content_checksum