Source code for oio.check_service.common
# Copyright (C) 2017 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.
import random
from urllib3.response import HTTPResponse
from oio.conscience.client import ConscienceClient
from oio.common.exceptions import ClientException
from oio.api.base import HttpApi
[docs]def random_buffer(dictionary, n):
slot = 512
pattern = ''.join(random.choice(dictionary) for _ in range(slot))
t = []
while len(t) * slot < n:
t.append(pattern)
return ''.join(t)[:n]
[docs]class CheckService(HttpApi):
"""
Make a cycle `PUT/GET/DELETE` on each host for the service type
"""
def __init__(self, namespace, service_type, **kwargs):
"""
Collect the list of hosts for a service type
"""
super(CheckService, self).__init__(**kwargs)
self.ns = namespace
self.service_type = service_type
self.all_services = ConscienceClient(
{"namespace": self.ns}).all_services(self.service_type)
self.all_services_host = []
for service in self.all_services:
self.all_services_host.append(service["addr"])
def _compare_status(self, expected_status, actual_status):
if expected_status is None:
return None
return expected_status == actual_status
def _direct_request(self, method, url, expected_status=None, **kwargs):
try:
resp, body = super(CheckService, self)._direct_request(
method, url, **kwargs)
except ClientException as exc:
body = exc.message
resp = HTTPResponse(status=exc.http_status)
success = self._compare_status(expected_status, resp.status)
return resp, body, success
def _cycle(self, host):
"""
Must make a cycle `PUT/GET/DELETE` on the host
"""
raise NotImplementedError('_cycle not implemented')
[docs] def run(self):
"""
Make the cycle on each hosts
"""
for service_host in self.all_services_host:
print(self.service_type.upper() + " " + service_host),
try:
success = self._cycle(service_host)
if success:
print("OK")
else:
print("FAIL")
except Exception:
print("FAIL")