Source code for oio.blob.auditor

# Copyright (C) 2015-2018 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


from oio.common.green import ratelimit

from contextlib import closing
from string import hexdigits
import hashlib
import time

from oio.blob.utils import check_volume, read_chunk_metadata
from oio.container.client import ContainerClient
from oio.common.daemon import Daemon
from oio.common import exceptions as exc
from oio.common.utils import paths_gen
from oio.common.easy_value import int_value
from oio.common.logger import get_logger
from oio.common.constants import STRLEN_CHUNKID


SLEEP_TIME = 30


[docs]class BlobAuditorWorker(object): def __init__(self, conf, logger, volume): self.conf = conf self.logger = logger self.volume = volume self.run_time = 0 self.passes = 0 self.errors = 0 self.orphan_chunks = 0 self.faulty_chunks = 0 self.corrupted_chunks = 0 self.last_reported = 0 self.chunks_run_time = 0 self.bytes_running_time = 0 self.bytes_processed = 0 self.total_bytes_processed = 0 self.total_chunks_processed = 0 self.report_interval = int_value( conf.get('report_interval'), 3600) self.max_chunks_per_second = int_value( conf.get('chunks_per_second'), 30) self.max_bytes_per_second = int_value( conf.get('bytes_per_second'), 10000000) self.container_client = ContainerClient(conf, logger=self.logger)
[docs] def audit_pass(self): self.namespace, self.address = check_volume(self.volume) start_time = report_time = time.time() total_errors = 0 total_corrupted = 0 total_orphans = 0 total_faulty = 0 audit_time = 0 paths = paths_gen(self.volume) for path in paths: loop_time = time.time() self.safe_chunk_audit(path) self.chunks_run_time = ratelimit( self.chunks_run_time, self.max_chunks_per_second ) self.total_chunks_processed += 1 now = time.time() if now - self.last_reported >= self.report_interval: self.logger.info( '%(start_time)s ' '%(passes)d ' '%(corrupted)d ' '%(faulty)d ' '%(orphans)d ' '%(errors)d ' '%(c_rate).2f ' '%(b_rate).2f ' '%(total).2f ' '%(audit_time).2f' '%(audit_rate).2f' % { 'start_time': time.ctime(report_time), 'passes': self.passes, 'corrupted': self.corrupted_chunks, 'faulty': self.faulty_chunks, 'orphans': self.orphan_chunks, 'errors': self.errors, 'c_rate': self.passes / (now - report_time), 'b_rate': self.bytes_processed / (now - report_time), 'total': (now - start_time), 'audit_time': audit_time, 'audit_rate': audit_time / (now - start_time) } ) report_time = now total_corrupted += self.corrupted_chunks total_orphans += self.orphan_chunks total_faulty += self.faulty_chunks total_errors += self.errors self.passes = 0 self.corrupted_chunks = 0 self.orphan_chunks = 0 self.faulty_chunks = 0 self.errors = 0 self.bytes_processed = 0 self.last_reported = now audit_time += (now - loop_time) elapsed = (time.time() - start_time) or 0.000001 self.logger.info( '%(elapsed).02f ' '%(corrupted)d ' '%(faulty)d ' '%(orphans)d ' '%(errors)d ' '%(chunk_rate).2f ' '%(bytes_rate).2f ' '%(audit_time).2f ' '%(audit_rate).2f' % { 'elapsed': elapsed, 'corrupted': total_corrupted + self.corrupted_chunks, 'faulty': total_faulty + self.faulty_chunks, 'orphans': total_orphans + self.orphan_chunks, 'errors': total_errors + self.errors, 'chunk_rate': self.total_chunks_processed / elapsed, 'bytes_rate': self.total_bytes_processed / elapsed, 'audit_time': audit_time, 'audit_rate': audit_time / elapsed } )
[docs] def safe_chunk_audit(self, path): chunk_id = path.rsplit('/', 1)[-1] # TODO(FVE): if ".pending" suffix, check for stale upload if len(chunk_id) != STRLEN_CHUNKID: self.logger.warn('WARN Not a chunk %s' % path) return for c in chunk_id: if c not in hexdigits: self.logger.warn('WARN Not a chunk %s' % path) return try: self.chunk_audit(path, chunk_id) except exc.FaultyChunk as err: self.faulty_chunks += 1 self.logger.error('ERROR faulty chunk %s: %s', path, err) except exc.CorruptedChunk as err: self.corrupted_chunks += 1 self.logger.error('ERROR corrupted chunk %s: %s', path, err) except exc.OrphanChunk as err: self.orphan_chunks += 1 self.logger.error('ERROR orphan chunk %s: %s', path, err) except Exception: self.errors += 1 self.logger.exception('ERROR while auditing chunk %s', path) self.passes += 1
[docs] def chunk_audit(self, path, chunk_id): with open(path) as chunk_file: return self.chunk_file_audit(chunk_file, chunk_id)
[docs] def chunk_file_audit(self, chunk_file, chunk_id): try: meta, _ = read_chunk_metadata(chunk_file, chunk_id) except exc.MissingAttribute as err: raise exc.FaultyChunk(err) size = int(meta['chunk_size']) md5_checksum = meta['chunk_hash'].lower() reader = ChunkReader(chunk_file, size, md5_checksum) with closing(reader): for buf in reader: buf_len = len(buf) self.bytes_running_time = ratelimit( self.bytes_running_time, self.max_bytes_per_second, increment=buf_len) self.bytes_processed += buf_len self.total_bytes_processed += buf_len try: container_id = meta['container_id'] content_id = meta['content_id'] _obj_meta, data = self.container_client.content_locate( cid=container_id, content=content_id, properties=False) # Check chunk data chunk_data = None metachunks = set() for c in data: if c['url'].endswith(meta['chunk_id']): metachunks.add(c['pos'].split('.', 2)[0]) chunk_data = c if not chunk_data: raise exc.OrphanChunk('Not found in content') metachunk_size = meta.get('metachunk_size') if metachunk_size is not None \ and chunk_data['size'] != int(metachunk_size): raise exc.FaultyChunk('Invalid metachunk size found') metachunk_hash = meta.get('metachunk_hash') if metachunk_hash is not None \ and chunk_data['hash'] != meta['metachunk_hash']: raise exc.FaultyChunk('Invalid metachunk hash found') if chunk_data['pos'] != meta['chunk_pos']: raise exc.FaultyChunk('Invalid chunk position found') except exc.NotFound: raise exc.OrphanChunk('Chunk not found in container')
[docs]class BlobAuditor(Daemon): """ Walk through the chunks of a volume, and check for incoherencies: missing extended attributes, invalid hash, position or size, or orphaned chunk. """ def __init__(self, conf, **kwargs): super(BlobAuditor, self).__init__(conf) self.logger = get_logger(conf) volume = conf.get('volume') if not volume: raise exc.ConfigurationException('No volume specified for auditor') self.volume = volume
[docs] def run(self, *args, **kwargs): work = True while work: work = False try: worker = BlobAuditorWorker(self.conf, self.logger, self.volume) worker.audit_pass() except Exception as e: self.logger.exception('ERROR in audit: %s' % e) if kwargs.get('daemon'): work = True self._sleep()
def _sleep(self): time.sleep(SLEEP_TIME)
[docs]class ChunkReader(object): def __init__(self, fp, size, md5_checksum): self.fp = fp self.size = size self.md5_checksum = md5_checksum self.bytes_read = 0 self.iter_md5 = None def __iter__(self): self.iter_md5 = hashlib.md5() while True: buf = self.fp.read() if buf: self.iter_md5.update(buf) self.bytes_read += len(buf) yield buf else: break
[docs] def close(self): if self.fp: self.md5_read = self.iter_md5.hexdigest() if self.bytes_read != self.size: raise exc.FaultyChunk('Invalid size for chunk') if self.md5_read != self.md5_checksum: raise exc.CorruptedChunk( 'checksum does not match %s != %s' % (self.md5_read, self.md5_checksum))