#!/usr/bin/env python
# Copyright (C) 2015-2017 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import print_function
try:
import simplejson as json
except ImportError:
import json # noqa
from collections import OrderedDict
import math
import re
import os
import pickle
from tarfile import TarInfo, REGTYPE, NUL, PAX_FORMAT, BLOCKSIZE, XHDTYPE, \
DIRTYPE, AREGTYPE, InvalidHeaderError
from md5py import MD5
from redis import ConnectionError
from werkzeug.wrappers import Response
from werkzeug.routing import Map, Rule
from werkzeug.exceptions import BadRequest, RequestedRangeNotSatisfiable, \
Conflict, UnprocessableEntity, ServiceUnavailable
from werkzeug.wsgi import wrap_file
from oio.api.object_storage import ObjectStorageApi, _sort_chunks
from oio.common import exceptions as exc
from oio.common.configuration import read_conf
from oio.common.logger import get_logger
from oio.common.wsgi import WerkzeugApp
from oio.common.redis_conn import RedisConn
from oio.common.storage_method import STORAGE_METHODS
RANGE_RE = re.compile(r"^bytes=(\d+)-(\d+)$")
# links:
# https://www.gnu.org/software/tar/manual/html_node/Standard.html
# https://www.cyberciti.biz/faq/linux-tar-rsync-preserving-acls-selinux-contexts/
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests
# http://pubs.opengroup.org/onlinepubs/9699919799/utilities/pax.html
NS = os.getenv("OIO_NS")
CONTAINER_PROPERTIES = ".__oio_container_properties"
CONTAINER_MANIFEST = ".__oio_container_manifest"
SCHILY = "SCHILY.xattr.user."
SLO = 'x-static-large-object'
SLO_SIZE = 'x-object-sysmeta-slo-size'
SLO_ETAG = 'x-object-sysmeta-slo-etag'
SLO_HEADERS = (SLO, SLO_SIZE, SLO_ETAG)
[docs]class OioTarEntry(object):
def __init__(self, conn, account, container, name, data=None):
self._slo = None
self._buf = None
self.acct = account
self.ref = container
self.name = name
self._filesize = 0
# contains MD5 of single object or list of chunks
self._checksums = None
self.compute(conn, data)
[docs] def compute(self, conn, data=None):
tarinfo = TarInfo()
tarinfo.name = self.name
tarinfo.mod = 0o700
tarinfo.uid = 0
tarinfo.gid = 0
tarinfo.type = REGTYPE
tarinfo.linkname = ""
if self.name == CONTAINER_PROPERTIES:
meta = data or conn.container_get_properties(self.acct, self.ref)
tarinfo.size = len(json.dumps(meta['properties'], sort_keys=True))
self._filesize = tarinfo.size
self._buf = tarinfo.tobuf(format=PAX_FORMAT)
return
elif self.name == CONTAINER_MANIFEST:
tarinfo.size = len(json.dumps(data, sort_keys=True))
self._filesize = tarinfo.size
self._buf = tarinfo.tobuf(format=PAX_FORMAT)
return
entry = conn.object_get_properties(self.acct, self.ref, self.name)
properties = entry['properties']
# x-static-large-object
if properties.get(SLO, False):
tarinfo.size = int(properties.get(SLO_SIZE))
_, slo = conn.object_fetch(self.acct, self.ref, self.name,
properties=False)
self._slo = json.loads("".join(slo), object_pairs_hook=OrderedDict)
self._checksums = {}
# format MD5 to share same format as multi chunks object
offset = 0
for idx, ck in enumerate(self._slo):
self._checksums[idx] = {
'hash': ck['hash'].upper(),
'size': ck['bytes'],
'offset': offset
}
offset += ck['bytes']
else:
tarinfo.size = int(entry['length'])
meta, chunks = conn.object_locate(self.acct, self.ref, self.name,
properties=False)
storage_method = STORAGE_METHODS.load(meta['chunk_method'])
chunks = _sort_chunks(chunks, storage_method.ec)
for idx in chunks:
chunks[idx] = chunks[idx][0]
del chunks[idx]['url']
del chunks[idx]['score']
del chunks[idx]['pos']
self._checksums = chunks
self._filesize = tarinfo.size
# XATTR
# do we have to store basic properties like policy, ... ?
for key, val in properties.items():
assert isinstance(val, basestring), \
"Invalid type for %s:%s:%s" % (self.acct, self.name, key)
if self.slo and key in SLO_HEADERS:
continue
tarinfo.pax_headers[SCHILY + key] = val
tarinfo.pax_headers['mime_type'] = entry['mime_type']
self._buf = tarinfo.tobuf(format=PAX_FORMAT)
@property
def filesize(self):
return self._filesize
@property
def slo(self):
return self._slo
@property
def header_blocks(self):
"""Number of tar blocks required to store the entry header"""
assert self._buf
return len(self._buf) / BLOCKSIZE
@property
def data_blocks(self):
"""Number of tar blocks required to store the entry data"""
return ((self.filesize - 1) / BLOCKSIZE) + 1
@property
def buf(self):
return self._buf
@property
def checksums(self):
return self._checksums
[docs]class LimitedStream(object):
"""
Wrap a stream to read no more than size bytes from input stream.
Also verify checksums.
"""
def __init__(self, stream, size, entry=None, offset=0):
self.stream = stream
self.max_size = size
self.pos = 0
self.entry = entry
self.chk = None
if entry:
self.chk = entry.get('checksums')
self.offset = offset
self.md5 = None
self.current_chunk = None
self._find_chunk_for_current_offset()
self.invalid_checksum = False
def _find_chunk_for_current_offset(self):
if not self.chk:
self.current_chunk = None
return
for idx, item in self.chk.items():
val = item
if (self.offset >= val['offset']
and self.offset < val['offset'] + val['size']):
self.current_chunk = val
if val['offset'] == self.offset:
self.md5 = MD5()
else:
self.md5 = pickle.loads(val['md5'])
self.current_chunk_idx = idx
return
if self.offset < self.entry['size']:
raise Exception("No chunk found for current offset")
def _current_chunk_end(self):
return self.current_chunk['offset'] + self.current_chunk['size']
def _read_eof(self):
"""Reset the current chunk and return an empty data block."""
# save MD5 internal status in current_chunk
if self.current_chunk:
self.current_chunk['md5'] = pickle.dumps(self.md5)
self.md5 = None
self.current_chunk = None
return ""
def _update_checksum(self, data):
"""Update and verify the checksum of the current chunk."""
while self.offset + len(data) >= self._current_chunk_end():
# We read past the current chunk end. We must only do a partial
# update of the checksum in order to verify it.
remaining = (self._current_chunk_end() - self.offset)
self.md5.update(data[0:remaining])
if self.md5.hexdigest().upper() != self.current_chunk['hash']:
self.invalid_checksum = True
raise IOError("Chunk has invalid checksum, aborting")
self.current_chunk['verified'] = True
# align offset on chunk boundary
self.offset += remaining
self._find_chunk_for_current_offset()
data = data[remaining:]
if len(data) == 0:
break
self.offset += len(data)
self.md5.update(data)
[docs] def read(self, size=-1):
if self.pos >= self.max_size:
return self._read_eof()
if size < 0:
size = 1024 * 1024 * 10
size = min(size, self.max_size - self.pos)
data = self.stream.read(size)
self.pos += len(data)
if self.current_chunk:
self._update_checksum(data)
return data
[docs]class ContainerTarFile(object):
""" Expose a File Object API to be used with wrap_file """
def __init__(self, storage_api, account, container,
range_, oio_map, logger):
self.acct = account
self.container = container
self.range_ = range_
self.oio_map = oio_map
self.manifest = oio_map[:]
self.storage = storage_api
self.logger = logger
if len(range_) != 2:
self.logger.warn('no valid ranges provided for %s %s', account,
container)
def __iter__(self):
return self
[docs] def next(self):
data = self.read()
if data == "":
raise StopIteration
return data
# FIXME: create_tar_oio_XXX functions should be merged
[docs] def create_tar_oio_stream(self, entry, range_):
"""Extract data from entry from object"""
mem = ""
name = entry['name']
if range_[0] < entry['hdr_blocks']:
tar = OioTarEntry(self.storage, self.acct, self.container, name)
for bl in xrange(entry['hdr_blocks']):
if bl >= range_[0] and bl <= range_[1]:
mem += tar.buf[bl * BLOCKSIZE:bl * BLOCKSIZE + BLOCKSIZE]
range_ = (entry['hdr_blocks'], range_[1])
if range_[0] > range_[1]:
return mem
# for sanity, shift ranges
range_ = (range_[0] - entry['hdr_blocks'],
range_[1] - entry['hdr_blocks'])
# compute needed padding data
nb_blocks, remainder = divmod(entry['size'], BLOCKSIZE)
start = range_[0] * BLOCKSIZE
last = False
if remainder > 0 and nb_blocks == range_[1]:
last = True
end = entry['size'] - 1
else:
end = range_[1] * BLOCKSIZE + BLOCKSIZE - 1
if entry['slo']:
# we have now to compute which block(s) we need to read
slo_start = 0
for part in entry['slo']:
if start > part['bytes']:
start -= part['bytes']
end -= part['bytes']
continue
slo_end = min(end, part['bytes'])
slo_start = start
cnt, path = part['name'].strip('/').split('/', 1)
_, data = self.storage.object_fetch(
self.acct, cnt, path, ranges=[(slo_start, slo_end)],
properties=False)
mem += "".join(data)
start = max(0, start - part['bytes'])
end -= part['bytes']
if end <= 0:
break
else:
_, data = self.storage.object_fetch(
self.acct, self.container, name, ranges=[(start, end)],
properties=False)
mem += "".join(data)
if last:
mem += NUL * (BLOCKSIZE - remainder)
if not mem:
self.logger.error("no data extracted")
if divmod(len(mem), BLOCKSIZE)[1]:
self.logger.error("data written does not match blocksize")
return mem
[docs] def create_tar_oio_properties(self, entry, range_, name):
"""
Extract data from fake object for :name:
CONTAINER_PROPERTIES: contains properties of container
CONTAINER_MANIFEST: map of object in Tar
"""
nb_blocks_to_serve = (range_[1] - range_[0] + 1) * BLOCKSIZE
if name == CONTAINER_PROPERTIES:
meta = self.storage.container_get_properties(self.acct,
self.container)
if not meta['properties']:
self.logger.error("container properties are empty")
struct = meta
data = json.dumps(meta['properties'], sort_keys=True)
elif name == CONTAINER_MANIFEST:
struct = self.manifest
data = json.dumps(self.manifest, sort_keys=True)
size = len(data)
mem = ""
if size != entry['size']:
self.logger.error("container properties has been updated")
if range_[0] < entry['hdr_blocks']:
tar = OioTarEntry(self.storage, self.acct, self.container,
name, data=struct)
for bl in xrange(entry['hdr_blocks']):
if bl >= range_[0] and bl <= range_[1]:
mem += tar.buf[bl * BLOCKSIZE:bl * BLOCKSIZE + BLOCKSIZE]
range_ = (entry['hdr_blocks'], range_[1])
if range_[0] > range_[1]:
return mem
# for sanity, shift blocks
range_ = (range_[0] - entry['hdr_blocks'],
range_[1] - entry['hdr_blocks'])
# compute needed padding data
nb_blocks, remainder = divmod(entry['size'], BLOCKSIZE)
start = range_[0] * BLOCKSIZE
last = False
if remainder > 0 and nb_blocks == range_[1]:
last = True
end = entry['size']
else:
end = range_[1] * BLOCKSIZE + BLOCKSIZE
mem += data[start:end]
if last:
mem += NUL * (BLOCKSIZE - remainder)
# add padding if needed
if len(mem) != nb_blocks_to_serve:
mem += NUL * (nb_blocks_to_serve - len(mem))
if not mem:
self.logger.error("no data extracted")
if divmod(len(mem), BLOCKSIZE)[1]:
self.logger.error("data written does not match blocksize")
return mem
[docs] def read(self, size=-1):
"""
Stream TAR content
each call will send object by object, or by chunk of `size`
if object is too large
"""
# Is there API to stream data from OIO SDK (to avoid copy ?)
data = ""
size = divmod(size, 512)[0]
if self.range_[0] > self.range_[1]:
self.logger.debug("EOF reached")
return data
for val in self.oio_map[:]:
if self.range_[0] > val['end_block']:
self.oio_map.remove(val)
continue
if size > 0 and val['end_block'] - self.range_[0] > size:
# TODO (mbonfils) add a unit test
end_block = self.range_[0] + size
else:
end_block = min(self.range_[1], val['end_block'])
assert self.range_[0] >= val['start_block']
assert self.range_[0] <= self.range_[1], \
"Got start %d / end %d" % (self.range_[0], self.range_[1])
_s = val['start_block']
# map ranges to object range
range_ = (self.range_[0] - _s, end_block - _s)
self.range_ = (end_block + 1, self.range_[1])
if 'name' not in val:
data = NUL * (range_[1] - range_[0] + 1) * BLOCKSIZE
elif val['name'] in (CONTAINER_PROPERTIES, CONTAINER_MANIFEST):
data = self.create_tar_oio_properties(val, range_, val['name'])
else:
data = self.create_tar_oio_stream(val, range_)
if end_block == val['end_block']:
self.oio_map.remove(val)
break
return data
[docs] def close(self):
if self.range_[0] <= self.range_[1]:
self.logger.info("data not all consumed")
[docs]class ContainerRestore(object):
MODE_FULL = 1
MODE_RANGE = 2
def __init__(self, redis, proxy, logger):
self.cur_state = {'offset_block': 0, 'offset': 0}
self._range = (0, 0)
self.req = None
self.req_size = -1
self.append = False
self.mode = self.MODE_FULL
# current file entry being processed
self.inf = None
self.state = {}
self.redis = redis
self.proxy = proxy
self.logger = logger
[docs] def prepare(self, account, container):
assert (self.req)
if self.req.headers.get('range') is None:
return
rnge = ContainerBackup._extract_range(self.req, blocks=None)
self._range = [rnge[2], rnge[3]]
self.mode = self.MODE_RANGE
data = self.redis.get("restore:%s:%s" % (account, container))
if self._range[0] == 0:
if data:
raise UnprocessableEntity(
"A restoration has been already started")
self.cur_state = {
'start': -1,
'end': -1,
'manifest': None,
'entry': None, # current entry in process
# block offset when appending on existing object
'offset_block': 0,
# block offset in data (w/o headers) when appending
'offset': 0}
return
if not data:
raise UnprocessableEntity("First segment is not available")
self.cur_state = json.loads(data, object_pairs_hook=OrderedDict)
if self._range[0] != self.cur_state['end']:
raise UnprocessableEntity(
"Segment was already written "
"or an error has occured previously")
for entry in self.cur_state['manifest']:
if self._range[0] > entry['end_block']:
continue
if self._range[0] == entry['start_block']:
self.append = False
self.cur_state['offset_block'] = 0
self.cur_state['offset'] = 0
break
if self._range[0] >= entry['start_block'] \
+ entry['hdr_blocks']:
self.append = True
self.cur_state['entry'] = entry
self.inf = TarInfo()
self.inf.name = entry['name']
offset = (self._range[0] - entry['start_block']
- entry['hdr_blocks'])
self.cur_state['offset'] = offset * BLOCKSIZE
self.inf.size = entry['size'] - offset * BLOCKSIZE
self.inf.size = min(self.inf.size, self.req_size)
self.cur_state['offset_block'] = (self._range[0]
- entry['start_block'])
break
raise UnprocessableEntity('Header is broken')
[docs] def read(self, size):
while (len(self.state['buf']) < size and
not self.req.stream.is_exhausted):
chunk = self.req.stream.read(size - len(self.state['buf']))
self.state['consumed'] += len(chunk)
self.state['buf'] += chunk
data = self.state['buf'][:size]
self.state['buf'] = self.state['buf'][size:]
if len(data) != size:
raise UnprocessableEntity("No enough data")
return data
[docs] def parse_xhd_type(self, hdrs):
""" enrich hdrs with new headers """
buf = self.read(self.inf.size)
while buf:
length = buf.split(' ', 1)[0]
if length[0] == '\x00':
break
tmp = buf[len(length) + 1:int(length) - 1]
key, value = tmp.split('=', 1)
if key.startswith(SCHILY):
key = key[len(SCHILY):]
assert key not in hdrs, (
"%s already found in %s (object: %s)" %
(key, hdrs, self.inf.name))
hdrs[key] = value
buf = buf[int(length):]
def _restore_container_properties(self, hdrs, account, container):
assert not hdrs, "invalid sequence in TAR"
hdrs = json.loads(self.read(self.inf.size),
object_pairs_hook=OrderedDict)
self.proxy.container_set_properties(account, container, hdrs)
def _load_manifest(self, hdrs, account, container):
assert not hdrs, "invalid sequence in TAR"
manifest = json.loads(self.read(self.inf.size),
object_pairs_hook=OrderedDict)
self.cur_state['manifest'] = manifest
if self.mode == self.MODE_RANGE:
self.cur_state['last_block'] = max(
[x['end_block'] for x in manifest]) + 1
def _restore_object(self, hdrs, account, container):
kwargs = {}
if not self.append and hdrs and 'mime_type' in hdrs:
kwargs['mime_type'] = hdrs['mime_type']
del hdrs['mime_type']
data = LimitedStream(self.req.stream, self.inf.size,
entry=self.cur_state.get('entry'),
offset=self.cur_state.get('offset'))
try:
_, size, _ = self.proxy.object_create(
account, container, obj_name=self.inf.name, append=self.append,
file_or_path=data, **kwargs)
except Exception:
# No data is written if an error occurs during object_create.
# We just have to update our state_machine offset regarding
# the current object.
if self.cur_state.get('manifest') is None:
raise
entry = None
for entry in self.cur_state['manifest']:
if entry['name'] == self.inf.name:
break
else: # it should not happen
raise BadRequest("Invalid internal state")
# Since an error has occured, we have to reset the
# current offset to the start of the current chunk
# and remove the stored checksum if set.
if data.invalid_checksum:
self.logger.error("Invalid checksum detected for %s",
self.inf.name)
raise BadRequest("Checksum error for %s" % self.inf.name)
self.cur_state['end'] = (entry['start_block']
+ self.cur_state['offset_block'])
self.redis.set("restore:%s:%s" % (account, container),
json.dumps(self.cur_state, sort_keys=True),
ex=ContainerBackup.REDIS_TIMEOUT)
raise
# save properties before checking size, otherwise they'll be lost
if hdrs:
self.proxy.object_set_properties(account, container,
self.inf.name,
properties=hdrs)
if size != self.inf.size:
raise UnprocessableEntity(
"Object created is smaller than expected")
self.state['consumed'] += size
if self.mode == self.MODE_RANGE:
self.cur_state['offset_block'] = 0
self.cur_state['offset'] = 0
self.append = False
[docs] def parse_reg_type(self, hdrs, account, container):
if self.inf.name == CONTAINER_PROPERTIES:
return self._restore_container_properties(hdrs, account, container)
elif self.inf.name == CONTAINER_MANIFEST:
return self._load_manifest(hdrs, account, container)
else:
return self._restore_object(hdrs, account, container)
[docs] def restore(self, request, account, container):
"""Manage PUT method for restoring a container"""
self.req = request
self.req_size = int(self.req.headers['content-length'])
self.prepare(account, container)
self.proxy.container_create(account, container)
self.state = {'consumed': 0, 'buf': ''}
hdrs = {}
while self.state['consumed'] < self.req_size:
try:
if not self.extract_tar_entry():
# skip NULL blocks
continue
except InvalidHeaderError as ex:
self.logger.error("Tar entry have invalid checksum")
raise BadRequest(str(ex))
if self.inf.type not in (XHDTYPE, REGTYPE, AREGTYPE, DIRTYPE):
raise BadRequest('unsupported TAR attribute %s' %
self.inf.type)
if self.inf.type == XHDTYPE:
self.parse_xhd_type(hdrs)
elif self.inf.type in (REGTYPE, AREGTYPE):
self.parse_reg_type(hdrs, account, container)
hdrs = {}
if self.inf.size % BLOCKSIZE:
self.read(BLOCKSIZE - self.inf.size % BLOCKSIZE)
if self.req_size != self.state['consumed']:
raise UnprocessableEntity(
"Invalid length of data consumed by restoration")
if (self.mode == self.MODE_FULL
or self._range[1] == self.cur_state['last_block']):
code = 201
manifest = self.cur_state.get('manifest')
if manifest:
for entry in self.cur_state.get('manifest'):
verified = 0
nb = 0
# check that each chunk of each object has been checked
for idx in entry.get('checksums', []):
if (entry['checksums'][idx].get('verified')
or entry['checksums'][idx].get('size') == 0):
verified += 1
nb += 1
if verified != nb:
self.logger.warn("%s not verified !",
entry.get('name'))
else:
self.logger.info("no manifest, checksums not available")
self.redis.delete("restore:%s:%s" % (account, container))
else:
code = 206
self.cur_state['start'] = self._range[0]
self.cur_state['end'] = self._range[1]
self.redis.set("restore:%s:%s" % (account, container),
json.dumps(self.cur_state, sort_keys=True),
ex=ContainerBackup.REDIS_TIMEOUT)
return Response(status=code)
[docs]def redis_cnx(fct):
def wrapper(*args):
try:
return fct(*args)
except ConnectionError:
args[0].logger.error("Redis is not available")
raise ServiceUnavailable()
return wrapper
[docs]class ContainerBackup(RedisConn, WerkzeugApp):
"""WSGI Application to dump or restore a container."""
REDIS_TIMEOUT = 3600 * 24 # Redis keys will expire after one day
STREAMING = 52428800 # 50 MB
# Number of blocks to serve to avoid splitting headers (1MiB)
BLOCK_ALIGNMENT = 2048
def __init__(self, conf):
if conf:
self.conf = read_conf(conf['key_file'],
section_name="admin-server")
else:
self.conf = {}
self.logger = get_logger(self.conf, name="ContainerBackup")
self.proxy = ObjectStorageApi(self.conf.get("namespace", NS),
logger=self.logger)
self.url_map = Map([
Rule('/v1.0/container/dump', endpoint='dump'),
Rule('/v1.0/container/restore', endpoint='restore'),
])
self.REDIS_TIMEOUT = self.conf.get("redis_cache_timeout",
self.REDIS_TIMEOUT)
super(ContainerBackup, self).__init__(self.conf)
WerkzeugApp.__init__(self, self.url_map, self.logger)
@property
def redis(self):
"""Redis connection object"""
return self.conn
[docs] @redis_cnx
def generate_manifest(self, account, container):
"""
Generate a static manifest of a container.
It will help to find quickly which part of object app have to serve
Manifest is cached into Redis with REDIS_TIMEOUT delay
"""
if not container:
raise exc.NoSuchContainer()
# TODO hash_map should contains if deleted or version flags are set
hash_map = "container_streaming:{0}/{1}".format(account, container)
cache = self.redis.get(hash_map)
if cache:
self.logger.debug("using cache")
return json.loads(cache, object_pairs_hook=OrderedDict)
map_objs = []
start_block = 0
meta = self.proxy.container_get_properties(account, container)
if meta['properties']:
# create special file to save properties of container
tar = OioTarEntry(self.proxy, account, container,
CONTAINER_PROPERTIES, data=meta)
entry = {
'name': CONTAINER_PROPERTIES,
'size': tar.filesize,
'hdr_blocks': tar.header_blocks,
'blocks': tar.header_blocks + tar.data_blocks,
'start_block': start_block,
}
start_block += entry['blocks']
entry['end_block'] = start_block - 1
map_objs.append(entry)
objs = self.proxy.object_list(account, container)
for obj in sorted(objs['objects'], key=lambda x: x['name']):
# FIXME: should we backup deleted objects?
if obj['deleted']:
continue
tar = OioTarEntry(self.proxy, account, container, obj['name'])
if (start_block / self.BLOCK_ALIGNMENT) != \
((start_block + tar.header_blocks) / self.BLOCK_ALIGNMENT):
# header is over boundary, we have to add padding blocks
padding = (self.BLOCK_ALIGNMENT -
divmod(start_block, self.BLOCK_ALIGNMENT)[1])
map_objs.append({
'blocks': padding,
'size': padding * BLOCKSIZE,
'start_block': start_block,
'slo': None,
'hdr_blocks': padding,
'end_block': start_block + padding - 1
})
start_block += padding
entry = {
'name': obj['name'],
'size': tar.filesize,
'hdr_blocks': tar.header_blocks,
'blocks': tar.header_blocks + tar.data_blocks,
'start_block': start_block,
'slo': tar.slo,
'checksums': tar.checksums,
}
start_block += entry['blocks']
entry['end_block'] = start_block - 1
map_objs.append(entry)
if not map_objs:
return map_objs
entry = {
'name': CONTAINER_MANIFEST,
'size': 0,
'hdr_blocks': 1, # a simple PAX header consume only 1 block
'blocks': 0,
'start_block': 0,
'slo': None,
}
map_objs.insert(0, entry)
entry['size'] = len(json.dumps(map_objs, sort_keys=True))
# ensure that we reserved enough blocks after recomputing offset
entry['blocks'] = \
1 + int(math.ceil(entry['size'] / float(BLOCKSIZE))) * 2
tar = OioTarEntry(self.proxy, account, container, CONTAINER_MANIFEST,
data=map_objs)
assert tar.header_blocks == 1, "Incorrect size for hdr_blocks"
assert tar.data_blocks <= entry['blocks']
# fix start_block and end_block
start = 0
for _entry in map_objs:
_entry['start_block'] = start
start += _entry['blocks']
_entry['end_block'] = start - 1
tar2 = OioTarEntry(self.proxy, account, container, CONTAINER_MANIFEST,
data=map_objs)
entry['size'] = tar2.filesize
assert tar2.header_blocks == tar.header_blocks
assert tar2.data_blocks <= entry['blocks'], \
"got %d instead of %d" % (tar2.data_blocks, tar.data_blocks)
self.logger.debug("add entry to cache")
self.redis.set(hash_map, json.dumps(map_objs, sort_keys=True),
ex=self.REDIS_TIMEOUT)
return map_objs
def _do_head(self, _, account, container):
"""
Manage HEAD method and response number of block
Note: Range header is unmanaged
"""
try:
results = self.generate_manifest(account, container)
except exc.NoSuchContainer:
self.logger.info("%s %s not found", account, container)
return Response(status=404)
if not results:
self.logger.info("no data for %s %s", account, container)
return Response(status=204)
hdrs = {
'X-Blocks': sum([i['blocks'] for i in results]),
'Content-Length': sum([i['blocks'] for i in results]) * BLOCKSIZE,
'Accept-Ranges': 'bytes',
'Content-Type': 'application/tar',
}
return Response(headers=hdrs, status=200)
@classmethod
def _extract_range(cls, req, blocks):
"""Convert byte range into block an performs validity check"""
# accept only single part range
val = req.headers['Range']
match = RANGE_RE.match(val)
if match is None:
raise RequestedRangeNotSatisfiable()
start = int(match.group(1))
end = int(match.group(2))
if start >= end:
raise RequestedRangeNotSatisfiable()
def check_range(value):
block, remainder = divmod(value, BLOCKSIZE)
if remainder or block < 0 or (blocks and block > blocks):
raise RequestedRangeNotSatisfiable()
return block
block_start = check_range(start)
block_end = check_range(end + 1) # Check Range RFC
return start, end, block_start, block_end
def _do_get(self, req, account, container):
"""Manage GET method to dump a container"""
try:
results = self.generate_manifest(account, container)
except exc.NoSuchContainer:
self.logger.info("%s %s not found", account, container)
return Response(status=404)
if not results:
self.logger.info("no data for %s %s", account, container)
return Response(status=204)
blocks = sum([i['blocks'] for i in results])
length = blocks * BLOCKSIZE
if 'Range' not in req.headers:
tar = ContainerTarFile(self.proxy, account, container,
(0, blocks-1), results, self.logger)
return Response(wrap_file(req.environ, tar,
buffer_size=self.STREAMING),
headers={
'Accept-Ranges': 'bytes',
'Content-Type': 'application/tar',
'Content-Length': length,
}, status=200)
start, end, block_start, block_end = self._extract_range(req, blocks)
tar = ContainerTarFile(self.proxy, account, container,
(block_start, block_end - 1),
results, self.logger)
return Response(wrap_file(req.environ, tar,
buffer_size=self.STREAMING),
headers={
'Accept-Ranges': 'bytes',
'Content-Type': 'application/tar',
'Content-Range': 'bytes %d-%d/%d' %
(start, end, length),
'Content-Length': end - start + 1,
}, status=206)
[docs] def on_dump(self, req):
"""Entry point for dump rule"""
# extract account and container
account = req.args.get('acct')
container = req.args.get('ref')
if not account:
raise BadRequest('Missing Account name')
if not container:
raise BadRequest('Missing Container name')
if req.method == 'HEAD':
return self._do_head(req, account, container)
if req.method == 'GET':
return self._do_get(req, account, container)
return Response("Not supported", 405)
@redis_cnx
def _do_put_head(self, req, account, container):
results = self.redis.get("restore:%s:%s" % (account,
container))
if not results:
return UnprocessableEntity("No restoration in progress")
in_progress = self.redis.get('restore:%s:%s:lock' % (account,
container)) or '0'
results = json.loads(results)
blocks = sum(i['blocks'] for i in results['manifest'])
return Response(headers={
'X-Tar-Size': blocks * BLOCKSIZE,
'X-Consumed-Size': results['end'] * BLOCKSIZE,
'X-Upload-In-Progress': in_progress
}, status=200)
@redis_cnx
def _do_put(self, req, account, container):
"""Manage PUT method for restoring a container"""
obj = ContainerRestore(self.redis, self.proxy, self.logger)
key = "restore:%s:%s:lock" % (account, container)
if not self.redis.set(key, 1, nx=True):
raise UnprocessableEntity("A restore is already in progress")
try:
return obj.restore(req, account, container)
finally:
self.redis.delete(key)
[docs] def on_restore(self, req):
"""Entry point for restore rule"""
account = req.args.get('acct')
container = req.args.get('ref')
if not account:
raise BadRequest('Missing Account name')
if not container:
raise BadRequest('Missing Container name')
if req.method not in ('PUT', 'HEAD'):
return Response("Not supported", 405)
try:
self.proxy.container_get_properties(account, container)
if not req.headers.get('range') and req.method == 'PUT':
raise Conflict('Container already exists')
except exc.NoSuchContainer:
pass
except Conflict:
raise
except Exception:
raise BadRequest('Fail to verify container')
if req.method == 'HEAD':
return self._do_put_head(req, account, container)
return self._do_put(req, account, container)