Source code for oio.api.backblaze

# Copyright (C) 2015-2017 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.

import logging
import hashlib
from tempfile import TemporaryFile
from urlparse import urlparse
from oio.api import io
from oio.common.exceptions import SourceReadError, OioException
from oio.api.backblaze_http import Backblaze, BackblazeException
import eventlet
logger = logging.getLogger(__name__)
WORD_LENGTH = 10
TRY_REQUEST_NUMBER = 3


def _get_name(chunk):
    raw_url = chunk["url"]
    parsed = urlparse(raw_url)
    return parsed.path.split('/')[-1]


def _connect_put(chunk, sysmeta, backblaze_info):
    chunk_path = _get_name(chunk[0])
    conn = {}
    conn['chunk'] = chunk
    conn['backblaze'] = Backblaze(backblaze_info['backblaze.account_id'],
                                  backblaze_info['backblaze.application_key'],
                                  backblaze_info['authorization'],
                                  backblaze_info['upload_token'])
    meta = sysmeta
    meta['name'] = chunk_path
    conn['sysmeta'] = meta
    return conn


def _read_to_temp(size, source, checksum, temp, first_byte=None):
    sha1 = hashlib.sha1()
    if first_byte:
        bytes_transferred = 1
        sha1.update(first_byte)
        checksum.update(first_byte)
        temp.write(first_byte)
    else:
        bytes_transferred = 0
    while True:
        remaining_bytes = size - bytes_transferred
        if io.WRITE_CHUNK_SIZE < remaining_bytes:
            read_size = io.WRITE_CHUNK_SIZE
        else:
            read_size = remaining_bytes
        try:
            data = source.read(read_size)
        except (ValueError, IOError) as err:
            raise SourceReadError((str(err)))
        if len(data) == 0:
            break
        sha1.update(data)
        checksum.update(data)
        temp.write(data)
        bytes_transferred += len(data)
    temp.seek(0)
    return bytes_transferred, sha1.hexdigest(), checksum.hexdigest()


[docs]class BackblazeChunkWriteHandler(object): def __init__(self, sysmeta, meta_chunk, checksum, storage_method, backblaze_info): self.sysmeta = sysmeta self.meta_chunk = meta_chunk self.checksum = checksum self.storage_method = storage_method self.backblaze_info = backblaze_info def _upload_chunks(self, conn, size, sha1, md5, temp): try_number = TRY_REQUEST_NUMBER while True: self.meta_chunk[0]['size'] = size try: conn['backblaze'].upload(self.backblaze_info['bucket_name'], self.sysmeta, temp, sha1) break except BackblazeException as b2e: temp.seek(0) if try_number == 0: logger.debug('headers sent: %s' % str(b2e.headers_send)) raise OioException('backblaze upload error: %s' % str(b2e)) else: sleep_time_default = pow(2, TRY_REQUEST_NUMBER - try_number) sleep = b2e.headers_received.get("Retry-After", sleep_time_default) eventlet.sleep(sleep) try_number -= 1 self.meta_chunk[0]['hash'] = md5 return self.meta_chunk[0]["size"], self.meta_chunk def _stream_small_chunks(self, source, conn, temp): size, sha1, md5 = _read_to_temp( conn['backblaze'].BACKBLAZE_MAX_CHUNK_SIZE, source, self.checksum, temp) if size <= 0: return 0, list() return self._upload_chunks(conn, size, sha1, md5, temp) def _stream_big_chunks(self, source, conn, temp): max_chunk_size = conn['backblaze'].BACKBLAZE_MAX_CHUNK_SIZE sha1_array = list() res = None size, sha1, md5 = _read_to_temp(max_chunk_size, source, self.checksum, temp) if size <= 0: return 0, list() # obligated to read max_chunk_size + 1 bytes # if the size of the file is max_chunk_size # backblaze will not take it because # the upload part must have at least 2 parts first_byte = source.read(1) if not first_byte: return self._upload_chunks(conn, size, sha1, md5, temp) tries = TRY_REQUEST_NUMBER while True: try: res = conn['backblaze'].upload_part_begin( self.backblaze_info['bucket_name'], self.sysmeta) break except BackblazeException as b2e: tries -= 1 if tries == 0: logger.debug('headers sent: %s' % str(b2e.headers_send)) raise OioException('Error at the beginning of upload: %s' % str(b2e)) else: eventlet.sleep(pow(2, TRY_REQUEST_NUMBER - tries)) file_id = res['fileId'] part_num = 1 bytes_read = size + 1 tries = TRY_REQUEST_NUMBER while True: while True: if bytes_read + max_chunk_size > self.meta_chunk[0]['size']: to_read = self.meta_chunk[0]['size'] - bytes_read else: to_read = max_chunk_size try: res, sha1 = conn['backblaze'].upload_part(file_id, temp, part_num, sha1) break except BackblazeException as b2e: temp.seek(0) tries = tries - 1 if tries == 0: logger.debug("headers sent: %s" % str(b2e.headers_send)) raise OioException('Error during upload: %s' % str(b2e)) else: val_tmp = pow(2, TRY_REQUEST_NUMBER - tries) eventlet.sleep(b2e.headers_received.get('Retry-After', val_tmp)) part_num += 1 sha1_array.append(sha1) temp.seek(0) temp.truncate(0) size, sha1, md5 = _read_to_temp(to_read, source, self.checksum, temp, first_byte) first_byte = None bytes_read = bytes_read + size if size == 0: break tries = TRY_REQUEST_NUMBER while True: try: res = conn['backblaze'].upload_part_end(file_id, sha1_array) break except BackblazeException as b2e: tries = tries - 1 if tries == 0: logger.warn('headers send: %s' % str(b2e.headers_send)) raise OioException('Error at the end of upload: %s' % str(b2e)) else: eventlet.sleep(pow(2, TRY_REQUEST_NUMBER - tries)) self.meta_chunk[0]['hash'] = md5 return bytes_read, self.meta_chunk
[docs] def stream(self, source): conn = _connect_put(self.meta_chunk, self.sysmeta, self.backblaze_info) with TemporaryFile() as temp: if "size" not in self.meta_chunk[0]: return self._stream_big_chunks(source, conn, temp) if self.meta_chunk[0]["size"] > \ conn['backblaze'].BACKBLAZE_MAX_CHUNK_SIZE: return self._stream_big_chunks(source, conn, temp) return self._stream_small_chunks(source, conn, temp)
[docs]class BackblazeWriteHandler(io.WriteHandler): def __init__(self, source, sysmeta, chunk_prep, storage_method, backblaze_info, **kwargs): super(BackblazeWriteHandler, self).__init__( source, sysmeta, chunk_prep, storage_method, **kwargs) self.backblaze_info = backblaze_info
[docs] def stream(self): """Only works with files, for the moment, because we need a file size to known when to stop.""" global_checksum = hashlib.md5() total_bytes_transferred = 0 content_chunks = list() for meta_chunk in self.chunk_prep(): handler = BackblazeChunkWriteHandler( self.sysmeta, meta_chunk, global_checksum, self.storage_method, self.backblaze_info) bytes_transferred, chunks = handler.stream(self.source) if bytes_transferred <= 0: break content_chunks += chunks total_bytes_transferred += bytes_transferred content_checksum = global_checksum.hexdigest() return content_chunks, total_bytes_transferred, content_checksum
[docs]class BackblazeDeleteHandler(object): def __init__(self, meta, chunks, backblaze_info): self.meta = meta self.chunks = chunks self.backblaze_info = backblaze_info def _delete(self, conn): sysmeta = conn['sysmeta'] try_number = TRY_REQUEST_NUMBER while True: try: conn['backblaze'].delete(self.backblaze_info['bucket_name'], sysmeta) break except BackblazeException as b2e: if try_number == 0: raise OioException('backblaze delete error: %s' % str(b2e)) else: eventlet.sleep(pow(2, TRY_REQUEST_NUMBER - try_number)) try_number -= 1
[docs] def delete(self): for chunk in self.chunks: conn = _connect_put(chunk, self.meta, self.backblaze_info) self._delete(conn)
[docs]class BackblazeChunkDownloadHandler(object): def __init__(self, meta, chunks, offset, size, headers=None, backblaze_info=None): self.failed_chunks = list() self.chunks = chunks headers = headers or {} end = None if size > 0 or offset: if offset < 0: h_range = "bytes=%d" % offset elif size is not None: h_range = "bytes=%d-%d" % (offset, size + offset - 1) else: h_range = "bytes=%d-" % offset headers["Range"] = h_range self.headers = headers self.begin = offset self.end = end self.meta = meta self.backblaze_info = backblaze_info
[docs] def get_stream(self): source = self._get_chunk_source() stream = None if source: stream = self._make_stream(source) return stream
def _get_chunk_source(self): return Backblaze(self.backblaze_info['backblaze.account_id'], self.backblaze_info['backblaze.application_key'], self.backblaze_info['authorization']) def _make_stream(self, source): result = None data = None for chunk in self.chunks: self.meta['name'] = _get_name(chunk) try_number = TRY_REQUEST_NUMBER while True: try: data = source.download(self.backblaze_info['bucket_name'], self.meta, self.headers) break except BackblazeException as b2e: if try_number == 0: raise OioException('backblaze download error: %s' % str(b2e)) else: eventlet.sleep(pow(2, TRY_REQUEST_NUMBER - try_number)) try_number -= 1 if data: result = data return result
[docs]class BackblazeDownloadHandler(object): def __init__(self, sysmeta, meta_chunks, backblaze_info, headers, range_start=None, range_end=None): self.meta_chunks = meta_chunks self.backblaze_info = backblaze_info self.headers = headers self.sysmeta = sysmeta def _get_streams(self): for pos in range(len(self.meta_chunks)): handler = BackblazeChunkDownloadHandler(self.sysmeta, self.meta_chunks[pos], 0, 0, None, self.backblaze_info) stream = handler.get_stream() if not stream: raise OioException("Error while downloading") yield stream
[docs] def get_iter(self): yield self._get_streams()