Source code for oio.content.plain

# Copyright (C) 2015-2017 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.


from oio.api.replication import ReplicatedWriteHandler
from oio.common.storage_functions import _sort_chunks, fetch_stream
from oio.common.storage_method import STORAGE_METHODS
from oio.content.content import Content, Chunk
from oio.common import exceptions as exc
from oio.common.exceptions import UnrecoverableContent


[docs]class PlainContent(Content):
[docs] def fetch(self): storage_method = STORAGE_METHODS.load(self.chunk_method) chunks = _sort_chunks(self.chunks.raw(), storage_method.ec, logger=self.logger) stream = fetch_stream(chunks, None, storage_method) return stream
[docs] def create(self, stream, **kwargs): storage_method = STORAGE_METHODS.load(self.chunk_method) sysmeta = self._generate_sysmeta() chunks = _sort_chunks(self.chunks.raw(), storage_method.ec, logger=self.logger) # TODO deal with headers headers = {} handler = ReplicatedWriteHandler( stream, sysmeta, chunks, storage_method, headers=headers) final_chunks, bytes_transferred, content_checksum = handler.stream() # TODO sanity checks self.checksum = content_checksum.upper() self._create_object(**kwargs) return final_chunks, bytes_transferred, content_checksum
[docs] def rebuild_chunk(self, chunk_id, allow_same_rawx=False, chunk_pos=None, allow_frozen_container=False): # Identify the chunk to rebuild current_chunk = self.chunks.filter(id=chunk_id).one() if current_chunk is None and chunk_pos is None: raise exc.OrphanChunk("Chunk not found in content") elif chunk_pos is None: chunk_pos = current_chunk.pos duplicate_chunks = self.chunks.filter( pos=chunk_pos).exclude(id=chunk_id).all() if len(duplicate_chunks) == 0: raise UnrecoverableContent("No copy of missing chunk") if current_chunk is None: chunk = {} chunk['hash'] = duplicate_chunks[0].checksum chunk['size'] = duplicate_chunks[0].size chunk['url'] = '' chunk['pos'] = chunk_pos current_chunk = Chunk(chunk) # Find a spare chunk address broken_list = list() if not allow_same_rawx and chunk_id is not None: broken_list.append(current_chunk) spare_urls, _quals = self._get_spare_chunk( duplicate_chunks, broken_list) spare_url = spare_urls[0] # Actually create the spare chunk, by duplicating a good one uploaded = False for src in duplicate_chunks: try: self.blob_client.chunk_copy( src.url, spare_url, chunk_id=chunk_id, fullpath=self.full_path, cid=self.container_id, path=self.path, version=self.version, content_id=self.content_id) self.logger.debug('Chunk copied from %s to %s, registering it', src.url, spare_url) uploaded = True break except Exception as err: self.logger.warn( "Failed to copy chunk from %s to %s: %s %s", src.url, spare_url, type(err), str(err.message)) if not uploaded: raise UnrecoverableContent("No copy available of missing chunk") # Register the spare chunk in object's metadata if chunk_id is None: self._add_raw_chunk(current_chunk, spare_url, frozen=allow_frozen_container) else: self._update_spare_chunk(current_chunk, spare_url, frozen=allow_frozen_container) self.logger.info('Chunk %s repaired in %s', chunk_id or chunk_pos, spare_url)