# Copyright (C) 2017-2019 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.
import time
import uuid
from datetime import datetime
try:
from lxml import etree
except ImportError:
from xml.etree import cElementTree as etree
from oio.common.exceptions import OioException
from oio.common.logger import get_logger
from oio.common.utils import depaginate
from oio.common.easy_value import true_value
from oio.common.constants import CH_ENCODED_SEPARATOR, CH_SEPARATOR
ALLOWED_STATUSES = ['enabled', 'disabled']
LIFECYCLE_PROPERTY_KEY = 'X-Container-Sysmeta-Swift3-Lifecycle'
TAGGING_KEY = 'x-object-sysmeta-swift3-tagging'
XMLNS_S3 = 'http://s3.amazonaws.com/doc/2006-03-01/'
[docs]def iso8601_to_int(when):
# FIXME: use dateutil.parser?
return int(time.mktime(time.strptime(when, "%Y-%m-%dT%H:%M:%S")))
[docs]def int_to_iso8601(when):
return datetime.utcfromtimestamp(when).isoformat()
[docs]class ProcessedVersions(object):
"""
Save the processed versions of the last object
"""
def __init__(self, **kwargs):
self.name = None
self.versions = None
[docs] def is_already_processed(self, obj_meta, **kwargs):
"""
Check if the version of this object is already processed.
"""
return obj_meta['name'] == self.name \
and int(obj_meta['version']) in self.versions
[docs] def is_current(self, obj_meta, **kwargs):
"""
Check if the object is the current version.
"""
return self.name != obj_meta['name']
[docs] def save_object(self, obj_meta, **kwargs):
"""
Save object as processed.
"""
if obj_meta['name'] != self.name:
self.name = obj_meta['name']
self.versions = [int(obj_meta['version'])]
else:
self.versions.append(int(obj_meta['version']))
[docs] def nb_processed(self, obj_meta, **kwargs):
"""
Get the number of processed versions.
"""
if obj_meta['name'] != self.name:
return 0
return len(self.versions)
[docs]class ContainerLifecycle(object):
def __init__(self, api, account, container, logger=None,
recursive=False):
self.api = api
self.account = account
self.container = container
self.logger = logger or get_logger(None, name=str(self.__class__))
self.recursive = recursive
self.rules = list()
self.processed_versions = None
[docs] def get_configuration(self):
"""
Get lifecycle configuration from container property.
"""
props = self.api.container_get_properties(self.account, self.container)
return props['properties'].get(LIFECYCLE_PROPERTY_KEY)
[docs] def load(self):
"""
Load lifecycle rules from container property.
:returns: True if a lifecycle configuration has been loaded
"""
xml = self.get_configuration()
if xml is None:
self.logger.info("No Lifecycle configuration for %s/%s",
self.account, self.container)
return False
else:
self.load_xml(xml)
return True
[docs] def load_xml(self, xml_str):
"""
Load lifecycle rules from LifecycleConfiguration XML document.
"""
tree = etree.fromstring(xml_str)
root_ns = tree.nsmap.get(None)
root_tag = 'LifecycleConfiguration'
if root_ns is not None:
root_tag = '{%s}%s' % (root_ns, root_tag)
if tree.tag != root_tag:
raise ValueError(
"Expected 'LifecycleConfiguration' as root tag, got '%s'" %
tree.tag)
for rule_elt in tree.findall('Rule', tree.nsmap):
rule = LifecycleRule.from_element(rule_elt, lifecycle=self)
self.rules.append(rule)
def _to_element_tree(self, **kwargs):
lifecycle_elt = etree.Element('LifecycleConfiguration')
for rule in self.rules:
rule_elt = rule._to_element_tree(**kwargs)
lifecycle_elt.append(rule_elt)
return lifecycle_elt
def __str__(self):
return etree.tostring(self._to_element_tree()).decode("utf-8")
[docs] def save(self, xml_str=None):
"""
Save the lifecycle configuration in container property.
:param xml_str: the configuration to save, or None to save the
configuration that has been loaded previously
:type xml_str: `str`
"""
if not self.rules:
raise ValueError('You must call `load_xml()`'
' parameter before saving')
self.api.container_set_properties(
self.account, self.container,
properties={LIFECYCLE_PROPERTY_KEY: str(self)})
[docs] def apply(self, obj_meta, **kwargs):
"""
Match then apply the set of rules of this lifecycle configuration
on the specified object.
:returns: tuples of (object metadata, rule name, action, status)
:rtype: generator of 4-tuples
:notice: you must consume the results or the rules won't be applied.
"""
if true_value(obj_meta['deleted']):
return
for rule in self.rules:
res = rule.apply(obj_meta, **kwargs)
if res:
for action in res:
yield obj_meta, rule.id, action[0], action[1]
if action[1] != 'Kept':
return
else:
yield obj_meta, rule.id, "n/a", "Kept"
[docs] def process_container(self, container, **kwargs):
"""
Match then apply the set of rules of the lifecycle configuration
on all objects of the container.
:returns: tuples of (object metadata, rule name, action, status)
:rtype: generator of 4-tuples
:notice: the results must be consumed or the rules won't be applied.
"""
for obj_meta in depaginate(
self.api.object_list,
listing_key=lambda x: x['objects'],
marker_key=lambda x: x.get('next_marker'),
truncated_key=lambda x: x['truncated'],
account=self.account,
container=container,
properties=True,
versions=True,
**kwargs):
try:
# Save the name of the object as it is in the container,
# for later use.
obj_meta['orig_name'] = obj_meta['name']
# And reconstruct the name of the object as it is
# when shown to the final user.
if self.recursive and CH_ENCODED_SEPARATOR in container:
obj_meta['name'] = container.split(CH_ENCODED_SEPARATOR,
1)[1]
obj_meta['name'].replace(CH_ENCODED_SEPARATOR,
CH_SEPARATOR)
obj_meta['name'] += CH_SEPARATOR + obj_meta['orig_name']
obj_meta['container'] = container
if self.processed_versions is not None \
and self.processed_versions.is_already_processed(
obj_meta, **kwargs):
continue
results = self.apply(obj_meta, **kwargs)
for res in results:
yield res
except Exception as exc:
self.logger.warn(
"Failed to apply lifecycle rules on %s/%s/%s: %s",
self.account, self.container, obj_meta['name'], exc)
yield obj_meta, "n/a", "n/a", exc
if self.processed_versions is not None:
self.processed_versions.save_object(obj_meta, **kwargs)
[docs] def execute(self, use_precessed_versions=True, **kwargs):
"""
Match then apply the set of rules of the lifecycle configuration
on all objects of the container (and its relatives, if recursive
mode is enabled).
:returns: tuples of (object metadata, rule name, action, status)
:rtype: generator of 4-tuples
:notice: the results must be consumed or the rules won't be applied.
"""
if use_precessed_versions:
self.processed_versions = ProcessedVersions()
results = self.process_container(self.container, **kwargs)
for res in results:
yield res
if self.recursive:
for container in depaginate(
self.api.container_list,
item_key=lambda x: x[0],
marker_key=lambda x: x[-1][0],
account=self.account,
prefix=self.container + CH_ENCODED_SEPARATOR):
results = self.process_container(container, **kwargs)
for res in results:
yield res
self.processed_versions = None
[docs] def is_current_version(self, obj_meta, **kwargs):
"""
Check if the object is the current version
"""
if self.processed_versions is None:
current_obj = self.api.object_get_properties(
self.account, obj_meta['container'], obj_meta['orig_name'])
return current_obj['version'] == obj_meta['version']
else:
return self.processed_versions.is_current(obj_meta, **kwargs)
[docs]class LifecycleRule(object):
"""Combination of a filter and a set of lifecycle actions."""
def __init__(self, id_, filter_, enabled, actions):
self.id = id_
self.filter = filter_
self.enabled = enabled
self.actions = actions
[docs] @classmethod
def from_element(cls, rule_elt, **kwargs):
"""
Load the rule from an XML element.
:type rule_elt: `lxml.etree.Element`
"""
nsmap = rule_elt.nsmap
try:
id_ = rule_elt.findall('ID', nsmap)[-1].text
if id_ is None:
raise ValueError("Missing value for 'ID' element")
except IndexError:
id_ = uuid.uuid4().hex
try:
filter_ = LifecycleRuleFilter.from_element(
rule_elt.findall('Filter', nsmap)[-1])
except IndexError:
raise ValueError("Missing 'Filter' element")
try:
status = rule_elt.findall('Status', nsmap)[-1].text
if status is None:
raise ValueError("Missing value for 'Status' element")
status = status.lower()
if status not in ALLOWED_STATUSES:
raise ValueError("Unknown 'Status' element")
enabled = status == 'enabled'
except IndexError:
raise ValueError("Missing 'Status' element")
actions = list()
try:
expiration = Expiration.from_element(
rule_elt.findall('Expiration', nsmap)[-1], **kwargs)
action_filter_type = type(expiration.filter)
actions.append(expiration)
except IndexError:
expiration = None
action_filter_type = None
transitions = list()
for transition_elt in rule_elt.findall('Transition', nsmap):
transition = Transition.from_element(transition_elt, **kwargs)
if action_filter_type is None:
action_filter_type = type(transition.filter)
elif type(transition.filter) != action_filter_type:
raise ValueError("'Date' and 'Days' in the same Rule")
transitions.append(transition)
if transitions:
if action_filter_type == DateActionFilter:
transitions = sorted(
transitions, key=lambda transition: transition.filter.date,
reverse=True)
elif action_filter_type == DaysActionFilter:
transitions = sorted(
transitions, key=lambda transition: transition.filter.days,
reverse=True)
if expiration:
if action_filter_type == DateActionFilter:
if expiration.filter.date <= transitions[0].filter.date:
raise ValueError(
"'Date' in the Expiration action "
"must be later than 'Date' "
"in the Transition action")
elif action_filter_type == DaysActionFilter:
if expiration.filter.days <= transitions[0].filter.days:
raise ValueError(
"'Days' in the Expiration action "
"must be greater than 'Days' "
"in the Transition action")
actions = actions + transitions
try:
expiration = NoncurrentVersionExpiration.from_element(
rule_elt.findall('NoncurrentVersionExpiration', nsmap)[-1],
**kwargs)
action_filter_type = type(expiration.filter)
actions.append(expiration)
except IndexError:
expiration = None
action_filter_type = None
transitions = list()
for transition_elt in rule_elt.findall('NoncurrentVersionTransition',
nsmap):
transition = NoncurrentVersionTransition.from_element(
transition_elt, **kwargs)
if action_filter_type is None:
action_filter_type = type(transition.filter)
elif type(transition.filter) != action_filter_type:
raise ValueError(
"'NoncurrentDays' and 'NoncurrentCount' in the same Rule")
transitions.append(transition)
if transitions:
if action_filter_type == NoncurrentDaysActionFilter:
transitions = sorted(
transitions,
key=lambda transition: transition.filter.days,
reverse=True)
elif action_filter_type == NoncurrentCountActionFilter:
transitions = sorted(
transitions,
key=lambda transition: transition.filter.count,
reverse=True)
if expiration:
if action_filter_type == NoncurrentDaysActionFilter:
if expiration.filter.days <= transitions[0].filter.days:
raise ValueError(
"'NoncurrentDays' "
"in the NoncurrentVersionExpiration "
"action must be greater than 'NoncurrentDays' "
"in the NoncurrentVersionTransition action")
elif action_filter_type == NoncurrentCountActionFilter:
if expiration.filter.count <= transitions[0].filter.count:
raise ValueError(
"'NoncurrentCount' "
"in the NoncurrentVersionExpiration "
"action must be greater than 'NoncurrentCount' "
"in the NoncurrentVersionTransition action")
actions = actions + transitions
if not actions:
raise ValueError(
"At least one action needs to be specified in a Rule")
return cls(id_, filter_, enabled, actions)
def _to_element_tree(self, **kwargs):
rule_elt = etree.Element("Rule")
id_elt = etree.Element("ID")
id_elt.text = self.id
rule_elt.append(id_elt)
filter_elt = self.filter._to_element_tree(**kwargs)
rule_elt.append(filter_elt)
status_elt = etree.Element("Status")
if self.enabled:
status_elt.text = 'Enabled'
else:
status_elt.text = 'Disabled'
rule_elt.append(status_elt)
for action in self.actions:
action_elt = action._to_element_tree(**kwargs)
rule_elt.append(action_elt)
return rule_elt
def __str__(self):
return etree.tostring(self._to_element_tree()).decode("utf-8")
[docs] def match(self, obj_meta, **kwargs):
"""
Check if the specified object passes the filter of this rule.
"""
return self.filter.match(obj_meta)
[docs] def apply(self, obj_meta, **kwargs):
"""
Apply the set of actions of this rule.
:returns: the list of actions that have been applied
:rtype: `list` of `tuple` of a class and a bool or
a class and an exception instance
"""
results = list()
if self.enabled and self.match(obj_meta):
for action in self.actions:
try:
res = action.apply(obj_meta, **kwargs)
results.append((action.__class__.__name__, res))
if res != 'Kept':
break
except OioException as exc:
results.append((action.__class__.__name__, exc))
return results
[docs]class LifecycleRuleFilter(object):
"""Filter to determine on which objects to apply a lifecycle rule."""
def __init__(self, prefix, tags):
"""
:param prefix: prefix that objects must have to pass this filter
:type prefix: `basestring`
:param tags: tags that objects must have to pass this filter
:type tags: `dict`
"""
self.prefix = prefix
self.tags = tags
[docs] @classmethod
def from_element(cls, filter_elt, **kwargs):
"""
Load the filter from an XML element.
:type filter_elt: `lxml.etree.Element`
"""
nsmap = filter_elt.nsmap
try:
and_elt = filter_elt.findall('And', nsmap)[-1]
try:
prefix = and_elt.findall('Prefix', nsmap)[-1].text
if prefix is None:
raise ValueError("Missing value for 'Prefix' element")
except IndexError:
prefix = None
tags = cls._tags_from_element(and_elt)
except IndexError:
try:
prefix = filter_elt.findall('Prefix', nsmap)[-1].text
if prefix is None:
raise ValueError("Missing value for 'Prefix' element")
except IndexError:
prefix = None
try:
k, v = cls._tag_from_element(
filter_elt.findall('Tag', nsmap)[-1])
tags = {k: v}
except IndexError:
tags = {}
if prefix and tags:
raise ValueError("Too many filters, use <And>")
return cls(prefix, tags, **kwargs)
def _to_element_tree(self, **kwargs):
filter_elt = _filter_elt = etree.Element('Filter')
nb_filters = len(self.tags)
if self.prefix is not None:
nb_filters += 1
if nb_filters == 0:
filter_elt.text = ''
return filter_elt
if nb_filters > 1:
and_elt = etree.Element('And')
_filter_elt = and_elt
filter_elt.append(and_elt)
if self.prefix is not None:
prefix_elt = etree.Element('Prefix')
prefix_elt.text = self.prefix
_filter_elt.append(prefix_elt)
for k, v in self.tags.items():
tag_elt = etree.Element('Tag')
key_elt = etree.Element('Key')
key_elt.text = k
tag_elt.append(key_elt)
value_elt = etree.Element('Value')
value_elt.text = v
tag_elt.append(value_elt)
_filter_elt.append(tag_elt)
return filter_elt
def __str__(self):
return etree.tostring(self._to_element_tree()).decode("utf-8")
[docs] def match(self, obj_meta, **kwargs):
"""
Check if an object matches the conditions defined by this filter.
"""
# Check the prefix
if self.prefix and not obj_meta['name'].startswith(self.prefix):
return False
# Check the tags
if self.tags:
tags = dict()
tagging_xml = obj_meta.get('properties', {}).get(TAGGING_KEY, None)
if tagging_xml is not None:
tagging_elt = etree.fromstring(tagging_xml)
expected_tag = 'Tagging'
root_ns = tagging_elt.nsmap.get(None)
if root_ns is not None:
expected_tag = '{%s}%s' % (root_ns, expected_tag)
if tagging_elt.tag != expected_tag:
raise ValueError(
"Expected 'Tagging' as root tag, got '%s'" %
tagging_elt.tag)
tags_elt = tagging_elt.find('TagSet', tagging_elt.nsmap)
if tags_elt is None:
raise ValueError("Missing 'TagSet' element in 'Tagging'")
tags = self._tags_from_element(tags_elt, tags_elt.nsmap)
for tagk in self.tags.keys():
if tags.get(tagk) != self.tags[tagk]:
return False
return True
@staticmethod
def _tag_from_element(tag_elt, nsmap=None):
try:
k = tag_elt.findall('Key', nsmap)[-1].text
if k is None:
raise ValueError("Missing value for 'Key' element")
except IndexError:
raise ValueError("Missing 'Key' element in 'Tag'")
try:
v = tag_elt.findall('Value', nsmap)[-1].text
if v is None:
raise ValueError("Missing value for 'Value' element")
except IndexError:
raise ValueError("Missing 'Value' element in 'Tag' (key=%s)" % k)
return k, v
@staticmethod
def _tags_from_element(tags_elt, nsmap=None):
tags = dict()
for tag_elt in tags_elt.findall('Tag', tags_elt.nsmap):
k, v = LifecycleRuleFilter._tag_from_element(tag_elt, nsmap=nsmap)
if tags.get(k, None) is not None:
raise ValueError("Duplicate Tag Keys are not allowed")
tags[k] = v
return tags
[docs]class LifecycleActionFilter(object):
"""
Specify conditions when the specific rule action takes effect.
"""
def __init__(self, lifecycle=None, **kwargs):
self.lifecycle = lifecycle
def _to_element_tree(self, **kwargs):
raise NotImplementedError
def __str__(self):
return etree.tostring(self._to_element_tree()).decode("utf-8")
[docs] def match(self, obj_meta, now=None, **kwargs):
"""
Check if an object matches the conditions.
"""
raise NotImplementedError
[docs]class DaysActionFilter(LifecycleActionFilter):
"""
Specify the number of days after object creation
when the specific rule action takes effect.
"""
def __init__(self, days, **kwargs):
super(DaysActionFilter, self).__init__(**kwargs)
self.days = days
[docs] @classmethod
def from_element(cls, days_elt, **kwargs):
try:
days = int(days_elt.text or '')
if days <= 0:
raise ValueError()
except ValueError:
raise ValueError(
"The days must be a positive integer")
return cls(days, **kwargs)
def _to_element_tree(self, **kwargs):
days_elt = etree.Element('Days')
days_elt.text = str(self.days)
return days_elt
[docs] def match(self, obj_meta, now=None, **kwargs):
now = now or time.time()
return float(obj_meta['mtime']) + self.days * 86400 < now
[docs]class NoncurrentDaysActionFilter(DaysActionFilter):
def _to_element_tree(self, **kwargs):
days_elt = etree.Element('NoncurrentDays')
days_elt.text = str(self.days)
return days_elt
[docs]class DateActionFilter(LifecycleActionFilter):
"""
Specify the date when the specific rule action takes effect.
"""
def __init__(self, date, **kwargs):
super(DateActionFilter, self).__init__(**kwargs)
self.date = date
[docs] @classmethod
def from_element(cls, date_elt, **kwargs):
date = iso8601_to_int(date_elt.text or '')
date = (date - (date % 86400))
return cls(date, **kwargs)
def _to_element_tree(self, **kwargs):
date_elt = etree.Element('Date')
date_elt.text = int_to_iso8601(self.date)
return date_elt
[docs] def match(self, obj_meta, now=None, **kwargs):
now = now or time.time()
return now > self.date and float(obj_meta['mtime']) < self.date
[docs]class NoncurrentCountActionFilter(LifecycleActionFilter):
def __init__(self, count, **kwargs):
super(NoncurrentCountActionFilter, self).__init__(**kwargs)
self.count = count
if self.lifecycle is None or self.lifecycle.processed_versions is None:
self.last_object_name = None
self.nb_noncurrent_version = 0
[docs] @classmethod
def from_element(cls, count_elt, **kwargs):
try:
count = int(count_elt.text or '')
if count < 0:
raise ValueError()
except ValueError:
raise ValueError(
"The count must be greater than or equal to zero")
return cls(count, **kwargs)
def _to_element_tree(self, **kwargs):
count_elt = etree.Element('NoncurrentCount')
count_elt.text = str(self.count)
return count_elt
[docs] def match(self, obj_meta, **kwargs):
if self.lifecycle is None or self.lifecycle.processed_versions is None:
if obj_meta['name'] != self.last_object_name:
self.last_object_name = obj_meta['name']
self.nb_noncurrent_version = 1
else:
self.nb_noncurrent_version += 1
return self.count < self.nb_noncurrent_version
processed = self.lifecycle.processed_versions.nb_processed(obj_meta)
return self.count < processed
[docs]class LifecycleAction(LifecycleActionFilter):
"""
Interface for Lifecycle actions.
Apply the action on the latest version.
"""
def __init__(self, filter_, **kwargs):
super(LifecycleAction, self).__init__(**kwargs)
self.filter = filter_
def _match_filter(self, obj_meta, **kwargs):
if self.filter is None:
return True
return self.filter.match(obj_meta, **kwargs)
[docs] def match(self, obj_meta, **kwargs):
return self.lifecycle.is_current_version(obj_meta, **kwargs) \
and self._match_filter(obj_meta, **kwargs)
[docs] def apply(self, obj_meta, **kwargs):
"""
Match then apply the treatment on the object.
"""
raise NotImplementedError
# TODO: implement AbortIncompleteMultipartUpload
[docs]class Expiration(LifecycleAction):
"""
Delete objects.
"""
[docs] @classmethod
def from_element(cls, expiration_elt, **kwargs):
"""
Load the expiration from an XML element
:type expiration_elt: `lxml.etree.Element`
"""
nsmap = expiration_elt.nsmap
try:
days_elt = expiration_elt.findall('Days', nsmap)[-1]
except IndexError:
days_elt = None
try:
date_elt = expiration_elt.findall('Date', nsmap)[-1]
except IndexError:
date_elt = None
if days_elt is None and date_elt is None:
raise ValueError(
"Missing 'Days' or 'Date' element in Expiration action")
elif days_elt is not None and date_elt is not None:
raise ValueError(
"'Days' and 'Date' in same Expiration action")
if days_elt is not None:
action_filter = DaysActionFilter.from_element(days_elt, **kwargs)
elif date_elt is not None:
action_filter = DateActionFilter.from_element(date_elt, **kwargs)
return cls(action_filter, **kwargs)
def _to_element_tree(self, **kwargs):
exp_elt = etree.Element('Expiration')
filter_elt = self.filter._to_element_tree(**kwargs)
exp_elt.append(filter_elt)
return exp_elt
[docs] def apply(self, obj_meta, version=None, **kwargs):
if self.match(obj_meta, **kwargs):
res = self.lifecycle.api.object_delete(
self.lifecycle.account, obj_meta['container'],
obj_meta['orig_name'], version=version)
return "Deleted" if res else "Kept"
return "Kept"
[docs]class Transition(LifecycleAction):
"""
Change object storage policy.
"""
STORAGE_POLICY_XML_TAG = 'StorageClass'
def __init__(self, filter_, policy, **kwargs):
super(Transition, self).__init__(filter_, **kwargs)
self.policy = policy
[docs] @classmethod
def from_element(cls, transition_elt, **kwargs):
"""
Load the transition from an XML element
:type transition_elt: `lxml.etree.Element`
"""
nsmap = transition_elt.nsmap
try:
policy = transition_elt.findall(
cls.STORAGE_POLICY_XML_TAG, nsmap)[-1].text
if policy is None:
raise ValueError("Missing value for '%s' element" %
cls.STORAGE_POLICY_XML_TAG)
except IndexError:
raise ValueError("Missing '%s' element in Transition action" %
cls.STORAGE_POLICY_XML_TAG)
try:
days_elt = transition_elt.findall('Days', nsmap)[-1]
except IndexError:
days_elt = None
try:
date_elt = transition_elt.findall('Date', nsmap)[-1]
except IndexError:
date_elt = None
if days_elt is None and date_elt is None:
raise ValueError(
"Missing 'Days' or 'Date' element in Transition action")
elif days_elt is not None and date_elt is not None:
raise ValueError(
"'Days' and 'Date' in same Transition action")
if days_elt is not None:
action_filter = DaysActionFilter.from_element(days_elt, **kwargs)
elif date_elt is not None:
action_filter = DateActionFilter.from_element(date_elt, **kwargs)
return cls(action_filter, policy, **kwargs)
def _to_element_tree(self, **kwargs):
trans_elt = etree.Element('Transition')
policy_elt = etree.Element(self.STORAGE_POLICY_XML_TAG)
policy_elt.text = self.policy
trans_elt.append(policy_elt)
filter_elt = self.filter._to_element_tree(**kwargs)
trans_elt.append(filter_elt)
return trans_elt
[docs] def apply(self, obj_meta, **kwargs):
if self.match(obj_meta, **kwargs):
if obj_meta['policy'] == self.policy:
return "Policy already changed to %s" % self.policy
self.lifecycle.api.object_change_policy(
self.lifecycle.account, obj_meta['container'],
obj_meta['orig_name'], self.policy,
version=obj_meta['version'])
return "Policy changed to %s" % self.policy
return "Kept"
[docs]class NoncurrentVersionExpiration(Expiration):
"""
Delete objects old versions.
"""
[docs] @classmethod
def from_element(cls, expiration_elt, **kwargs):
"""
Load the expiration from an XML element
:type expiration_elt: `lxml.etree.Element`
"""
nsmap = expiration_elt.nsmap
try:
days_elt = expiration_elt.findall('NoncurrentDays', nsmap)[-1]
except IndexError:
days_elt = None
try:
count_elt = expiration_elt.findall('NoncurrentCount', nsmap)[-1]
except IndexError:
count_elt = None
if days_elt is None and count_elt is None:
raise ValueError(
"Missing 'NoncurrentDays' or 'NoncurrentCount' element "
"in NoncurrentVersionExpiration action")
elif days_elt is not None and count_elt is not None:
raise ValueError(
"'NoncurrentDays' and 'NoncurrentCount' "
"in same NoncurrentVersionExpiration action")
if days_elt is not None:
action_filter = NoncurrentDaysActionFilter.from_element(
days_elt, **kwargs)
return cls(action_filter, **kwargs)
elif count_elt is not None:
action_filter = NoncurrentCountActionFilter.from_element(
count_elt, **kwargs)
return cls(action_filter, **kwargs)
def _to_element_tree(self, **kwargs):
exp_elt = etree.Element('NoncurrentVersionExpiration')
filter_elt = self.filter._to_element_tree(**kwargs)
exp_elt.append(filter_elt)
return exp_elt
[docs] def match(self, obj_meta, **kwargs):
return not self.lifecycle.is_current_version(
obj_meta, **kwargs) and self._match_filter(obj_meta, **kwargs)
[docs] def apply(self, obj_meta, **kwargs):
return super(NoncurrentVersionExpiration, self).apply(
obj_meta, version=obj_meta['version'], **kwargs)
[docs]class NoncurrentVersionTransition(Transition):
"""
Change object storage policy for old versions of the object only.
"""
[docs] @classmethod
def from_element(cls, transition_elt, **kwargs):
"""
Load the transition from an XML element
:type transition_elt: `lxml.etree.Element`
"""
nsmap = transition_elt.nsmap
try:
policy = transition_elt.findall(
cls.STORAGE_POLICY_XML_TAG, nsmap)[-1].text
if policy is None:
raise ValueError("Missing value for '%s' element" %
cls.STORAGE_POLICY_XML_TAG)
except IndexError:
raise ValueError(
"Missing '%s' element in NoncurrentVersionTransition action" %
(cls.STORAGE_POLICY_XML_TAG))
try:
days_elt = transition_elt.findall('NoncurrentDays', nsmap)[-1]
except IndexError:
days_elt = None
if days_elt is None:
raise ValueError(
"Missing 'NoncurrentDays' element "
"in NoncurrentVersionTransition action")
action_filter = NoncurrentDaysActionFilter.from_element(
days_elt, **kwargs)
return cls(action_filter, policy, **kwargs)
def _to_element_tree(self, **kwargs):
trans_elt = etree.Element('NoncurrentVersionTransition')
policy_elt = etree.Element(self.STORAGE_POLICY_XML_TAG)
policy_elt.text = self.policy
trans_elt.append(policy_elt)
filter_elt = self.filter._to_element_tree(**kwargs)
trans_elt.append(filter_elt)
return trans_elt
[docs] def match(self, obj_meta, **kwargs):
return not self.lifecycle.is_current_version(
obj_meta, **kwargs) and self._match_filter(obj_meta, **kwargs)