# Copyright (C) 2018-2019 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import random
import signal
from oio.common.green import ratelimit, eventlet, threading, ContextPool, time
from oio.common.easy_value import int_value
from oio.common import exceptions
from oio.common.logger import get_logger
# TODO(FVE): rename class and module
[docs]class Rebuilder(object):
"""
Base class for rebuilders or movers.
Subclass and implement
`_create_worker()`
`_fill_queue()`
`_item_to_string()`
`_get_report()`
`_read_retry_queue()`
"""
def __init__(self, conf, logger, volume, input_file=None, **kwargs):
# pylint: disable=no-member
self.conf = conf
self.logger = logger or get_logger(conf)
self.namespace = conf['namespace']
self.volume = volume
self.input_file = input_file
self.nworkers = int_value(conf.get('workers'), 1)
self.success = True
# exit gracefully
self.running = True
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)
# counters
self.lock_counters = threading.Lock()
self.items_processed = 0
self.errors = 0
self.total_items_processed = 0
self.total_errors = 0
# report
self.lock_report = threading.Lock()
self.start_time = 0
self.last_report = 0
self.report_interval = int_value(conf.get('report_interval'), 3600)
[docs] def exit_gracefully(self, signum, frame):
self.logger.info(
'Stop sending and wait for all results already sent')
self.success = False
self.running = False
[docs] def rebuilder_pass(self, **kwargs):
self.start_time = self.last_report = time.time()
self.log_report('START', force=True)
workers = list()
with ContextPool(self.nworkers + 1) as pool:
# spawn one worker for the retry queue
rqueue = eventlet.Queue(self.nworkers)
pool.spawn(self._read_retry_queue, rqueue, **kwargs)
# spawn workers to rebuild
queue = eventlet.Queue(self.nworkers*10)
for i in range(self.nworkers):
worker = self._create_worker(**kwargs)
workers.append(worker)
pool.spawn(worker.rebuilder_pass, i, queue,
retry_queue=rqueue, **kwargs)
# fill the queue (with the main thread)
try:
self._fill_queue(queue, **kwargs)
except Exception as exc:
if self.running:
self.logger.error("Failed to fill queue: %s", exc)
self.success = False
# block until all items are rebuilt
queue.join()
# block until the retry queue is empty
rqueue.join()
self.log_report('DONE', force=True)
return self.success and self.total_errors == 0
def _create_worker(self, **kwargs):
"""
Spawn a worker, subclass of `RebuilderWorker`.
"""
raise NotImplementedError()
def _fill_queue(self, queue, **kwargs):
"""
Fill `queue` with items that will be passed to
`RebuilderWorker#_rebuild_one()`.
"""
raise NotImplementedError()
def _read_retry_queue(self, queue, **kwargs):
"""
Read `exceptions.RetryLater` items that return from workers,
because they cannot be handled at the moment. They can either be
put in the main queue again or be sent to an external one, or
just dropped (default implementation).
"""
while True:
queue.get()
queue.task_done()
def _item_to_string(self, item, **kwargs):
"""
Pretty-print an item that is being worked on.
"""
raise NotImplementedError()
def _update_processed_without_lock(self, info, error=None, increment=1,
**kwargs):
self.items_processed += increment
if error is not None:
self.errors += 1
[docs] def update_processed(self, item, info, error=None, increment=1, **kwargs):
if error is not None:
self.logger.error('ERROR while rebuilding %s: %s',
self._item_to_string(item, **kwargs), error)
with self.lock_counters:
self._update_processed_without_lock(info, error=error,
increment=increment, **kwargs)
def _update_totals_without_lock(self, **kwargs):
items_processed = self.items_processed
self.items_processed = 0
self.total_items_processed += items_processed
errors = self.errors
self.errors = 0
self.total_errors += errors
return items_processed, errors, self.total_items_processed, \
self.total_errors
[docs] def update_totals(self, **kwargs):
with self.lock_counters:
return self._update_totals_without_lock(**kwargs)
def _get_report(self, status, end_time, counters, **kwargs):
raise NotImplementedError()
[docs] def log_report(self, status, force=False, **kwargs):
end_time = time.time()
if (force and self.lock_report.acquire()) \
or (end_time - self.last_report >= self.report_interval
and self.lock_report.acquire(False)):
try:
counters = self.update_totals()
self.logger.info(
self._get_report(status, end_time, counters, **kwargs))
self.last_report = end_time
finally:
self.lock_report.release()
[docs]class RebuilderWorker(object):
"""
Base class for rebuilder or mover workers.
Subclass and implement `_rebuild_one()`.
"""
def __init__(self, rebuilder, **kwargs):
self.rebuilder = rebuilder
self.conf = rebuilder.conf
self.logger = rebuilder.logger
self.namespace = rebuilder.namespace
self.volume = rebuilder.volume
self.items_run_time = 0
self.max_items_per_second = int_value(
rebuilder.conf.get('items_per_second'), 30)
self.random_wait = rebuilder.conf.get('random_wait')
[docs] def update_processed(self, item, info, error=None, **kwargs):
return self.rebuilder.update_processed(item, info, error=error,
**kwargs)
[docs] def log_report(self, **kwargs):
return self.rebuilder.log_report('RUN', **kwargs)
[docs] def rebuilder_pass(self, num, queue, retry_queue=None, **kwargs):
while True:
info = None
err = None
item = queue.get()
try:
info = self._rebuild_one(item, **kwargs)
except exceptions.RetryLater as exc:
if retry_queue:
self.logger.warn(
"Putting an item in the retry queue: %s", exc.args[1])
retry_queue.put(exc.args[0])
else:
err = str(exc)
except Exception as exc:
err = str(exc)
queue.task_done()
self.update_processed(item, info, error=err, **kwargs)
self.log_report(**kwargs)
self.items_run_time = ratelimit(self.items_run_time,
self.max_items_per_second)
if self.random_wait:
eventlet.sleep(random.randint(0, self.random_wait) / 1.0e6)
def _rebuild_one(self, item, **kwargs):
"""
Rebuild one item from the queue previously filled
by `Rebuilder#_fill_queue()`.
"""
raise NotImplementedError()