Source code for oio.xcute.client

# Copyright (C) 2019 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.

import sys
import time

from oio.api.base import HttpApi
from oio.common.exceptions import OioException, OioNetworkException, reraise
from oio.common.logger import get_logger
from oio.conscience.client import ConscienceClient


[docs]class XcuteClient(HttpApi): """Simple client API for the xcute service.""" def __init__(self, conf, endpoint=None, proxy_endpoint=None, refresh_delay=3600.0, logger=None, **kwargs): """ Initialize a client for the xcute service. :param conf: dictionary with at least the namespace name :type conf: `dict` :param endpoint: URL of an xcute service :param proxy_endpoint: URL of the proxy :param refresh_interval: time between refreshes of the xcute service endpoint (if not provided at instantiation) :type refresh_interval: `float` seconds """ super(XcuteClient, self).__init__( endpoint=endpoint, service_type='xcute-service', **kwargs) self.conf = conf self.logger = logger or get_logger(self.conf) self.conscience = ConscienceClient(conf, endpoint=proxy_endpoint, logger=self.logger, **kwargs) self._refresh_delay = refresh_delay if not self.endpoint else -1.0 self._last_refresh = 0.0 def _get_xcute_addr(self, **kwargs): """Fetch IP and port of an xcute service from Conscience.""" acct_instance = self.conscience.next_instance('xcute', **kwargs) acct_addr = acct_instance.get('addr') return acct_addr def _refresh_endpoint(self, now=None, **kwargs): """Refresh xcute service endpoint.""" addr = self._get_xcute_addr(**kwargs) self.endpoint = '/'. join(("http:/", addr, "v1.0/xcute")) if not now: now = time.time() self._last_refresh = now def _maybe_refresh_endpoint(self, **kwargs): """Refresh xcute service endpoint if delay has been reached.""" if self._refresh_delay >= 0.0 or not self.endpoint: now = time.time() if now - self._last_refresh > self._refresh_delay: try: self._refresh_endpoint(now, **kwargs) except OioNetworkException as exc: if not self.endpoint: # Cannot use the previous one raise self.logger.warn( "Failed to refresh xcute endpoint: %s", exc) except OioException: if not self.endpoint: # Cannot use the previous one raise self.logger.exception("Failed to refresh xcute endpoint")
[docs] def xcute_request(self, method, action, params=None, **kwargs): """Make a request to the xcute service.""" self._maybe_refresh_endpoint(**kwargs) if not params: params = dict() try: resp, body = self._request(method, action, params=params, **kwargs) except OioNetworkException as exc: exc_info = sys.exc_info() if self._refresh_delay >= 0.0: self.logger.info( "Refreshing xcute endpoint after error %s", exc) try: self._refresh_endpoint(**kwargs) except Exception as exc: self.logger.warn("%s", exc) reraise(exc_info[0], exc_info[1], exc_info[2]) return resp, body
[docs] def job_list(self, limit=None, marker=None): _, data = self.xcute_request( 'GET', '/job/list', params={'limit': limit, 'marker': marker}) return data
[docs] def job_create(self, job_type, job_config=None): _, data = self.xcute_request( 'POST', '/job/create', params={'type': job_type}, json=job_config) return data
[docs] def job_show(self, job_id): _, data = self.xcute_request( 'GET', '/job/show', params={'id': job_id}) return data
[docs] def job_pause(self, job_id): _, data = self.xcute_request( 'POST', '/job/pause', params={'id': job_id}) return data
[docs] def job_resume(self, job_id): _, data = self.xcute_request( 'POST', '/job/resume', params={'id': job_id}) return data
[docs] def job_delete(self, job_id): self.xcute_request( 'DELETE', '/job/delete', params={'id': job_id})
[docs] def lock_list(self): _, data = self.xcute_request( 'GET', '/lock/list') return data
[docs] def lock_show(self, lock): _, data = self.xcute_request( 'GET', '/lock/show', params={'lock': lock}) return data