Source code for oio.directory.indexer
# Copyright (C) 2018-2019 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from datetime import datetime
import six
from oio.blob.utils import check_volume_for_service_type
from oio.common import exceptions as exc
from oio.common.constants import STRLEN_REFERENCEID
from oio.common.daemon import Daemon
from oio.common.easy_value import int_value, boolean_value
from oio.common.green import ratelimit, ContextPool
from oio.common.http_urllib3 import get_pool_manager
from oio.common.logger import get_logger
from oio.common.utils import paths_gen
from oio.directory.client import DirectoryClient
from oio.rdir.client import RdirClient
from oio.common.green import time
[docs]class Meta2IndexingWorker(object):
"""
Indexing worker responsible for a single volume.
"""
def __init__(self, volume_path, conf, pool_manager=None):
"""
Initializes an Indexing worker for indexing meta2 databases.
Possible values of conf relating to this worker are:
- interval: (int) in sec time between two full scans. Default: half an
hour.
- report_interval: (int) in sec, time between two reports: Default: 300
- scanned_per_second: (int) maximum number of indexed databases /s.
- try_removing_faulty_indexes : In the event where we encounter a
database that's not supposed to be handled by this volume, attempt
to remove it from this volume rdir index if it exists
WARNING: The decision is based off of a proxy response, that could
be affected by cache inconsistencies for example, use at your own
risk. Default: False
:param volume_path: The volume path to be indexed
:param conf: The configuration to be passed to the needed services
:param pool_manager: A connection pool manager. If none is given, a
new one with a default size of 10 will be created.
"""
self.logger = get_logger(conf)
self._stop = False
self.volume = volume_path
self.success_nb = 0
self.failed_nb = 0
self.full_scan_nb = 0
self.last_report_time = 0
self.last_scan_time = 0
self.last_index_time = 0
self.start_time = 0
self.indexed_since_last_report = 0
self.scans_interval = int_value(
conf.get('interval'), 1800)
self.report_interval = int_value(
conf.get('report_interval'), 300)
self.max_indexed_per_second = int_value(
conf.get('scanned_per_second'), 3000)
self.namespace, self.volume_id = check_volume_for_service_type(
self.volume, "meta2")
self.attempt_bad_index_removal = boolean_value(
conf.get('try_removing_faulty_indexes'), False)
if not pool_manager:
pool_manager = get_pool_manager(pool_connections=10)
self.index_client = RdirClient(conf, logger=self.logger,
pool_manager=pool_manager)
self.dir_client = DirectoryClient(conf, logger=self.logger,
pool_manager=pool_manager)
[docs] def report(self, tag):
"""
Log the status of indexer
:param tag: One of three: starting, running, ended.
"""
total = self.success_nb + self.failed_nb
now = time.time()
elapsed = (now - self.start_time) or 0.00001
since_last_rprt = (now - self.last_report_time) or 0.00001
self.logger.info(
'volume_id=%(volume_id)s %(tag)s=%(current_time)s '
'elapsed=%(elapsed).02f '
'pass=%(pass)d '
'errors=%(errors)d '
'containers_indexed=%(total_indexed)d %(index_rate).2f/s',
{
'volume_id': self.volume_id,
'tag': tag,
'current_time': datetime.fromtimestamp(
int(now)).isoformat(),
'pass': self.full_scan_nb,
'errors': self.failed_nb,
'total_indexed': total,
'index_rate': self.indexed_since_last_report / since_last_rprt,
'elapsed': elapsed
}
)
self.last_report_time = now
self.indexed_since_last_report = 0
[docs] def warn(self, msg, container_id):
self.logger.warn(
'volume_id=%(volume_id)s container_id=%(container_id)s %(error)s',
{
'volume_id': self.volume_id,
'container_id': container_id,
'error': msg
}
)
def _attempt_index_removal(self, db_path, cid):
"""
Fail safe removal attempt.
"""
try:
self.index_client.meta2_index_delete(self.volume_id, db_path, cid)
except exc.OioException as exception:
self.warn(
container_id=cid,
msg="Unable to remove database from the volume "
"index : {0}".format(str(exception))
)
[docs] def index_meta2_database(self, db_id):
"""
Add a meta2 database to the rdir index. Fails if the database isn't
handled by the current volume.
:param db_id: The ContentID representing the reference to the database.
"""
if len(db_id) < STRLEN_REFERENCEID:
self.warn('Not a valid container ID', db_id)
return
try:
srvcs = self.dir_client.list(cid=db_id)
account, container = srvcs['account'], srvcs['name']
is_peer = self.volume_id in [x['host'] for x in srvcs['srv'] if
x['type'] == 'meta2']
container_id = db_id.rsplit(".")[0]
if six.PY2:
if isinstance(account, six.text_type):
account = account.encode('utf-8')
if isinstance(container, six.text_type):
container = container.encode('utf-8')
cont_url = "{0}/{1}/{2}".format(self.namespace, account, container)
if not is_peer:
self.warn("Trying to index a container that isn't handled by"
"this volume", db_id)
if self.attempt_bad_index_removal:
self._attempt_index_removal(cont_url, container_id)
return
self.index_client.meta2_index_push(volume_id=self.volume_id,
container_url=cont_url,
mtime=time.time(),
container_id=container_id)
self.success_nb += 1
except exc.OioException as exception:
self.failed_nb += 1
self.warn("Unable to to index container: %s" % str(exception),
db_id)
self.indexed_since_last_report += 1
[docs] def crawl_volume(self):
"""
Crawl the volume assigned to this worker, and index every database.
"""
paths = paths_gen(self.volume)
self.full_scan_nb += 1
self.success_nb = 0
self.failed_nb = 0
now = time.time()
self.last_report_time = now
self.report("starting")
for db_path in paths:
# Graceful exit, hopefully
if self._stop:
break
db_id = db_path.rsplit("/")[-1].rsplit(".")
if len(db_id) != 3:
self.warn("Malformed db file name !", db_path)
continue
db_id = ".".join(db_id[:2])
self.index_meta2_database(db_id)
self.last_index_time = ratelimit(
self.last_index_time,
self.max_indexed_per_second
)
now = time.time()
if now - self.last_report_time >= self.report_interval:
self.report("running")
self.report("ended")
[docs] def run(self):
"""
Main worker loop
"""
self.start_time = time.time()
while not self._stop:
try:
self.crawl_volume()
self.last_scan_time = time.time()
time.sleep(self.scans_interval)
except exc.OioException as exception:
self.logger.exception("ERROR during indexing meta2: %s",
exception)
[docs] def stop(self):
"""
Could be needed for eventually gracefully stopping.
"""
self._stop = True
[docs]class Meta2Indexer(Daemon):
"""
A daemon that spawns a greenlet running a Meta2IndexingWorker
for each volume.
"""
def __init__(self, conf):
super(Meta2Indexer, self).__init__(conf=conf)
self.logger = get_logger(conf)
if not conf.get("volume_list"):
raise exc.OioException("No meta2 volumes provided to index !")
self.volumes = [x.strip() for x in conf.get('volume_list').split(',')]
self.pool = ContextPool(len(self.volumes))
self.volume_workers = [Meta2IndexingWorker(x, conf) for x in
self.volumes]
[docs] def run(self, *args, **kwargs):
for worker in self.volume_workers:
self.pool.spawn(worker.run)
self.pool.waitall()