# Copyright (C) 2015-2017 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import print_function
from oio.common.green import eventlet
import sys
import random
import errno
import signal
import time
import os
from oio.common.utils import CPU_COUNT, drop_privileges
from oio.common.easy_value import int_value
from oio.common.configuration import read_conf
from oio.common.logger import get_logger, redirect_stdio
[docs]class HaltServer(BaseException):
def __init__(self, reason, exit_status=1):
self.reason = reason
self.exit_status = exit_status
[docs]class Runner(object):
WORKER_START_ERROR = 3
SIGNALS = [getattr(signal, "SIG%s" % x)
for x in "HUP QUIT INT TERM".split()]
SIG_NAMES = dict(
(getattr(signal, name), name[3:].lower()) for name in dir(signal)
if name[:3] == "SIG" and name[3] != "_"
)
def __init__(self, conf_file, worker_class, **kwargs):
section_name = 'event-agent'
self.conf = read_conf(conf_file, section_name)
self.logger = get_logger(
self.conf, verbose=kwargs.pop('verbose', False))
redirect_stdio(self.logger)
drop_privileges(self.conf.get('user', 'openio'))
self.num_workers = int_value(self.conf.get('workers'), CPU_COUNT)
self.worker_class = worker_class
self.workers = {}
self.sig_queue = []
[docs] def start(self):
self.logger.info('Starting event-agent')
self.pid = os.getpid()
self.configure_signals()
[docs] def signal(self, sig, frame):
if len(self.sig_queue) < 5:
self.sig_queue.append(sig)
[docs] def run(self):
self.start()
try:
self.manage_workers()
while True:
sig = self.sig_queue.pop(0) if self.sig_queue else None
if sig is None:
eventlet.sleep(1)
self.manage_workers()
continue
if sig not in self.SIG_NAMES:
self.logger.info('Ignoring unknown signal: %s', sig)
continue
signame = self.SIG_NAMES.get(sig)
handler = getattr(self, "handle_%s" % signame, None)
if not handler:
self.logger.error("Unhandled signal: %s", signame)
continue
self.logger.info("Handling signal: %s", signame)
handler()
except StopIteration:
self.halt()
except KeyboardInterrupt:
self.halt()
except HaltServer as h:
self.halt(reason=h.reason, exit_status=h.exit_status)
except SystemExit:
raise
except Exception:
self.logger.info("Unhandled exception in main loop", exc_info=True)
self.stop(False)
sys.exit(-1)
[docs] def handle_chld(self, sig, frame):
self.reap_workers()
[docs] def handle_hup(self):
self.logger.info("Shutdown gracefully")
self.reload()
[docs] def handle_term(self):
raise StopIteration
[docs] def handle_int(self):
self.stop(False)
raise StopIteration
[docs] def handle_quit(self):
self.stop(False)
raise StopIteration
[docs] def halt(self, reason=None, exit_status=0):
self.stop()
self.logger.info("Shutting down")
if reason is not None:
self.logger.info("Reason: %s", reason)
sys.exit(exit_status)
[docs] def stop(self, graceful=True):
sig = signal.SIGTERM
if not graceful:
sig = signal.SIGQUIT
limit = time.time() + 5
self.kill_workers(sig)
while self.workers and time.time() < limit:
eventlet.sleep(0.1)
self.kill_workers(signal.SIGKILL)
[docs] def reap_workers(self):
try:
while True:
wpid, status = os.waitpid(-1, os.WNOHANG)
if not wpid:
break
exitcode = status >> 8
if exitcode == self.WORKER_START_ERROR:
reason = "Worker failed to start"
raise HaltServer(reason, self.WORKER_START_ERROR)
worker = self.workers.pop(wpid, None)
if not worker:
continue
except OSError as e:
if e.errno != errno.ECHILD:
raise
[docs] def manage_workers(self):
if len(self.workers) < self.num_workers:
self.spawn_workers()
workers = self.workers.items()
while len(workers) > self.num_workers:
(pid, _) = workers.pop(0)
self.kill_worker(pid, signal.SIGTERM)
[docs] def spawn_worker(self):
worker = self.worker_class(self.pid, self.conf, self.logger)
pid = os.fork()
if pid != 0:
self.workers[pid] = worker
return pid
# child process
signal.signal(signal.SIGCHLD, signal.SIG_DFL)
worker_pid = os.getpid()
try:
self.logger.info("Booting worker with pid: %s", worker_pid)
worker.init()
sys.exit(0)
except SystemExit:
raise
except Exception:
self.logger.exception("Exception in worker process")
if not worker.started:
sys.exit(self.WORKER_START_ERROR)
sys.exit(-1)
finally:
self.logger.info("Worker exiting (pid: %s)", worker_pid)
[docs] def spawn_workers(self):
for i in range(self.num_workers - len(self.workers)):
self.spawn_worker()
eventlet.sleep(0.1 * random.random())
[docs] def kill_workers(self, sig):
worker_pids = list(self.workers.keys())
for pid in worker_pids:
self.kill_worker(pid, sig)
[docs] def kill_worker(self, pid, sig):
try:
os.kill(pid, sig)
except OSError as e:
if e.errno == errno.ESRCH:
try:
self.workers.pop(pid)
return
except (KeyError, OSError):
return
raise
[docs]def validate_msg(msg):
return len(msg) == 4