# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Recursively check account, container, content and chunk integrity.
"""
from __future__ import print_function
from oio.common.green import Event, GreenPool, LightQueue, sleep, Semaphore,\
ratelimit_function_build
import os
import csv
import sys
from time import time
from oio.blob.rebuilder import BlobRebuilder
from oio.common import exceptions as exc
from oio.common.fullpath import decode_fullpath
from oio.common.json import json
from oio.common.logger import get_logger
from oio.common.storage_method import STORAGE_METHODS
from oio.common.utils import cid_from_name, CacheDict
from oio.event.beanstalk import BeanstalkdSender
from oio.api.object_storage import ObjectStorageApi
from oio.api.object_storage import _sort_chunks
from oio.rdir.client import RdirClient
DEFAULT_DEPTH = 4
IRREPARABLE_PREFIX = '#IRREPARABLE'
[docs]class Target(object):
"""
Identify the target of a check, hold a log of errors.
"""
def __init__(self, account, container=None, obj=None,
chunk=None, content_id=None, version=None,
cid=None):
self.account = account
self.container = container
self.obj = obj
self.content_id = content_id
self.version = version
self.chunk = chunk
self._cid = cid
# List of tuples with a timestamp as first element,
# and an ItemResult as second element.
self.error_log = list()
[docs] def append_result(self, result):
self.error_log.append((time(), result))
@property
def cid(self):
if not self._cid and self.account and self.container:
self._cid = cid_from_name(self.account, self.container)
return self._cid
@cid.setter
def cid(self, cid):
if cid is not None:
self._cid = cid
self.account = None
self.container = None
[docs] def copy(self):
return Target(
self.account,
self.container,
self.obj,
self.chunk,
self.content_id,
self.version)
[docs] def copy_object(self):
return Target(self.account, self.container, self.obj,
content_id=self.content_id, version=self.version)
[docs] def copy_container(self):
return Target(self.account, self.container)
[docs] def copy_account(self):
return Target(self.account)
@property
def has_errors(self):
"""
Tell if this target still presents errors.
Will return False if it showed errors in the past but does not show
them anymore.
"""
return self.error_log and self.error_log[-1][1].errors
@property
def irreparable(self):
"""
Tell if the target presents irreparable errors.
Check only the latest result. The "irreparable" situation may have been
temporary, for example if a rawx went down then up again.
"""
return self.has_errors and self.latest_error_result().irreparable
[docs] def latest_error_result(self):
if self.has_errors:
return self.error_log[-1][1]
return None
[docs] def time_in_error(self):
"""
Tell for how long this target has shown errors.
:rtype: tuple
:returns: the duration (in seconds) since we detected an error,
and the number of consecutive error confirmations.
"""
if not self.has_errors:
return 0.0, 0
consecutive = list()
for res in reversed(self.error_log):
if not res[1]:
break
consecutive.append(res)
return time() - consecutive[-1][0], len(consecutive) - 1
def __repr__(self):
if self.type == 'chunk':
return 'chunk=' + self.chunk
out = 'account=%s' % self.account
if self.container:
out += ', container=' + self.container
if self.cid:
out += ', cid=' + self.cid
if self.obj:
out += ', obj=' + self.obj
if self.content_id:
out += ', content_id=' + self.content_id
if self.version:
out += ', version=' + self.version
if self.chunk:
out += ', chunk=' + self.chunk
return out
@property
def type(self):
"""Tell which type of item this object targets."""
if self.chunk:
return 'chunk'
elif self.obj:
return 'object'
elif self.container:
return 'container'
else:
return 'account'
[docs]class ItemResult(object):
"""
Hold the result of a check.
Must be serializable to be used in the Checker's return queue.
"""
def __init__(self, errors=None, irreparable=False):
self.errors = errors if errors is not None else list()
self.irreparable = irreparable
[docs] def errors_to_str(self, separator='\n', err_format='%s'):
"""
Pretty print errors stored in this result.
"""
if not self.errors:
return str(None)
return separator.join(err_format % x for x in self.errors)
[docs]class Checker(object):
def __init__(self, namespace, concurrency=50,
error_file=None, rebuild_file=None, check_xattr=True,
limit_listings=0, request_attempts=1,
logger=None, verbose=False, check_hash=False,
min_time_in_error=0.0, required_confirmations=0,
beanstalkd_addr=None,
beanstalkd_tube=BlobRebuilder.DEFAULT_BEANSTALKD_WORKER_TUBE,
cache_size=2**24, **_kwargs):
self.pool = GreenPool(concurrency)
self.error_file = error_file
self.error_sender = None
self.check_xattr = bool(check_xattr)
self.check_hash = bool(check_hash)
self.logger = logger or get_logger({'namespace': namespace},
name='integrity', verbose=verbose)
# Optimisation for when we are only checking one object
# or one container.
# 0 -> do not limit
# 1 -> limit account listings (list of containers)
# 2 -> limit container listings (list of objects)
self.limit_listings = limit_listings
if self.error_file:
outfile = open(self.error_file, 'a')
self.error_writer = csv.writer(outfile, delimiter=' ')
self.rebuild_file = rebuild_file
if self.rebuild_file:
self.fd = open(self.rebuild_file, 'a')
self.rebuild_writer = csv.writer(self.fd, delimiter='|')
if beanstalkd_addr:
self.error_sender = BeanstalkdSender(
beanstalkd_addr, beanstalkd_tube, self.logger)
self.api = ObjectStorageApi(
namespace,
logger=self.logger,
max_retries=request_attempts - 1,
request_attempts=request_attempts)
self.rdir_client = RdirClient(
{"namespace": namespace}, logger=self.logger)
self.accounts_checked = 0
self.containers_checked = 0
self.objects_checked = 0
self.chunks_checked = 0
self.account_not_found = 0
self.container_not_found = 0
self.object_not_found = 0
self.chunk_not_found = 0
self.account_exceptions = 0
self.container_exceptions = 0
self.object_exceptions = 0
self.chunk_exceptions = 0
self.list_cache = CacheDict(cache_size)
self.running_tasks = {}
self.running_lock = Semaphore(1)
self.result_queue = LightQueue(concurrency)
self.running = True
self.run_time = 0
# Set of targets which must be checked again, to confirm
# or deny the issues reported by previous passes.
self.delayed_targets = dict()
# Minimum time in error and number of confirmations of the error
# before triggering a reconstruction action.
self.min_time_in_error = min_time_in_error
self.required_confirmations = required_confirmations
[docs] def reset_stats(self):
self.accounts_checked = 0
self.containers_checked = 0
self.objects_checked = 0
self.chunks_checked = 0
self.account_not_found = 0
self.container_not_found = 0
self.object_not_found = 0
self.chunk_not_found = 0
self.account_exceptions = 0
self.container_exceptions = 0
self.object_exceptions = 0
self.chunk_exceptions = 0
def _spawn(self, func, target, *args, **kwargs):
"""
Spawn a task on the internal GreenPool.
Discards the task if the pool is no more running.
"""
if self.running:
return self.pool.spawn(func, target, *args, **kwargs)
self.logger.info("Discarding %s", target)
return None
def _spawn_n(self, func, target, *args, **kwargs):
"""
Spawn a task on the internal GreenPool, do not wait for the result.
Discards the task if the pool is no more running.
"""
if self.running:
return self.pool.spawn_n(func, target, *args, **kwargs)
self.logger.info("Discarding %s", target)
return None
[docs] def send_result(self, target, errors=None, irreparable=False):
"""
Put an item in the result queue.
"""
# TODO(FVE): send to an external queue.
target.append_result(ItemResult(errors, irreparable))
self.result_queue.put(target)
[docs] def send_chunk_job(self, target, irreparable=False):
"""
Send a "content broken" event, to trigger the
reconstruction of the chunk.
"""
item = (self.api.namespace, target.cid,
target.content_id, target.chunk)
ev_dict = BlobRebuilder.task_event_from_item(item)
if irreparable:
ev_dict['data']['irreparable'] = irreparable
job = json.dumps(ev_dict)
self.error_sender.send_job(job)
self.error_sender.job_done() # Don't expect any response
[docs] def write_error(self, target, irreparable=False):
if not self.error_file:
return
error = list()
if irreparable:
error.append(IRREPARABLE_PREFIX)
error.append(target.account)
if target.container:
error.append(target.container)
if target.obj:
error.append(target.obj)
if target.chunk:
error.append(target.chunk)
self.error_writer.writerow(error)
[docs] def write_chunk_error(self, target,
chunk=None, irreparable=False):
if chunk is not None:
target = target.copy()
target.chunk = chunk
self.write_error(target, irreparable=irreparable)
if self.rebuild_file:
self.write_rebuilder_input(target,
irreparable=irreparable)
if self.error_sender:
self.send_chunk_job(target, irreparable=irreparable)
def _check_chunk_xattr(self, target, obj_meta, xattr_meta):
"""
Check coherency of chunk extended attributes with object metadata.
:returns: a list of errors
"""
errors = list()
# Composed position -> erasure coding
attr_prefix = 'meta' if '.' in obj_meta['pos'] else ''
attr_key = attr_prefix + 'chunk_size'
if str(obj_meta['size']) != xattr_meta.get(attr_key):
errors.append(
"'%s' xattr (%s) differs from size in meta2 (%s)" %
(attr_key, xattr_meta.get(attr_key), obj_meta['size']))
attr_key = attr_prefix + 'chunk_hash'
if obj_meta['hash'] != xattr_meta.get(attr_key):
errors.append(
"'%s' xattr (%s) differs from hash in meta2 (%s)" %
(attr_key, xattr_meta.get(attr_key), obj_meta['hash']))
return errors
def _check_chunk(self, target):
"""
Execute various checks on a chunk:
- does it appear in object's chunk list?
- is it reachable?
- are its extended attributes coherent?
:returns: the list of errors encountered,
and the chunk's owner object metadata.
"""
chunk = target.chunk
errors = list()
obj_meta = None
xattr_meta = None
cached = self._get_cached_or_lock(chunk)
if cached is not None:
return cached + (True, )
self.logger.debug('Checking chunk "%s"', target)
try:
xattr_meta = self.api.blob_client.chunk_head(
chunk, xattr=self.check_xattr, check_hash=self.check_hash)
except exc.NotFound as err:
self.chunk_not_found += 1
errors.append('Not found: %s' % (err, ))
except exc.FaultyChunk as err:
self.chunk_exceptions += 1
errors.append('Faulty: %r' % (err, ))
except Exception as err:
self.chunk_exceptions += 1
errors.append('Check failed: %s' % (err, ))
if not target.obj:
if xattr_meta:
self.complete_target_from_chunk_metadata(target, xattr_meta)
else:
self.recover_and_complete_object_meta(target, chunk)
if target.obj:
obj_listing, obj_meta = self.check_obj(target.copy_object())
if chunk not in obj_listing:
errors.append('Missing from object listing')
db_meta = dict()
else:
db_meta = obj_listing[chunk]
if db_meta and xattr_meta and self.check_xattr:
errors.extend(
self._check_chunk_xattr(target, db_meta, xattr_meta))
self.list_cache[chunk] = errors, obj_meta
self._unlock(chunk)
# Do not send errors directly, let the caller do it.
# Indeed, it may want to check if the chunks can be repaired or not.
self.chunks_checked += 1
return errors, obj_meta, False
[docs] def check_chunk(self, target):
errors, _obj_meta, from_cache = self._check_chunk(target)
# If the result comes from the cache, we already reported it.
if not from_cache:
self.send_result(target, errors, target.irreparable)
return errors
def _check_metachunk(self, target, stg_met, pos, chunks,
recurse=0):
"""
Check that a metachunk has the right number of chunks.
:returns: the list of errors
"""
required = stg_met.expected_chunks
errors = list()
chunk_results = list()
if len(chunks) < required:
missing_chunks = required - len(chunks)
if stg_met.ec:
subs = {x['num'] for x in chunks}
for sub in range(required):
if sub not in subs:
chkt = target.copy()
chkt.chunk = '%d.%d' % (pos, sub)
err = "Missing chunk at position %s" % chkt.chunk
chunk_results.append((chkt, [err], False))
errors.append(err)
else:
for _ in range(missing_chunks):
chkt = target.copy()
chkt.chunk = '%d.%d' % (pos, sub)
err = "Missing chunk at position %d" % pos
chunk_results.append((chkt, [err], False))
errors.append(err)
if recurse > 0:
for chunk in chunks:
tcopy = target.copy()
tcopy.chunk = chunk['url']
chunk_errors, _, from_cache = self._check_chunk(tcopy)
chunk_results.append((tcopy, chunk_errors, from_cache))
if chunk_errors:
errors.append("Unusable chunk %s at position %s" % (
chunk['url'], chunk['pos']))
irreparable = required - len(errors) < stg_met.min_chunks_to_read
if irreparable:
errors.append(
"Unavailable metachunk at position %s "
"(%d/%d chunks available, %d/%d required)" % (
pos, required - len(errors), stg_met.expected_chunks,
stg_met.min_chunks_to_read, stg_met.expected_chunks))
for tgt, errs, from_cache in chunk_results:
# If the result comes from the cache, we already reported it.
if not from_cache:
self.send_result(tgt, errs, irreparable)
# Since the "metachunk" is not an official item type,
# this method does not report errors itself. Errors will
# be reported as object errors.
return errors
def _check_obj_policy(self, target, obj_meta, chunks, recurse=0):
"""
Check that the list of chunks of an object matches
the object's storage policy.
:returns: the list of errors encountered
"""
stg_met = STORAGE_METHODS.load(obj_meta['chunk_method'])
chunks_by_pos = _sort_chunks(chunks, stg_met.ec)
tasks = list()
for pos, pchunks in chunks_by_pos.iteritems():
tasks.append((pos, self._spawn(
self._check_metachunk,
target.copy(), stg_met, pos, pchunks,
recurse=recurse)))
errors = list()
for pos, task in tasks:
if not task and not self.running:
errors.append("Pos %d skipped: checker is exiting" % pos)
continue
try:
errors.extend(task.wait())
except Exception as err:
errors.append("Check failed: pos %d: %s" % (pos, err))
return errors
[docs] def check_obj_versions(self, target, versions, recurse=0):
"""
Run checks of all versions of the targeted object in parallel.
"""
tasks = list()
for ov in versions:
tcopy = target.copy_object()
tcopy.content_id = ov['id']
tcopy.version = str(ov['version'])
tasks.append((tcopy.version,
self._spawn(self.check_obj,
tcopy, recurse=recurse)))
errors = list()
for version, task in tasks:
if not task and not self.running:
errors.append(
"Version %s skipped: checker is exiting" % version)
continue
try:
task.wait()
except Exception as err:
errors.append("Check failed: version %s: %s" % (version, err))
if errors:
# Send a result with the target without version to tell
# we were not able to check all versions of the object.
self.send_result(target, errors)
def _load_obj_meta(self, target, errors):
"""
Load object metadata and chunks.
:param target: which object to check.
:param errors: list of errors that will be appended
in case any error occurs.
:returns: a tuple with object metadata and a list of chunks.
"""
try:
return self.api.object_locate(
target.account, target.container, target.obj,
version=target.version, properties=False)
except exc.NoSuchObject as err:
self.object_not_found += 1
errors.append('Not found: %s' % (err, ))
except Exception as err:
self.object_exceptions += 1
errors.append('Check failed: %s' % (err, ))
return None, []
def _get_cached_or_lock(self, lock_key):
# If something is running, wait for it
with self.running_lock:
event = self.running_tasks.get(lock_key)
if event:
event.wait()
event = None
# Maybe get a cached result
if lock_key in self.list_cache:
return self.list_cache[lock_key]
# No cached result, try to compute the thing ourselves
while True:
with self.running_lock:
# Another check while locked
if lock_key in self.list_cache:
return self.list_cache[lock_key]
# Still nothing cached
event = self.running_tasks.get(lock_key)
if event is None:
self.running_tasks[lock_key] = Event()
return None
event.wait()
def _unlock(self, lock_key):
with self.running_lock:
event = self.running_tasks[lock_key]
del self.running_tasks[lock_key]
event.send(True)
[docs] def check_obj(self, target, recurse=0):
"""
Check one object version.
If no version is specified, all versions of the object will be checked.
:returns: the result of the check of the most recent version,
or the one that is explicitly targeted.
"""
account = target.account
container = target.container
obj = target.obj
vers = target.version # can be None
cached = self._get_cached_or_lock((account, container, obj, vers))
if cached is not None:
return cached
self.logger.info('Checking object "%s"', target)
container_listing, _ = self.check_container(target.copy_container())
errors = list()
if obj not in container_listing:
errors.append('Missing from container listing')
# checksum = None
else:
versions = container_listing[obj]
if vers is None:
if target.content_id is None:
# No version specified, check all versions
self.check_obj_versions(target.copy_object(), versions,
recurse=recurse)
# Now return the cached result of the most recent version
target.content_id = versions[0]['id']
target.version = str(versions[0]['version'])
res = self.check_obj(target, recurse=0)
self._unlock((account, container, obj, vers))
return res
else:
for ov in versions:
if ov['id'] == target.content_id:
vers = str(ov['version'])
target.version = vers
break
else:
errors.append('Missing from container listing')
# TODO check checksum match
# checksum = container_listing[obj]['hash']
pass
meta, chunks = self._load_obj_meta(target, errors)
chunk_listing = {c['url']: c for c in chunks}
if meta:
if target.content_id is None:
target.content_id = meta['id']
if target.version is None:
target.version = str(meta['version'])
self.list_cache[(account, container, obj, vers)] = \
(chunk_listing, meta)
self.objects_checked += 1
self._unlock((account, container, obj, vers))
# Skip the check if we could not locate the object
if meta:
errors.extend(
self._check_obj_policy(target, meta, chunks, recurse=recurse))
self.send_result(target, errors)
return chunk_listing, meta
[docs] def check_container(self, target, recurse=0):
account = target.account
container = target.container
cached = self._get_cached_or_lock((account, container))
if cached is not None:
return cached
self.logger.info('Checking container "%s"', target)
account_listing = self.check_account(target.copy_account())
errors = list()
if container not in account_listing:
errors.append('Missing from account listing')
marker = None
results = []
ct_meta = dict()
extra_args = dict()
if self.limit_listings > 1 and target.obj:
# When we are explicitly checking one object, start the listing
# where this object is supposed to be. Do not use a limit,
# but an end marker, in order to fetch all versions of the object.
extra_args['prefix'] = target.obj
extra_args['end_marker'] = target.obj + '\x00' # HACK
while True:
try:
resp = self.api.object_list(
account, container, marker=marker, versions=True,
**extra_args)
except exc.NoSuchContainer as err:
self.container_not_found += 1
errors.append('Not found: %s' % (err, ))
break
except Exception as err:
self.container_exceptions += 1
errors.append('Check failed: %s' % (err, ))
break
truncated = resp.get('truncated', False)
if truncated:
marker = resp['next_marker']
if resp['objects']:
# safeguard, probably useless
if not marker:
marker = resp['objects'][-1]['name']
results.extend(resp['objects'])
if not truncated or self.limit_listings > 1:
break
else:
ct_meta = resp
ct_meta.pop('objects')
break
container_listing = dict()
# Save all object versions, with the most recent first
for obj in results:
container_listing.setdefault(obj['name'], list()).append(obj)
for versions in container_listing.values():
versions.sort(key=lambda o: o['version'], reverse=True)
if self.limit_listings <= 1:
# We just listed the whole container, keep the result in a cache
self.containers_checked += 1
self.list_cache[(account, container)] = container_listing, ct_meta
self._unlock((account, container))
if recurse > 0:
for obj_vers in container_listing.values():
for obj in obj_vers:
tcopy = target.copy_object()
tcopy.obj = obj['name']
tcopy.content_id = obj['id']
tcopy.version = str(obj['version'])
self._spawn_n(self.check_obj, tcopy, recurse - 1)
self.send_result(target, errors)
return container_listing, ct_meta
[docs] def check_account(self, target, recurse=0):
account = target.account
cached = self._get_cached_or_lock(account)
if cached is not None:
return cached
self.logger.info('Checking account "%s"', target)
errors = list()
marker = None
results = []
extra_args = dict()
if self.limit_listings > 0 and target.container:
# When we are explicitly checking one container, start the listing
# where this container is supposed to be, and list only one
# container.
extra_args['prefix'] = target.container
extra_args['limit'] = 1
while True:
try:
resp = self.api.container_list(
account, marker=marker, **extra_args)
except Exception as err:
self.account_exceptions += 1
errors.append('Check failed: %s' % (err, ))
break
if resp:
marker = resp[-1][0]
results.extend(resp)
if self.limit_listings > 0:
break
else:
break
containers = dict()
for container in results:
# Name, number of objects, number of bytes
containers[container[0]] = (container[1], container[2])
if self.limit_listings <= 0:
# We just listed the whole account, keep the result in a cache
self.accounts_checked += 1
self.list_cache[account] = containers
self._unlock(account)
if recurse > 0:
for container in containers:
tcopy = target.copy_account()
tcopy.container = container
self._spawn_n(self.check_container, tcopy, recurse - 1)
self.send_result(target, errors)
return containers
[docs] def check(self, target, recurse=0):
if target.type == 'chunk':
self._spawn_n(self.check_chunk, target)
elif target.type == 'object':
self._spawn_n(self.check_obj, target, recurse)
elif target.type == 'container':
self._spawn_n(self.check_container, target, recurse)
else:
self._spawn_n(self.check_account, target, recurse)
[docs] def check_all_accounts(self, recurse=0):
all_accounts = self.api.account_list()
for acct in all_accounts:
self.check(Target(acct), recurse=recurse)
[docs] def fetch_results(self, rate_limiter=None):
while self.running and not self.result_queue.empty():
res = self.result_queue.get(True)
yield res
# Rate limiting is done on the result queue for now.
# Someday we could implement a submission queue instead of
# letting each worker submit tasks to the pool, and do
# the rate limiting on this queue.
if rate_limiter is not None:
self.run_time = rate_limiter(self.run_time)
[docs] def merge_with_delayed_target(self, target):
"""
Merge the specified target with a delayed one.
:returns: the delayed target, if there is one, with an error log
including the errors of the new target. Return the new target
otherwise.
"""
tkey = repr(target)
prev_target = self.delayed_targets.get(tkey, target)
if prev_target is not target:
errors = dict(prev_target.error_log)
errors.update(target.error_log)
prev_target.error_log = sorted(errors.items())
return prev_target
[docs] def log_result(self, target):
"""
Log a check result, if it shows errors. Dispatch the errors to the
appropriate destinations (log files, queues, etc.).
"""
# The result may come from a new target, or from an old target
# we checked another time, or both.
target = self.merge_with_delayed_target(target)
if target.has_errors:
time_in_error, confirmations = target.time_in_error()
if (time_in_error < self.min_time_in_error or
confirmations < self.required_confirmations):
self.logger.info("Delaying check for %s, %d/%d confirmations",
target, confirmations,
self.required_confirmations)
self.delayed_targets[repr(target)] = target
else:
if target.type == 'chunk':
self.logger.info(
"Writing error for %s, %d/%d confirmations",
target, confirmations, self.required_confirmations)
self.write_chunk_error(target,
irreparable=target.irreparable)
else:
self.write_error(target, irreparable=target.irreparable)
self.delayed_targets.pop(repr(target), None)
self.logger.warn(
'%s:%s\n%s',
target,
' irreparable' if target.irreparable else '',
target.latest_error_result().errors_to_str(err_format=' %s'))
[docs] def run(self, rate_limiter=None):
"""
Fetch results and write logs until all jobs have finished.
:returns: a generator yielding check results.
"""
while self.running and (self.pool.running() + self.pool.waiting()):
for result in self.fetch_results(rate_limiter):
self.log_result(result)
yield result
sleep(0.1)
if self.running:
self.pool.waitall()
# No rate limiting
for result in self.fetch_results():
self.log_result(result)
yield result
self.list_cache = CacheDict(self.list_cache.size)
[docs] def stop(self):
self.logger.info("Stopping")
self.running = False
[docs] def report(self):
success = True
def _report_stat(name, stat):
print("{0:18}: {1}".format(name, stat))
print()
print('Report')
_report_stat("Accounts checked", self.accounts_checked)
if self.account_not_found:
success = False
_report_stat("Missing accounts", self.account_not_found)
if self.account_exceptions:
success = False
_report_stat("Exceptions", self.account_exceptions)
print()
_report_stat("Containers checked", self.containers_checked)
if self.container_not_found:
success = False
_report_stat("Missing containers", self.container_not_found)
if self.container_exceptions:
success = False
_report_stat("Exceptions", self.container_exceptions)
print()
_report_stat("Objects checked", self.objects_checked)
if self.object_not_found:
success = False
_report_stat("Missing objects", self.object_not_found)
if self.object_exceptions:
success = False
_report_stat("Exceptions", self.object_exceptions)
print()
_report_stat("Chunks checked", self.chunks_checked)
if self.chunk_not_found:
success = False
_report_stat("Missing chunks", self.chunk_not_found)
if self.chunk_exceptions:
success = False
_report_stat("Exceptions", self.chunk_exceptions)
return success
[docs]def run_once(checker, entries=None, rate_limiter=None):
if entries:
for entry in entries:
if isinstance(entry, Target):
checker.check(entry, recurse=DEFAULT_DEPTH)
else:
checker.check(Target(*entry), recurse=DEFAULT_DEPTH)
else:
checker.check_all_accounts(recurse=DEFAULT_DEPTH)
for _ in checker.run(rate_limiter):
pass
if not checker.report():
return 1
return 0
[docs]def run_indefinitely(checker, entries=None, rate_limiter=None,
pause_between_passes=0.0):
def _stop(*args):
checker.stop()
import signal
signal.signal(signal.SIGINT, _stop)
signal.signal(signal.SIGQUIT, _stop)
signal.signal(signal.SIGTERM, _stop)
while checker.running:
if checker.delayed_targets:
run_once(checker,
entries=checker.delayed_targets.values(),
rate_limiter=rate_limiter)
run_once(checker, entries, rate_limiter)
checker.reset_stats()
if checker.running and pause_between_passes > 0.0:
checker.logger.info("Pausing for %.3fs", pause_between_passes)
iterations, rest = divmod(pause_between_passes, 1)
sleep(rest)
for _ in range(int(iterations)):
if not checker.running:
break
sleep(1.0)
[docs]def main():
"""
Main function for legacy integrity crawler.
"""
import argparse
from oio.cli import get_logger_from_args, make_logger_args_parser
parser = argparse.ArgumentParser(description=__doc__,
parents=[make_logger_args_parser()])
parser.add_argument('namespace', help='Namespace name')
parser.add_argument(
'account', nargs='?', help="Account (if not set, check all accounts)")
t_help = "Element whose integrity should be checked. " \
"Can be empty (check the whole account), " \
"CONTAINER (check all objects of the container), " \
"CONTAINER CONTENT (check all chunks of the object) " \
"or CONTAINER CONTENT CHUNK (check only one chunk). " \
"When reading from stdin, expect one element per line " \
"(starting with account)."
parser.add_argument('target', metavar='T', nargs='*',
help=t_help)
parser.add_argument('--attempts', type=int, default=1,
help=('Number of attempts for '
'listing requests (default: 1).'))
parser.add_argument('--beanstalkd', metavar='IP:PORT',
help=("BETA: send broken chunks events to a "
"beanstalkd tube. Do not enable this without "
"also enabling --confirmations and/or "
"--time-in-error, or the system may be "
"rebuilding temporary unavailable chunks."))
parser.add_argument('--beanstalkd-tube',
default=BlobRebuilder.DEFAULT_BEANSTALKD_WORKER_TUBE,
help=("The beanstalkd tube to send broken chunks "
"events to (default=%s)." %
BlobRebuilder.DEFAULT_BEANSTALKD_WORKER_TUBE))
parser.add_argument('--concurrency', '--workers', type=int,
default=50,
help='Number of concurrent checks (default: 50).')
parser.add_argument('--confirmations', type=int,
default=0,
help=("BETA: report an error only after this number "
"of confirmations (default: 0, report "
"immediately). Makes sense only when running "
"this tool as a daemon."))
parser.add_argument('--daemon',
action='store_true',
help=("Loop indefinitely, until killed."))
parser.add_argument('-o', '--output',
help=('Output file. Will contain elements in error. '
'Can later be passed to stdin to re-check only '
'these elements.'))
parser.add_argument('--output-for-chunk-rebuild',
'--output-for-blob-rebuilder',
dest='output_for_chunk_rebuild',
help="Write chunk errors in a file with a format " +
"suitable as 'openio-admin chunk rebuild' input.")
parser.add_argument('--pause-between-passes', type=float, default=0.0,
help=("When running as a daemon, make a pause before "
"restarting from the beginning "
"(default: 0.0 seconds)."))
parser.add_argument('-p', '--presence',
action='store_true', default=False,
help="Presence check, the xattr check is skipped.")
parser.add_argument('-r', '--ratelimit',
help=('Set the hour-based rate limiting policy. '
'Ex: "0h30:10;6h45:2;15h30:3;9h45:5;20h00:8".'))
parser.add_argument('--time-in-error', type=float,
default=0.0,
help=("BETA: report an error only after the item has "
"shown errors for this amount of time "
"(default: 0.0 seconds, report immediately). "
"Makes sense only when running this tool "
"as a daemon."))
args = parser.parse_args()
if args.attempts < 1:
raise ValueError('attempts must be at least 1')
if not os.isatty(sys.stdin.fileno()):
source = sys.stdin
limit_listings = 0 # do full listings, cache the results
entries = csv.reader(source, delimiter=' ')
else:
if args.account:
entries = [[args.account] + args.target]
limit_listings = len(args.target)
else:
entries = None
limit_listings = 0
logger = get_logger_from_args(args)
checker = Checker(
args.namespace,
error_file=args.output,
concurrency=args.concurrency,
rebuild_file=args.output_for_chunk_rebuild,
check_xattr=not args.presence,
limit_listings=limit_listings,
request_attempts=args.attempts,
logger=logger,
verbose=not args.quiet,
min_time_in_error=args.time_in_error,
required_confirmations=args.confirmations,
beanstalkd_addr=args.beanstalkd,
beanstalkd_tube=args.beanstalkd_tube,
)
if args.ratelimit:
rate_limiter = ratelimit_function_build(args.ratelimit)
else:
rate_limiter = None
if args.daemon:
run_indefinitely(checker, entries, rate_limiter,
pause_between_passes=args.pause_between_passes)
else:
return run_once(checker, entries, rate_limiter)