Source code for oio.api.base

# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.

from six import text_type, iteritems
from six.moves.urllib_parse import urlencode

from oio.common.easy_value import true_value
from oio.common.json import json as jsonlib

from oio.common.http_urllib3 import urllib3, get_pool_manager, \
    oio_exception_from_httperror, URLLIB3_REQUESTS_KWARGS
from oio.common import exceptions
from oio.common.utils import deadline_to_timeout, monotonic_time
from oio.common.constants import ADMIN_HEADER, \
    TIMEOUT_HEADER, PERFDATA_HEADER, FORCEMASTER_HEADER, \
    CONNECTION_TIMEOUT, READ_TIMEOUT, REQID_HEADER, STRLEN_REQID


[docs]class HttpApi(object): """ Provides facilities to make HTTP requests towards the same endpoint, with a pool of connections. """ def __init__(self, endpoint=None, pool_manager=None, connection='keep-alive', service_type='unknown', **kwargs): """ :param pool_manager: an optional pool manager that will be reused :type pool_manager: `urllib3.PoolManager` :param endpoint: base of the URL that will requested :type endpoint: `str` :keyword admin_mode: allow talking to a slave/worm namespace :type admin_mode: `bool` :keyword perfdata: optional dictionary that will be filled with metrics of time spent to resolve the meta2 address and to do the meta2 request. :type perfdata: `dict` :keyword connection: 'keep-alive' to keep connections open (default) or 'close' to explicitly close them. """ self.endpoint = endpoint if not pool_manager: # get_pool_manager filters its args pool_manager = get_pool_manager(**kwargs) self.pool_manager = pool_manager self.admin_mode = true_value(kwargs.get('admin_mode', False)) self.force_master = true_value(kwargs.get('force_master', False)) self.connection = connection self.service_type = service_type def __logger(self): """Try to get a logger from a child class, or create one.""" if not hasattr(self, 'logger'): from oio.common.logger import get_logger setattr(self, 'logger', get_logger(None, self.__class__.__name__)) return getattr(self, 'logger') def _direct_request(self, method, url, headers=None, data=None, json=None, params=None, admin_mode=False, pool_manager=None, force_master=False, **kwargs): """ Make an HTTP request. :param method: HTTP method to use (e.g. "GET") :type method: `str` :param url: URL to request :type url: `str` :keyword admin_mode: allow operations on slave or worm namespaces :type admin_mode: `bool` :keyword deadline: deadline for the request, in monotonic time. Supersedes `read_timeout`. :type deadline: `float` seconds :keyword timeout: optional timeout for the request (in seconds). May be a `urllib3.Timeout(connect=connection_timeout, read=read_timeout)`. This method also accepts `connection_timeout` and `read_timeout` as separate arguments. :type timeout: `float` or `urllib3.Timeout` :keyword headers: optional headers to add to the request :type headers: `dict` :keyword force_master: request will run on master service only. :type force_master: `bool` :raise oio.common.exceptions.OioTimeout: in case of read, write or connection timeout :raise oio.common.exceptions.OioNetworkException: in case of connection error :raise oio.common.exceptions.OioException: in other case of HTTP error :raise oio.common.exceptions.ClientException: in case of HTTP status code >= 400 """ # Filter arguments that are not recognized by Requests out_kwargs = {k: v for k, v in iteritems(kwargs) if k in URLLIB3_REQUESTS_KWARGS} # Ensure headers are all strings if headers: out_headers = {k: text_type(v) for k, v in headers.items()} else: out_headers = dict() if self.admin_mode or admin_mode: out_headers[ADMIN_HEADER] = '1' if self.force_master or force_master: out_headers[FORCEMASTER_HEADER] = '1' # Look for a request deadline, deduce the timeout from it. if kwargs.get('deadline', None) is not None: to = deadline_to_timeout(kwargs['deadline'], True) to = min(to, kwargs.get('read_timeout', to)) out_kwargs['timeout'] = urllib3.Timeout( connect=kwargs.get('connection_timeout', CONNECTION_TIMEOUT), read=to) # Shorten the deadline by 1% to compensate for the time spent # connecting and reading response. out_headers[TIMEOUT_HEADER] = int(to * 990000.0) # Ensure there is a timeout if 'timeout' not in out_kwargs: out_kwargs['timeout'] = urllib3.Timeout( connect=kwargs.get('connection_timeout', CONNECTION_TIMEOUT), read=kwargs.get('read_timeout', READ_TIMEOUT)) if TIMEOUT_HEADER not in out_headers: to = out_kwargs['timeout'] if isinstance(to, urllib3.Timeout): to = to.read_timeout else: to = float(to) out_headers[TIMEOUT_HEADER] = int(to * 1000000.0) # Look for a request ID if 'reqid' in kwargs: out_headers[REQID_HEADER] = str(kwargs['reqid']) if len(out_headers.get(REQID_HEADER, '')) > STRLEN_REQID: out_headers[REQID_HEADER] = \ out_headers[REQID_HEADER][:STRLEN_REQID] self.__logger().warn('Request ID truncated to %d characters', STRLEN_REQID) # Convert json and add Content-Type if json: out_headers["Content-Type"] = "application/json" data = jsonlib.dumps(json) # Trigger performance measurments perfdata = kwargs.get('perfdata', None) if perfdata is not None: out_headers[PERFDATA_HEADER] = 'enabled' # Explicitly keep or close the connection if 'Connection' not in out_headers: out_headers['Connection'] = self.connection out_kwargs['headers'] = out_headers out_kwargs['body'] = data # Add query string if params: out_param = [] for k, v in params.items(): if v is not None: if isinstance(v, text_type): v = text_type(v).encode('utf-8') out_param.append((k, v)) encoded_args = urlencode(out_param) url += '?' + encoded_args if not pool_manager: pool_manager = self.pool_manager try: if perfdata is not None: request_start = monotonic_time() resp = pool_manager.request(method, url, **out_kwargs) if perfdata is not None: request_end = monotonic_time() service_perfdata = perfdata.setdefault( self.service_type, dict()) service_perfdata['overall'] = service_perfdata.get( 'overall', 0.0) + request_end - request_start body = resp.data if body: try: body = jsonlib.loads(body.decode('utf-8')) except (UnicodeDecodeError, ValueError): pass if perfdata is not None and PERFDATA_HEADER in resp.headers: service_perfdata = perfdata[self.service_type] for header_val in resp.headers[PERFDATA_HEADER].split(','): kv = header_val.split('=', 1) service_perfdata[kv[0]] = service_perfdata.get( kv[0], 0.0) + float(kv[1]) / 1000000.0 except urllib3.exceptions.HTTPError as exc: oio_exception_from_httperror(exc, reqid=out_headers.get(REQID_HEADER), url=url) if resp.status >= 400: raise exceptions.from_response(resp, body) return resp, body def _request(self, method, url, endpoint=None, **kwargs): """ Make a request to an HTTP endpoint. :param method: HTTP method to use (e.g. "GET") :type method: `str` :param url: URL to request :type url: `str` :param endpoint: endpoint to use in place of `self.endpoint` :type endpoint: `str` :keyword deadline: deadline for the request, in monotonic time. Supersedes `read_timeout`. :type deadline: `float` seconds :keyword timeout: optional timeout for the request (in seconds). May be a `urllib3.Timeout(connect=connection_timeout, read=read_timeout)`. This method also accepts `connection_timeout` and `read_timeout` as separate arguments. :type timeout: `float` or `urllib3.Timeout` :keyword headers: optional headers to add to the request :type headers: `dict` :raise oio.common.exceptions.OioTimeout: in case of read, write or connection timeout :raise oio.common.exceptions.OioNetworkException: in case of connection error :raise oio.common.exceptions.OioException: in other case of HTTP error :raise oio.common.exceptions.ClientException: in case of HTTP status code >= 400 """ if not endpoint: if not self.endpoint: raise ValueError("endpoint not set in function call" + " nor in class contructor") endpoint = self.endpoint url = '/'.join([endpoint.rstrip('/'), url.lstrip('/')]) return self._direct_request(method, url, **kwargs)