Source code for oio.directory.meta0

# Copyright (C) 2015-2017 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


"""Meta0 client and meta1 balancing operations"""
import random

from oio.directory.meta import MetaMapping
from oio.common.client import ProxyClient
from oio.common.exceptions import ConfigurationException, OioException
from oio.common.json import json


[docs]class Meta0Client(ProxyClient): """Meta0 administration client""" def __init__(self, conf, **kwargs): super(Meta0Client, self).__init__(conf, request_prefix="/admin", **kwargs)
[docs] def force(self, mapping, **kwargs): """ Force the meta0 prefix mapping. The mapping may be partial to force only a subset of the prefixes. """ self._request('POST', "/meta0_force", data=mapping, **kwargs)
[docs] def list(self, **kwargs): """Get the meta0 prefix mapping""" _, obody = self._request('GET', "/meta0_list", **kwargs) return obody
[docs]class Meta0PrefixMapping(MetaMapping): """Represents the content of the meta0 database""" def __init__(self, meta0_client, replicas=3, digits=None, min_dist=1, **kwargs): """ :param replicas: number of services to allocate to manage a base :type replicas: strictly positive `int` :param digits: number of digits used to name the database files :type digits: `int` between 0 and 4 (inclusive) """ super(Meta0PrefixMapping, self).__init__( meta0_client.conf, ['meta1'], **kwargs) self.m0 = meta0_client self.replicas = int(replicas) if self.replicas < 1: raise ConfigurationException("replicas must be >= 1") if digits is None: digits = 4 self.digits = int(digits) if self.digits < 0: raise ConfigurationException("meta_digits must be >= 0") if self.digits > 4: raise ConfigurationException("meta_digits must be <= 4") self.min_dist = min_dist @property def services(self): return self.services_by_service_type['meta1'] def _get_old_peers_by_base(self, base): return self.raw_services_by_base.get(base, list()) def _get_peers_by_base(self, base): return [v['addr'] for v in self.services_by_base.get(base, list())] def _get_service_type_by_base(self, base): return 'meta1' def _apply_link_services(self, moved_ok, **kwargs): try: self.m0.force(self.to_json(moved_ok).strip(), **kwargs) except OioException as exc: self.logger.warn( "Failed to link services for meta0: %s", exc) def __nonzero__(self): return bool(self.services_by_base)
[docs] def num_bases(self): """Get total the number of bases according to `self.digits`""" return 1 << (4 * self.digits)
[docs] def get_loc(self, svc, default=None): """ Get the location of a service. If location is not defined, return: - service IP address if `default` is None or "addr" - `default` for any other value. """ if isinstance(svc, basestring): svc = self.services.get(svc, {"addr": svc}) loc = svc.get("tags", {}).get("tag.loc", default) if not loc or loc == "addr": loc = svc["addr"].rsplit(":", 1)[0] return str(loc)
[docs] @staticmethod def dist_between(loc1, loc2): loc1_parts = loc1.split('.', 3) loc2_parts = loc2.split('.', 3) max_dist = max(len(loc1_parts), len(loc2_parts)) in_common = 0 loc1_part = loc1_parts.pop(0) loc2_part = loc2_parts.pop(0) try: while loc1_part == loc2_part: in_common += 1 loc1_part = loc1_parts.pop(0) loc2_part = loc2_parts.pop(0) except IndexError: pass return max_dist - in_common
[docs] def get_score(self, svc): """Get the score of a service, or 0 if it is unknown""" if isinstance(svc, basestring): svc = self.services.get(svc, {'addr': svc}) score = int(svc.get("score", 0)) return score
[docs] def get_managed_bases(self, svc): """Get the list of bases managed by the service""" if isinstance(svc, basestring): svc = self.services.get(svc, {'addr': svc}) return svc.get('bases', set())
[docs] def prefix_to_base(self, pfx): """ Get the name of the base the prefix will be saved in. When `self.digits` is 4, the name of the base is `pfx`. """ return str(pfx[:self.digits]).ljust(4, '0')
[docs] def prefix_siblings(self, pfx): """ Get the list of prefixes that share the same base as `pfx`. """ min_base = self.prefix_to_base(pfx) max_base = str(min_base[:self.digits]).ljust(4, 'F') return ["%04X" % base for base in xrange(int(min_base, 16), int(max_base, 16) + 1)]
def _extend(self, bases=None): """ Extend the mapping to all meta1 prefixes, if `self.digits` is less than 4. :param bases: if set, only extend the mapping for the specified bases """ extended = dict() if not bases: svc_by_base = self.services_by_base else: svc_by_base = {base: self.services_by_base[base] for base in bases} if self.digits == 4: # nothing to extend: there is one base for each prefix return svc_by_base for base, services in svc_by_base.iteritems(): for pfx in self.prefix_siblings(base): extended[pfx] = services return extended
[docs] def to_json(self, bases=None): """ Serialize the mapping to a JSON string suitable as input for 'meta0_force' request. """ simplified = dict() for pfx, services in self._extend(bases).iteritems(): simplified[pfx] = [x['addr'] for x in services] return json.dumps(simplified)
[docs] def load(self, json_mapping=None, swap_bytes=True, **kwargs): """ Load the mapping from the cluster, from a JSON string or from a dictionary. """ if isinstance(json_mapping, basestring): raw_mapping = json.loads(json_mapping) elif isinstance(json_mapping, dict): raw_mapping = json_mapping else: raw_mapping = self.m0.list(**kwargs) # pylint: disable=no-member for pfx, services_addrs in raw_mapping.iteritems(): services = list() # FIXME: this is REALLY annoying # self.prefix_to_base() takes the beginning of the prefix, # but here we have to take the end, because meta0 does # some byte swapping. if swap_bytes: base = pfx[4-self.digits:] else: base = pfx[:self.digits] for svc_addr in services_addrs: svc = self.services.get(svc_addr, {"addr": svc_addr}) services.append(svc) self.assign_services(base, services) # Deep copy the list self.raw_services_by_base[base] = [str(x) for x in services_addrs]
def _find_services(self, known=None, lookup=None, max_lookup=50): """ Call `lookup` to find `self.replicas` different services. :param known: an iterable of already know services :param lookup: a function that returns an iterable of services """ services = known if known else list() known_locations = {self.get_loc(svc) for svc in services} iterations = 0 while len(services) < self.replicas and iterations < max_lookup: iterations += 1 svcs = lookup(services) if not svcs: break for svc in svcs: loc = self.get_loc(svc) if all(self.dist_between(loc, loc1) >= self.min_dist for loc1 in known_locations): known_locations.add(loc) services.append(svc) if len(services) >= self.replicas: break return services
[docs] def find_services_random(self, known=None, **_kwargs): """Find `replicas` services, including the ones of `known`""" return self._find_services( known, (lambda known2: (self.services[random.choice(self.services.keys())], )))
[docs] def find_services_less_bases(self, known=None, min_score=1, **_kwargs): """Find `replicas` services, including the ones of `known`""" if known is None: known = list() filtered = [x for x in self.services.itervalues() if self.get_score(x) >= min_score] # Reverse the list so we can quickly pop the service # with less managed bases filtered.sort(key=(lambda x: len(self.get_managed_bases(x))), reverse=True) def _lookup(known2): while filtered: svc = filtered.pop() if svc not in known2: return (svc,) return None return self._find_services(known, _lookup, len(filtered))
[docs] def find_services_m1_pool(self, known=None, **_kwargs): """ Find `replicas` services, including the ones of `known`, by calling the proxy's load balancer. """ def _lookup(known2): res = self.conscience.poll("meta1", known=known2) return (self.services.get(svc['addr']) for svc in res) return self._find_services(known, _lookup)
[docs] def assign_services(self, base, services, fail_if_already_set=False): """ Assign `services` to manage `base`. :param fail_if_already_set: raise ValueError if `base` is already managed by some services """ if fail_if_already_set and base in self.services_by_base: raise ValueError("Base %s already managed" % base) for svc in services: base_set = svc.get('bases') or set() base_set.add(base) svc['bases'] = base_set self.services_by_base[base] = services
[docs] def bootstrap(self, strategy=None): """ Build `self.num_bases()` assignations from scratch, using `strategy` to find new services. """ self.reset() if not strategy: strategy = self.find_services_random last_percent = 0 for base_int in xrange(0, self.num_bases()): base = "%0*X" % (self.digits, base_int) services = strategy() self.assign_services(base, services, fail_if_already_set=True) if self.logger: progress = ((base_int + 1) * 100) / self.num_bases() if progress / 10 > last_percent: last_percent = progress / 10 self.logger.info("%d%%", progress)
[docs] def count_pfx_by_svc(self): """ Build a dict with service addresses as keys and the number of managed bases as values. """ pfx_by_svc = dict() for svc in self.services.itervalues(): addr = svc["addr"] pfx_by_svc[addr] = len(self.get_managed_bases(svc)) return pfx_by_svc
[docs] def check_replicas(self): """Check that all bases have the right number of replicas""" error = False grand_total = 0 for base, services in self.services_by_base.iteritems(): if len(services) < self.replicas: if self.logger: self.logger.error( "Base %s is managed by %d services, %d required", base, len(services), self.replicas) self.logger.error("%s", [x["addr"] for x in services]) error = True elif len(services) > self.replicas: if self.logger: self.logger.warn( "Base %s is managed by %d services, %d expected", base, len(services), self.replicas) self.logger.warn("%s", [x["addr"] for x in services]) grand_total += len(services) if self.logger: self.logger.info("Grand total: %d (expected: %d)", grand_total, self.num_bases() * self.replicas) return not error
[docs] def decommission(self, svc, bases_to_remove=None, strategy=None): """ Unassign all bases of `bases_to_remove` from `svc`, and assign them to other services using `strategy`. """ if isinstance(svc, basestring): svc = self.services[svc] saved_score = svc["score"] svc["score"] = 0 if not bases_to_remove: bases_to_remove = list(svc.get("bases", list())) if not strategy: strategy = self.find_services_less_bases for base in bases_to_remove: try: self.services_by_base[base].remove(svc) except ValueError: pass try: svc["bases"].remove(base) except KeyError: pass new_svcs = strategy(known=self.services_by_base[base]) self.assign_services(base, new_svcs) svc["score"] = saved_score return set(bases_to_remove)
[docs] def rebalance(self, max_loops=65536): """Reassign bases from the services which manage the most""" if self.digits == 0: if self.logger: self.logger.info("No equilibration possible when " + "meta1_digits is set to 0") return None loops = 0 moved_bases = set() ideal_bases_by_svc = (self.num_bases() * self.replicas / len([x for x in self.services.itervalues() if self.get_score(x) > 0])) upper_limit = ideal_bases_by_svc + 1 if self.logger: self.logger.info("META1 Digits = %d", self.digits) self.logger.info("Replicas = %d", self.replicas) self.logger.info( "Scored positively = %d", len([x for x in self.services.itervalues() if self.get_score(x) > 0])) self.logger.info( "Ideal number of bases per meta1: %d, limit: %d", ideal_bases_by_svc, upper_limit) while loops < max_loops: candidates = self.services.values() candidates.sort(key=(lambda x: len(self.get_managed_bases(x)))) already_balanced = 0 while candidates: svc = candidates.pop() # service with most bases svc_bases = self.get_managed_bases(svc) if len(svc_bases) <= upper_limit: already_balanced += 1 continue if self.logger: self.logger.info("meta1 %s has %d bases, moving some", svc['addr'], len(svc_bases)) while (len(svc_bases) > upper_limit and loops < max_loops): bases = {base for base in random.sample( svc_bases, len(svc_bases) - upper_limit) if base not in moved_bases} if bases: moved = self.decommission(svc, bases) for base in moved: moved_bases.add(base) loops += 1 else: loops += 1 # safeguard against infinite loops if already_balanced >= len(self.services): break if self.logger: self.logger.info("%s bases moved", len(moved_bases)) for svc in sorted(self.services.values(), key=lambda x: x['addr']): svc_bases = self.get_managed_bases(svc) self.logger.info("meta1 %s has %d bases", svc['addr'], len(svc_bases)) return moved_bases
[docs]def count_prefixes(digits): """Returns the number of real prefixes in meta0/meta1. Raises an exception if the prefix number is not acceptable.""" if digits <= 4: return 16**digits raise ValueError('Invalid number of digits')
[docs]def generate_short_prefixes(digits): from itertools import product hexa = "0123456789ABCDEF" if digits == 0: return ('',) elif digits <= 4: return (''.join(pfx) for pfx in product(hexa, repeat=digits))
[docs]def generate_prefixes(digits): for p in generate_short_prefixes(digits): yield p.ljust(4, '0')