Source code for oio.api.object_storage

# Copyright (C) 2015-2020 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.


from __future__ import absolute_import
from six import PY2, string_types
from six.moves.urllib_parse import unquote

from io import BytesIO
from functools import partial
import os
import warnings
import time
import random

from oio.common import exceptions as exc
from oio.api.ec import ECWriteHandler
from oio.api.io import MetachunkPreparer, LinkHandler
from oio.api.replication import ReplicatedWriteHandler
from oio.common.utils import cid_from_name, GeneratorIO, monotonic_time, \
    depaginate, set_deadline_from_read_timeout, compute_perfdata_stats
from oio.common.easy_value import float_value, true_value
from oio.common.logger import get_logger
from oio.common.decorators import ensure_headers, ensure_request_id, \
    ensure_request_id2
from oio.common.storage_method import STORAGE_METHODS
from oio.common.constants import OIO_VERSION, HEADER_PREFIX, TIMEOUT_KEYS
from oio.common.decorators import handle_account_not_found, \
    handle_container_not_found, handle_object_not_found, patch_kwargs
from oio.common.storage_functions import _sort_chunks, fetch_stream, \
    fetch_stream_ec
from oio.common.fullpath import encode_fullpath
from oio.common.cache import del_cached_object_metadata


[docs]class ObjectStorageApi(object): """ The Object Storage API. High level API that wraps `AccountClient`, `ContainerClient` and `DirectoryClient` classes. Every method that takes a `kwargs` argument accepts the at least the following keywords: - `headers`: `dict` of extra headers to pass to the proxy - `connection_timeout`: `float` - `read_timeout`: `float` - `write_timeout`: `float` """ EXTRA_KEYWORDS = ('chunk_checksum_algo', 'autocreate', 'chunk_buffer_min', 'chunk_buffer_max', 'cache', 'tls') def __init__(self, namespace, logger=None, perfdata=None, **kwargs): """ Initialize the object storage API. :param namespace: name of the namespace to interract with :type namespace: `str` :keyword connection_timeout: connection timeout towards proxy and rawx services :type connection_timeout: `float` seconds :keyword read_timeout: timeout for rawx responses and data reads from the caller (when uploading), and metadata requests :type read_timeout: `float` seconds :keyword write_timeout: timeout for rawx write requests :type write_timeout: `float` seconds :keyword pool_manager: a pooled connection manager that will be used for all HTTP based APIs (except rawx) :type pool_manager: `urllib3.PoolManager` :keyword chunk_checksum_algo: algorithm to use for chunk checksums. Only 'md5' and `None` are supported at the moment. :keyword autocreate: if set, container will be created automatically. Default value is True. :type autocreate: `bool` :keyword endpoint: network location of the oio-proxy to talk to. :type endpoint: `str` :keyword cache: dict-like object used as a cache for object metadata. """ self.namespace = namespace conf = {"namespace": self.namespace} self.logger = logger or get_logger(conf) self.perfdata = perfdata self._global_kwargs = {tok: float_value(tov, None) for tok, tov in kwargs.items() if tok in TIMEOUT_KEYS} self._global_kwargs['autocreate'] = True if self.perfdata is not None: self._global_kwargs['perfdata'] = self.perfdata for key in self.__class__.EXTRA_KEYWORDS: if key in kwargs: self._global_kwargs[key] = kwargs[key] self.logger.debug("Global API parameters: %s", self._global_kwargs) from oio.account.client import AccountClient from oio.container.client import ContainerClient from oio.directory.client import DirectoryClient # TODO(FVE): we may want to share the PoolManager instance self.directory = DirectoryClient(conf, logger=self.logger, **kwargs) self.container = ContainerClient(conf, logger=self.logger, **kwargs) # In AccountClient, "endpoint" is the account service, not the proxy acct_kwargs = kwargs.copy() acct_kwargs["proxy_endpoint"] = acct_kwargs.pop("endpoint", None) self.account = AccountClient(conf, logger=self.logger, **acct_kwargs) self._blob_client = None self._proxy_client = None @property def blob_client(self): """ A low-level client to rawx services. :rtype: `oio.blob.client.BlobClient` """ if self._blob_client is None: from oio.blob.client import BlobClient connection_pool = self.container.pool_manager self._blob_client = BlobClient( conf={"namespace": self.namespace}, logger=self.logger, connection_pool=connection_pool, perfdata=self.perfdata) return self._blob_client # FIXME(FVE): this method should not exist # This high-level API should use lower-level APIs, # not do request directly to the proxy. @property def proxy_client(self): if self._proxy_client is None: from oio.common.client import ProxyClient conf = self.container.conf pool_manager = self.container.pool_manager self._proxy_client = ProxyClient( conf, pool_manager=pool_manager, no_ns_in_url=True, logger=self.logger) return self._proxy_client
[docs] @patch_kwargs @ensure_headers @ensure_request_id def account_create(self, account, **kwargs): """ Create an account. :param account: name of the account to create :type account: `str` :returns: `True` if the account has been created """ return self.account.account_create(account, **kwargs)
[docs] @handle_account_not_found @patch_kwargs @ensure_headers @ensure_request_id def account_delete(self, account, **kwargs): """ Delete an account. :param account: name of the account to delete :type account: `str` """ self.account.account_delete(account, **kwargs)
[docs] @handle_account_not_found @patch_kwargs @ensure_headers @ensure_request_id def account_show(self, account, **kwargs): """ Get information about an account. """ return self.account.account_show(account, **kwargs)
[docs] @patch_kwargs @ensure_headers @ensure_request_id def account_list(self, **kwargs): """ List known accounts. Notice that account creation is asynchronous, and an autocreated account may appear in the listing only after several seconds. """ return self.account.account_list(**kwargs)
[docs] @handle_account_not_found @patch_kwargs @ensure_headers @ensure_request_id def account_get_properties(self, account, **kwargs): """ Get information about an account, including account properties. """ res = self.account.account_show(account, **kwargs) # Deal with the previous protocol which # returned 'metadata' instead of 'properties'. props = res.pop('metadata', dict()) res.setdefault('properties', dict()).update(props) return res
[docs] @handle_account_not_found @patch_kwargs @ensure_headers @ensure_request_id def account_set_properties(self, account, properties, **kwargs): self.account.account_update(account, properties, None, **kwargs)
[docs] @handle_account_not_found @patch_kwargs @ensure_headers @ensure_request_id def account_del_properties(self, account, properties, **kwargs): """ Delete some properties from the specified account. """ self.account.account_update(account, None, [k for k in properties], **kwargs)
[docs] @patch_kwargs @ensure_headers @ensure_request_id def resolve_cid(self, cid, **kwargs): """Resolve a CID into account and container names.""" md = self.directory.show(cid=cid, **kwargs) return md.get('account'), md.get('name')
[docs] @patch_kwargs @ensure_headers @ensure_request_id def container_create(self, account, container, properties=None, **kwargs): """ Create a container. :param account: account in which to create the container :type account: `str` :param container: name of the container :type container: `str` :param properties: properties to set on the container :type properties: `dict` :returns: True if the container has been created, False if it already exists """ return self.container.container_create( account, container, properties=properties, **kwargs)
def _recompute_missing_chunks(self, account, container, **kwargs): damaged_objects = 0 missing_chunks = 0 for obj_meta in depaginate( self.object_list, listing_key=lambda x: x['objects'], marker_key=lambda x: x.get('next_marker'), truncated_key=lambda x: x['truncated'], account=account, container=container, properties=True, versions=True, **kwargs): try: obj_meta, chunks = self.object_locate( account, container, obj_meta['name'], version=obj_meta['version'], **kwargs) stg_met = STORAGE_METHODS.load(obj_meta['chunk_method']) chunks_by_pos = _sort_chunks(chunks, stg_met.ec, logger=self.logger) expected_chunks = len(chunks_by_pos) * stg_met.expected_chunks diff = expected_chunks - len(chunks) if diff > 0: damaged_objects += 1 missing_chunks += diff except exc.NoSuchObject: pass return damaged_objects, missing_chunks
[docs] @handle_container_not_found @patch_kwargs @ensure_headers @ensure_request_id def container_touch(self, account, container, recompute=False, **kwargs): """ Trigger a notification about the container state. :param account: account from which to delete the container :type account: `str` :param container: name of the container :type container: `str` """ damaged_objects = None missing_chunks = None if (recompute): damaged_objects, missing_chunks = self._recompute_missing_chunks( account, container, **kwargs) self.container.container_touch( account, container, recompute=recompute, damaged_objects=damaged_objects, missing_chunks=missing_chunks, **kwargs)
[docs] @patch_kwargs @ensure_headers @ensure_request_id def container_create_many(self, account, containers, properties=None, **kwargs): """ Create Many containers :param account: account in which to create the containers :type account: `str` :param containers: names of the containers :type containers: `list` :param properties: properties to set on the containers :type properties: `dict` """ return self.container.container_create_many( account, containers, properties=properties, **kwargs)
[docs] @handle_container_not_found @patch_kwargs @ensure_headers @ensure_request_id def container_delete(self, account, container, **kwargs): """ Delete a container. :param account: account from which to delete the container :type account: `str` :param container: name of the container :type container: `str` """ self.container.container_delete(account, container, **kwargs)
[docs] @handle_container_not_found @patch_kwargs @ensure_headers @ensure_request_id def container_flush(self, account, container, fast=False, **kwargs): """ Flush a container :param account: account from which to delete the container :type account: `str` :param container: name of the container :type container: `str` :param fast: flush container quickly, may put high pressure on the event system :type fast: `bool` """ if fast: truncated = True while truncated: resp = self.container.container_flush(account, container, **kwargs) truncated = resp['truncated'] return while True: # No need to keep a marker: we are deleting objects resp = self.object_list(account, container, **kwargs) if not resp['objects']: break objects = [obj['name'] for obj in resp['objects']] deleted = self.object_delete_many( account, container, objects, **kwargs) if not any(x[1] for x in deleted): raise exc.OioException( 'None of the %d objects could be deleted' % len(deleted))
[docs] @handle_account_not_found @patch_kwargs @ensure_headers @ensure_request_id def container_list(self, account, limit=None, marker=None, end_marker=None, prefix=None, delimiter=None, s3_buckets_only=False, **kwargs): """ Get the list of containers of an account. :param account: account from which to get the container list :type account: `str` :keyword limit: maximum number of results to return :type limit: `int` :keyword marker: name of the container from where to start the listing :type marker: `str` :keyword end_marker: :keyword prefix: :keyword delimiter: :keyword s3_buckets_only: list only S3 buckets. :type s3_buckets_only: `bool` :type marker: `bool` :return: the list of containers of an account :rtype: `list` of items (`list`) with 5 fields: name, number of objects, number of bytes, 1 if the item is a prefix or 0 if the item is actually a container, and modification time. """ resp = self.account.container_list(account, limit=limit, marker=marker, end_marker=end_marker, prefix=prefix, delimiter=delimiter, s3_buckets_only=s3_buckets_only, **kwargs) return resp["listing"]
[docs] @handle_container_not_found @patch_kwargs @ensure_headers @ensure_request_id def container_show(self, account, container, **kwargs): """ Get information about a container (user properties). :param account: account in which the container is :type account: `str` :param container: name of the container :type container: `str` :returns: a `dict` with "properties" containing a `dict` of user properties. """ return self.container.container_show(account, container, **kwargs)
[docs] @handle_container_not_found @patch_kwargs @ensure_headers @ensure_request_id def container_snapshot(self, account, container, dst_account, dst_container, batch_size=100, **kwargs): """ Take a snapshot of a container. Create a separate database containing all information about the contents from the original database, but with copies of the chunks at the time of the snapshot. This new database is frozen (you cannot write into it). Pay attention to the fact that the source container is frozen during the snapshot capture. The capture may take some time, depending on the number of objects hosted by the container. You should consider setting a long read_timeout on the request. :param account: account in which the source container is. :type account: `str` :param container: name of the source container. :type container: `str` :param dst_account: account in which the snapshot will be created. :type dst_account: `str` :param dst_container: name of the new container (i.e. the snapshot). :type dst_container: `str` :keyword batch_size: number of chunks to copy at a time. """ try: self.container.container_freeze(account, container, **kwargs) self.container.container_snapshot( account, container, dst_account, dst_container, **kwargs) obj_gen = depaginate( self.object_list, listing_key=lambda x: x['objects'], marker_key=lambda x: x.get('next_marker'), truncated_key=lambda x: x['truncated'], account=dst_account, container=dst_container, properties=False, versions=True, **kwargs) target_beans = [] copy_beans = [] for obj in obj_gen: _, chunks = self.object_locate( account, container, obj["name"], version=obj['version'], **kwargs) fullpath = encode_fullpath( dst_account, dst_container, obj['name'], obj['version'], obj['content']) storage_method = STORAGE_METHODS.load(obj['chunk_method']) chunks_by_pos = _sort_chunks(chunks, storage_method.ec, logger=self.logger) handler = LinkHandler( fullpath, chunks_by_pos, storage_method, self.blob_client, **kwargs) try: chunks_copies = handler.link() except exc.UnfinishedUploadException as ex: self.logger.warn( 'Failed to upload all data (%s), deleting chunks', ex.exception) self._delete_orphan_chunks( ex.chunks_already_uploaded, obj['container_id'], **kwargs) ex.reraise() t_beans, c_beans = self._prepare_meta2_raw_update( chunks, chunks_copies, obj['content']) target_beans.extend(t_beans) copy_beans.extend(c_beans) if len(target_beans) > batch_size: self.container.container_raw_update( target_beans, copy_beans, dst_account, dst_container, frozen=True, **kwargs) target_beans = [] copy_beans = [] if target_beans: self.container.container_raw_update( target_beans, copy_beans, dst_account, dst_container, frozen=True, **kwargs) self.container.container_touch(dst_account, dst_container) finally: self.container.container_enable(account, container, **kwargs)
[docs] @handle_container_not_found @patch_kwargs @ensure_headers @ensure_request_id def container_get_properties(self, account, container, properties=None, **kwargs): """ Get information about a container (user and system properties). :param account: account in which the container is :type account: `str` :param container: name of the container :type container: `str` :param properties: *ignored* :returns: a `dict` with "properties" and "system" entries, containing respectively a `dict` of user properties and a `dict` of system properties. """ return self.container.container_get_properties(account, container, properties=properties, **kwargs)
[docs] @handle_container_not_found @patch_kwargs @ensure_headers @ensure_request_id def container_set_properties(self, account, container, properties=None, clear=False, **kwargs): """ Set properties on a container. :param account: name of the account :type account: `str` :param container: name of the container where to set properties :type container: `str` :param properties: a dictionary of properties :type properties: `dict` :param clear: :type clear: `bool` :keyword system: dictionary of system properties to set """ return self.container.container_set_properties( account, container, properties, clear=clear, **kwargs)
[docs] @handle_container_not_found @patch_kwargs @ensure_headers @ensure_request_id def container_del_properties(self, account, container, properties, **kwargs): """ Delete properties of a container. :param account: name of the account :type account: `str` :param container: name of the container to deal with :type container: `str` :param properties: a list of property keys :type properties: `list` """ return self.container.container_del_properties( account, container, properties, **kwargs)
def _delete_exceeding_versions(self, account, container, obj, versions, maxvers, **kwargs): if not versions: return exceeding_versions = versions[maxvers:] for version in exceeding_versions: self.object_delete(account, container, obj, version=version, **kwargs)
[docs] @handle_container_not_found @patch_kwargs @ensure_headers @ensure_request_id def container_purge(self, account, container, maxvers=None, **kwargs): if maxvers is None: props = self.container_get_properties(account, container, **kwargs) maxvers = props['system'].get('sys.m2.policy.version', None) if maxvers is None: _, data = self.proxy_client._request('GET', "config") maxvers = data['meta2.max_versions'] maxvers = int(maxvers) if maxvers < 0: return elif maxvers == 0: maxvers = 1 truncated = True marker = None last_object_name = None versions = None while truncated: resp = self.object_list(account, container, versions=True, marker=marker, **kwargs) # FIXME `next_marker` is an object name. # `object_list` can send twice the same version # if there weren't all versions for the last object. last_object_name = None truncated = resp['truncated'] marker = resp.get('next_marker', None) for obj in resp['objects']: if obj['name'] != last_object_name: self._delete_exceeding_versions( account, container, last_object_name, versions, maxvers, **kwargs) last_object_name = obj['name'] versions = list() if not obj['deleted']: versions.append(obj['version']) self._delete_exceeding_versions( account, container, last_object_name, versions, maxvers, **kwargs)
[docs] def container_update(self, account, container, metadata, clear=False, **kwargs): """ :deprecated: use `container_set_properties` """ warnings.warn("You'd better use container_set_properties()", DeprecationWarning) if not metadata: self.container_del_properties( account, container, [], **kwargs) else: self.container_set_properties( account, container, metadata, clear, **kwargs)
[docs] def object_create(self, account, container, *args, **kwargs): """ See documentation of object_create_ext for parameters :returns: `list` of chunks, size and hash of what has been uploaded """ ul_chunks, ul_bytes, obj_checksum, _ = \ self.object_create_ext(account, container, *args, **kwargs) return ul_chunks, ul_bytes, obj_checksum
[docs] @handle_container_not_found @patch_kwargs @ensure_headers @ensure_request_id2(prefix="create-") def object_create_ext(self, account, container, file_or_path=None, data=None, etag=None, obj_name=None, mime_type=None, metadata=None, policy=None, key_file=None, append=False, properties=None, **kwargs): """ Create an object or append data to object in *container* of *account* with data taken from either *data* (`str` or `generator`) or *file_or_path* (path to a file or file-like object). The object will be named after *obj_name* if specified, or after the base name of *file_or_path*. :param account: name of the account where to create the object :type account: `str` :param container: name of the container where to create the object :type container: `str` :param file_or_path: file-like object or path to a file from which to read object data :type file_or_path: `str` or file-like object :param data: object data (if `file_or_path` is not set) :type data: `str` or `generator` :keyword etag: entity tag of the object :type etag: `str` :keyword obj_name: name of the object to create. If not set, will use the base name of `file_or_path`. :keyword mime_type: MIME type of the object :type mime_type: `str` :keyword properties: a dictionary of properties :type properties: `dict` :keyword policy: name of the storage policy :type policy: `str` :keyword key_file: :param append: if set, data will be append to existing object (or object will be created if unset) :type append: `bool` :keyword autocreate: if set to false, autocreation of container will be disabled :type autocreate: `bool` :keyword perfdata: optional `dict` that will be filled with metrics of time spent to resolve the meta2 address, to do the meta2 requests, and to upload chunks to rawx services. :keyword deadline: deadline for the request, in monotonic time (`oio.common.utils.monotonic_time`). This supersedes `timeout` or `read_timeout` keyword arguments. :type deadline: `float` seconds :keyword container_properties: when containers are automatically created, this keyword allows to set user and system properties. :type container_properties: `dict` :keyword tls: if set, it will try to use TLS port exposed by rawx :type tls: `bool` :returns: `list` of chunks, size, hash and metadata of what has been uploaded """ if (data, file_or_path) == (None, None): raise exc.MissingData() src = data if data is not None else file_or_path if src is file_or_path: # We are asked to read from a file path or a file-like object if isinstance(file_or_path, string_types): if not os.path.exists(file_or_path): raise exc.FileNotFound("File '%s' not found." % file_or_path) file_name = os.path.basename(file_or_path) else: try: file_name = os.path.basename(file_or_path.name) except AttributeError: file_name = None obj_name = obj_name or file_name else: # We are asked to read from a buffer or an iterator if isinstance(src, string_types): try: src = src.encode('utf-8') except UnicodeDecodeError: # src is already encoded pass try: src = BytesIO(src) except TypeError: src = GeneratorIO(src, sub_generator=PY2) if not obj_name: raise exc.MissingName( "No name for the object has been specified" ) sysmeta = {'mime_type': mime_type, 'etag': etag} if metadata: warnings.warn( "You'd better use 'properties' instead of 'metadata'", DeprecationWarning, stacklevel=4) if not properties: properties = metadata else: properties.update(metadata) if isinstance(src, BytesIO) or hasattr(src, 'read'): return self._object_create( account, container, obj_name, src, sysmeta, properties=properties, policy=policy, key_file=key_file, append=append, **kwargs) else: with open(src, 'rb') as srcf: return self._object_create( account, container, obj_name, srcf, sysmeta, properties=properties, policy=policy, key_file=key_file, append=append, **kwargs)
[docs] @handle_object_not_found @patch_kwargs @ensure_headers @ensure_request_id def object_change_policy(self, account, container, obj, policy, **kwargs): """ Change the storage policy of an object :param account: name of the account where to change the policy of the object :type account: `str` :param container: name of the container where to change the policy of the object :type container: `str` :param obj: name of the object to change the policy :type obj: `str` :param policy: name of the new storage policy :type policy: `str` :returns: `list` of chunks, size, hash and metadata of object """ meta, stream = self.object_fetch( account, container, obj, **kwargs) kwargs['version'] = meta['version'] return self.object_create_ext( account, container, obj_name=meta['name'], data=stream, policy=policy, change_policy=True, **kwargs)
[docs] @patch_kwargs @ensure_headers @ensure_request_id def object_touch(self, account, container, obj, version=None, **kwargs): """ Trigger a notification about an object (as if it just had been created). :param account: name of the account where to touch the object :type account: `str` :param container: name of the container where to touch the object :type container: `str` :param obj: name of the object to touch :type obj: `str` """ self.container.content_touch(account, container, obj, version=version, **kwargs)
[docs] @patch_kwargs @ensure_headers @ensure_request_id def object_drain(self, account, container, obj, version=None, **kwargs): """ Remove all the chunks of a content, but keep all the metadata. :param account: name of the account where the object is present :type account: `str` :param container: name of the container where the object is present :type container: `str` :param obj: name of the object to drain """ self.container.content_drain(account, container, obj, version=version, **kwargs)
[docs] @handle_object_not_found @patch_kwargs @ensure_headers @ensure_request_id def object_delete(self, account, container, obj, version=None, **kwargs): """ Delete an object from a container. If versioning is enabled and no version is specified, the object will be marked as deleted but not actually deleted. :param account: name of the account the object belongs to :type account: `str` :param container: name of the container the object belongs to :type container: `str` :param obj: name of the object to delete :param version: version of the object to delete :returns: True on success """ return self.container.content_delete(account, container, obj, version=version, **kwargs)
[docs] @patch_kwargs @ensure_headers @ensure_request_id def object_delete_many(self, account, container, objs, **kwargs): """ Delete several objects. :param objs: an iterable of object names (should not be a generator) :returns: a list of tuples with the name of the object and a boolean telling if the object has been successfully deleted :rtype: `list` of `tuple` """ return self.container.content_delete_many( account, container, objs, **kwargs)
[docs] @handle_object_not_found @patch_kwargs @ensure_headers @ensure_request_id def object_truncate(self, account, container, obj, version=None, size=None, **kwargs): """ Truncate object at specified size. Only shrink is supported. A download may occur if size is not on chunk boundaries. :param account: name of the account in which the object is stored :param container: name of the container in which the object is stored :param obj: name of the object to query :param version: version of the object to query :param size: new size of object """ # code copied from object_fetch (should be factorized !) meta, raw_chunks = self.object_locate( account, container, obj, version=version, properties=False, **kwargs) chunk_method = meta['chunk_method'] storage_method = STORAGE_METHODS.load(chunk_method) chunks = _sort_chunks(raw_chunks, storage_method.ec, logger=self.logger) for pos in sorted(chunks.keys()): chunk = chunks[pos][0] if (size >= chunk['offset'] and size <= chunk['offset'] + chunk['size']): break else: raise exc.OioException("No chunk found at position %d" % size) if chunk['offset'] != size: # retrieve partial chunk ret = self.object_fetch(account, container, obj, version=version, ranges=[(chunk['offset'], size-1)]) # TODO implement a proper object_update pos = int(chunk['pos'].split('.')[0]) self.object_create(account, container, obj_name=obj, data=ret[1], meta_pos=pos, content_id=meta['id']) return self.container.content_truncate(account, container, obj, version=version, size=size, **kwargs)
[docs] @handle_container_not_found @patch_kwargs @ensure_headers @ensure_request_id def object_list(self, account, container, limit=None, marker=None, delimiter=None, prefix=None, end_marker=None, properties=False, versions=False, deleted=False, **kwargs): """ Lists objects inside a container. :param properties: if True, list object properties along with objects :param versions: if True, list all versions of objects :param deleted: if True, list also the deleted objects :returns: a dict which contains * 'objects': the `list` of object descriptions * 'prefixes': common prefixes (only if delimiter and prefix are set) * 'properties': a `dict` of container properties * 'system': a `dict` of system metadata * 'truncated': a `bool` telling if the listing was truncated * 'next_marker': a `str` to be used as `marker` to get the next page of results (in case the listing was truncated) """ hdrs, resp_body = self.container.content_list( account, container, limit=limit, marker=marker, end_marker=end_marker, prefix=prefix, delimiter=delimiter, properties=properties, versions=versions, deleted=deleted, **kwargs) for obj in resp_body['objects']: try: obj['chunk_method'] = obj['chunk-method'] del obj['chunk-method'] except KeyError: obj['chunk_method'] = None try: obj['mime_type'] = obj['mime-type'] del obj['mime-type'] except KeyError: obj['mime_type'] = None resp_body['truncated'] = true_value( hdrs.get(HEADER_PREFIX + 'list-truncated')) marker_header = HEADER_PREFIX + 'list-marker' if marker_header in hdrs: resp_body['next_marker'] = unquote(hdrs.get(marker_header)) return resp_body
[docs] @handle_object_not_found @patch_kwargs @ensure_headers @ensure_request_id def object_locate(self, account, container, obj, version=None, chunk_info=False, properties=True, **kwargs): """ Get a description of the object along with the list of its chunks. :param account: name of the account in which the object is stored :param container: name of the container in which the object is stored :param obj: name of the object to query :param version: version of the object to query :param chunk_info: if True, fetch additional information about chunks from rawx services (slow). The second element of the returned tuple becomes a generator (instead of a list). :param properties: should the request return object properties along with content description :type properties: `bool` :returns: a tuple with object metadata `dict` as first element and chunk `list` as second element """ obj_meta, chunks = self.container.content_locate( account, container, obj, properties=properties, version=version, **kwargs) # FIXME(FVE): converting to float does not sort properly # the chunks of the same metachunk def _fetch_ext_info(chunks_): for chunk in sorted(chunks_, key=lambda x: float(x['pos'])): try: ext_info = self.blob_client.chunk_head( chunk['url'], **kwargs) for key in ('chunk_size', 'chunk_hash', 'full_path'): chunk[key] = ext_info.get(key) except exc.OioException as err: chunk['error'] = str(err) yield chunk if not chunk_info: return obj_meta, chunks return obj_meta, _fetch_ext_info(chunks)
[docs] def object_analyze(self, *args, **kwargs): """ :deprecated: use `object_locate` """ warnings.warn("You'd better use object_locate()", DeprecationWarning) return self.object_locate(*args, **kwargs)
[docs] def object_fastcopy(self, *args, **kwargs): """ :deprecated: use `object_link`. """ warnings.warn("You'd better use object_link()", DeprecationWarning) return self.object_link(*args, **kwargs)
@staticmethod def _cache_wrapper(account, container, obj, version, stream, **kwargs): try: for dat in stream: yield dat except exc.UnrecoverableContent: # The cache may no longer be valid del_cached_object_metadata( account=account, reference=container, path=obj, version=version, **kwargs) raise @staticmethod def _ttfb_wrapper(stream, req_start, download_start, perfdata): """Keep track of time-to-first-byte and time-to-last-byte""" perfdata_rawx = perfdata.setdefault('rawx', dict()) size = 0 for dat in stream: if 'ttfb' not in perfdata: perfdata['ttfb'] = monotonic_time() - req_start yield dat size += len(dat) req_end = monotonic_time() perfdata['ttlb'] = req_end - req_start perfdata_rawx['overall'] = perfdata_rawx.get('overall', 0.0) \ + req_end - download_start perfdata['data_size'] = size compute_perfdata_stats(perfdata, 'connect.') compute_perfdata_stats(perfdata, 'download.') def _object_fetch_impl(self, account, container, obj, version=None, ranges=None, key_file=None, **kwargs): """ Actual implementation of object fetch logic. """ perfdata = kwargs.get('perfdata', None) if perfdata is not None: req_start = monotonic_time() # Check cid format cid_arg = kwargs.get('cid') if cid_arg is not None: cid_arg = cid_arg.upper() cid_seq = cid_arg.split('.', 1) try: int(cid_seq[0], 16) if len(cid_seq) > 1: int(cid_seq[1], 10) except ValueError: raise exc.OioException("Invalid cid: " + cid_arg) meta, raw_chunks = self.object_locate( account, container, obj, version=version, **kwargs) chunk_method = meta['chunk_method'] storage_method = STORAGE_METHODS.load(chunk_method) chunks = _sort_chunks(raw_chunks, storage_method.ec, logger=self.logger) meta['container_id'] = ( cid_arg or cid_from_name(account, container).upper()) meta['ns'] = self.namespace kwargs['logger'] = self.logger if perfdata is not None: download_start = monotonic_time() if storage_method.ec: stream = fetch_stream_ec(chunks, ranges, storage_method, **kwargs) elif storage_method.backblaze: stream = self._fetch_stream_backblaze(meta, chunks, ranges, storage_method, key_file, **kwargs) else: stream = fetch_stream(chunks, ranges, storage_method, **kwargs) if perfdata is not None: stream = self._ttfb_wrapper( stream, req_start, download_start, perfdata) if kwargs.get('cache'): stream = self._cache_wrapper( account, container, obj, version, stream, **kwargs) return meta, stream
[docs] @handle_object_not_found @patch_kwargs @ensure_headers @ensure_request_id def object_fetch(self, account, container, obj, version=None, ranges=None, key_file=None, **kwargs): """ Download an object. :param account: name of the account in which the object is stored :param container: name of the container in which the object is stored :param obj: name of the object to fetch :param version: version of the object to fetch :type version: `str` :param ranges: a list of object ranges to download :type ranges: `list` of `tuple` :param key_file: path to the file containing credentials :keyword properties: should the request return object properties along with content description (True by default) :type properties: `bool` :keyword perfdata: optional `dict` that will be filled with metrics of time spent to resolve the meta2 address, to do the meta2 request, and the time-to-first-byte, as seen by this API. :returns: a dictionary of object metadata and a stream of object data :rtype: tuple """ # Fetch object metadata (possibly from cache) and object stream. meta, stream = self._object_fetch_impl( account, container, obj, version=version, ranges=ranges, key_file=key_file, **kwargs) def _data_error_wrapper(buggy_stream): blocks = 0 try: for dat in buggy_stream: yield dat blocks += 1 except exc.UnrecoverableContent: # Maybe we got this error because the cached object # metadata was stale. cache = kwargs.pop('cache', None) if cache is None: # No cache configured: nothing more to do. raise elif blocks >= 1: # The first blocks of data were already sent to the # caller, we cannot start again. raise # Retry the request without reading from the cache. new_meta, new_stream = self._object_fetch_impl( account, container, obj, version=version, ranges=ranges, key_file=key_file, cache=None, **kwargs) # Hack the metadata dictionary which has already been # returned to the caller. meta.update(new_meta) # Send data from the new stream. for dat in new_stream: yield dat return meta, _data_error_wrapper(stream)
[docs] @handle_object_not_found @patch_kwargs @ensure_headers @ensure_request_id def object_get_properties(self, account, container, obj, **kwargs): """ Get the description of an object along with its user properties. :param account: name of the account in which the object is stored :param container: name of the container in which the object is stored :param obj: name of the object to query :returns: a `dict` describing the object .. py:data:: example {'hash': '6BF60C17CC15EEA108024903B481738F', 'ctime': '1481031763', 'deleted': 'False', 'properties': { u'projet': u'OpenIO-SDS'}, 'length': '43518', 'hash_method': 'md5', 'chunk_method': 'ec/algo=liberasurecode_rs_vand,k=6,m=3', 'version': '1481031762951972', 'policy': 'EC', 'id': '20BF2194FD420500CD4729AE0B5CBC07', 'mime_type': 'application/octet-stream', 'name': 'Makefile'} """ return self.container.content_get_properties( account, container, obj, **kwargs)
[docs] @handle_object_not_found @patch_kwargs @ensure_headers @ensure_request_id def object_show(self, account, container, obj, version=None, **kwargs): """ Get the description of an object along with the dictionary of user-set properties. :deprecated: prefer using `object_get_properties`, for consistency with `container_get_properties`. """ # stacklevel=5 because we are using 3 decorators warnings.warn("You'd better use object_get_properties()", DeprecationWarning, stacklevel=5) return self.container.content_get_properties( account, container, obj, version=version, **kwargs)
[docs] def object_update(self, account, container, obj, metadata, version=None, clear=False, **kwargs): """ :deprecated: use `object_set_properties` """ warnings.warn("You'd better use object_set_properties()", DeprecationWarning, stacklevel=2) if clear and not metadata: self.object_del_properties( account, container, obj, [], version=version, **kwargs) return self.object_set_properties( account, container, obj, metadata, version=version, clear=clear, **kwargs)
[docs] @handle_object_not_found @patch_kwargs @ensure_headers @ensure_request_id def object_set_properties(self, account, container, obj, properties, version=None, clear=False, **kwargs): """ Set properties on an object. :param account: name of the account in which the object is stored :param container: name of the container in which the object is stored :param obj: name of the object to query :param properties: dictionary of properties """ return self.container.content_set_properties( account, container, obj, version=version, properties={'properties': properties}, clear=clear, **kwargs)
[docs] @handle_object_not_found @patch_kwargs @ensure_headers @ensure_request_id def object_del_properties(self, account, container, obj, properties, version=None, **kwargs): """ Delete some properties from an object. :param properties: list of property keys to delete :type properties: `list` :returns: True if the property has been deleted (or was missing) """ return self.container.content_del_properties( account, container, obj, properties=properties, version=version, **kwargs)
def _object_upload(self, ul_handler, **kwargs): """Upload data to rawx, measure time it takes.""" perfdata = kwargs.get('perfdata', None) if perfdata is not None: upload_start = monotonic_time() ul_chunks, ul_bytes, obj_checksum = ul_handler.stream() if perfdata is not None: upload_end = monotonic_time() perfdata_rawx = perfdata.setdefault('rawx', dict()) perfdata_rawx['overall'] = perfdata_rawx.get('overall', 0.0) \ + upload_end - upload_start perfdata['data_size'] = ul_bytes compute_perfdata_stats(perfdata, 'connect.') compute_perfdata_stats(perfdata, 'upload.') return ul_chunks, ul_bytes, obj_checksum def _object_prepare(self, account, container, obj_name, source, sysmeta, policy=None, key_file=None, link=False, **kwargs): """Call content/prepare, initialize chunk uploaders.""" chunk_prep = MetachunkPreparer( self.container, account, container, obj_name, policy=policy, **kwargs) obj_meta = chunk_prep.obj_meta obj_meta.update(sysmeta) obj_meta['content_path'] = obj_name obj_meta['container_id'] = cid_from_name(account, container).upper() obj_meta['ns'] = self.namespace obj_meta['full_path'] = encode_fullpath( account, container, obj_name, obj_meta['version'], obj_meta['id']) obj_meta['oio_version'] = (obj_meta.get('oio_version') or OIO_VERSION) storage_method = STORAGE_METHODS.load(obj_meta['chunk_method']) if link: handler = LinkHandler( obj_meta['full_path'], None, storage_method, self.blob_client, **kwargs) return obj_meta, handler, None if storage_method.ec: write_handler_cls = ECWriteHandler elif storage_method.backblaze: from oio.api.backblaze import BackblazeWriteHandler from oio.api.backblaze_http import BackblazeUtils backblaze_info = BackblazeUtils.get_credentials(storage_method, key_file) write_handler_cls = partial(BackblazeWriteHandler, backblaze_info=backblaze_info) else: write_handler_cls = ReplicatedWriteHandler kwargs['logger'] = self.logger handler = write_handler_cls( source, obj_meta, chunk_prep, storage_method, **kwargs) return obj_meta, handler, chunk_prep def _object_create(self, account, container, obj_name, source, sysmeta, properties=None, policy=None, key_file=None, **kwargs): obj_meta, ul_handler, chunk_prep = self._object_prepare( account, container, obj_name, source, sysmeta, policy=policy, key_file=key_file, **kwargs) # XXX content_id and version are necessary to update an existing object kwargs['content_id'] = obj_meta['id'] kwargs['version'] = obj_meta['version'] try: ul_chunks, ul_bytes, obj_checksum = self._object_upload( ul_handler, **kwargs) except exc.OioException as ex: self.logger.warn( 'Failed to upload all data (%s), deleting chunks', ex) self._delete_orphan_chunks( chunk_prep.all_chunks_so_far(), obj_meta['container_id'], **kwargs) raise etag = obj_meta.get('etag') if etag and etag.lower() != obj_checksum.lower(): raise exc.EtagMismatch( "given etag %s != computed %s" % (etag, obj_checksum)) obj_meta['etag'] = obj_checksum # obj_meta['properties'] contains special properties # describing the quality of selected chunks. if properties: obj_meta['properties'].update(properties) # If we are here, we know that the metadata server is fine # (it provided us with chunk addresses) and the client is still # listening (he just uploaded all data). It seems a good idea to # postpone the deadline. set_deadline_from_read_timeout(kwargs, force=True) data = {'chunks': ul_chunks, 'properties': obj_meta['properties'], 'container_properties': kwargs.get('container_properties')} try: # FIXME: we may just pass **obj_meta self.container.content_create( account, container, obj_name, size=ul_bytes, checksum=obj_checksum, data=data, stgpol=obj_meta['policy'], mime_type=obj_meta['mime_type'], chunk_method=obj_meta['chunk_method'], **kwargs) except (exc.Conflict, exc.DeadlineReached) as ex: self.logger.warn( 'Failed to commit to meta2 (%s), deleting chunks', ex) self._delete_orphan_chunks(ul_chunks, obj_meta['container_id'], **kwargs) raise return ul_chunks, ul_bytes, obj_checksum, obj_meta def _delete_orphan_chunks(self, chunks, cid, **kwargs): """Delete chunks that have been orphaned by an unfinished upload.""" # FIXME(FVE): won't work with BackBlaze del_resps = self.blob_client.chunk_delete_many(chunks, cid, **kwargs) for resp in del_resps: if isinstance(resp, Exception): self.logger.warn('failed to delete chunk %s (%s)', resp.chunk.get('real_url', resp.chunk['url']), resp) elif resp.status not in (204, 404): self.logger.warn('failed to delete chunk %s (HTTP %s)', resp.chunk.get('real_url', resp.chunk['url']), resp.status) def _fetch_stream_backblaze(self, meta, chunks, ranges, storage_method, key_file, **kwargs): from oio.api.backblaze import BackblazeChunkDownloadHandler from oio.api.backblaze_http import BackblazeUtils backblaze_info = BackblazeUtils.get_credentials(storage_method, key_file) total_bytes = 0 current_offset = 0 size = None offset = 0 for pos in range(len(chunks)): if ranges: offset = ranges[pos][0] size = ranges[pos][1] if size is None: size = int(meta["length"]) chunk_size = int(chunks[pos][0]["size"]) if total_bytes >= size: break if current_offset + chunk_size > offset: if current_offset < offset: _offset = offset - current_offset else: _offset = 0 if chunk_size + total_bytes > size: _size = size - total_bytes else: _size = chunk_size handler = BackblazeChunkDownloadHandler( meta, chunks[pos], _offset, _size, backblaze_info=backblaze_info) stream = handler.get_stream() if not stream: raise exc.OioException("Error while downloading") total_bytes += len(stream) yield stream current_offset += chunk_size
[docs] @handle_container_not_found @patch_kwargs @ensure_headers @ensure_request_id def container_refresh(self, account, container, attempts=3, **kwargs): """ Reset statistics of the specified container, and trigger an event that will update them. If the container does not exist, remove it from account. """ for i in range(attempts): try: self.account.container_reset(account, container, time.time(), **kwargs) break except exc.Conflict: if i >= attempts - 1: raise try: self.container.container_touch(account, container, **kwargs) except exc.ClientException as err: if err.status != 406 and err.status != 431: raise # CODE_USER_NOTFOUND or CODE_CONTAINER_NOTFOUND metadata = dict() metadata["dtime"] = time.time() self.account.container_update(account, container, metadata, **kwargs)
[docs] @handle_account_not_found @patch_kwargs @ensure_headers @ensure_request_id def account_refresh(self, account=None, **kwargs): """ Refresh counters of an account. :param account: name of the account to refresh, or None to refresh all accounts (slow) :type account: `str` """ if account is None: accounts = self.account_list(**kwargs) for account in accounts: try: self.account_refresh(account, **kwargs) except exc.NoSuchAccount: # account remove in the meantime pass return self.account.account_refresh(account, **kwargs) containers = self.container_list(account, **kwargs) for container in containers: try: self.container_refresh(account, container[0], **kwargs) except exc.NoSuchContainer: # container remove in the meantime pass while containers: marker = containers[-1][0] containers = self.container_list(account, marker=marker, **kwargs) if containers: for container in containers: try: self.container_refresh(account, container[0], **kwargs) except exc.NoSuchContainer: # container remove in the meantime pass
[docs] def all_accounts_refresh(self, **kwargs): """ :deprecated: call `account_refresh(None)` instead """ warnings.warn("You'd better use account_refresh(None)", DeprecationWarning, stacklevel=2) return self.account_refresh(None, **kwargs)
[docs] @handle_account_not_found @patch_kwargs @ensure_headers @ensure_request_id def account_flush(self, account, **kwargs): """ Flush all containers of an account :param account: name of the account to flush :type account: `str` """ self.account.account_flush(account, **kwargs)
def _random_buffer(self, dictionary, num_chars): """ :rtype: `str` :returns: `num_chars` randomly taken from `dictionary` """ return ''.join(random.choice(dictionary) for _ in range(num_chars)) def _link_chunks(self, targets, fullpath, **kwargs): """ Create chunk hard links. :param targets: original chunk URLs :param fullpath: full path to the object whose chunks will be hard linked """ new_chunks = list() # TODO(FVE): use a GreenPool to parallelize for chunk in targets: resp, new_chunk_url = self.blob_client.chunk_link( chunk['url'], None, fullpath, **kwargs) new_chunk = chunk.copy() new_chunk['url'] = new_chunk_url new_chunks.append(new_chunk) return new_chunks def _prepare_meta2_raw_update(self, targets, copies, content): """ Generate the lists of original and replacement chunk beans to be used as input for `container_raw_update`. """ targets_beans = [] copies_beans = [] for target, copy in zip(targets, copies): targets_beans.append(self._m2_chunk_bean(target, content)) copies_beans.append(self._m2_chunk_bean(copy, content)) return targets_beans, copies_beans @staticmethod def _m2_chunk_bean(meta, content): """ Prepare a dictionary to be used as a chunk "bean" (in meta2 sense). """ return {"type": "chunk", "id": meta['url'], "hash": meta['hash'], "size": int(meta["size"]), "pos": meta["pos"], "content": content}
[docs] def object_head(self, account, container, obj, trust_level=0, **kwargs): """ Check for the presence of an object in a container. :param trust_level: 0: do not check chunks; 1: check if there are enough chunks to read the object; 2: check if all chunks are present. :type trust_level: `int` """ try: if trust_level == 0: self.object_get_properties(account, container, obj, **kwargs) elif trust_level == 1: raise NotImplementedError() elif trust_level == 2: _, chunks = self.object_locate(account, container, obj, **kwargs) for chunk in chunks: self.blob_client.chunk_head(chunk['url']) else: raise ValueError( '`trust_level` must be between 0 and 2') except (exc.NotFound, exc.NoSuchObject, exc.NoSuchContainer): return False return True
[docs]class ObjectStorageAPI(ObjectStorageApi): """ :deprecated: transitional wrapper for ObjectStorageApi """ def __init__(self, namespace, endpoint=None, **kwargs): super(ObjectStorageAPI, self).__init__(namespace, endpoint=endpoint, **kwargs) warnings.simplefilter('once') warnings.warn( "oio.api.ObjectStorageAPI is deprecated, use oio.ObjectStorageApi", DeprecationWarning, stacklevel=2)