# Copyright (C) 2015-2017 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.
from oio.common.green import eventlet
import logging
import hashlib
from tempfile import TemporaryFile
from urlparse import urlparse
from oio.api import io
from oio.common.exceptions import SourceReadError, OioException
from oio.api.backblaze_http import Backblaze, BackblazeException
logger = logging.getLogger(__name__)
WORD_LENGTH = 10
TRY_REQUEST_NUMBER = 3
def _get_name(chunk):
raw_url = chunk["url"]
parsed = urlparse(raw_url)
return parsed.path.split('/')[-1]
def _connect_put(chunk, sysmeta, backblaze_info):
chunk_path = _get_name(chunk[0])
conn = {}
conn['chunk'] = chunk
conn['backblaze'] = Backblaze(backblaze_info['backblaze.account_id'],
backblaze_info['backblaze.application_key'],
backblaze_info['authorization'],
backblaze_info['upload_token'])
meta = sysmeta
meta['name'] = chunk_path
conn['sysmeta'] = meta
return conn
def _read_to_temp(size, source, checksum, temp, first_byte=None):
sha1 = hashlib.sha1()
if first_byte:
bytes_transferred = 1
sha1.update(first_byte)
checksum.update(first_byte)
temp.write(first_byte)
else:
bytes_transferred = 0
while True:
remaining_bytes = size - bytes_transferred
if io.WRITE_CHUNK_SIZE < remaining_bytes:
read_size = io.WRITE_CHUNK_SIZE
else:
read_size = remaining_bytes
try:
data = source.read(read_size)
except (ValueError, IOError) as err:
raise SourceReadError((str(err)))
if len(data) == 0:
break
sha1.update(data)
checksum.update(data)
temp.write(data)
bytes_transferred += len(data)
temp.seek(0)
return bytes_transferred, sha1.hexdigest(), checksum.hexdigest()
[docs]class BackblazeChunkWriteHandler(object):
def __init__(self, sysmeta, meta_chunk, checksum,
storage_method, backblaze_info):
self.sysmeta = sysmeta
self.meta_chunk = meta_chunk
self.checksum = checksum
self.storage_method = storage_method
self.backblaze_info = backblaze_info
def _upload_chunks(self, conn, size, sha1, md5, temp):
try_number = TRY_REQUEST_NUMBER
while True:
self.meta_chunk[0]['size'] = size
try:
conn['backblaze'].upload(self.backblaze_info['bucket_name'],
self.sysmeta, temp, sha1)
break
except BackblazeException as b2e:
temp.seek(0)
if try_number == 0:
logger.debug('headers sent: %s'
% str(b2e.headers_send))
raise OioException('backblaze upload error: %s' % str(b2e))
else:
sleep_time_default = pow(2,
TRY_REQUEST_NUMBER - try_number)
sleep = b2e.headers_received.get("Retry-After",
sleep_time_default)
eventlet.sleep(sleep)
try_number -= 1
self.meta_chunk[0]['hash'] = md5
return self.meta_chunk[0]["size"], self.meta_chunk
def _stream_small_chunks(self, source, conn, temp):
size, sha1, md5 = _read_to_temp(
conn['backblaze'].BACKBLAZE_MAX_CHUNK_SIZE,
source, self.checksum, temp)
if size <= 0:
return 0, list()
return self._upload_chunks(conn, size, sha1, md5, temp)
def _stream_big_chunks(self, source, conn, temp):
max_chunk_size = conn['backblaze'].BACKBLAZE_MAX_CHUNK_SIZE
sha1_array = list()
res = None
size, sha1, md5 = _read_to_temp(max_chunk_size, source,
self.checksum, temp)
if size <= 0:
return 0, list()
# obligated to read max_chunk_size + 1 bytes
# if the size of the file is max_chunk_size
# backblaze will not take it because
# the upload part must have at least 2 parts
first_byte = source.read(1)
if not first_byte:
return self._upload_chunks(conn, size, sha1, md5, temp)
tries = TRY_REQUEST_NUMBER
while True:
try:
res = conn['backblaze'].upload_part_begin(
self.backblaze_info['bucket_name'], self.sysmeta)
break
except BackblazeException as b2e:
tries -= 1
if tries == 0:
logger.debug('headers sent: %s'
% str(b2e.headers_send))
raise OioException('Error at the beginning of upload: %s'
% str(b2e))
else:
eventlet.sleep(pow(2, TRY_REQUEST_NUMBER - tries))
file_id = res['fileId']
part_num = 1
bytes_read = size + 1
tries = TRY_REQUEST_NUMBER
while True:
while True:
if bytes_read + max_chunk_size > self.meta_chunk[0]['size']:
to_read = self.meta_chunk[0]['size'] - bytes_read
else:
to_read = max_chunk_size
try:
res, sha1 = conn['backblaze'].upload_part(file_id, temp,
part_num, sha1)
break
except BackblazeException as b2e:
temp.seek(0)
tries = tries - 1
if tries == 0:
logger.debug("headers sent: %s"
% str(b2e.headers_send))
raise OioException('Error during upload: %s'
% str(b2e))
else:
val_tmp = pow(2, TRY_REQUEST_NUMBER - tries)
eventlet.sleep(b2e.headers_received.get('Retry-After',
val_tmp))
part_num += 1
sha1_array.append(sha1)
temp.seek(0)
temp.truncate(0)
size, sha1, md5 = _read_to_temp(to_read, source,
self.checksum, temp,
first_byte)
first_byte = None
bytes_read = bytes_read + size
if size == 0:
break
tries = TRY_REQUEST_NUMBER
while True:
try:
res = conn['backblaze'].upload_part_end(file_id,
sha1_array)
break
except BackblazeException as b2e:
tries = tries - 1
if tries == 0:
logger.warn('headers send: %s'
% str(b2e.headers_send))
raise OioException('Error at the end of upload: %s'
% str(b2e))
else:
eventlet.sleep(pow(2, TRY_REQUEST_NUMBER - tries))
self.meta_chunk[0]['hash'] = md5
return bytes_read, self.meta_chunk
[docs] def stream(self, source):
conn = _connect_put(self.meta_chunk, self.sysmeta,
self.backblaze_info)
with TemporaryFile() as temp:
if "size" not in self.meta_chunk[0]:
return self._stream_big_chunks(source, conn, temp)
if self.meta_chunk[0]["size"] > \
conn['backblaze'].BACKBLAZE_MAX_CHUNK_SIZE:
return self._stream_big_chunks(source, conn, temp)
return self._stream_small_chunks(source, conn, temp)
[docs]class BackblazeWriteHandler(io.WriteHandler):
def __init__(self, source, sysmeta, chunk_prep,
storage_method, backblaze_info, **kwargs):
super(BackblazeWriteHandler, self).__init__(
source, sysmeta, chunk_prep, storage_method, **kwargs)
self.backblaze_info = backblaze_info
[docs] def stream(self):
"""Only works with files, for the moment, because we need a file size
to known when to stop."""
global_checksum = hashlib.md5()
total_bytes_transferred = 0
content_chunks = list()
for meta_chunk in self.chunk_prep():
handler = BackblazeChunkWriteHandler(
self.sysmeta, meta_chunk, global_checksum, self.storage_method,
self.backblaze_info)
bytes_transferred, chunks = handler.stream(self.source)
if bytes_transferred <= 0:
break
content_chunks += chunks
total_bytes_transferred += bytes_transferred
content_checksum = global_checksum.hexdigest()
return content_chunks, total_bytes_transferred, content_checksum
[docs]class BackblazeDeleteHandler(object):
def __init__(self, meta, chunks, backblaze_info):
self.meta = meta
self.chunks = chunks
self.backblaze_info = backblaze_info
def _delete(self, conn):
sysmeta = conn['sysmeta']
try_number = TRY_REQUEST_NUMBER
while True:
try:
conn['backblaze'].delete(self.backblaze_info['bucket_name'],
sysmeta)
break
except BackblazeException as b2e:
if try_number == 0:
raise OioException('backblaze delete error: %s'
% str(b2e))
else:
eventlet.sleep(pow(2, TRY_REQUEST_NUMBER - try_number))
try_number -= 1
[docs] def delete(self):
for chunk in self.chunks:
conn = _connect_put(chunk, self.meta, self.backblaze_info)
self._delete(conn)
[docs]class BackblazeChunkDownloadHandler(object):
def __init__(self, meta, chunks, offset, size,
headers=None, backblaze_info=None):
self.failed_chunks = list()
self.chunks = chunks
headers = headers or {}
end = None
if size > 0 or offset:
if offset < 0:
h_range = "bytes=%d" % offset
elif size is not None:
h_range = "bytes=%d-%d" % (offset,
size + offset - 1)
else:
h_range = "bytes=%d-" % offset
headers["Range"] = h_range
self.headers = headers
self.begin = offset
self.end = end
self.meta = meta
self.backblaze_info = backblaze_info
[docs] def get_stream(self):
source = self._get_chunk_source()
stream = None
if source:
stream = self._make_stream(source)
return stream
def _get_chunk_source(self):
return Backblaze(self.backblaze_info['backblaze.account_id'],
self.backblaze_info['backblaze.application_key'],
self.backblaze_info['authorization'])
def _make_stream(self, source):
result = None
data = None
for chunk in self.chunks:
self.meta['name'] = _get_name(chunk)
try_number = TRY_REQUEST_NUMBER
while True:
try:
data = source.download(self.backblaze_info['bucket_name'],
self.meta, self.headers)
break
except BackblazeException as b2e:
if try_number == 0:
raise OioException('backblaze download error: %s'
% str(b2e))
else:
eventlet.sleep(pow(2, TRY_REQUEST_NUMBER - try_number))
try_number -= 1
if data:
result = data
return result
[docs]class BackblazeDownloadHandler(object):
def __init__(self, sysmeta, meta_chunks, backblaze_info, headers,
range_start=None, range_end=None):
self.meta_chunks = meta_chunks
self.backblaze_info = backblaze_info
self.headers = headers
self.sysmeta = sysmeta
def _get_streams(self):
for pos in range(len(self.meta_chunks)):
handler = BackblazeChunkDownloadHandler(self.sysmeta,
self.meta_chunks[pos],
0, 0, None,
self.backblaze_info)
stream = handler.get_stream()
if not stream:
raise OioException("Error while downloading")
yield stream
[docs] def get_iter(self):
yield self._get_streams()