Source code for oio.directory.meta0

# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


"""Meta0 client and meta1 balancing operations"""
import random
from math import ceil

from oio.directory.meta import MetaMapping
from oio.common.client import ProxyClient
from oio.common.exceptions import \
        ConfigurationException, OioException, PreconditionFailed
from oio.common.json import json


_LEVELS = {'site': 0, 'rack': 1, 'host': 2, 'volume': 3}


def _loc(svc):
    """Extract the location of a service and ensure it is well formed."""
    loc = svc.get("tags", {}).get("tag.loc", None)
    if not loc:
        raise ConfigurationException("No location on {0}".format(svc))
    tokens = loc.split('.')
    if len(tokens) > 4:
        raise ConfigurationException("Malformed location for {1}"
                                     .format(loc, svc))
    return tokens


def _loc_full(svc):
    """
    Extract the location of the given service and ensure it is well formed
    and that it is composed of exactly 4 tokens
    """
    tokens = _loc(svc)
    tokens = list(reversed(tokens))
    while len(tokens) < 4:
        tokens.append(None)
    return tuple(reversed(tokens))


def _slice_services(allsrv, level):
    """
    Generate slices of services, where a slice is a set of services sharing
    the same level of location.
    :param allsrv: A sorted sequence of services to extract the slices of.
    :type allsrv: a sequence of dictionnaries representing services.
    :param level: the position of the location token with the least
                 significance. Tokens below that position will be ignored.
    :type level: a positive integer
    :yield: 2-tuples meaning <start, end>
    """
    assert(level >= 0 and level <= 3)
    assert(len(allsrv) > 0)
    masks = (lambda loc: (loc[0], None, None, None),
             lambda loc: (loc[0], loc[1], None, None),
             lambda loc: (loc[0], loc[1], loc[2], None),
             lambda loc: loc)
    mask_func = masks[level]
    last_index, last = 0, mask_func(_loc_full(allsrv[0]))
    for idx, srv in enumerate(allsrv[1:], start=1):
        current = mask_func(_loc_full(srv))
        if current != last:
            yield last_index, idx
            last_index, last = idx, current
    yield last_index, len(allsrv)


def _patch_service(srv):
    s = dict()
    s.update(srv)
    s['bases'] = set()
    return s


def _prepare_for_bootstrap(allsrv):
    return sorted((_patch_service(s) for s in allsrv), key=_loc_full)


def _bootstrap(allsrv, allgroups, replicas, level, degradation=0):
    """
    Produce a list of services (dicts), with a fields named 'bases' in each
    service containing the bases managed by that service.
    :param allsrv: an iterable of services
    :param allgroups: an interable of the prefixes to spread over the services
    :param replicas: how many replicas are expected for each base
    :param level: the position of the location token with the least
                  significance. Tokens over that position are ignored.
    :param degradation: how many 'location levels' (as a whole) might be lost.
    """
    assert(degradation >= 0)
    assert(level >= 0 and level <= 3)

    # Ensure we work on a sorted sequence of service
    allsrv = tuple(_prepare_for_bootstrap(allsrv))

    # Extract the slices of service corresponding to the location levels
    # to be perfectly balanced.
    allslices = tuple(((s, e, set()) for s, e in
                       _slice_services(allsrv, level)))

    # Check the resilience conditions are satisfied
    quorum = int(replicas / 2) + 1
    worst_load = ceil(float(replicas) / len(allslices))
    if replicas - (worst_load * degradation) < quorum:
        fmt = "Balancing not satisfiable, {0} replicas wanted on {1}" + \
              " site(s), and an acceptable degradation of {2} site(s)."
        raise PreconditionFailed(
                fmt.format(replicas, len(allslices), degradation))

    # First affect to each slice a set of short prefixes
    for idx, group in enumerate(allgroups):
        current_slices = _modulo_peek(allslices, idx, replicas)
        for start, end, groups in current_slices:
            groups.add(group)

    # In each slice, spread the prefixes among the services of the slice.
    for start, end, groups in allslices:
        slen = end - start
        assert(slen > 0)
        for idx, group in enumerate(groups):
            current_srv = allsrv[start + (idx % slen)]
            current_srv['bases'].add(group)

    return allsrv


def _modulo_peek(tab, offset, num):
    """
    Extract <num> subsequent items from <tab>, starting at <offset>, restarting
    at 0 when the end of the array is reached.
    :param tab: a non-empty sequence with an accessor to a random element.
    :param offset: a positive natural number
    :param num: a strictly positive natural number
    :return: a new tuple of <num> elements from <tab>, starting at <offset>
    """
    result, tablen = list(), len(tab)
    assert(tablen > 0)
    assert(num > 0)
    assert(offset >= 0)
    for i in range(offset, offset+num):
        result.append(tab[i % tablen])
    return tuple(result)


[docs]class Meta0Client(ProxyClient): """Meta0 administration client""" def __init__(self, conf, **kwargs): super(Meta0Client, self).__init__(conf, request_prefix="/admin", **kwargs)
[docs] def force(self, mapping, **kwargs): """ Force the meta0 prefix mapping. The mapping may be partial to force only a subset of the prefixes. """ self._request('POST', "/meta0_force", data=mapping, **kwargs)
[docs] def list(self, **kwargs): """Get the meta0 prefix mapping""" _, obody = self._request('GET', "/meta0_list", **kwargs) return obody
[docs]class Meta0PrefixMapping(MetaMapping): """Represents the content of the meta0 database""" def __init__(self, meta0_client, replicas=3, digits=None, min_dist=1, **kwargs): """ :param replicas: number of services to allocate to manage a base :type replicas: strictly positive `int` :param digits: number of digits used to name the database files :type digits: `int` between 0 and 4 (inclusive) """ self.m0 = meta0_client self.replicas = int(replicas) if self.replicas < 1: raise ConfigurationException("replicas must be >= 1") if digits is None: digits = 4 self.digits = int(digits) if self.digits < 0: raise ConfigurationException("meta_digits must be >= 0") if self.digits > 4: raise ConfigurationException("meta_digits must be <= 4") self.min_dist = min_dist if self.min_dist < 1: raise ConfigurationException("min_dist must be >= 1") if self.min_dist > 4: raise ConfigurationException("min_dist must be <= 4") super(Meta0PrefixMapping, self).__init__( meta0_client.conf, ['meta1'], **kwargs)
[docs] def reset(self): super(Meta0PrefixMapping, self).reset() for svc_id, svc in self.services.items(): location = self.get_loc(svc) location_parts = location.rsplit('.', self.min_dist-1) location = '.'.join( [location_parts[0]] + ['*'] * len(location_parts[1:])) svc['location'] = location svc['upper_limit'] = 0
def _compute_upper_limit(self): """ Compute the right number bases each service should host, based on the total number of bases, the total number of service locations, and the number of services per location. """ available_svcs_by_location = dict() for svc_id, svc in self.services.items(): if self.get_score(svc) <= 0: continue location = svc['location'] available_svcs_by_location[location] = \ available_svcs_by_location.get(location, 0) + 1 if len(available_svcs_by_location) < self.replicas: raise ValueError( "Less than %s locations have a positve score, " "we won't rebalance (currently: %s)" % (self.replicas, len(available_svcs_by_location))) upper_limit_by_location = dict() for location, available_services \ in available_svcs_by_location.items(): ideal_bases = (float(self.num_bases()) * self.replicas / len(available_svcs_by_location) / available_services) upper_limit = int(ceil(ideal_bases)) upper_limit_by_location[location] = upper_limit self.logger.info( "Scored positively in %s = %d", location, available_services) self.logger.info( "Ideal number of bases per meta1 in %s: %f, limit: %d", location, ideal_bases, upper_limit) for svc in sorted(self.services.values(), key=lambda x: x['addr']): if self.get_score(svc) <= 0: svc['upper_limit'] = 0 else: svc['upper_limit'] = upper_limit_by_location[svc['location']] self.logger.info( "meta1 %s belongs to location %s " "and must have a maximum of %d bases", svc['addr'], svc['location'], svc['upper_limit']) @property def services(self): return self.services_by_service_type['meta1'] def _get_old_peers_by_base(self, base): return set(self.raw_services_by_base.get(base)) def _get_peers_by_base(self, base): return {v['addr'] for v in self.services_by_base.get(base, list())} def _get_service_type_by_base(self, base): return 'meta1' def _apply_link_services(self, moved_ok, **kwargs): try: self.m0.force(self.to_json(moved_ok).strip(), **kwargs) except OioException as exc: self.logger.warn( "Failed to link services for meta0: %s", exc) def __nonzero__(self): return bool(self.services_by_base)
[docs] def num_bases(self): """Get total the number of bases according to `self.digits`""" return 1 << (4 * self.digits)
[docs] def get_loc(self, svc, default=None): """ Get the location of a service. If location is not defined, return: - service IP address if `default` is None or "addr" - `default` for any other value. """ if isinstance(svc, basestring): svc = self.services.get(svc, {"addr": svc}) loc = svc.get("tags", {}).get("tag.loc", default) if not loc or loc == "addr": loc = svc["addr"].rsplit(":", 1)[0] return str(loc)
[docs] @staticmethod def dist_between(loc1, loc2): loc1_parts = loc1.split('.', 3) loc2_parts = loc2.split('.', 3) max_dist = max(len(loc1_parts), len(loc2_parts)) in_common = 0 loc1_part = loc1_parts.pop(0) loc2_part = loc2_parts.pop(0) try: while loc1_part == loc2_part: in_common += 1 loc1_part = loc1_parts.pop(0) loc2_part = loc2_parts.pop(0) except IndexError: pass return max_dist - in_common
[docs] def get_score(self, svc): """Get the score of a service, or 0 if it is unknown""" if isinstance(svc, basestring): svc = self.services.get(svc, {'addr': svc}) score = int(svc.get("score", 0)) return score
[docs] def get_managed_bases(self, svc): """Get the list of bases managed by the service""" if isinstance(svc, basestring): svc = self.services.get(svc, {'addr': svc}) return svc.get('bases', set())
[docs] def prefix_to_base(self, pfx): """ Get the name of the base the prefix will be saved in. When `self.digits` is 4, the name of the base is `pfx`. """ return str(pfx[:self.digits]).ljust(4, '0')
[docs] def prefix_siblings(self, pfx): """ Get the list of prefixes that share the same base as `pfx`. """ min_base = self.prefix_to_base(pfx) max_base = str(min_base[:self.digits]).ljust(4, 'F') return ["%04X" % base for base in xrange(int(min_base, 16), int(max_base, 16) + 1)]
def _extend(self, bases=None): """ Extend the mapping to all meta1 prefixes, if `self.digits` is less than 4. :param bases: if set, only extend the mapping for the specified bases """ extended = dict() if not bases: svc_by_base = self.services_by_base else: svc_by_base = {base: self.services_by_base[base] for base in bases} if self.digits == 4: # nothing to extend: there is one base for each prefix return svc_by_base for base, services in svc_by_base.iteritems(): for pfx in self.prefix_siblings(base): extended[pfx] = services return extended
[docs] def to_json(self, bases=None): """ Serialize the mapping to a JSON string suitable as input for 'meta0_force' request. """ simplified = dict() for pfx, services in self._extend(bases).iteritems(): simplified[pfx] = [x['addr'] for x in services] return json.dumps(simplified)
def _learn(self, base, addrs): services = list() for svc_addr in addrs: svc = self.services.get(svc_addr, {"addr": svc_addr}) services.append(svc) self.assign_services(base, services) # Deep copy the list self.raw_services_by_base[base] = [str(x) for x in addrs]
[docs] def load_json(self, json_mapping, **kwargs): """ Load the mapping from a JSON string """ if isinstance(json_mapping, basestring): raw_mapping = json.loads(json_mapping) elif isinstance(json_mapping, dict): raw_mapping = json_mapping else: raw_mapping = self.m0.list(**kwargs) # pylint: disable=no-member for pfx, services_addrs in raw_mapping.iteritems(): base = pfx[:self.digits] self._learn(base, services_addrs)
[docs] def load_meta0(self, json_mapping=None, **kwargs): """ Load the mapping from dictionnary out of the cluster, """ raw_mapping = self.m0.list(**kwargs) # pylint: disable=no-member for pfx, services_addrs in raw_mapping.iteritems(): base = pfx[:self.digits] self._learn(base, services_addrs)
def _find_services(self, known=None, lookup=None, max_lookup=50): """ Call `lookup` to find `self.replicas` different services. :param known: an iterable of already know services :param lookup: a function that returns an iterable of services """ services = known if known else list() known_locations = {self.get_loc(svc) for svc in services} iterations = 0 while len(services) < self.replicas and iterations < max_lookup: iterations += 1 svcs = lookup(services) if not svcs: break for svc in svcs: loc = self.get_loc(svc) if all(self.dist_between(loc, loc1) >= self.min_dist for loc1 in known_locations): known_locations.add(loc) services.append(svc) if len(services) >= self.replicas: break return services
[docs] def find_services_less_bases(self, known=None, min_score=1, **_kwargs): """Find `replicas` services, including the ones of `known`""" if known is None: known = list() filtered = [x for x in self.services.values() if self.get_score(x) >= min_score] # Reverse the list so we can quickly pop the service # with less managed bases filtered.sort(key=(lambda x: len(self.get_managed_bases(x))), reverse=True) def _lookup(known2): while filtered: svc = filtered.pop() if svc not in known2: return (svc,) return None return self._find_services(known, _lookup, len(filtered))
[docs] def find_services_more_availability(self, known=None, min_score=1, **_kwargs): """Find `replicas` services, including the ones of `known`""" if known is None: known = list() filtered = [x for x in self.services.values() if self.get_score(x) >= min_score] filtered.sort(key=(lambda svc: svc['upper_limit'] - len(self.get_managed_bases(svc)))) def _lookup(known2): while filtered: svc = filtered.pop() if svc not in known2: return (svc,) return None return self._find_services(known, _lookup, len(filtered))
[docs] def assign_services(self, base, services, fail_if_already_set=False): """ Assign `services` to manage `base`. :param fail_if_already_set: raise ValueError if `base` is already managed by some services """ if fail_if_already_set and base in self.services_by_base: raise ValueError("Base %s already managed" % base) for svc in services: base_set = svc.get('bases') or set() base_set.add(base) svc['bases'] = base_set self.services_by_base[base] = services
[docs] def bootstrap(self, level='volume', degradation=0): """ Spread the prefixe to balance accross sites. By default the token describing the site is <0>. Put one short_prefix into each slice (corresponding to a location level) and balance well into each level. At the end, expand each short prefixes into their complete prefixes. :param level: the kind of location token describing the level of fair balancing. :param degradation: how many location slices of the given level we should tolerate. :return: None """ assert(degradation >= 0) assert(level in _LEVELS) level = _LEVELS[level] # Perform a first bootstrap, producing a <srv,prefixes> mapping allgroups = tuple(generate_short_prefixes(self.digits)) allsrv = _bootstrap(self.services.values(), allgroups, self.replicas, level, degradation=degradation) # Compute the reverse <base,srv> mapping (mandatory book-keeping) bases = dict((pfx, set()) for pfx in allgroups) for srv in allsrv: for prefix in srv['bases']: bases[prefix].add(srv['addr']) for base, srv in bases.iteritems(): self._learn(base, srv)
[docs] def count_pfx_by_svc(self): """ Build a dict with service addresses as keys and the number of managed bases as values. """ pfx_by_svc = dict() for svc in self.services.itervalues(): addr = svc["addr"] pfx_by_svc[addr] = len(self.get_managed_bases(svc)) return pfx_by_svc
[docs] def check_replicas(self): """Check that all bases have the right number of replicas""" error = False grand_total = 0 for base, services in self.services_by_base.iteritems(): if len(services) < self.replicas: self.logger.error( "Base %s is managed by %d services, %d required", base, len(services), self.replicas) self.logger.error("%s", [x["addr"] for x in services]) error = True elif len(services) > self.replicas: self.logger.warn( "Base %s is managed by %d services, %d expected", base, len(services), self.replicas) self.logger.warn("%s", [x["addr"] for x in services]) grand_total += len(services) self.logger.info("Grand total: %d (expected: %d)", grand_total, self.num_bases() * self.replicas) return not error
[docs] def decommission(self, svc, bases_to_remove=None, strategy=None, try_=False, compute_upper_limit=True): """ Unassign all bases of `bases_to_remove` from `svc`, and assign them to other services using `strategy`. :param svc: service to decommission :type svc: `str` or `dict` :param bases_to_remove: bases to remove of the service :type bases_to_remove: `list` :param strategy: strategy to move the bases :type strategy: `function` :param try_: keep the bases on this service if the strategy doesn't find better, otherwise obligatorily remove the bases of this service :type try_: `bool` :param compute_upper_limit: compute the right number bases each service should host :type compute_upper_limit: `bool` """ if isinstance(svc, basestring): svc = self.services[svc] if not try_: saved_score = svc["score"] svc["score"] = 0 if compute_upper_limit: self._compute_upper_limit() bases_to_remove_checked = set() if bases_to_remove: # Remove extra digits and duplicates bases_to_remove = {b[:self.digits] for b in bases_to_remove} else: bases_to_remove = set(svc.get('bases', list())) if not strategy: strategy = self.find_services_more_availability for base in bases_to_remove: try: self.services_by_base[base].remove(svc) except ValueError: self.logger.warn('Base %s was not managed by %s', base, svc['addr']) continue try: svc["bases"].remove(base) except KeyError: pass new_svcs = strategy(known=self.services_by_base[base]) self.assign_services(base, new_svcs) bases_to_remove_checked.add(base) if not try_: svc["score"] = saved_score return bases_to_remove_checked
[docs] def rebalance(self, max_loops=4096): """Reassign bases from the services which manage the most""" moved_bases = dict() def _decommission(svc, bases, try_=False): moved = self.decommission(svc, bases, compute_upper_limit=False, try_=try_) nb_really_moved = 0 for base in moved: peers = self._get_peers_by_base(base) if try_ and svc['addr'] in peers: continue old_peers = self._get_old_peers_by_base(base) new_peers = [v for v in peers if v not in old_peers] if len(new_peers) != 1: raise ValueError( 'New missing or too many peers for base %s' % base) moved_bases[base] = new_peers[0] nb_really_moved += 1 return nb_really_moved def _log_future_state(): self.logger.info("%s bases will move", len(moved_bases)) for svc in sorted(self.services.values(), key=lambda x: x['addr']): svc_bases = self.get_managed_bases(svc) more_info = '' bases_too_many = len(svc_bases) - svc['upper_limit'] if bases_too_many > 0: more_info = ' (still %d bases in excess)' % bases_too_many self.logger.info("meta1 %s will host %d bases%s", svc['addr'], len(svc_bases), more_info) if self.digits == 0: self.logger.warn("No equilibration possible when " + "meta1_digits is set to 0") return None self.logger.info("Meta1 digits = %d", self.digits) self.logger.info("Replicas = %d", self.replicas) self.logger.info("Minimum distance = %d", self.min_dist) self._compute_upper_limit() # First, respect the minimum distance self.logger.info('First, find the bases ' 'that do not respect the minimum distance') for base, addrs in self.raw_services_by_base.items(): addrs_by_location = dict() for addr in addrs: addrs_by_location.setdefault( self.services[addr]['location'], list()).append(addr) if len(addrs_by_location) == self.replicas: continue for location, svc_addrs in addrs_by_location.items(): if len(svc_addrs) < 2: continue # Decommission base of the fuller service svcs = [self.services.get(svc_addr, {'addr': svc_addr}) for svc_addr in svc_addrs] svcs.sort(key=(lambda svc: svc['upper_limit'] - len(self.get_managed_bases(svc)))) _decommission(svcs[0], [base]) # Move a single base on all replicas break _log_future_state() # Second, rebalances the number of bases on each meta1 self.logger.info('Second, rebalance the bases in each location') loops = 0 rebalance_again = True while loops < max_loops and rebalance_again: loops += 1 # safeguard against infinite loops self.logger.info("Loop %d", loops) candidates = self.services.values() candidates.sort(key=(lambda x: len(self.get_managed_bases(x)))) rebalance_again = False while candidates: svc = candidates.pop() # service with most bases svc_bases = self.get_managed_bases(svc) upper_limit = svc['upper_limit'] if len(svc_bases) <= upper_limit: self.logger.info( "meta1 %s is already balanced with %d bases", svc['addr'], len(svc_bases)) continue self.logger.info("meta1 %s has %d bases, moving some", svc['addr'], len(svc_bases)) bases_never_moved = set() bases_already_moved = set() for base in svc_bases: # Move a single base on all replicas svc_addr = moved_bases.get(base) if svc_addr is None: bases_never_moved.add(base) elif svc_addr == svc['addr']: bases_already_moved.add(base) if not bases_never_moved \ and not bases_already_moved: self.logger.info("meta1 %s has no base available to move", svc['addr']) continue rebalance_again = True nb_bases_to_move = len(svc_bases) - upper_limit while nb_bases_to_move > 0 \ and (bases_already_moved or bases_never_moved): # Try moving the already moved bases first # to minimize the number of bases to move bases_to_move = set() for base in random.sample( bases_already_moved, min(len(bases_already_moved), nb_bases_to_move)): bases_to_move.add(base) bases_already_moved.remove(base) for base in random.sample( bases_never_moved, min(len(bases_never_moved), nb_bases_to_move - len(bases_to_move))): bases_to_move.add(base) bases_never_moved.remove(base) nb_bases_to_move -= _decommission(svc, bases_to_move, try_=True) _log_future_state() return moved_bases.keys()
[docs]def count_prefixes(digits): """Returns the number of real prefixes in meta0/meta1. Raises an exception if the prefix number is not acceptable.""" if digits <= 4: return 16**digits raise ValueError('Invalid number of digits')
[docs]def generate_short_prefixes(digits): if digits == 0: return ('',) elif digits <= 4: from itertools import product hexa = "0123456789ABCDEF" return (''.join(pfx) for pfx in product(hexa, repeat=digits))
[docs]def generate_prefixes(digits): for pfx in generate_short_prefixes(digits): yield pfx.ljust(4, '0')