# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.
from oio.common import exceptions as exc
from oio.common.logger import get_logger
from oio.blob.client import BlobClient
from oio.container.client import ContainerClient, extract_chunk_qualities
from oio.common.constants import OIO_VERSION
from oio.common.fullpath import encode_fullpath
[docs]def compare_chunk_quality(current, candidate):
"""
Compare the qualities of two chunks.
:returns: > 0 if the candidate is better quality,
0 if they are equal, < 0 if the candidate is worse.
"""
balance = 0
# Compare distance between chunks.
balance += candidate.get('final_dist', 1) - current.get('final_dist', 1)
# Compare use of fallback mechanisms.
expected_slot = current.get('expected_slot')
if (current.get('final_slot') != expected_slot and
candidate.get('final_slot') == expected_slot):
# The current slot is not the expected one,
# and the candidate slot is the expected one.
balance += 1
elif (current.get('final_slot') == expected_slot and
candidate.get('final_slot') != expected_slot):
# The current slot is the expected one,
# but we are proposed to replace it with another one.
# The final balance may still be positive if the distance
# has been drastically increased.
balance -= 1
return balance
[docs]def ensure_better_chunk_qualities(current_chunks, candidates, threshold=1):
"""
Ensure that the set of spare chunks is really an improvement over
the set of current chunks, raise SpareChunkException if it is not.
"""
balance = 0
for current, candidate in zip(current_chunks, candidates.keys()):
balance += compare_chunk_quality(current.quality,
candidates[candidate])
if balance < threshold:
raise exc.SpareChunkException(
"the spare chunks found would not improve the quality "
"(balance=%d, threshold=%d)" % (balance, threshold))
return balance
[docs]def cmp(x, y):
"""cmp function as a workaround for python3"""
return (x > y) - (x < y)
[docs]class Content(object):
# FIXME: no need for container_id since we have account and container name
def __init__(self, conf, container_id, metadata, chunks, storage_method,
account, container_name, blob_client=None,
container_client=None, logger=None):
self.conf = conf
self.container_id = container_id
self.metadata = metadata
self.chunks = ChunksHelper(chunks)
self.storage_method = storage_method
self.logger = logger or get_logger(self.conf)
self.blob_client = (blob_client or BlobClient(conf))
self.container_client = (container_client
or ContainerClient(self.conf,
logger=self.logger))
# FIXME: all these may be properties
self.content_id = self.metadata["id"]
self.path = self.metadata["name"]
self.length = int(self.metadata["length"])
self.version = self.metadata["version"]
self.checksum = self.metadata["hash"]
self.chunk_method = self.metadata["chunk_method"]
self.account = account
self.container_name = container_name
if 'full_path' in self.metadata:
self.full_path = metadata['full_path']
else:
self.full_path = encode_fullpath(
self.account, self.container_name, self.path, self.version,
self.content_id)
@property
def mime_type(self):
return self.metadata["mime_type"]
@mime_type.setter
def mime_type(self, value):
self.metadata["mime_type"] = value
@property
def policy(self):
return self.metadata["policy"]
@policy.setter
def policy(self, value):
self.metadata["policy"] = value
@property
def properties(self):
return self.metadata.get('properties')
@properties.setter
def properties(self, value):
if not isinstance(value, dict):
raise ValueError("'value' must be a dict")
self.metadata['properties'] = value
def _get_spare_chunk(self, chunks_notin, chunks_broken,
max_attempts=3, check_quality=False,
fake_excluded_chunks=None, **kwargs):
notin = ChunksHelper(chunks_notin, False).raw()
broken = ChunksHelper(chunks_broken, False).raw()
if fake_excluded_chunks:
for fake_excluded_chunk in fake_excluded_chunks:
chunk = fake_excluded_chunk.copy()
chunk['hash'] = broken[0]['hash']
chunk['pos'] = broken[0]['pos']
chunk['size'] = broken[0]['size']
broken.append(chunk)
spare_data = {
"notin": notin,
"broken": broken
}
last_exc = None
bal = 0
for attempt in range(max_attempts):
try:
spare_resp = self.container_client.content_spare(
cid=self.container_id, path=self.content_id,
data=spare_data, stgpol=self.policy, **kwargs)
quals = extract_chunk_qualities(
spare_resp.get('properties', {}), raw=True)
if check_quality:
bal = ensure_better_chunk_qualities(chunks_broken, quals)
break
except (exc.ClientException, exc.SpareChunkException) as err:
self.logger.info(
"Failed to find spare chunk (attempt %d/%d): %s",
attempt + 1, max_attempts, err)
last_exc = err
# TODO(FVE): exponential backoff?
else:
if isinstance(last_exc, exc.SpareChunkException):
exc.reraise(exc.SpareChunkException, last_exc)
raise exc.SpareChunkException(
"No spare chunk: %s" % str(last_exc))
url_list = []
for chunk in spare_resp["chunks"]:
url_list.append(chunk["id"])
if check_quality:
self.logger.info("Found %d spare chunks, that will improve "
"metachunk quality by %d", len(url_list), bal)
return url_list, quals
def _add_raw_chunk(self, current_chunk, url, **kwargs):
data = {'type': 'chunk',
'id': url,
'hash': current_chunk.checksum,
'size': current_chunk.size,
'pos': current_chunk.pos,
'content': self.content_id}
self.container_client.container_raw_insert(
data, cid=self.container_id, **kwargs)
def _update_spare_chunk(self, current_chunk, new_url, **kwargs):
old = {'type': 'chunk',
'id': current_chunk.url,
'hash': current_chunk.checksum,
'size': current_chunk.size,
'pos': current_chunk.pos,
'content': self.content_id}
new = {'type': 'chunk',
'id': new_url,
'hash': current_chunk.checksum,
'size': current_chunk.size,
'pos': current_chunk.pos,
'content': self.content_id}
self.container_client.container_raw_update(
[old], [new], cid=self.container_id, **kwargs)
def _generate_sysmeta(self):
sysmeta = dict()
sysmeta['id'] = self.content_id
sysmeta['version'] = self.version
sysmeta['policy'] = self.policy
sysmeta['mime_type'] = self.mime_type
sysmeta['chunk_method'] = self.chunk_method
sysmeta['chunk_size'] = self.metadata['chunk_size']
sysmeta['oio_version'] = OIO_VERSION
sysmeta['full_path'] = self.full_path
sysmeta['content_path'] = self.path
sysmeta['container_id'] = self.container_id
return sysmeta
def _create_object(self, **kwargs):
data = {'chunks': self.chunks.raw(),
'properties': self.properties}
self.container_client.content_create(
cid=self.container_id, path=self.path, content_id=self.content_id,
stgpol=self.policy, size=self.length, checksum=self.checksum,
version=self.version, chunk_method=self.chunk_method,
mime_type=self.mime_type, data=data,
**kwargs)
[docs] def rebuild_chunk(self, chunk_id, allow_same_rawx=False, chunk_pos=None,
allow_frozen_container=False):
raise NotImplementedError()
[docs] def create(self, stream, **kwargs):
raise NotImplementedError()
[docs] def fetch(self):
raise NotImplementedError()
[docs] def delete(self, **kwargs):
self.container_client.content_delete(
cid=self.container_id, path=self.path, **kwargs)
[docs] def move_chunk(self, chunk_id, check_quality=False, dry_run=False,
max_attempts=3, **kwargs):
"""
Move a chunk to another place. Optionally ensure that the
new place is an improvement over the current one.
"""
if isinstance(chunk_id, Chunk):
current_chunk = chunk_id
chunk_id = current_chunk.id
else:
current_chunk = self.chunks.filter(id=chunk_id).one()
if current_chunk is None or current_chunk not in self.chunks:
raise exc.OrphanChunk("Chunk not found in content")
other_chunks = self.chunks.filter(
metapos=current_chunk.metapos).exclude(id=chunk_id).all()
spare_urls, qualities = self._get_spare_chunk(
other_chunks, [current_chunk], check_quality=check_quality,
max_attempts=max_attempts, **kwargs)
if dry_run:
self.logger.info("Dry-run: would copy chunk from %s to %s",
current_chunk.url, spare_urls[0])
else:
self.logger.info("Copying chunk from %s to %s",
current_chunk.url, spare_urls[0])
# TODO(FVE): retry to copy (max_attempts times)
self.blob_client.chunk_copy(
current_chunk.url, spare_urls[0], chunk_id=chunk_id,
fullpath=self.full_path, cid=self.container_id,
path=self.path, version=self.version,
content_id=self.content_id, **kwargs)
self._update_spare_chunk(current_chunk, spare_urls[0])
try:
self.blob_client.chunk_delete(current_chunk.url, **kwargs)
except Exception as err:
self.logger.warn(
"Failed to delete chunk %s: %s", current_chunk.url, err)
current_chunk.url = spare_urls[0]
current_chunk.quality = qualities[current_chunk.url]
return current_chunk.raw()
[docs] def move_linked_chunk(self, chunk_id, from_url):
current_chunk = self.chunks.filter(id=chunk_id).one()
if current_chunk is None:
raise exc.OrphanChunk("Chunk not found in content")
_, to_url = self.blob_client.chunk_link(from_url, None, self.full_path)
self.logger.debug("link chunk %s from %s to %s", chunk_id, from_url,
to_url)
self._update_spare_chunk(current_chunk, to_url)
try:
self.blob_client.chunk_delete(current_chunk.url)
except Exception as err:
self.logger.warn(
"Failed to delete chunk %s: %s", current_chunk.url, err)
current_chunk.url = to_url
return current_chunk.raw()
[docs]class Chunk(object):
def __init__(self, chunk):
self._data = chunk
self._pos = chunk['pos']
d = self.pos.split('.', 1)
if len(d) > 1:
ec = True
self._metapos = int(d[0])
self._subpos = int(d[1])
else:
self._metapos = int(self._pos)
ec = False
self._ec = ec
@property
def ec(self):
return self._ec
@property
def url(self):
return self._data.get('url') or self._data['id']
@url.setter
def url(self, new_url):
self._data["url"] = new_url
self._data["real_url"] = new_url
@property
def pos(self):
return self._pos
@property
def metapos(self):
return self._metapos
@property
def subpos(self):
return self._subpos
@property
def size(self):
return self._data["size"]
@size.setter
def size(self, new_size):
self._data["size"] = new_size
@property
def id(self):
return self.url.split('/')[-1]
@property
def host(self):
return self.url.split('/')[2]
@property
def checksum(self):
return self._data["hash"].upper()
@checksum.setter
def checksum(self, new_checksum):
self._data["hash"] = new_checksum
@property
def data(self):
return self._data
@property
def imperfections(self):
"""
List imperfections of this chunk.
Tell how much the quality of this chunk can be improved.
:returns: a positive number telling how many criteria can be improved
(0 if all criteria are met).
"""
qual = self.quality
imperfections = list()
if qual.get('final_slot') != qual.get('expected_slot'):
imperfections.append('slot %s != %s' % (
qual.get('final_slot'),
qual.get('expected_slot')))
if qual['final_dist'] < qual['expected_dist']:
imperfections.append('dist %d < %d' % (
qual['final_dist'],
qual['expected_dist']))
return imperfections
@property
def quality(self):
"""
Get the "quality" of the chunk, i.e. a dictionary telling how it
matched the request criteria when it has been selected.
"""
return self._data.setdefault('quality',
{'final_dist': 0,
'expected_dist': 1})
@quality.setter
def quality(self, value):
self._data['quality'] = value
[docs] def raw(self):
return self._data
def __str__(self):
return "[Chunk %s (%s)]" % (self.url, self.pos)
def __repr__(self):
return str(self)
def __cmp__(self, other):
if self.metapos != other.metapos:
return cmp(self.metapos, other.metapos)
if not self.ec:
return cmp(self.id, other.id)
return cmp(self.subpos, other.subpos)
def __lt__(self, other):
return self.__cmp__(other) < 0
def __le__(self, other):
return self.__cmp__(other) <= 0
def __eq__(self, other):
return self.__cmp__(other) == 0
def __ne__(self, other):
return self.__cmp__(other) != 0
def __ge__(self, other):
return self.__cmp__(other) >= 0
def __gt__(self, other):
return self.__cmp__(other) > 0
[docs]class ChunksHelper(object):
def __init__(self, chunks, raw_chunk=True):
if raw_chunk:
self.chunks = [Chunk(c) for c in chunks]
else:
self.chunks = chunks
self.chunks.sort()
[docs] def filter(self, id=None, pos=None, metapos=None, subpos=None, host=None,
url=None):
found = []
for c in self.chunks:
if id is not None and c.id != id:
continue
if pos is not None and c.pos != str(pos):
continue
if metapos is not None and c.metapos != metapos:
continue
if subpos is not None and c.subpos != subpos:
continue
if host is not None and c.host != host:
continue
if url is not None and c.url != url:
continue
found.append(c)
return ChunksHelper(found, False)
[docs] def exclude(self, id=None, pos=None, metapos=None, subpos=None, host=None,
url=None):
found = []
for c in self.chunks:
if id is not None and c.id == id:
continue
if pos is not None and c.pos == str(pos):
continue
if metapos is not None and c.metapos == metapos:
continue
if subpos is not None and c.subpos == subpos:
continue
if host is not None and c.host == host:
continue
if url is not None and c.url == url:
continue
found.append(c)
return ChunksHelper(found, False)
[docs] def one(self):
if len(self.chunks) != 1:
return None
return self.chunks[0]
[docs] def all(self):
return self.chunks
[docs] def raw(self):
res = []
for c in self.chunks:
res.append(c.raw())
return res
def __len__(self):
return len(self.chunks)
def __iter__(self):
for c in self.chunks:
yield c
def __getitem__(self, item):
return self.chunks[item]