Source code for oio.directory.admin

# Copyright (C) 2017-2019 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.

from functools import wraps
from six import string_types
from oio.common.client import ProxyClient


[docs]def service_id_to_string(service_id): """Convert a list of service IDs to a comma separated string.""" if not service_id: return None elif isinstance(service_id, string_types): return service_id else: try: return ','.join(service_id) except Exception: raise ValueError("'service_id' must be a string or a list")
[docs]def loc_params(func): """Wrap database localization parameters in request parameters""" @wraps(func) def _wrapped(self, service_type=None, account=None, reference=None, cid=None, service_id=None, **kwargs): params = kwargs.pop('params', {}) if service_type: params['type'] = service_type elif 'type' not in params: raise ValueError("Missing value for service_type") if cid: params['cid'] = cid elif account and reference: params['acct'] = account params['ref'] = reference elif 'cid' not in params and \ ('acct' not in params or 'ref' not in params): raise ValueError("Missing value for account and reference or cid") if service_id: params['service_id'] = service_id_to_string(service_id) return func(self, params, **kwargs) return _wrapped
[docs]class AdminClient(ProxyClient): """Low level database administration client.""" def __init__(self, conf, **kwargs): super(AdminClient, self).__init__( conf, request_prefix="/admin", **kwargs) kwargs.pop('pool_manager', None) self._kwargs = kwargs self._cache_client = None self._forwarder = None @property def cache_client(self): """ Instanciate a client object for '/cache/*' proxy routes. """ if self._cache_client is None: self._cache_client = ProxyClient( self.conf, request_prefix="/cache", pool_manager=self.pool_manager, no_ns_in_url=True, **self._kwargs) return self._cache_client @property def forwarder(self): """ Instanciate a client object for '/forward/*' proxy routes. """ if self._forwarder is None: self._forwarder = ProxyClient( self.conf, request_prefix="/forward", pool_manager=self.pool_manager, no_ns_in_url=True, **self._kwargs) return self._forwarder
[docs] @loc_params def election_debug(self, params, **kwargs): """ Get debugging information about an election. """ _, body = self._request('POST', '/debug', params=params, **kwargs) return body
[docs] @loc_params def election_leave(self, params, **kwargs): """ Force all peers to leave the election. """ _, body = self._request('POST', '/leave', params=params, **kwargs) return body
[docs] @loc_params def election_ping(self, params, **kwargs): """ Trigger or refresh an election. """ _, body = self._request('POST', '/ping', params=params, **kwargs) return body
[docs] @loc_params def election_status(self, params, **kwargs): """ Get the status of an election (trigger it if necessary). :returns: a `dict` with 'master' (`str`), 'slaves' (`list`), 'peers' (`dict`) and 'type' (`str`) .. py:data:: example { 'peers': { '127.0.0.3:6014': { 'status': {'status': 303, 'message': '127.0.0.1:6015'}, 'body': u''}, '127.0.0.1:6015': { 'status': {'status': 200, 'message': 'OK'}, 'body': u''}, '127.0.0.2:6016': { 'status': {'status': 303, 'message': '127.0.0.1:6015'}, 'body': u''} }, 'master': '127.0.0.1:6015', 'slaves': ['127.0.0.3:6014', '127.0.0.2:6016'], 'type': 'meta1' } """ _, body = self._request('POST', '/status', params=params, **kwargs) resp = {'peers': body, 'type': params['type']} for svc_id in body.keys(): if body[svc_id]['status']['status'] == 200: resp['master'] = svc_id elif body[svc_id]['status']['status'] == 303: slaves = resp.get('slaves', []) slaves.append(svc_id) resp['slaves'] = slaves return resp
[docs] @loc_params def election_sync(self, params, **kwargs): """Try to synchronize a dubious election.""" _, body = self._request('POST', '/sync', params=params, **kwargs) return body
[docs] @loc_params def has_base(self, params, **kwargs): """ Ask each peer if base exists. """ _, body = self._request('POST', '/has', params=params, **kwargs) return body
[docs] @loc_params def set_properties(self, params, properties=None, system=None, **kwargs): """ Set user or system properties in the admin table of an sqliterepo base. """ data = dict() if properties: data['properties'] = properties if system: data['system'] = dict() for k, v in system: data['system'][k if k.startswith('sys.') else 'sys.' + k] = v self._request('POST', "/set_properties", params=params, json=data, **kwargs)
[docs] @loc_params def get_properties(self, params, **kwargs): """ Get user and system properties from the admin table of an sqliterepo base. """ _resp, body = self._request('POST', "/get_properties", params=params, data='', **kwargs) return body
[docs] @loc_params def set_peers(self, params, peers, **kwargs): """ Force the new peer set in the replicas of the old peer set. """ data = {'system': {'sys.peers': ','.join(sorted(peers))}} self._request('POST', "/set_properties", params=params, json=data, **kwargs)
[docs] @loc_params def copy_base_from(self, params, svc_from, svc_to, **kwargs): """ Copy a base to another service, using DB_PIPEFROM. :param svc_from: id of the source service. :param svc_to: id of the destination service. """ data = {'to': svc_to, 'from': svc_from} self._request('POST', "/copy", params=params, json=data, **kwargs)
[docs] @loc_params def copy_base_to(self, params, svc_to, **kwargs): """ Copy a base to another service, using DB_PIPETO. Source service is looked after in service directory. :param svc_to: id of the destination service. """ self._request('POST', "/copy", params=params, json={'to': svc_to}, **kwargs)
[docs] @loc_params def remove_base(self, params, **kwargs): """ Remove specific base. """ _, body = self._request('POST', '/remove', params=params, **kwargs) return body
[docs] @loc_params def vacuum_base(self, params, **kwargs): """ Vacuum (defragment) the database on the master service, then resynchronize it on the slaves. """ self._request('POST', '/vacuum', params=params, **kwargs)
# Proxy's cache and config actions ################################ def _proxy_endpoint(self, proxy_netloc=None): if proxy_netloc and proxy_netloc != self.cache_client.proxy_netloc: return self.cache_client.endpoint.replace( self.cache_client.proxy_netloc, proxy_netloc) else: return self.cache_client.endpoint
[docs] def proxy_flush_cache(self, high=True, low=True, proxy_netloc=None, **kwargs): """ Flush "high" and "low" proxy caches. By default, flush the cache of the local proxy. If `proxy_netloc` is provided, flush the cache of this proxy. """ endpoint = self._proxy_endpoint(proxy_netloc) if high: url = endpoint + '/flush/high' self.cache_client._direct_request('POST', url, **kwargs) if low: url = endpoint + '/flush/low' self.cache_client._direct_request('POST', url, **kwargs)
[docs] def proxy_get_cache_status(self, proxy_netloc=None, **kwargs): """ Get the status of the high (conscience and meta0) and low (meta1) cache, including the current number of entries. """ endpoint = self._proxy_endpoint(proxy_netloc) url = endpoint + '/status' _resp, body = self.cache_client._direct_request('GET', url, **kwargs) return body
[docs] def proxy_get_live_config(self, proxy_netloc=None, **kwargs): """ Get all configuration parameters from the specified proxy service. :returns: a dictionary with all configuration keys the service recognizes, and their current value. :rtype: `dict` """ if not proxy_netloc: proxy_netloc = self.proxy_netloc _resp, body = self._direct_request( 'GET', 'http://' + proxy_netloc + '/v3.0/config', **kwargs) return body
[docs] def proxy_set_live_config(self, proxy_netloc=None, config=None, **kwargs): """ Set configuration parameters on the specified proxy service. """ if not proxy_netloc: proxy_netloc = self.proxy_netloc if config is None: raise ValueError("Missing value for 'config'") _resp, body = self._direct_request( 'POST', 'http://' + proxy_netloc + '/v3.0/config', json=config, **kwargs) return body
# Forwarded actions ############################################### def _forward_service_action(self, svc_id, action, method='POST', **kwargs): """Execute service-specific actions.""" _resp, body = self.forwarder._request( method, action, params={'id': svc_id}, **kwargs) return body
[docs] def service_flush_cache(self, svc_id, **kwargs): """Flush the resolver cache of an sqliterepo-based service.""" self._forward_service_action(svc_id, '/flush', **kwargs)
[docs] def service_get_live_config(self, svc_id, **kwargs): """ Get all configuration parameters from the specified service. Works on all services using ASN.1 protocol. :returns: a dictionary with all configuration keys the service recognizes, and their current value. :rtype: `dict` """ return self._forward_service_action( svc_id, '/config', method='GET', **kwargs)
[docs] def service_set_live_config(self, svc_id, config, **kwargs): """ Set some configuration parameters on the specified service. Works on all services using ASN.1 protocol. Notice that some parameters may not be taken into account, and no parameter will survice a service restart. """ return self._forward_service_action( svc_id, '/config', json=config, **kwargs)
[docs] def service_get_info(self, svc_id, **kwargs): """ Get all information from the specified service. Works on all services using ASN.1 protocol except conscience. :returns: a dictionary with all information keys the service recognizes, and their current value. :rtype: `dict` """ return self._forward_service_action( svc_id, '/info', method='GET', **kwargs)
[docs] def service_balance_elections(self, svc_id, max_ops=0, inactivity=0, **kwargs): """ Balance elections to get an acceptable slave/master ratio. :param svc_to: id of the service that should balance its elections. :param max_ops: maximum number of balancing operations. :param inactivity: avoid expiring election whose last activity is younger than the specified value. """ params = {'inactivity': int(inactivity), 'max': int(max_ops), 'id': svc_id} _resp, body = self.forwarder._request( 'POST', '/balance-masters', params=params, **kwargs) return _resp.status, body
[docs] def service_reload_lb(self, svc_id, **kwargs): """ Force the service to reload its internal load balancer. """ self._forward_service_action(svc_id, '/reload', **kwargs)