Source code for oio.blob.mover

# Copyright (C) 2015-2018 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from string import hexdigits
import time

from oio.blob.client import BlobClient
from oio.blob.utils import check_volume, read_chunk_metadata
from oio.common.exceptions import ContentNotFound
from oio.container.client import ContainerClient
from oio.common.daemon import Daemon
from oio.common import exceptions as exc
from oio.common.utils import paths_gen, statfs, cid_from_name
from oio.common.easy_value import int_value, true_value
from oio.common.logger import get_logger
from oio.common.green import ratelimit
from oio.common.constants import STRLEN_CHUNKID
from oio.common.fullpath import decode_fullpath
from oio.content.factory import ContentFactory

SLEEP_TIME = 30
READ_BUFFER_SIZE = 65535


[docs]class BlobMoverWorker(object): def __init__(self, conf, logger, volume): self.conf = conf self.logger = logger or get_logger(conf) self.volume = volume self.namespace, self.address = check_volume(self.volume) self.run_time = 0 self.passes = 0 self.errors = 0 self.last_reported = 0 self.last_usage_check = 0 self.chunks_run_time = 0 self.bytes_running_time = 0 self.bytes_processed = 0 self.total_bytes_processed = 0 self.total_chunks_processed = 0 self.usage_target = int_value( conf.get('usage_target'), 0) self.usage_check_interval = int_value( conf.get('usage_check_interval'), 3600) self.report_interval = int_value( conf.get('report_interval'), 3600) self.max_chunks_per_second = int_value( conf.get('chunks_per_second'), 30) self.max_bytes_per_second = int_value( conf.get('bytes_per_second'), 10000000) self.limit = int_value(conf.get('limit'), 0) self.allow_links = true_value(conf.get('allow_links', True)) self.blob_client = BlobClient(conf) self.container_client = ContainerClient(conf, logger=self.logger) self.content_factory = ContentFactory(conf)
[docs] def mover_pass(self, **kwargs): start_time = report_time = time.time() total_errors = 0 mover_time = 0 paths = paths_gen(self.volume) for path in paths: loop_time = time.time() now = time.time() if now - self.last_usage_check >= self.usage_check_interval: free_ratio = statfs(self.volume) usage = (1-float(free_ratio)) * 100 if usage <= self.usage_target: self.logger.info( 'current usage %.2f%%: target reached (%.2f%%)', usage, self.usage_target) self.last_usage_check = now break self.safe_chunk_move(path) self.chunks_run_time = ratelimit( self.chunks_run_time, self.max_chunks_per_second ) self.total_chunks_processed += 1 now = time.time() if now - self.last_reported >= self.report_interval: self.logger.info( '%(start_time)s ' '%(passes)d ' '%(errors)d ' '%(c_rate).2f ' '%(b_rate).2f ' '%(total).2f ' '%(mover_time).2f' '%(mover_rate).2f' % { 'start_time': time.ctime(report_time), 'passes': self.passes, 'errors': self.errors, 'c_rate': self.passes / (now - report_time), 'b_rate': self.bytes_processed / (now - report_time), 'total': (now - start_time), 'mover_time': mover_time, 'mover_rate': mover_time / (now - start_time) } ) report_time = now total_errors += self.errors self.passes = 0 self.bytes_processed = 0 self.last_reported = now mover_time += (now - loop_time) if self.limit != 0 and self.total_chunks_processed >= self.limit: break elapsed = (time.time() - start_time) or 0.000001 self.logger.info( '%(elapsed).02f ' '%(errors)d ' '%(chunk_rate).2f ' '%(bytes_rate).2f ' '%(mover_time).2f ' '%(mover_rate).2f' % { 'elapsed': elapsed, 'errors': total_errors + self.errors, 'chunk_rate': self.total_chunks_processed / elapsed, 'bytes_rate': self.total_bytes_processed / elapsed, 'mover_time': mover_time, 'mover_rate': mover_time / elapsed } )
[docs] def safe_chunk_move(self, path): chunk_id = path.rsplit('/', 1)[-1] if len(chunk_id) != STRLEN_CHUNKID: self.logger.warn('WARN Not a chunk %s' % path) return for c in chunk_id: if c not in hexdigits: self.logger.warn('WARN Not a chunk %s' % path) return try: self.chunk_move(path, chunk_id) except Exception as e: self.errors += 1 self.logger.error('ERROR while moving chunk %s: %s', path, e) self.passes += 1
[docs] def load_chunk_metadata(self, path, chunk_id): with open(path) as f: meta, _ = read_chunk_metadata(f, chunk_id) return meta
[docs] def chunk_move(self, path, chunk_id): meta = self.load_chunk_metadata(path, chunk_id) container_id = meta['container_id'] content_id = meta['content_id'] chunk_id = meta['chunk_id'] try: content = self.content_factory.get(container_id, content_id) except ContentNotFound: raise exc.OrphanChunk('Content not found') new_chunk = content.move_chunk(chunk_id) self.logger.info( 'moved chunk http://%s/%s to %s', self.address, chunk_id, new_chunk['url']) if self.allow_links: old_links = meta['links'] for chunk_id, fullpath in old_links.iteritems(): account, container, _, _, content_id = \ decode_fullpath(fullpath) container_id = cid_from_name(account, container) try: content = self.content_factory.get(container_id, content_id) except ContentNotFound: raise exc.OrphanChunk('Content not found') new_linked_chunk = content.move_linked_chunk( chunk_id, new_chunk['url']) self.logger.info( 'moved chunk http://%s/%s to %s', self.address, chunk_id, new_linked_chunk['url'])
[docs]class BlobMover(Daemon): def __init__(self, conf, **kwargs): super(BlobMover, self).__init__(conf) self.logger = get_logger(conf) volume = conf.get('volume') if not volume: raise exc.ConfigurationException('No volume specified for mover') self.volume = volume global SLEEP_TIME if SLEEP_TIME > int(conf.get('report_interval', 3600)): SLEEP_TIME = int(conf.get('report_interval', 3600))
[docs] def run(self, *args, **kwargs): work = True while work: try: worker = BlobMoverWorker(self.conf, self.logger, self.volume) worker.mover_pass(**kwargs) work = False except Exception as err: self.logger.exception('ERROR in mover: %s', err) if kwargs.get('daemon'): work = True self._sleep()
def _sleep(self): time.sleep(SLEEP_TIME)