Source code for oio.container.lifecycle

# Copyright (C) 2017 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.

import time

try:
    from lxml import etree
except ImportError:
    from xml.etree import cElementTree as etree

from oio.common.exceptions import OioException
from oio.common.logger import get_logger
from oio.common.utils import cid_from_name, depaginate


LIFECYCLE_PROPERTY_KEY = "X-Container-Sysmeta-Swift3-Lifecycle"


[docs]def iso8601_to_int(text): # FIXME: use dateutil.parser? return int(time.mktime(time.strptime(text, "%Y-%m-%dT%H:%M:%S")))
[docs]class ContainerLifecycle(object): def __init__(self, api, account, container, logger=None): self.api = api self.account = account self.container = container self.logger = logger or get_logger(None, name=str(self.__class__)) self._rules = dict() self.src_xml = None
[docs] def load(self): """ Load lifecycle rules from container property. :returns: True if a lifecycle configuration has been loaded """ props = self.api.container_get_properties(self.account, self.container) xml_str = props['properties'].get(LIFECYCLE_PROPERTY_KEY) if xml_str: self.load_xml(xml_str) return True else: self.logger.info("No Lifecycle configuration for %s/%s", self.account, self.container) return False
[docs] def load_xml(self, xml_str): """ Load lifecycle rules from LifecycleConfiguration XML document. """ tree = etree.fromstring(xml_str) if tree.tag != 'LifecycleConfiguration': raise ValueError( "Expected 'LifecycleConfiguration' as root tag, got '%s'" % tree.tag) for rule_elt in tree.findall('Rule'): rule = LifecycleRule.from_element(rule_elt, lifecycle=self) self._rules[rule.id] = rule self.src_xml = xml_str
[docs] def save(self, xml_str=None): """ Save the lifecycle configuration in container property. :param xml_str: the configuration to save, or None to save the configuration that has been loaded previously :type xml_str: `str` """ xml_str = xml_str or self.src_xml if not xml_str: raise ValueError('You must call `load_xml()` or provide `xml_str`' ' parameter before saving') self.api.container_set_properties( self.account, self.container, properties={LIFECYCLE_PROPERTY_KEY: xml_str})
[docs] def apply(self, obj_meta, **kwargs): """ Match then apply the set of rules of this lifecycle configuration on the specified object. :returns: tuples of (object metadata, rule name, action, status) :rtype: generator of 4-tuples :notice: you must consume the results or the rules won't be applied. """ for rule in self._rules.values(): res = rule.apply(obj_meta, **kwargs) if res: for action in res: yield obj_meta, rule.id, action[0], action[1] else: yield obj_meta, rule.id, "n/a", "n/a"
[docs] def execute(self, **kwargs): """ Match then apply the set of rules of the lifecycle configuration on all objects of the container. :returns: tuples of (object metadata, rule name, action, status) :rtype: generator of 4-tuples :notice: you must consume the results or the rules won't be applied. """ for obj_meta in depaginate( self.api.object_list, listing_key=lambda x: x['objects'], marker_key=lambda x: x.get('next_marker'), truncated_key=lambda x: x['truncated'], account=self.account, container=self.container, properties=True, versions=True, **kwargs): try: results = self.apply(obj_meta, **kwargs) for res in results: yield res except Exception as exc: self.logger.warn( "Failed to apply lifecycle rules on %s/%s/%s: %s", self.account, self.container, obj_meta['name'], exc) yield obj_meta, "n/a", "n/a", exc
[docs]class LifecycleRule(object): """Combination of a filter and a set of lifecycle actions.""" def __init__(self, filter_, id_=None, enabled=True, abort_multipart=None, actions=None): self.filter = filter_ self.id = id_ or self.filter.generate_id() self.enabled = enabled self.abort_multipart = abort_multipart self.actions = actions or dict()
[docs] @classmethod def from_element(cls, rule_elt, lifecycle=None): """ Load the rule from an XML element. :type rule_elt: `lxml.etree.Element` """ filter_elt = rule_elt.find('Filter') if filter_elt is None: raise ValueError("Missing 'Filter' element") rule_filter = LifecycleRuleFilter.from_element(filter_elt) id_elt = rule_elt.find('./ID') id_ = id_elt.text if id_elt is not None else None status_elt = rule_elt.find('Status') if status_elt is None: raise ValueError("Missing 'Status' element") actions = {name: action_from_element(element, lifecycle=lifecycle) for name, element in {k: rule_elt.find(k) for k in ACTION_MAP.keys()}.items() if element is not None} if not actions: raise ValueError("Missing one of %s" % ACTION_MAP.keys()) return cls(rule_filter, id_=id_, enabled=(status_elt.text.lower() == "enabled"), actions=actions)
[docs] def match(self, obj_meta, **kwargs): """ Check if the specified object passes the filter of this rule. """ return self.filter.match(obj_meta)
[docs] def apply(self, obj_meta, **kwargs): """ Apply the set of actions of this rule. :returns: the list of actions that have been applied :rtype: `list` of `tuple` of a class and a bool or a class and an exception instance """ results = list() if self.match(obj_meta): for action in self.actions.values(): try: res = action.apply(obj_meta, **kwargs) results.append((action.__class__.__name__, res)) except OioException as exc: results.append((action.__class__.__name__, exc)) return results
[docs]class LifecycleRuleFilter(object): """Filter to determine on which objects to apply a lifecycle rule.""" _rule_number = 0 def __init__(self, prefix=None, tags=None): """ :param prefix: prefix that objects must have to pass this filter :type prefix: `basestring` :param tags: tags that objects must have to pass this filter :type tags: `dict` """ self.prefix = prefix self.tags = tags or dict()
[docs] @classmethod def from_element(cls, filter_elt, **kwargs): """ Load the filter from an XML element. :type filter_elt: `lxml.etree.Element` """ prefix_elt = filter_elt.find('.//Prefix') prefix = prefix_elt.text if prefix_elt is not None else None tags = dict() for tag_elt in filter_elt.findall('.//Tag'): key_elt = tag_elt.find('Key') if key_elt is None: raise ValueError("Missing 'Key' element in 'Tag'") val_elt = tag_elt.find('Value') if val_elt is None: raise ValueError("Missing 'Value' element in 'Tag' (key=%s)" % key_elt.text) tags[key_elt.text] = val_elt.text return cls(prefix=prefix, tags=tags, **kwargs)
[docs] def generate_id(self): """Generate a rule ID from prefix and/or tags.""" parts = list() if self.prefix: parts.append('prefix=%s' % self.prefix) for kv in sorted(self.tags.items(), key=lambda x: x[0]): parts.append('='.join(kv)) if not parts: id_ = self.__class__._rule_number self.__class__._rule_number += 1 return "anonymous-rule-%s" % id_ return ','.join(parts)
[docs] def match(self, obj_meta, **kwargs): """ Check if an object matches the conditions defined by this filter. """ if self.prefix and not obj_meta['name'].startswith(self.prefix): return False for tagk in self.tags.keys(): if obj_meta.get('properties', {}).get(tagk) != self.tags[tagk]: return False return True
[docs]class LifecycleActionFilter(object): """ Specify conditions when the specific rule action takes effect. """ def __init__(self, lifecycle=None, **kwargs): self.lifecycle = lifecycle
[docs] def match(self, obj_meta, now=None, **kwargs): """ Check if an object matches the conditions. """ raise NotImplementedError
[docs]class DaysActionFilter(LifecycleActionFilter): """ Specify the number of days after object creation when the specific rule action takes effect. """ XML_TAG = 'Days' def __init__(self, days_elt, **kwargs): super(DaysActionFilter, self).__init__(**kwargs) self.days = int(days_elt.text)
[docs] def match(self, obj_meta, now=None, **kwargs): now = now or time.time() return float(obj_meta['ctime']) + self.days * 86400 < now
[docs]class DateActionFilter(LifecycleActionFilter): """ Specify the date when the specific rule action takes effect. """ XML_TAG = 'Date' def __init__(self, date_elt, **kwargs): super(DateActionFilter, self).__init__(**kwargs) date = iso8601_to_int(date_elt.text) self.date = (date - (date % 86400))
[docs] def match(self, obj_meta, now=None, **kwargs): now = now or time.time() return now > self.date
[docs]class CountActionFilter(LifecycleActionFilter): XML_TAG = 'Count' def __init__(self, count_elt, **kwargs): super(CountActionFilter, self).__init__(**kwargs) count = int(count_elt.text) if count < 0: raise ValueError( "The count must be greater than or equal to zero.") self.count = count
[docs]class LifecycleAction(LifecycleActionFilter): """ Interface for Lifecycle actions. """ def __init__(self, filter, **kwargs): super(LifecycleAction, self).__init__(**kwargs) self.filter = filter
[docs] def match(self, obj_meta, now=None, **kwargs): if self.filter is None: return True return self.filter.match(obj_meta, now=now, **kwargs)
[docs] def apply(self, obj_meta, **kwargs): """ Match then apply the treatment on the object. """ raise NotImplementedError
# TODO: implement AbortIncompleteMultipartUpload
[docs]class Expiration(LifecycleAction): """ Delete objects. """
[docs] def apply(self, obj_meta, **kwargs): if self.match(obj_meta, **kwargs): res = self.lifecycle.api.object_delete( self.lifecycle.account, self.lifecycle.container, obj_meta['name'], version=obj_meta.get('version')) return "Deleted" if res else "Kept" return "Kept"
[docs] @classmethod def from_element(cls, expiration_elt, **kwargs): """ Load the expiration from an XML element :type expiration_elt: `lxml.etree.Element` """ days_elt = expiration_elt.find(DaysExpiration.XML_TAG) date_elt = expiration_elt.find(DateExpiration.XML_TAG) if not ((days_elt is None) ^ (date_elt is None)): raise ValueError( "Missing '%s' or '%s' element in '%s'" % (DaysExpiration.XML_TAG, DateExpiration.XML_TAG, expiration_elt.tag)) if days_elt is not None: return DaysExpiration(days_elt, **kwargs) if date_elt is not None: return DateExpiration(date_elt, **kwargs)
[docs]class DaysExpiration(Expiration): """ Delete objects older than a specified delay. """ XML_TAG = DaysActionFilter.XML_TAG def __init__(self, days_elt, **kwargs): filter = DaysActionFilter(days_elt, **kwargs) super(DaysExpiration, self).__init__(filter, **kwargs)
[docs]class DateExpiration(Expiration): """ Delete objects from the specified date. """ XML_TAG = DateActionFilter.XML_TAG def __init__(self, date_elt, **kwargs): filter = DateActionFilter(date_elt, **kwargs) super(DateExpiration, self).__init__(filter, **kwargs)
[docs]class Transition(LifecycleAction): """ Change object storage policy. """ XML_POLICY = 'StorageClass' def __init__(self, filter, policy_elt, **kwargs): super(Transition, self).__init__(filter, **kwargs) self.policy = policy_elt.text if self.lifecycle and self.lifecycle.api: from oio.content.factory import ContentFactory self.factory = ContentFactory(self.lifecycle.api.container.conf, self.lifecycle.api.container) else: self.factory = None
[docs] def apply(self, obj_meta, **kwargs): if self.match(obj_meta, **kwargs): cid = cid_from_name(self.lifecycle.account, self.lifecycle.container) if not self.factory: return "Kept" # TODO: avoid loading content description a second time self.factory.change_policy(cid, obj_meta['id'], self.policy) return "Policy changed to %s" % self.policy return "Kept"
[docs] @classmethod def from_element(cls, transition_elt, **kwargs): policy_elt = transition_elt.find(cls.XML_POLICY) if policy_elt is None: raise ValueError("Missing '%s' element in '%s'" % (cls.XML_POLICY, transition_elt.tag)) days_elt = transition_elt.find(DaysTransition.XML_TAG) date_elt = transition_elt.find(DateTransition.XML_TAG) if not ((days_elt is None) ^ (date_elt is None)): raise ValueError( "Missing '%s' or '%s' element in '%s'" % (DaysTransition.XML_TAG, DateTransition.XML_TAG, transition_elt.tag)) if days_elt is not None: return DaysTransition(days_elt, policy_elt, **kwargs) if date_elt is not None: return DateTransition(date_elt, policy_elt, **kwargs)
[docs]class DaysTransition(Transition): """ Change object storage policy after a specified delay. """ XML_TAG = DaysActionFilter.XML_TAG def __init__(self, days_elt, policy_elt, **kwargs): filter = DaysActionFilter(days_elt, **kwargs) super(DaysTransition, self).__init__(filter, policy_elt, **kwargs)
[docs]class DateTransition(Transition): """ Change object storage policy from the specified date. """ XML_TAG = DateActionFilter.XML_TAG def __init__(self, date_elt, policy_elt, **kwargs): filter = DateActionFilter(date_elt, **kwargs) super(DateTransition, self).__init__(filter, policy_elt, **kwargs)
[docs]class NoncurrentVersionActionFilter(LifecycleActionFilter): """ Apply the action on all versions of an obejct, except the latest. """ def __init__(self, **kwargs): super(NoncurrentVersionActionFilter, self).__init__(**kwargs) self.versioning = None def _match(self, obj_meta, now=None, **kwargs): """ Check if versioning is enabled. """ if self.versioning is None: data = self.lifecycle.api.container_get_properties( self.lifecycle.account, self.lifecycle.container) sys = data['system'] version = sys.get('sys.m2.policy.version', None) if version is None: from oio.common.client import ProxyClient proxy_client = ProxyClient( {"namespace": self.lifecycle.api.namespace}, no_ns_in_url=True) _, data = proxy_client._request('GET', "config") version = data['meta2.max_versions'] version = int(version) self.versioning = version > 1 or version < 0 return self.versioning
[docs] def match(self, obj_meta, now=None, **kwargs): if self._match(obj_meta, now=now, **kwargs): # Load the description of the latest object version # TODO: use a cache to avoid requesting each time descr = self.lifecycle.api.object_show(self.lifecycle.account, self.lifecycle.container, obj_meta['name']) return str(descr['version']) != str(obj_meta['version']) return False
[docs]class NoncurrentVersionExpiration(Expiration): """ Delete objects old versions. """ def __init__(self, filter, **kwargs): super(NoncurrentVersionExpiration, self).__init__(filter, **kwargs) self.noncurrent_version = NoncurrentVersionActionFilter(**kwargs)
[docs] def match(self, obj_meta, now=None, **kwargs): if self.noncurrent_version.match(obj_meta, now=now, **kwargs): return super(NoncurrentVersionExpiration, self).match( obj_meta, now=now, **kwargs) return False
[docs] @classmethod def from_element(cls, expiration_elt, **kwargs): days_elt = expiration_elt.find(NoncurrentDaysExpiration.XML_TAG) count_elt = expiration_elt.find( NoncurrentCountExpiration.XML_TAG) if not ((days_elt is None) ^ (count_elt is None)): raise ValueError( "Missing '%s' or '%s' element in '%s'" % (NoncurrentDaysExpiration.XML_TAG, NoncurrentCountExpiration.XML_TAG, expiration_elt.tag)) if days_elt is not None: return NoncurrentDaysExpiration(days_elt, **kwargs) if count_elt is not None: return NoncurrentCountExpiration(count_elt, **kwargs)
[docs]class NoncurrentDaysExpiration(NoncurrentVersionExpiration): """ Delete objects old versions after a specified delay. """ XML_TAG = 'NoncurrentDays' def __init__(self, days_elt, **kwargs): filter = DaysActionFilter(days_elt, **kwargs) super(NoncurrentDaysExpiration, self).__init__(filter, **kwargs)
[docs]class NoncurrentCountExpiration(NoncurrentVersionExpiration): """ Delete exceeding versions, and keep a maximum number of versions. """ XML_TAG = 'NoncurrentCount' def __init__(self, count_elt, **kwargs): filter = CountActionFilter(count_elt, **kwargs) super(NoncurrentCountExpiration, self).__init__(filter, **kwargs) self.last_object_name = None
[docs] def match(self, obj_meta, now=None, **kwargs): return self.noncurrent_version._match(obj_meta, now=now, **kwargs)
[docs] def apply(self, obj_meta, **kwargs): if self.match(obj_meta, **kwargs): object_name = obj_meta['name'] if object_name != self.last_object_name: self.lifecycle.api.container.content_purge( self.lifecycle.account, self.lifecycle.container, object_name, maxvers=self.filter.count+1) self.last_object_name = object_name if not self.lifecycle.api.object_head(self.lifecycle.account, self.lifecycle.container, object_name, version=obj_meta['version']): return "Deleted" return "Kept"
[docs]class NoncurrentVersionTransition(Transition): """ Change object storage policy for old versions of the object only. """ def __init__(self, filter, policy_elt, **kwargs): super(NoncurrentVersionTransition, self).__init__( filter, policy_elt, **kwargs) self.noncurrent = NoncurrentVersionActionFilter(**kwargs)
[docs] def match(self, obj_meta, now=None, **kwargs): if self.noncurrent.match(obj_meta, now=now, **kwargs): return super(NoncurrentVersionTransition, self).match( obj_meta, now=now, **kwargs) return False
[docs] @classmethod def from_element(cls, transition_elt, **kwargs): policy_elt = transition_elt.find(cls.XML_POLICY) if policy_elt is None: raise ValueError("Missing '%s' element in '%s'" % (cls.XML_POLICY, transition_elt.tag)) days_elt = transition_elt.find(NoncurrentDaysTransition.XML_TAG) if days_elt is None: raise ValueError( "Missing '%s' element in '%s'" % (NoncurrentDaysTransition.XML_TAG, transition_elt.tag)) return NoncurrentDaysTransition(days_elt, policy_elt, **kwargs)
[docs]class NoncurrentDaysTransition(NoncurrentVersionTransition): """ Change object storage policy after a specified delay, for old versions of the object only. """ XML_TAG = 'NoncurrentDays' def __init__(self, days_elt, policy_elt, **kwargs): filter = DaysActionFilter(days_elt, **kwargs) super(NoncurrentDaysTransition, self).__init__( filter, policy_elt, **kwargs)
ACTION_MAP = {a.__name__: a for a in (Expiration, Transition, NoncurrentVersionExpiration, NoncurrentVersionTransition)}
[docs]def action_from_element(element, **kwargs): """ Create a new `LifecycleAction` subclass instance from an XML description. :param element: the XML description of the action :type element: `Element` """ return ACTION_MAP[element.tag].from_element(element, **kwargs)