Source code for oio.api.backblaze

# Copyright (C) 2015-2017 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.


from oio.common.green import eventlet

import logging
import hashlib
from tempfile import TemporaryFile
from urlparse import urlparse
from oio.api import io
from oio.common.exceptions import SourceReadError, OioException
from oio.api.backblaze_http import Backblaze, BackblazeException


logger = logging.getLogger(__name__)
WORD_LENGTH = 10
TRY_REQUEST_NUMBER = 3


def _get_name(chunk):
    raw_url = chunk["url"]
    parsed = urlparse(raw_url)
    return parsed.path.split('/')[-1]


def _connect_put(chunk, sysmeta, backblaze_info):
    chunk_path = _get_name(chunk[0])
    conn = {}
    conn['chunk'] = chunk
    conn['backblaze'] = Backblaze(backblaze_info['backblaze.account_id'],
                                  backblaze_info['backblaze.application_key'],
                                  backblaze_info['authorization'],
                                  backblaze_info['upload_token'])
    meta = sysmeta
    meta['name'] = chunk_path
    conn['sysmeta'] = meta
    return conn


def _read_to_temp(size, source, checksum, temp, first_byte=None):
    sha1 = hashlib.sha1()
    if first_byte:
        bytes_transferred = 1
        sha1.update(first_byte)
        checksum.update(first_byte)
        temp.write(first_byte)
    else:
        bytes_transferred = 0
    while True:
        remaining_bytes = size - bytes_transferred
        if io.WRITE_CHUNK_SIZE < remaining_bytes:
            read_size = io.WRITE_CHUNK_SIZE
        else:
            read_size = remaining_bytes
        try:
            data = source.read(read_size)
        except (ValueError, IOError) as err:
            raise SourceReadError((str(err)))
        if len(data) == 0:
            break
        sha1.update(data)
        checksum.update(data)
        temp.write(data)
        bytes_transferred += len(data)
    temp.seek(0)
    return bytes_transferred, sha1.hexdigest(), checksum.hexdigest()


[docs]class BackblazeChunkWriteHandler(object): def __init__(self, sysmeta, meta_chunk, checksum, storage_method, backblaze_info): self.sysmeta = sysmeta self.meta_chunk = meta_chunk self.checksum = checksum self.storage_method = storage_method self.backblaze_info = backblaze_info def _upload_chunks(self, conn, size, sha1, md5, temp): try_number = TRY_REQUEST_NUMBER while True: self.meta_chunk[0]['size'] = size try: conn['backblaze'].upload(self.backblaze_info['bucket_name'], self.sysmeta, temp, sha1) break except BackblazeException as b2e: temp.seek(0) if try_number == 0: logger.debug('headers sent: %s' % str(b2e.headers_send)) raise OioException('backblaze upload error: %s' % str(b2e)) else: sleep_time_default = pow(2, TRY_REQUEST_NUMBER - try_number) sleep = b2e.headers_received.get("Retry-After", sleep_time_default) eventlet.sleep(sleep) try_number -= 1 self.meta_chunk[0]['hash'] = md5 return self.meta_chunk[0]["size"], self.meta_chunk def _stream_small_chunks(self, source, conn, temp): size, sha1, md5 = _read_to_temp( conn['backblaze'].BACKBLAZE_MAX_CHUNK_SIZE, source, self.checksum, temp) if size <= 0: return 0, list() return self._upload_chunks(conn, size, sha1, md5, temp) def _stream_big_chunks(self, source, conn, temp): max_chunk_size = conn['backblaze'].BACKBLAZE_MAX_CHUNK_SIZE sha1_array = list() res = None size, sha1, md5 = _read_to_temp(max_chunk_size, source, self.checksum, temp) if size <= 0: return 0, list() # obligated to read max_chunk_size + 1 bytes # if the size of the file is max_chunk_size # backblaze will not take it because # the upload part must have at least 2 parts first_byte = source.read(1) if not first_byte: return self._upload_chunks(conn, size, sha1, md5, temp) tries = TRY_REQUEST_NUMBER while True: try: res = conn['backblaze'].upload_part_begin( self.backblaze_info['bucket_name'], self.sysmeta) break except BackblazeException as b2e: tries -= 1 if tries == 0: logger.debug('headers sent: %s' % str(b2e.headers_send)) raise OioException('Error at the beginning of upload: %s' % str(b2e)) else: eventlet.sleep(pow(2, TRY_REQUEST_NUMBER - tries)) file_id = res['fileId'] part_num = 1 bytes_read = size + 1 tries = TRY_REQUEST_NUMBER while True: while True: if bytes_read + max_chunk_size > self.meta_chunk[0]['size']: to_read = self.meta_chunk[0]['size'] - bytes_read else: to_read = max_chunk_size try: res, sha1 = conn['backblaze'].upload_part(file_id, temp, part_num, sha1) break except BackblazeException as b2e: temp.seek(0) tries = tries - 1 if tries == 0: logger.debug("headers sent: %s" % str(b2e.headers_send)) raise OioException('Error during upload: %s' % str(b2e)) else: val_tmp = pow(2, TRY_REQUEST_NUMBER - tries) eventlet.sleep(b2e.headers_received.get('Retry-After', val_tmp)) part_num += 1 sha1_array.append(sha1) temp.seek(0) temp.truncate(0) size, sha1, md5 = _read_to_temp(to_read, source, self.checksum, temp, first_byte) first_byte = None bytes_read = bytes_read + size if size == 0: break tries = TRY_REQUEST_NUMBER while True: try: res = conn['backblaze'].upload_part_end(file_id, sha1_array) break except BackblazeException as b2e: tries = tries - 1 if tries == 0: logger.warn('headers send: %s' % str(b2e.headers_send)) raise OioException('Error at the end of upload: %s' % str(b2e)) else: eventlet.sleep(pow(2, TRY_REQUEST_NUMBER - tries)) self.meta_chunk[0]['hash'] = md5 return bytes_read, self.meta_chunk
[docs] def stream(self, source): conn = _connect_put(self.meta_chunk, self.sysmeta, self.backblaze_info) with TemporaryFile() as temp: if "size" not in self.meta_chunk[0]: return self._stream_big_chunks(source, conn, temp) if self.meta_chunk[0]["size"] > \ conn['backblaze'].BACKBLAZE_MAX_CHUNK_SIZE: return self._stream_big_chunks(source, conn, temp) return self._stream_small_chunks(source, conn, temp)
[docs]class BackblazeWriteHandler(io.WriteHandler): def __init__(self, source, sysmeta, chunk_prep, storage_method, backblaze_info, **kwargs): super(BackblazeWriteHandler, self).__init__( source, sysmeta, chunk_prep, storage_method, **kwargs) self.backblaze_info = backblaze_info
[docs] def stream(self): """Only works with files, for the moment, because we need a file size to known when to stop.""" global_checksum = hashlib.md5() total_bytes_transferred = 0 content_chunks = list() for meta_chunk in self.chunk_prep(): handler = BackblazeChunkWriteHandler( self.sysmeta, meta_chunk, global_checksum, self.storage_method, self.backblaze_info) bytes_transferred, chunks = handler.stream(self.source) if bytes_transferred <= 0: break content_chunks += chunks total_bytes_transferred += bytes_transferred content_checksum = global_checksum.hexdigest() return content_chunks, total_bytes_transferred, content_checksum
[docs]class BackblazeDeleteHandler(object): def __init__(self, meta, chunks, backblaze_info): self.meta = meta self.chunks = chunks self.backblaze_info = backblaze_info def _delete(self, conn): sysmeta = conn['sysmeta'] try_number = TRY_REQUEST_NUMBER while True: try: conn['backblaze'].delete(self.backblaze_info['bucket_name'], sysmeta) break except BackblazeException as b2e: if try_number == 0: raise OioException('backblaze delete error: %s' % str(b2e)) else: eventlet.sleep(pow(2, TRY_REQUEST_NUMBER - try_number)) try_number -= 1
[docs] def delete(self): for chunk in self.chunks: conn = _connect_put(chunk, self.meta, self.backblaze_info) self._delete(conn)
[docs]class BackblazeChunkDownloadHandler(object): def __init__(self, meta, chunks, offset, size, headers=None, backblaze_info=None): self.failed_chunks = list() self.chunks = chunks headers = headers or {} end = None if size > 0 or offset: if offset < 0: h_range = "bytes=%d" % offset elif size is not None: h_range = "bytes=%d-%d" % (offset, size + offset - 1) else: h_range = "bytes=%d-" % offset headers["Range"] = h_range self.headers = headers self.begin = offset self.end = end self.meta = meta self.backblaze_info = backblaze_info
[docs] def get_stream(self): source = self._get_chunk_source() stream = None if source: stream = self._make_stream(source) return stream
def _get_chunk_source(self): return Backblaze(self.backblaze_info['backblaze.account_id'], self.backblaze_info['backblaze.application_key'], self.backblaze_info['authorization']) def _make_stream(self, source): result = None data = None for chunk in self.chunks: self.meta['name'] = _get_name(chunk) try_number = TRY_REQUEST_NUMBER while True: try: data = source.download(self.backblaze_info['bucket_name'], self.meta, self.headers) break except BackblazeException as b2e: if try_number == 0: raise OioException('backblaze download error: %s' % str(b2e)) else: eventlet.sleep(pow(2, TRY_REQUEST_NUMBER - try_number)) try_number -= 1 if data: result = data return result
[docs]class BackblazeDownloadHandler(object): def __init__(self, sysmeta, meta_chunks, backblaze_info, headers, range_start=None, range_end=None): self.meta_chunks = meta_chunks self.backblaze_info = backblaze_info self.headers = headers self.sysmeta = sysmeta def _get_streams(self): for pos in range(len(self.meta_chunks)): handler = BackblazeChunkDownloadHandler(self.sysmeta, self.meta_chunks[pos], 0, 0, None, self.backblaze_info) stream = handler.get_stream() if not stream: raise OioException("Error while downloading") yield stream
[docs] def get_iter(self): yield self._get_streams()