Source code for oio.xcute.server

# Copyright (C) 2019 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from functools import wraps
from werkzeug.exceptions import HTTPException, \
    BadRequest as HTTPBadRequest, Forbidden as HTTPForbidden, \
    NotFound as HTTPNotFound, InternalServerError as HTTPInternalServerError
from werkzeug.routing import Map, Rule, Submount
from werkzeug.wrappers import Response

from oio.common.easy_value import int_value
from oio.common.exceptions import Forbidden, NotFound
from oio.common.green import time
from oio.common.json import json
from oio.common.logger import get_logger
from oio.common.wsgi import WerkzeugApp
from oio.xcute.common.backend import XcuteBackend
from oio.xcute.jobs import JOB_TYPES


[docs]def access_log(func): @wraps(func) def access_log_wrapper(self, req, *args, **kwargs): code = -1 pre = time.time() try: rc = func(self, req, *args, **kwargs) code = rc._status_code return rc except HTTPException as exc: code = exc.code raise finally: post = time.time() # remote method code time size user reqid uri self.logger.info( '%s %s %d %d %s %s %s %s', req.environ['HTTP_HOST'], req.environ['REQUEST_METHOD'], code, int((post - pre) * 1000000), '-', '-', '-', req.environ['RAW_URI']) return access_log_wrapper
[docs]def handle_exceptions(func): @wraps(func) def handle_exceptions_wrapper(self, *args, **kwargs): try: return func(self, *args, **kwargs) except HTTPException: raise except NotFound as exc: raise HTTPNotFound(exc.message) except Forbidden as exc: raise HTTPForbidden(exc.message) except Exception as exc: self.logger.exception('Internal error: %s', exc) raise HTTPInternalServerError(str(exc)) return handle_exceptions_wrapper
[docs]class XcuteServer(WerkzeugApp): def __init__(self, conf, logger=None): self.conf = conf self.logger = logger or get_logger(self.conf) self.backend = XcuteBackend(self.conf, logger=self.logger) url_map = Map([ Rule('/status', endpoint='status'), Submount('/v1.0/xcute', [ Rule('/job/list', endpoint='job_list', methods=['GET']), Rule('/job/create', endpoint='job_create', methods=['POST']), Rule('/job/show', endpoint='job_show', methods=['GET']), Rule('/job/pause', endpoint='job_pause', methods=['POST']), Rule('/job/resume', endpoint='job_resume', methods=['POST']), Rule('/job/delete', endpoint='job_delete', methods=['DELETE']), Rule('/lock/list', endpoint='lock_list', methods=['GET']), Rule('/lock/show', endpoint='lock_show', methods=['GET']), ]) ]) super(XcuteServer, self).__init__(url_map, logger)
[docs] @handle_exceptions def on_status(self, req): status = self.backend.status() return Response(json.dumps(status), mimetype='application/json')
[docs] @access_log @handle_exceptions def on_job_list(self, req): limit = int_value(req.args.get('limit'), None) marker = req.args.get('marker') job_infos = self.backend.list_jobs(limit=limit, marker=marker) return Response( json.dumps(job_infos), mimetype='application/json')
[docs] @access_log @handle_exceptions def on_job_create(self, req): job_type = req.args.get('type') if not job_type: raise HTTPBadRequest('Missing job type') job_class = JOB_TYPES.get(job_type) if job_class is None: raise HTTPBadRequest('Unknown job type') job_config, lock = job_class.sanitize_config( json.loads(req.data or '{}')) job_id = self.backend.create(job_type, job_config, lock) job_info = self.backend.get_job_info(job_id) return Response( json.dumps(job_info), mimetype='application/json', status=202)
def _get_job_id(self, req): """Fetch job ID from request query string.""" job_id = req.args.get('id') if not job_id: raise HTTPBadRequest('Missing job ID') return job_id
[docs] @access_log @handle_exceptions def on_job_show(self, req): job_id = self._get_job_id(req) job_info = self.backend.get_job_info(job_id) return Response(json.dumps(job_info), mimetype='application/json')
[docs] @access_log @handle_exceptions def on_job_pause(self, req): job_id = self._get_job_id(req) self.backend.request_pause(job_id) job_info = self.backend.get_job_info(job_id) return Response( json.dumps(job_info), mimetype='application/json', status=202)
[docs] @access_log @handle_exceptions def on_job_resume(self, req): job_id = self._get_job_id(req) self.backend.resume(job_id) job_info = self.backend.get_job_info(job_id) return Response( json.dumps(job_info), mimetype='application/json', status=202)
[docs] @access_log @handle_exceptions def on_job_delete(self, req): job_id = self._get_job_id(req) self.backend.delete(job_id) return Response(status=204)
[docs] @access_log @handle_exceptions def on_lock_list(self, req): locks = self.backend.list_locks() return Response(json.dumps(locks), mimetype='application/json')
[docs] @access_log @handle_exceptions def on_lock_show(self, req): lock = req.args.get('lock') if not lock: raise HTTPBadRequest('Missing lock') lock_info = self.backend.get_lock_info(lock) return Response(json.dumps(lock_info), mimetype='application/json')
[docs]def create_app(conf): logger = get_logger(conf) app = XcuteServer(conf, logger) return app