# Copyright (C) 2018-2019 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from datetime import datetime
from oio.common import exceptions
from oio.common.easy_value import int_value
from oio.common.fullpath import encode_fullpath
from oio.common.green import sleep
from oio.common.json import json
from oio.common.utils import request_id
from oio.content.factory import ContentFactory
from oio.event.beanstalk import BeanstalkdListener, BeanstalkdSender
from oio.event.evob import EventTypes
from oio.rebuilder.rebuilder import Rebuilder, RebuilderWorker
DEFAULT_IMPROVER_TUBE = 'oio-improve'
[docs]class BlobImprover(Rebuilder):
"""
Move chunks of objects declared as "perfectible",
if possible to improve them (increased distance between chunks or
better hosting service).
"""
supported_events = (EventTypes.CONTENT_PERFECTIBLE, )
def __init__(self, conf, logger, beanstalkd_addr, **kwargs):
super(BlobImprover, self).__init__(conf, logger, volume=None, **kwargs)
self.content_factory = ContentFactory(self.conf, logger=self.logger)
beanstalkd_tube = self.conf.get('beanstalkd_tube',
DEFAULT_IMPROVER_TUBE)
self.listener = BeanstalkdListener(beanstalkd_addr, beanstalkd_tube,
self.logger, **kwargs)
self.sender = BeanstalkdSender(beanstalkd_addr, beanstalkd_tube,
self.logger, **kwargs)
self.retry_delay = int_value(self.conf.get('retry_delay'), 30)
self.reqid_prefix = 'blob-impr-'
[docs] def exit_gracefully(self, signum, frame):
super(BlobImprover, self).exit_gracefully(signum, frame)
self.listener.running = False
def _event_from_job(self, job_id, data, **kwargs):
"""Decode a JSON string into an event dictionary."""
# pylint: disable=no-member
event = json.loads(data)
type_ = event.get('event')
# Bury events that should not be there
if type_ not in self.__class__.supported_events:
msg = 'Discarding event %s (type=%s)' % (
event.get('job_id'), type_)
self.logger.info(msg)
raise exceptions.ExplicitBury(msg)
yield event
def _create_worker(self, **kwargs):
return BlobImproverWorker(self, **kwargs)
def _fill_queue(self, queue, **kwargs):
max_events = kwargs.get('max_events')
sent_events = 0
# Do not block more than 2 seconds
events = self.listener.fetch_jobs(self._event_from_job,
reserve_timeout=2, **kwargs)
for event in events:
queue.put(event)
sent_events += 1
if max_events > 0 and sent_events >= max_events:
self.logger.info('Max events (%d) reached, exiting',
max_events)
break
if not self.running:
break
events.close()
def _read_retry_queue(self, queue, **kwargs):
while True:
# Reschedule jobs we were not able to handle.
item = queue.get()
sent = False
while not sent:
sent = self.sender.send_job(json.dumps(item),
delay=self.retry_delay)
if not sent:
sleep(1.0)
self.sender.job_done()
queue.task_done()
def _item_to_string(self, item, **kwargs):
try:
url = item['url']
fullpath = encode_fullpath(
url['account'], url['user'], url['path'],
url.get('version', 1), url['content'])
# TODO(FVE): maybe tell some numbers about chunks
if item.get('event') == EventTypes.CONTENT_PERFECTIBLE:
return 'perfectible object %s' % (fullpath, )
else:
return 'object %s' % (fullpath, )
except (KeyError, ValueError) as err:
return '<unknown item> ({0})'.format(repr(err))
def _get_report(self, status, end_time, counters, **kwargs):
items_processed, errors, total_items_processed, total_errors = counters
time_since_last_report = (end_time - self.last_report) or 0.00001
total_time = (end_time - self.start_time) or 0.00001
return ('%(status)s volume=%(volume)s '
'last_report=%(last_report)s %(time_since_last_report).2fs '
'chunks=%(chunks)d %(chunks_rate).2f/s '
'errors=%(errors)d %(errors_rate).2f%% '
'start_time=%(start_time)s %(total_time).2fs '
'total_chunks=%(total_chunks)d '
'%(total_chunks_rate).2f/s '
'total_errors=%(total_errors)d %(total_errors_rate).2f%%' % {
'status': status,
'volume': self.volume,
'last_report': datetime.fromtimestamp(
int(self.last_report)).isoformat(),
'time_since_last_report': time_since_last_report,
'chunks': items_processed,
'chunks_rate':
items_processed / time_since_last_report,
'errors': errors,
'errors_rate':
100 * errors / float(items_processed or 1),
'start_time': datetime.fromtimestamp(
int(self.start_time)).isoformat(),
'total_time': total_time,
'total_chunks': total_items_processed,
'total_chunks_rate':
total_items_processed / total_time,
'total_errors': total_errors,
'total_errors_rate': 100 * total_errors /
float(total_items_processed or 1)
})
[docs]class BlobImproverWorker(RebuilderWorker):
def __init__(self, rebuilder, **kwargs):
super(BlobImproverWorker, self).__init__(rebuilder, **kwargs)
@property
def content_factory(self):
return self.rebuilder.content_factory
[docs] def move_perfectible_from_event(self, event, dry_run=False,
max_attempts=3, **kwargs):
"""
Move one or more "perfectible" chunks described in a
"storage.content.perfectible" event.
"""
url = event['url']
reqid = request_id(self.rebuilder.reqid_prefix)
descr = self.rebuilder._item_to_string(event)
self.logger.info('Working on %s (reqid=%s)', descr, reqid)
# There are chances that the set of chunks of the object has
# changed between the time the event has been emitted and now.
# It seems a good idea to reload the object metadata and compare.
content = self.content_factory.get(url['id'], url['content'],
account=url.get('account'),
container_name=url.get('user'),
reqid=reqid)
for chunk in event['data']['chunks']:
found = content.chunks.filter(url=chunk['id']).one()
if not found:
raise exceptions.PreconditionFailed(
"Chunk %s not found in %s" % (chunk['id'], descr))
# Chunk quality information is not saved along with object
# metadata, thus we must fill it now.
found.quality = chunk['quality']
moveable = [chunk for chunk in content.chunks
if chunk.imperfections]
moveable.sort(key=lambda x: x.imperfections)
moves = list()
errors = list()
if not moveable:
self.logger.info('Nothing to do for %s', descr)
return moves, errors
for chunk in moveable:
try:
src = str(chunk.url)
# Must do a copy or bad things will happen.
raw_src = dict(chunk.raw())
self.logger.debug("Working on %s: %s",
src, chunk.imperfections)
# TODO(FVE): try to improve all chunks of a metachunk
# in a single pass
dst = content.move_chunk(chunk, check_quality=True,
dry_run=dry_run, reqid=reqid,
max_attempts=max_attempts,
**kwargs)
self.logger.debug("%s replaced by %s", src, dst['url'])
moves.append((raw_src, dst))
except exceptions.OioException as err:
self.logger.warn("Could not improve %s: %s", chunk, err)
errors.append(err)
return moves, errors
def _rebuild_one(self, item, dry_run=False, move_attempts=3, **kwargs):
moves, errors = self.move_perfectible_from_event(
item, dry_run=dry_run, max_attempts=move_attempts, **kwargs)
if errors:
if not moves:
# Later we may want to limit attempts.
item['attempts'] = item.get('attempts', 0) + 1
raise exceptions.RetryLater(
item, 'Could not improve any chunk: %s' % errors)
else:
self.logger.info(
'Some chunks of %s have not been improved: %s',
self.rebuilder._item_to_string(item), errors)
# TODO(FVE): build a new event, send it back
else:
# TODO(FVE): if there are no moves, should we reschedule?
for move in moves:
self.logger.debug('%s%s moved to %s',
'dry-run: ' if dry_run else '',
move[0], move[1])
if dry_run:
raise exceptions.RetryLater(item, 'Rescheduled after dry-run')