# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from datetime import datetime
from string import hexdigits
from oio.blob.utils import check_volume, read_chunk_metadata
from oio.common.constants import STRLEN_CHUNKID, CHUNK_SUFFIX_PENDING
from oio.common.easy_value import int_value
from oio.common.exceptions import Conflict, NotFound
from oio.common.green import ratelimit, time
from oio.common.utils import paths_gen
from oio.container.client import ContainerClient
[docs]class BlobRegistrator(object):
DEFAULT_CHUNKS_PER_SECOND = 30
DEFAULT_REPORT_INTERVAL = 3600
BEAN_TYPES = ('alias', 'header', 'chunk')
def __init__(self, conf, logger, volume, container_ids):
self.conf = conf
self.logger = logger
self.volume = volume
self.volume_ns, self.volume_id = check_volume(self.volume)
self.container_ids = container_ids or list()
self.container_ids = [container_id.upper()
for container_id in self.container_ids]
self.namespace = self.conf['namespace']
if self.namespace != self.volume_ns:
raise ValueError(
'Namespace (%s) mismatch with volume namespace (%s)',
self.namespace, self.volume_ns)
# action
self.action_name = self.conf['action'].lower()
if (self.action_name == 'insert'):
self.action = self._insert_bean
elif (self.action_name == 'update'):
self.action = self._update_bean
elif (self.action_name == 'check'):
self.action = self._check_bean
else:
raise ValueError('Unknown action (%s)', self.action_name)
# speed
self.chunks_run_time = 0
self.max_chunks_per_second = int_value(
self.conf.get('chunks_per_second'),
self.DEFAULT_CHUNKS_PER_SECOND)
# counters
self.chunks_processed = 0
self.chunk_errors = 0
self.beans_processed = dict()
self.bean_successes = dict()
self.bean_already_exists = dict()
self.bean_orphans = dict()
self.bean_errors = dict()
for bean_type in self.BEAN_TYPES:
self.beans_processed[bean_type] = 0
self.bean_successes[bean_type] = 0
self.bean_already_exists[bean_type] = 0
self.bean_orphans[bean_type] = 0
self.bean_errors[bean_type] = 0
# report
self.start_time = 0
self.last_report = 0
self.report_interval = int_value(
conf.get('report_interval'),
self.DEFAULT_REPORT_INTERVAL)
self.client = ContainerClient(
{'namespace': self.namespace}, logger=self.logger)
self.ctime = int(time.time())
def _beans_from_meta(self, meta):
return \
[{
'type': 'alias',
'name': meta['content_path'],
'version': int(meta['content_version']),
'ctime': self.ctime,
'mtime': self.ctime,
'deleted': False,
'header': meta['content_id']
}, {
'type': 'header',
'id': meta['content_id'],
'size': 0,
'ctime': self.ctime,
'mtime': self.ctime,
'policy': meta['content_policy'],
'chunk-method': meta['content_chunkmethod'],
'mime-type': 'application/octet-stream'
}, {
'type': 'chunk',
'id': 'http://' + self.volume_id + '/' + meta['chunk_id'],
'hash': meta.get('metachunk_hash') or meta['chunk_hash'],
'size': int(meta['chunk_size']),
'ctime': self.ctime,
'pos': meta['chunk_pos'],
'content': meta['content_id']
}]
def _check_bean(self, meta, bean):
raise Exception("CHECK not yet implemented")
def _insert_bean(self, meta, bean):
self.client.container_raw_insert(bean, cid=meta['container_id'])
def _update_bean(self, meta, bean):
self.client.container_raw_update(
[bean], [bean], cid=meta['container_id'])
def _get_report(self, status, end_time):
time_since_last_report = (end_time - self.last_report) or 0.00001
total_time = (end_time - self.start_time) or 0.00001
report = (
'%(status)s volume=%(volume)s '
'start_time=%(start_time)s %(total_time).2fs '
'last_report=%(last_report)s %(time_since_last_report).2fs '
'chunks_processed=%(chunks_processed)d '
'%(chunks_processed_rate).2f/s '
'chunk_errors=%(chunk_errors)d '
'%(chunk_errors_rate).2f%% ' % {
'status': status,
'volume': self.volume_id,
'start_time': datetime.fromtimestamp(
int(self.start_time)).isoformat(),
'total_time': total_time,
'last_report': datetime.fromtimestamp(
int(self.last_report)).isoformat(),
'time_since_last_report': time_since_last_report,
'chunks_processed': self.chunks_processed,
'chunks_processed_rate':
self.chunks_processed / total_time,
'chunk_errors': self.chunk_errors,
'chunk_errors_rate':
100 * self.chunk_errors
/ float(self.chunks_processed or 1),
})
for bean_type in self.BEAN_TYPES:
report = (
'%(report)s '
'bean_%(bean_type)s_processed=%(beans_processed)d '
'%(beans_processed_rate).2f/s '
'bean_%(bean_type)s_successes=%(bean_successes)d '
'%(bean_successes_rate).2f%% '
'bean_%(bean_type)s_already_exists=%(bean_already_exists)d '
'%(bean_already_exists_rate).2f%% '
'bean_%(bean_type)s_orphans=%(bean_orphans)d '
'%(bean_orphans_rate).2f%% '
'bean_%(bean_type)s_errors=%(bean_errors)d '
'%(bean_errors_rate).2f%%' % {
'report': report,
'bean_type': bean_type,
'beans_processed': self.beans_processed[bean_type],
'beans_processed_rate':
self.beans_processed[bean_type]
/ total_time,
'bean_successes': self.bean_successes[bean_type],
'bean_successes_rate':
100 * self.bean_successes[bean_type]
/ float(self.beans_processed[bean_type] or 1),
'bean_already_exists': self.bean_already_exists[bean_type],
'bean_already_exists_rate':
100 * self.bean_already_exists[bean_type]
/ float(self.beans_processed[bean_type] or 1),
'bean_orphans': self.bean_orphans[bean_type],
'bean_orphans_rate':
100 * self.bean_orphans[bean_type]
/ float(self.beans_processed[bean_type] or 1),
'bean_errors': self.bean_errors[bean_type],
'bean_errors_rate':
100 * self.bean_errors[bean_type]
/ float(self.beans_processed[bean_type] or 1)
})
return report
[docs] def log_report(self, status, force=False):
end_time = time.time()
if force or (end_time - self.last_report >= self.report_interval):
self.logger.info(self._get_report(status, end_time))
self.last_report = end_time
[docs] def pass_volume(self):
self.start_time = self.last_report = time.time()
self.log_report('START', force=True)
paths = paths_gen(self.volume)
for path in paths:
try:
self.pass_chunk_file(path)
self.chunks_processed += 1
except Exception as exc:
self.logger.error(
'Failed to pass chunk file (chunk_file=%s): %s',
path, exc)
self.chunk_errors += 1
self.log_report('RUN')
self.chunks_run_time = ratelimit(
self.chunks_run_time, self.max_chunks_per_second)
self.log_report('DONE', force=True)
return self.chunk_errors == 0 \
and all(errors == 0 for errors in self.bean_errors.values())
[docs] def pass_chunk_file(self, path):
chunk_id = path.rsplit('/', 1)[-1]
if len(chunk_id) != STRLEN_CHUNKID:
if chunk_id.endswith(CHUNK_SUFFIX_PENDING):
self.logger.info('Skipping pending chunk %s', path)
else:
self.logger.warn('WARN Not a chunk %s', path)
return
for char in chunk_id:
if char not in hexdigits:
self.logger.warn('WARN Not a chunk %s', path)
return
with open(path) as f:
meta, _ = read_chunk_metadata(f, chunk_id)
if self.container_ids \
and meta['container_id'] in self.container_ids:
self.logger.debug(
'Skipping chunk file (container_id=%s content_path=%s '
'content_version=%s content_id=%s chunk_id=%s '
'chunk_pos=%s)', meta['container_id'],
meta['content_path'], meta['content_version'],
meta['content_id'], meta['chunk_id'], meta['chunk_pos'])
return
beans = self._beans_from_meta(meta)
for bean in beans:
try:
self.pass_bean(meta, bean)
except Exception as exc:
self.logger.error(
'Failed to pass chunk file (container_id=%s '
'content_path=%s content_version=%s content_id=%s '
'chunk_id=%s chunk_pos=%s): %s', meta['container_id'],
meta['content_path'], meta['content_version'],
meta['content_id'], meta['chunk_id'],
meta['chunk_pos'], exc)
self.bean_errors[bean['type']] = \
self.bean_errors[bean['type']] + 1
[docs] def pass_bean(self, meta, bean):
try:
self.beans_processed[bean['type']] = \
self.beans_processed[bean['type']] + 1
self.action(meta, bean)
self.logger.debug(
'Passed %s (container_id=%s content_path=%s '
'content_version=%s content_id=%s chunk_id=%s chunk_pos=%s)',
bean['type'], meta['container_id'], meta['content_path'],
meta['content_version'], meta['content_id'],
meta['chunk_id'], meta['chunk_pos'])
self.bean_successes[bean['type']] = \
self.bean_successes[bean['type']] + 1
except Conflict as exc:
self.logger.info(
'Already exists %s (container_id=%s content_path=%s '
'content_version=%s content_id=%s chunk_id=%s chunk_pos=%s): '
'%s', bean['type'], meta['container_id'], meta['content_path'],
meta['content_version'], meta['content_id'],
meta['chunk_id'], meta['chunk_pos'], exc)
self.bean_already_exists[bean['type']] = \
self.bean_already_exists[bean['type']] + 1
except NotFound as exc:
self.logger.info(
'Orphan %s (container_id=%s content_path=%s '
'content_version=%s content_id=%s chunk_id=%s chunk_pos=%s): '
'%s', bean['type'], meta['container_id'], meta['content_path'],
meta['content_version'], meta['content_id'],
meta['chunk_id'], meta['chunk_pos'], exc)
self.bean_orphans[bean['type']] = \
self.bean_orphans[bean['type']] + 1
except Exception as exc:
self.logger.error(
'Failed to pass %s (container_id=%s content_path=%s '
'content_version=%s content_id=%s chunk_id=%s chunk_pos=%s): '
'%s', bean['type'], meta['container_id'], meta['content_path'],
meta['content_version'], meta['content_id'],
meta['chunk_id'], meta['chunk_pos'], exc)
self.bean_errors[bean['type']] = \
self.bean_errors[bean['type']] + 1