Source code for oio.account.client

# Copyright (C) 2015-2020 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.

from six import reraise
import json
import sys
import time
from oio.api.base import HttpApi
from oio.common.constants import TIMEOUT_KEYS
from oio.common.decorators import patch_kwargs
from oio.common.easy_value import float_value
from oio.common.exceptions import OioException, OioNetworkException
from oio.common.logger import get_logger
from oio.conscience.client import ConscienceClient


[docs]class AccountClient(HttpApi): """Simple client API for the account service.""" def __init__(self, conf, endpoint=None, proxy_endpoint=None, refresh_delay=3600.0, logger=None, **kwargs): """ Initialize a client for the account service. :param conf: dictionary with at least the namespace name :type conf: `dict` :param endpoint: URL of an account service :param proxy_endpoint: URL of the proxy :param refresh_interval: time between refreshes of the account service endpoint (if not provided at instantiation) :type refresh_interval: `float` seconds """ super(AccountClient, self).__init__( endpoint=endpoint, service_type='account-service', **kwargs) self.logger = logger or get_logger(conf) self.cs = ConscienceClient(conf, endpoint=proxy_endpoint, logger=self.logger, **kwargs) self._global_kwargs = {tok: float_value(tov, None) for tok, tov in kwargs.items() if tok in TIMEOUT_KEYS} self._refresh_delay = refresh_delay if not self.endpoint else -1.0 self._last_refresh = 0.0 def _get_account_addr(self, **kwargs): """Fetch IP and port of an account service from Conscience.""" acct_instance = self.cs.next_instance('account', **kwargs) acct_addr = acct_instance.get('addr') return acct_addr def _refresh_endpoint(self, now=None, **kwargs): """Refresh account service endpoint.""" addr = self._get_account_addr(**kwargs) self.endpoint = '/'. join(("http:/", addr, "v1.0/account")) if not now: now = time.time() self._last_refresh = now def _maybe_refresh_endpoint(self, **kwargs): """Refresh account service endpoint if delay has been reached.""" if self._refresh_delay >= 0.0 or not self.endpoint: now = time.time() if now - self._last_refresh > self._refresh_delay: try: self._refresh_endpoint(now, **kwargs) except OioNetworkException as exc: if not self.endpoint: # Cannot use the previous one raise self.logger.warn( "Failed to refresh account endpoint: %s", exc) except OioException: if not self.endpoint: # Cannot use the previous one raise self.logger.exception("Failed to refresh account endpoint") # Since all operations implemented in this class (as of 2019-08-08) result # in only one request to the account service, we can patch the keyword # arguments here. If this is changed, put the decorator on each public # method of this class.
[docs] @patch_kwargs def account_request(self, account, method, action, params=None, **kwargs): """Make a request to the account service.""" self._maybe_refresh_endpoint(**kwargs) if not params: params = dict() if account: # Do not quote account, _request() will urlencode query string params['id'] = account try: resp, body = self._request(method, action, params=params, **kwargs) except OioNetworkException as exc: exc_info = sys.exc_info() if self._refresh_delay >= 0.0: self.logger.info( "Refreshing account endpoint after error %s", exc) try: self._refresh_endpoint(**kwargs) except Exception as exc: self.logger.warn("%s", exc) reraise(exc_info[0], exc_info[1], exc_info[2]) return resp, body
[docs] def account_create(self, account, **kwargs): """ Create an account. :param account: name of the account to create :type account: `str` :returns: `True` if the account has been created """ resp, _body = self.account_request(account, 'PUT', 'create', **kwargs) return resp.status == 201
[docs] def account_delete(self, account, **kwargs): """ Delete an account. :param account: name of the account to delete :type account: `str` """ self.account_request(account, 'POST', 'delete', **kwargs)
[docs] def account_list(self, **kwargs): """ List accounts. """ _resp, body = self.account_request(None, 'GET', 'list', **kwargs) return body
[docs] def account_show(self, account, **kwargs): """ Get information about an account. """ _resp, body = self.account_request(account, 'GET', 'show', **kwargs) return body
[docs] def account_update(self, account, metadata, to_delete, **kwargs): """ Update metadata of the specified account. :param metadata: dictionary of properties that must be set or updated. :type metadata: `dict` :param to_delete: list of property keys that must be removed. :type to_delete: `list` """ data = json.dumps({"metadata": metadata, "to_delete": to_delete}) self.account_request(account, 'PUT', 'update', data=data, **kwargs)
[docs] def bucket_list(self, account, limit=None, marker=None, prefix=None, **kwargs): """ Get the list of buckets of an account. :param account: account from which to get the bucket list :type account: `str` :keyword limit: maximum number of results to return :type limit: `int` :keyword marker: name of the bucket from where to start the listing :type marker: `str` :keyword prefix: :rtype: `dict` with 'ctime' (`float`), 'buckets' (`int`), 'bytes' (`int`), 'objects' (`int`), 'containers' (`int`), 'id' (`str`), 'metadata' (`dict`), 'listing' (`list`), 'truncated' and 'next_marker'. 'listing' contains dicts of container metadata (name, number of objects, number of bytes and modification time). """ params = {"id": account, "limit": limit, "marker": marker, "prefix": prefix} _resp, body = self.account_request(account, 'GET', 'buckets', params=params, **kwargs) return body
[docs] def bucket_show(self, bucket, **kwargs): """ Get information about a bucket. """ _resp, body = self.account_request(bucket, 'GET', 'show-bucket', **kwargs) return body
[docs] def bucket_update(self, bucket, metadata, to_delete, **kwargs): """ Update metadata of the specified bucket. :param metadata: dictionary of properties that must be set or updated. :type metadata: `dict` :param to_delete: list of property keys that must be removed. :type to_delete: `list` """ data = json.dumps({"metadata": metadata, "to_delete": to_delete}) _resp, body = self.account_request(bucket, 'PUT', 'update-bucket', data=data, **kwargs) return body
[docs] def container_list(self, account, limit=None, marker=None, end_marker=None, prefix=None, delimiter=None, s3_buckets_only=False, **kwargs): """ Get the list of containers of an account. :param account: account from which to get the container list :type account: `str` :keyword limit: maximum number of results to return :type limit: `int` :keyword marker: name of the container from where to start the listing :type marker: `str` :keyword end_marker: :keyword prefix: :keyword delimiter: :keyword s3_buckets_only: list only S3 buckets. :type s3_buckets_only: `bool` :rtype: `dict` with 'ctime' (`float`), 'bytes' (`int`), 'objects' (`int`), 'containers' (`int`), 'id' (`str`), 'metadata' (`dict`) and 'listing' (`list`). 'listing' contains lists of container metadata (name, number of objects, number of bytes, whether it is a prefix, and modification time). """ params = {"id": account, "limit": limit, "marker": marker, "end_marker": end_marker, "prefix": prefix, "delimiter": delimiter, "s3_buckets_only": s3_buckets_only} _resp, body = self.account_request(account, 'GET', 'containers', params=params, **kwargs) return body
[docs] def container_show(self, account, container, **kwargs): """ Get information about a container. """ _resp, body = self.account_request(account, 'GET', 'show-container', params={'container': container}, **kwargs) return body
[docs] def container_update(self, account, container, metadata=None, **kwargs): """ Update account with container-related metadata. :param account: name of the account to update :type account: `str` :param container: name of the container whose metadata has changed :type container: `str` :param metadata: container metadata ("bytes", "objects", "mtime", "dtime") :type metadata: `dict` """ metadata['name'] = container _resp, body = self.account_request(account, 'PUT', 'container/update', data=json.dumps(metadata), **kwargs) return body
[docs] def container_reset(self, account, container, mtime, **kwargs): """ Reset container of an account :param account: name of the account :type account: `str` :param container: name of the container to reset :type container: `str` :param mtime: time of the modification """ metadata = dict() metadata["name"] = container metadata["mtime"] = mtime self.account_request(account, 'PUT', 'container/reset', data=json.dumps(metadata), **kwargs)
[docs] def account_refresh(self, account, **kwargs): """ Refresh counters of an account :param account: name of the account to refresh :type account: `str` """ self.account_request(account, 'POST', 'refresh', **kwargs)
[docs] def account_flush(self, account, **kwargs): """ Flush all containers of an account :param account: name of the account to flush :type account: `str` """ self.account_request(account, 'POST', 'flush', **kwargs)