Source code for oio.api.replication
# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.
from oio.common.green import sleep, LightQueue, Timeout, GreenPile
import hashlib
from socket import error as SocketError
from urlparse import urlparse
from oio.api import io
from oio.common.exceptions import OioTimeout, SourceReadError, \
SourceReadTimeout
from oio.common.http import headers_from_object_metadata
from oio.common.utils import monotonic_time
from oio.common.constants import CHUNK_HEADERS
from oio.common import green
from oio.common.logger import get_logger
LOGGER = get_logger({}, __name__)
[docs]class FakeChecksum(object):
"""Acts as a checksum object but does not compute anything"""
def __init__(self, actual_checksum):
self.checksum = actual_checksum
[docs] def hexdigest(self):
"""Returns the checksum passed as constructor parameter"""
return self.checksum
[docs]class ReplicatedMetachunkWriter(io.MetachunkWriter):
def __init__(self, sysmeta, meta_chunk, checksum, storage_method,
quorum=None, connection_timeout=None, write_timeout=None,
read_timeout=None, headers=None, **kwargs):
super(ReplicatedMetachunkWriter, self).__init__(
storage_method=storage_method, quorum=quorum, **kwargs)
self.sysmeta = sysmeta
self.meta_chunk = meta_chunk
self.checksum = checksum
self.connection_timeout = connection_timeout or io.CONNECTION_TIMEOUT
self.write_timeout = write_timeout or io.CHUNK_TIMEOUT
self.read_timeout = read_timeout or io.CLIENT_TIMEOUT
self.headers = headers or {}
self.logger = kwargs.get('logger', LOGGER)
[docs] def stream(self, source, size):
bytes_transferred = 0
meta_chunk = self.meta_chunk
if self.chunk_checksum_algo:
meta_checksum = hashlib.new(self.chunk_checksum_algo)
else:
meta_checksum = None
pile = GreenPile(len(meta_chunk))
failed_chunks = []
current_conns = []
for chunk in meta_chunk:
pile.spawn(self._connect_put, chunk)
for conn, chunk in pile:
if not conn:
failed_chunks.append(chunk)
else:
current_conns.append(conn)
self.quorum_or_fail([co.chunk for co in current_conns], failed_chunks)
bytes_transferred = 0
try:
with green.ContextPool(len(meta_chunk)) as pool:
for conn in current_conns:
conn.failed = False
conn.queue = LightQueue(io.PUT_QUEUE_DEPTH)
pool.spawn(self._send_data, conn)
while True:
buffer_size = self.buffer_size()
if size is not None:
remaining_bytes = size - bytes_transferred
if buffer_size < remaining_bytes:
read_size = buffer_size
else:
read_size = remaining_bytes
else:
read_size = buffer_size
with green.SourceReadTimeout(self.read_timeout):
try:
data = source.read(read_size)
except (ValueError, IOError) as err:
raise SourceReadError(str(err))
if len(data) == 0:
for conn in current_conns:
if not conn.failed:
conn.queue.put('')
break
self.checksum.update(data)
if meta_checksum:
meta_checksum.update(data)
bytes_transferred += len(data)
# copy current_conns to be able to remove a failed conn
for conn in current_conns[:]:
if not conn.failed:
conn.queue.put(data)
else:
current_conns.remove(conn)
failed_chunks.append(conn.chunk)
self.quorum_or_fail([co.chunk for co in current_conns],
failed_chunks)
for conn in current_conns:
while conn.queue.qsize():
sleep(0)
except green.SourceReadTimeout as err:
self.logger.warn('Source read timeout (reqid=%s): %s',
self.reqid, err)
raise SourceReadTimeout(err)
except SourceReadError as err:
self.logger.warn('Source read error (reqid=%s): %s',
self.reqid, err)
raise
except Timeout as to:
self.logger.warn('Timeout writing data (reqid=%s): %s',
self.reqid, to)
raise OioTimeout(to)
except Exception:
self.logger.exception('Exception writing data (reqid=%s)',
self.reqid)
raise
success_chunks = []
for conn in current_conns:
if conn.failed:
failed_chunks.append(conn.chunk)
continue
pile.spawn(self._get_response, conn)
for (conn, resp) in pile:
if resp:
self._handle_resp(
conn, resp,
meta_checksum.hexdigest() if meta_checksum else None,
success_chunks, failed_chunks)
self.quorum_or_fail(success_chunks, failed_chunks)
for chunk in success_chunks:
chunk["size"] = bytes_transferred
return bytes_transferred, success_chunks[0]['hash'], success_chunks
def _connect_put(self, chunk):
"""
Create a connection in order to PUT `chunk`.
:returns: a tuple with the connection object and `chunk`
"""
raw_url = chunk.get("real_url", chunk["url"])
parsed = urlparse(raw_url)
try:
chunk_path = parsed.path.split('/')[-1]
hdrs = headers_from_object_metadata(self.sysmeta)
hdrs[CHUNK_HEADERS["chunk_pos"]] = chunk["pos"]
hdrs[CHUNK_HEADERS["chunk_id"]] = chunk_path
hdrs.update(self.headers)
with green.ConnectionTimeout(self.connection_timeout):
if self.perfdata is not None:
connect_start = monotonic_time()
conn = io.http_connect(
parsed.netloc, 'PUT', parsed.path, hdrs)
conn.set_cork(True)
if self.perfdata is not None:
connect_end = monotonic_time()
perfdata_rawx = self.perfdata.setdefault('rawx', dict())
perfdata_rawx[chunk['url']] = \
perfdata_rawx.get(chunk['url'], 0.0) \
+ connect_end - connect_start
conn.chunk = chunk
return conn, chunk
except (SocketError, Timeout) as err:
msg = str(err)
self.logger.warn("Failed to connect to %s (reqid=%s): %s",
chunk, self.reqid, err)
except Exception as err:
msg = str(err)
self.logger.exception("Failed to connect to %s (reqid=%s)",
chunk, self.reqid)
chunk['error'] = msg
return None, chunk
def _send_data(self, conn):
"""
Send data to an open connection, taking data blocks from `conn.queue`.
"""
conn.upload_start = None
while True:
data = conn.queue.get()
if not conn.failed:
try:
with green.ChunkWriteTimeout(self.write_timeout):
if self.perfdata is not None \
and conn.upload_start is None:
conn.upload_start = monotonic_time()
conn.send('%x\r\n' % len(data))
conn.send(data)
conn.send('\r\n')
if not data:
# Last segment sent, disable TCP_CORK to flush buffers
conn.set_cork(False)
sleep(0)
except (Exception, green.ChunkWriteTimeout) as err:
conn.failed = True
conn.chunk['error'] = str(err)
def _get_response(self, conn):
"""
Wait for server response.
:returns: a tuple with `conn` and the reponse object or an exception.
"""
try:
with green.ChunkWriteTimeout(self.write_timeout):
resp = conn.getresponse()
if self.perfdata is not None:
upload_end = monotonic_time()
perfdata_rawx = self.perfdata.setdefault('rawx', dict())
url_chunk = conn.chunk['url']
perfdata_rawx[url_chunk] = \
perfdata_rawx.get(url_chunk, 0.0) \
+ upload_end - conn.upload_start
except Timeout as err:
resp = err
self.logger.warn('Failed to read response from %s (reqid=%s): %s',
conn.chunk, self.reqid, err)
except Exception as err:
resp = err
self.logger.exception("Failed to read response from %s (reqid=%s)",
conn.chunk, self.reqid)
return (conn, resp)
def _handle_resp(self, conn, resp, checksum, successes, failures):
"""
If `resp` is an exception or its status is not 201,
declare `conn` as failed and put `conn.chunk` in
`failures` list.
Otherwise put `conn.chunk` in `successes` list.
And then close `conn`.
"""
if resp:
if isinstance(resp, (Exception, Timeout)):
conn.failed = True
conn.chunk['error'] = str(resp)
failures.append(conn.chunk)
elif resp.status != 201:
conn.failed = True
conn.chunk['error'] = 'HTTP %s' % resp.status
failures.append(conn.chunk)
self.logger.error(
"Unexpected status code from %s (reqid=%s): %s",
conn.chunk, self.reqid, resp.status)
else:
rawx_checksum = resp.getheader(CHUNK_HEADERS['chunk_hash'])
if rawx_checksum and checksum and \
rawx_checksum.lower() != checksum:
conn.failed = True
conn.chunk['error'] = \
"checksum mismatch: %s (local), %s (rawx)" % \
(checksum, rawx_checksum.lower())
failures.append(conn.chunk)
self.logger.error("%s (reqid=%s): %s",
conn.chunk['url'], self.reqid,
conn.chunk['error'])
else:
conn.chunk['hash'] = checksum or rawx_checksum
successes.append(conn.chunk)
conn.close()
[docs]class ReplicatedWriteHandler(io.WriteHandler):
"""
Handles writes to a replicated content.
For initialization parameters, see oio.api.io.WriteHandler.
"""
[docs] def stream(self):
global_checksum = hashlib.md5()
total_bytes_transferred = 0
content_chunks = []
kwargs = ReplicatedMetachunkWriter.filter_kwargs(self.extra_kwargs)
for meta_chunk in self.chunk_prep():
size = self.sysmeta['chunk_size']
handler = ReplicatedMetachunkWriter(
self.sysmeta, meta_chunk, global_checksum, self.storage_method,
connection_timeout=self.connection_timeout,
write_timeout=self.write_timeout,
read_timeout=self.read_timeout,
headers=self.headers,
**kwargs)
bytes_transferred, _h, chunks = handler.stream(self.source, size)
content_chunks += chunks
total_bytes_transferred += bytes_transferred
if bytes_transferred < size:
break
if len(self.source.peek()) == 0:
break
content_checksum = global_checksum.hexdigest()
return content_chunks, total_bytes_transferred, content_checksum