Source code for oio.event.filters.content_cleaner

# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


from oio.blob.client import BlobClient
from oio.common.constants import REQID_HEADER
from oio.event.evob import Event, EventTypes
from oio.event.filters.base import Filter
from oio.common.exceptions import OioException
from oio.common.http_urllib3 import URLLIB3_POOLMANAGER_KWARGS
from oio.common.storage_method import STORAGE_METHODS, guess_storage_method
from oio.common.utils import request_id


[docs]class ContentReaperFilter(Filter): """Filter that deletes chunks on content deletion events"""
[docs] def init(self): self.handlers = { "plain": self._handle_rawx, "ec": self._handle_rawx, "backblaze": self._handle_b2, } kwargs = {k: v for k, v in self.conf.items() if k in URLLIB3_POOLMANAGER_KWARGS} self.blob_client = BlobClient(self.conf, logger=self.logger, **kwargs) self.chunk_concurrency = int(self.conf.get('concurrency', 3)) self.chunk_timeout = float(self.conf.get('timeout', 5.0))
def _handle_rawx(self, url, chunks, content_headers, storage_method, reqid): cid = url.get('id') headers = {REQID_HEADER: reqid, 'Connection': 'close'} resps = self.blob_client.chunk_delete_many( chunks, cid=cid, headers=headers, concurrency=self.chunk_concurrency, timeout=self.chunk_timeout) for resp in resps: if isinstance(resp, Exception): self.logger.warn( 'failed to delete chunk %s (%s)', resp.chunk.get('real_url', resp.chunk['url']), resp) elif resp.status not in (204, 404): self.logger.warn( 'failed to delete chunk %s (HTTP %s)', resp.chunk.get('real_url', resp.chunk['url']), resp.status) def _handle_b2(self, url, chunks, headers, storage_method, reqid): from oio.api.backblaze import BackblazeDeleteHandler from oio.api.backblaze_http import BackblazeUtils meta = {'container_id': url['id']} key_file = self.conf.get('key_file') b2_creds = BackblazeUtils.get_credentials( storage_method, key_file) try: BackblazeDeleteHandler(meta, chunks, b2_creds).delete() except OioException as exc: self.logger.warn('delete failed: %s' % str(exc)) def _load_handler(self, chunk_method): storage_method = STORAGE_METHODS.load(chunk_method) handler = self.handlers.get(storage_method.type) if not handler: raise OioException("No handler found for chunk method [%s]" % chunk_method) return handler, storage_method
[docs] def process(self, env, beanstalkd, cb): event = Event(env) if event.event_type == EventTypes.CONTENT_DELETED: url = event.env.get('url') chunks = [] content_headers = list() for item in event.data: if item.get('type') == 'chunks': # The event contains "id" whereas the API uses "url". item['url'] = item['id'] chunks.append(item) if item.get("type") == 'contents_headers': content_headers.append(item) if len(chunks): reqid = event.reqid or request_id('content-cleaner-') if not content_headers: chunk_method = guess_storage_method(chunks[0]['id']) + '/' else: chunk_method = content_headers[0]['chunk-method'] handler, storage_method = self._load_handler(chunk_method) handler(url, chunks, content_headers, storage_method, reqid) return self.app(env, beanstalkd, cb) return self.app(env, beanstalkd, cb)
[docs]def filter_factory(global_conf, **local_conf): conf = global_conf.copy() conf.update(local_conf) def reaper_filter(app): return ContentReaperFilter(app, conf) return reaper_filter