Source code for oio.directory.meta1
# Copyright (C) 2018 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Meta1 client and meta2 balancing operations"""
from oio.directory.meta import MetaMapping
from oio.common.exceptions import OioException
from oio.directory.client import DirectoryClient
[docs]class Meta1RefMapping(MetaMapping):
"""Represents the content of the meta1 database"""
def __init__(self, namespace, directory_client=None, **kwargs):
super(Meta1RefMapping, self).__init__(
{'namespace': namespace}, ['meta2', 'sqlx'], **kwargs)
self._reference = directory_client
self.service_type_by_base = dict()
self.args_by_base = dict()
def _get_old_peers_by_base(self, base):
return self.raw_services_by_base.get(base, list())
def _get_peers_by_base(self, base):
return self.services_by_base.get(base, dict()).keys()
def _get_service_type_by_base(self, base):
return self.service_type_by_base.get(base, None)
def _get_args_by_base(self, base):
return self.args_by_base.get(base, None)
def _apply_link_services(self, moved_ok, **kwargs):
for base in moved_ok:
peers = self._get_peers_by_base(base)
service_type = self._get_service_type_by_base(base)
args = self._get_args_by_base(base)
cid, seq = self.get_cid_and_seq(base)
try:
self.reference.force(
service_type=service_type, cid=cid, replace=True,
services=dict(host=','.join(peers), type=service_type,
args=args, seq=seq))
"""
FIXME(ABO): This part can be removed when, either:
- meta1 sends the removed services bundled with the
account.services events.
- meta2 sends a storage.container.deleted event when the
sqliterepo layer is the one that notifies the deletion of
the databases.
"""
if service_type == 'meta2' and kwargs.get('src_service'):
self.rdir.meta2_index_delete(
volume_id=kwargs.get('src_service'),
container_id=cid
)
except OioException as exc:
self.logger.warn(
"Failed to link services for base %s (seq=%d): %s",
cid, seq, exc)
@property
def reference(self):
if not self._reference:
self._reference = DirectoryClient(self.conf)
return self._reference
def _service_id(self, service, service_type):
return self.conf['namespace'] + "|" + service_type + "|" + service
def _conscience_poll(self, service_type, known, avoid, **kwargs):
try:
services_found = self.conscience.poll(
service_type,
known=[self._service_id(svc, service_type) for svc in known],
avoid=[self._service_id(svc, service_type) for svc in avoid])
return [svc['addr'] for svc in services_found]
except OioException as exc:
self.logger.warn(
"Failed to poll services (type=%s, known=%s, avoid=%s): %s",
service_type, known, avoid, exc)
return list()
[docs] def move(self, src_service, dest_service, base_name, service_type,
**kwargs):
"""
Move a `base` of `src_service` to `dest_service`
"""
if service_type not in self.services_by_service_type.keys():
raise ValueError(
"service type must be %s"
% " or ".join(self.services_by_service_type.keys()))
cid, seq = self.get_cid_and_seq(base_name)
data = self.reference.list(cid=cid)
if dest_service is not None and dest_service not in \
self.services_by_service_type[service_type].keys():
raise ValueError(
"destination service must be a %s service" % service_type)
bases = dict()
for service in data['srv']:
if service['type'] != service_type:
continue
if seq is not None and seq != service['seq']:
continue
base = cid + "." + str(service['seq'])
raw_services = bases.get(base, None)
if raw_services is None:
raw_services = dict()
bases[base] = raw_services
host = service['host']
service.pop('host', None)
raw_services[host] = service
moved = set()
for base, raw_services in bases.iteritems():
old_peers = raw_services.keys()
if src_service not in old_peers:
continue
src_info = raw_services.pop(src_service)
if dest_service is None:
known = raw_services.keys()
services_found = self._conscience_poll(
service_type, known, [src_service], **kwargs)
if not services_found:
self.logger.warn(
"No destination service found %s (seq=%d)", cid, seq)
dest_service = services_found[0]
elif dest_service in old_peers:
continue
raw_services[dest_service] = src_info
moved.add(base)
self.raw_services_by_base[base] = old_peers
self.services_by_base[base] = raw_services
self.service_type_by_base[base] = service_type
self.args_by_base[base] = src_info['args']
if not moved:
raise ValueError(
"source service isn't used "
"or destination service is already used for this base")
return moved