# Copyright (C) 2015-2018 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.
from oio.common import exceptions as exc
from oio.common.exceptions import ClientException, OrphanChunk
from oio.common.logger import get_logger
from oio.blob.client import BlobClient
from oio.container.client import ContainerClient
from oio.common.constants import OIO_VERSION
from oio.common.fullpath import encode_fullpath
[docs]class Content(object):
# FIXME: no need for container_id since we have account and container name
def __init__(self, conf, container_id, metadata, chunks, storage_method,
account, container_name, blob_client=None,
container_client=None, logger=None):
self.conf = conf
self.container_id = container_id
self.metadata = metadata
self.chunks = ChunksHelper(chunks)
self.storage_method = storage_method
self.logger = logger or get_logger(self.conf)
self.blob_client = (blob_client or BlobClient(conf))
self.container_client = (container_client
or ContainerClient(self.conf,
logger=self.logger))
# FIXME: all these may be properties
self.content_id = self.metadata["id"]
self.path = self.metadata["name"]
self.length = int(self.metadata["length"])
self.version = self.metadata["version"]
self.checksum = self.metadata["hash"]
self.chunk_method = self.metadata["chunk_method"]
self.account = account
self.container_name = container_name
if 'full_path' in self.metadata:
self.full_path = metadata['full_path']
else:
self.full_path = encode_fullpath(
self.account, self.container_name, self.path, self.version,
self.content_id)
@property
def mime_type(self):
return self.metadata["mime_type"]
@mime_type.setter
def mime_type(self, value):
self.metadata["mime_type"] = value
@property
def policy(self):
return self.metadata["policy"]
@policy.setter
def policy(self, value):
self.metadata["policy"] = value
@property
def properties(self):
return self.metadata.get('properties')
@properties.setter
def properties(self, value):
if not isinstance(value, dict):
raise ValueError("'value' must be a dict")
self.metadata['properties'] = value
def _get_spare_chunk(self, chunks_notin, chunks_broken):
spare_data = {
"notin": ChunksHelper(chunks_notin, False).raw(),
"broken": ChunksHelper(chunks_broken, False).raw()
}
try:
spare_resp = self.container_client.content_spare(
cid=self.container_id, path=self.content_id,
data=spare_data, stgpol=self.policy)
except ClientException as e:
raise exc.SpareChunkException("No spare chunk (%s)" % e.message)
url_list = []
for c in spare_resp["chunks"]:
url_list.append(c["id"])
return url_list
def _add_raw_chunk(self, current_chunk, url):
data = {'type': 'chunk',
'id': url,
'hash': current_chunk.checksum,
'size': current_chunk.size,
'pos': current_chunk.pos,
'content': self.content_id}
self.container_client.container_raw_insert(
data, cid=self.container_id)
def _update_spare_chunk(self, current_chunk, new_url):
old = {'type': 'chunk',
'id': current_chunk.url,
'hash': current_chunk.checksum,
'size': current_chunk.size,
'pos': current_chunk.pos,
'content': self.content_id}
new = {'type': 'chunk',
'id': new_url,
'hash': current_chunk.checksum,
'size': current_chunk.size,
'pos': current_chunk.pos,
'content': self.content_id}
self.container_client.container_raw_update(
[old], [new], cid=self.container_id)
def _generate_sysmeta(self):
sysmeta = dict()
sysmeta['id'] = self.content_id
sysmeta['version'] = self.version
sysmeta['policy'] = self.policy
sysmeta['mime_type'] = self.mime_type
sysmeta['chunk_method'] = self.chunk_method
sysmeta['chunk_size'] = self.metadata['chunk_size']
sysmeta['oio_version'] = OIO_VERSION
sysmeta['full_path'] = self.full_path
sysmeta['content_path'] = self.path
sysmeta['container_id'] = self.container_id
return sysmeta
def _create_object(self, **kwargs):
data = {'chunks': self.chunks.raw(),
'properties': self.properties}
self.container_client.content_create(
cid=self.container_id, path=self.path, content_id=self.content_id,
stgpol=self.policy, size=self.length, checksum=self.checksum,
version=self.version, chunk_method=self.chunk_method,
mime_type=self.mime_type, data=data,
**kwargs)
[docs] def rebuild_chunk(self, chunk_id, allow_same_rawx=False, chunk_pos=None):
raise NotImplementedError()
[docs] def create(self, stream, **kwargs):
raise NotImplementedError()
[docs] def fetch(self):
raise NotImplementedError()
[docs] def delete(self, **kwargs):
self.container_client.content_delete(
cid=self.container_id, path=self.path, **kwargs)
[docs] def move_chunk(self, chunk_id):
current_chunk = self.chunks.filter(id=chunk_id).one()
if current_chunk is None:
raise OrphanChunk("Chunk not found in content")
other_chunks = self.chunks.filter(
metapos=current_chunk.metapos).exclude(id=chunk_id).all()
spare_urls = self._get_spare_chunk(other_chunks, [current_chunk])
self.logger.debug("copy chunk from %s to %s",
current_chunk.url, spare_urls[0])
self.blob_client.chunk_copy(
current_chunk.url, spare_urls[0], chunk_id=chunk_id,
fullpath=self.full_path, cid=self.container_id,
path=self.path, version=self.version, content_id=self.content_id)
self._update_spare_chunk(current_chunk, spare_urls[0])
try:
self.blob_client.chunk_delete(current_chunk.url)
except Exception as err:
self.logger.warn(
"Failed to delete chunk %s: %s", current_chunk.url, err)
current_chunk.url = spare_urls[0]
return current_chunk.raw()
[docs] def move_linked_chunk(self, chunk_id, from_url):
current_chunk = self.chunks.filter(id=chunk_id).one()
if current_chunk is None:
raise OrphanChunk("Chunk not found in content")
_, to_url = self.blob_client.chunk_link(from_url, None, self.full_path)
self.logger.debug("link chunk %s from %s to %s", chunk_id, from_url,
to_url)
self._update_spare_chunk(current_chunk, to_url)
try:
self.blob_client.chunk_delete(current_chunk.url)
except Exception as err:
self.logger.warn(
"Failed to delete chunk %s: %s", current_chunk.url, err)
current_chunk.url = to_url
return current_chunk.raw()
[docs]class Chunk(object):
def __init__(self, chunk):
self._data = chunk
self._pos = chunk['pos']
d = self.pos.split('.', 1)
if len(d) > 1:
ec = True
self._metapos = int(d[0])
self._subpos = int(d[1])
else:
self._metapos = int(self._pos)
ec = False
self._ec = ec
@property
def ec(self):
return self._ec
@property
def url(self):
return self._data["url"]
@url.setter
def url(self, new_url):
self._data["url"] = new_url
@property
def pos(self):
return self._pos
@property
def metapos(self):
return self._metapos
@property
def subpos(self):
return self._subpos
@property
def size(self):
return self._data["size"]
@size.setter
def size(self, new_size):
self._data["size"] = new_size
@property
def id(self):
return self.url.split('/')[-1]
@property
def host(self):
return self.url.split('/')[2]
@property
def checksum(self):
return self._data["hash"].upper()
@checksum.setter
def checksum(self, new_checksum):
self._data["hash"] = new_checksum
@property
def data(self):
return self._data
[docs] def raw(self):
return self._data
def __str__(self):
return "[Chunk %s (%s)]" % (self.url, self.pos)
def __cmp__(self, other):
if self.metapos != other.metapos:
return cmp(self.metapos, other.metapos)
if not self.ec:
return cmp(self.id, other.id)
return cmp(self.subpos, other.subpos)
[docs]class ChunksHelper(object):
def __init__(self, chunks, raw_chunk=True):
if raw_chunk:
self.chunks = []
for c in chunks:
self.chunks.append(Chunk(c))
else:
self.chunks = chunks
self.chunks.sort()
[docs] def filter(self, id=None, pos=None, metapos=None, subpos=None, host=None):
found = []
for c in self.chunks:
if id is not None and c.id != id:
continue
if pos is not None and c.pos != str(pos):
continue
if metapos is not None and c.metapos != metapos:
continue
if subpos is not None and c.subpos != subpos:
continue
if host is not None and c.host != host:
continue
found.append(c)
return ChunksHelper(found, False)
[docs] def exclude(self, id=None, pos=None, metapos=None, subpos=None, host=None):
found = []
for c in self.chunks:
if id is not None and c.id == id:
continue
if pos is not None and c.pos == str(pos):
continue
if metapos is not None and c.metapos == metapos:
continue
if subpos is not None and c.subpos == subpos:
continue
if host is not None and c.host == host:
continue
found.append(c)
return ChunksHelper(found, False)
[docs] def one(self):
if len(self.chunks) != 1:
return None
return self.chunks[0]
[docs] def all(self):
return self.chunks
[docs] def raw(self):
res = []
for c in self.chunks:
res.append(c.raw())
return res
def __len__(self):
return len(self.chunks)
def __iter__(self):
for c in self.chunks:
yield c
def __getitem__(self, item):
return self.chunks[item]