# Copyright (C) 2017 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from functools import wraps
from oio.common.client import ProxyClient
[docs]def service_id_to_string(service_id):
"""Convert a list of service IDs to a comma separated string."""
if not service_id:
return None
elif isinstance(service_id, basestring):
return service_id
else:
try:
return ','.join(service_id)
except Exception:
raise ValueError("'service_id' must be a string or a list")
[docs]def loc_params(func):
"""Wrap database localization parameters in request parameters"""
@wraps(func)
def _wrapped(self, service_type=None, account=None, reference=None,
cid=None, service_id=None, **kwargs):
params = kwargs.pop('params', {})
if service_type:
params['type'] = service_type
elif 'type' not in params:
raise ValueError("Missing value for service_type")
if cid:
params['cid'] = cid
elif account and reference:
params['acct'] = account
params['ref'] = reference
elif 'cid' not in params and \
('acct' not in params or 'ref' not in params):
raise ValueError("Missing value for account and reference or cid")
if service_id:
params['service_id'] = service_id_to_string(service_id)
return func(self, params, **kwargs)
return _wrapped
[docs]class AdminClient(ProxyClient):
"""Low level database administration client."""
def __init__(self, conf, **kwargs):
super(AdminClient, self).__init__(
conf, request_prefix="/admin", **kwargs)
kwargs.pop('pool_manager', None)
self.forwarder = ProxyClient(
conf, request_prefix="/forward", pool_manager=self.pool_manager,
no_ns_in_url=True, **kwargs)
[docs] @loc_params
def election_debug(self, params, **kwargs):
"""
Get debugging information about an election.
"""
_, body = self._request('POST', '/debug', params=params, **kwargs)
return body
[docs] @loc_params
def election_leave(self, params, **kwargs):
"""
Force all peers to leave the election.
"""
_, body = self._request('POST', '/leave', params=params, **kwargs)
return body
[docs] @loc_params
def election_ping(self, params, **kwargs):
"""
Trigger or refresh an election.
"""
_, body = self._request('POST', '/ping', params=params, **kwargs)
return body
[docs] @loc_params
def election_status(self, params, **kwargs):
"""
Get the status of an election (trigger it if necessary).
:returns: a `dict` with 'master' (`str`), 'slaves' (`list`),
'peers' (`dict`) and 'type' (`str`)
.. py:data:: example
{
'peers': {
'127.0.0.3:6014': {
'status':
{'status': 303,
'message': '127.0.0.1:6015'},
'body': u''},
'127.0.0.1:6015': {
'status':
{'status': 200,
'message': 'OK'},
'body': u''},
'127.0.0.2:6016': {
'status':
{'status': 303,
'message': '127.0.0.1:6015'},
'body': u''}
},
'master': '127.0.0.1:6015',
'slaves': ['127.0.0.3:6014', '127.0.0.2:6016'],
'type': 'meta1'
}
"""
_, body = self._request('POST', '/status', params=params, **kwargs)
resp = {'peers': body, 'type': params['type']}
for svc_id in body.keys():
if body[svc_id]['status']['status'] == 200:
resp['master'] = svc_id
elif body[svc_id]['status']['status'] == 303:
slaves = resp.get('slaves', [])
slaves.append(svc_id)
resp['slaves'] = slaves
return resp
[docs] @loc_params
def election_sync(self, params, **kwargs):
"""Try to synchronize a dubious election."""
_, body = self._request('POST', '/sync', params=params, **kwargs)
return body
[docs] @loc_params
def has_base(self, params, **kwargs):
"""
Ask each peer if base exists.
"""
_, body = self._request('POST', '/has', params=params, **kwargs)
return body
[docs] @loc_params
def set_properties(self, params,
properties=None, system=None, **kwargs):
"""
Set user or system properties in the admin table of an sqliterepo base.
"""
data = dict()
if properties:
data['properties'] = properties
if system:
data['system'] = dict()
for k, v in system:
data['system'][k if k.startswith('sys.') else 'sys.' + k] = v
self._request('POST', "/set_properties",
params=params, json=data, **kwargs)
[docs] @loc_params
def get_properties(self, params, **kwargs):
"""
Get user and system properties from the admin table of an
sqliterepo base.
"""
_resp, body = self._request('POST', "/get_properties",
params=params, data='', **kwargs)
return body
[docs] @loc_params
def set_peers(self, params, peers, **kwargs):
"""
Force the new peer set in the replicas of the old peer set.
"""
data = {'system': {'sys.peers': ','.join(sorted(peers))}}
self._request('POST', "/set_properties",
params=params, json=data, **kwargs)
[docs] @loc_params
def copy_base_from(self, params, svc_from, svc_to, **kwargs):
"""
Copy a base to another service, using DB_PIPEFROM.
:param svc_from: id of the source service.
:param svc_to: id of the destination service.
"""
data = {'to': svc_to, 'from': svc_from}
self._request('POST', "/copy",
params=params, json=data, **kwargs)
[docs] @loc_params
def copy_base_to(self, params, svc_to, **kwargs):
"""
Copy a base to another service, using DB_PIPETO.
Source service is looked after in service directory.
:param svc_to: id of the destination service.
"""
self._request('POST', "/copy",
params=params, json={'to': svc_to}, **kwargs)
[docs] @loc_params
def remove_base(self, params, **kwargs):
"""
Remove specific base.
"""
_, body = self._request('POST', '/remove', params=params, **kwargs)
return body
def _forward_service_action(self, svc_id, action, **kwargs):
"""Execute service-specific actions."""
self.forwarder._request('POST', action,
params={'id': svc_id}, **kwargs)
[docs] def service_flush_cache(self, svc_id, **kwargs):
"""Flush the resolver cache of an sqlx-bases service."""
self._forward_service_action(svc_id, '/flush', **kwargs)
[docs] def service_set_live_config(self, svc_id, config, **kwargs):
"""
Set some configuration parameters on the specified service.
Works on all services using ASN.1 protocol.
Notice that some parameters may not be taken into account,
and no parameter will survice a service restart.
"""
self._forward_service_action(svc_id, '/config', json=config, **kwargs)