Source code for oio.directory.meta2

# Copyright (C) 2018-2019 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


from oio.conscience.client import ConscienceClient
from oio.common.decorators import ensure_request_id2
from oio.common.logger import get_logger
from oio.directory.admin import AdminClient
from oio.directory.client import DirectoryClient
from oio.rdir.client import RdirClient


[docs]class Meta2Database(object): """ Execute maintenance operations on meta2 databases (or compatible services). """ def __init__(self, conf, logger=None, admin_client=None, conscience_client=None, rdir_client=None, directory_client=None, service_type='meta2'): self.conf = conf self.service_type = service_type self.logger = logger or get_logger(self.conf) self._admin = admin_client self._conscience = conscience_client self._rdir = rdir_client self._directory = directory_client self.services_by_base = dict() self.all_service_ids = list() self.reload_all_services() @property def admin(self): if not self._admin: self._admin = AdminClient(self.conf) return self._admin @property def conscience(self): if not self._conscience: self._conscience = ConscienceClient(self.conf) return self._conscience @property def rdir(self): if not self._rdir: self._rdir = RdirClient(self.conf) return self._rdir @property def directory(self): if not self._directory: self._directory = DirectoryClient(self.conf) return self._directory
[docs] def reset_peers(self): """ Reset the base allocations and reload the services from Conscience. """ self.services_by_base.clear()
[docs] def reload_all_services(self): """ Load the list of all services of type `self.service_type`. """ self.all_service_ids = [ s['id'] for s in self.conscience.all_services(self.service_type)]
[docs] @staticmethod def get_cid_and_seq(base): len_base = len(base) if len_base > 64: try: if base[64] != '.': raise ValueError() seq = int(base[65:]) return (base[:64], seq) except ValueError: raise ValueError( "Bad format for the base name (base=%s)" % base) else: return (base.ljust(64, '0'), None)
def _get_bases_seq(self, base, **kwargs): base_seqs = dict() cid, seq = self.get_cid_and_seq(base) linked_services = self.directory.list(cid=cid, **kwargs) for service in linked_services['srv']: if service['type'] != self.service_type: continue if seq is not None and seq != service['seq']: continue bseq = cid + "." + str(service['seq']) services = base_seqs.get(bseq, dict()) services[service.pop('host')] = service base_seqs[bseq] = services for bseq, services in base_seqs.items(): self.services_by_base[bseq] = services # FIXME(adu): Check nb of peers yield bseq def _get_peers(self, bseq): return self.services_by_base[bseq].keys() def _set_peers(self, bseq, src, dst, **kwargs): cid, _ = self.get_cid_and_seq(bseq) current_peers = self._get_peers(bseq) new_peers = [v for v in current_peers if v != src] new_peers.append(dst) if len(current_peers) != len(new_peers): raise ValueError("Not the same number of peers " "(current_peers=%s new_peers=%s)" % ( current_peers, new_peers)) self.logger.debug( "Setting peers (base=%s new_peers=%s)", bseq, new_peers) self.admin.set_peers(self.service_type, cid=cid, peers=new_peers, **kwargs) self.services_by_base[bseq][dst] = self.services_by_base[bseq].pop(src) def _get_args(self, bseq): for _host, service in self.services_by_base[bseq].items(): return service['args'] def _check_src_service(self, bseq, src): peers = self._get_peers(bseq) if src not in peers: raise ValueError( "Source service isn't used (peers=%s)" % peers) def _get_service_full_id(self, service): return self.conf['namespace'] + "|" + self.service_type + "|" + service def _check_dst_service(self, bseq, src, dst, **kwargs): peers = self._get_peers(bseq) if dst is None: known = [self._get_service_full_id(v) for v in peers if v != src] avoid = [self._get_service_full_id(src)] try: services_found = self.conscience.poll( self.service_type, known=known, avoid=avoid, **kwargs) dst = services_found[0].get('tags', dict()).get( 'tag.service_id', None) if dst is None: dst = services_found[0]['addr'] except Exception as exc: # pylint: disable=broad-except self.logger.error( "Failed to poll service (base=%s src=%s peers=%s known=%s " "avoid=%s): %s", bseq, peers, src, known, avoid, exc) raise if dst in peers: raise ValueError( "Destination service is already used (peers=%s)" % peers) if dst not in self.all_service_ids: raise ValueError( "Destination service must be a %s service" % self.service_type) return dst def _has_base(self, bseq, **kwargs): """ Check which of the old peers actually host the database. """ cid, _ = self.get_cid_and_seq(bseq) peers_to_copy_from = list() master = None has = self.admin.has_base(self.service_type, cid=cid, **kwargs) for service, status in has.items(): if status['status']['status'] == 200: peers_to_copy_from.append(service) continue self.logger.warn( "Missing base (base=%s service=%s status=%s)", bseq, service, status) if not peers_to_copy_from: raise ValueError("No base to copy") try: election = self.admin.election_status(self.service_type, cid=cid, **kwargs) for service, status in election['peers'].items(): if status['status']['status'] == 200: master = service break except Exception as exc: # pylint: disable=broad-except self.logger.warn( "Failed to get election status (base=%s): %s", bseq, exc) if master is None: self.logger.warn("No master") elif master not in peers_to_copy_from: self.logger.warn( "Master service %s for %s does not host the base!", master, bseq) else: # Prefer to copy the master peers_to_copy_from.remove(master) peers_to_copy_from.append(master) peers_to_copy_from.reverse() return peers_to_copy_from def _copy_base(self, bseq, dst, peers_to_copy_from, **kwargs): cid, _ = self.get_cid_and_seq(bseq) for true_src in peers_to_copy_from: self.logger.debug( "Copying base (base=%s true_src=%s dst=%s)", bseq, true_src, dst) try: self.admin.copy_base_from( self.service_type, cid=cid, svc_from=true_src, svc_to=dst, **kwargs) break except Exception as exc: # pylint: disable=broad-except self.logger.warn( "Failed to copy base (base=%s true_src=%s dst=%s): %s", bseq, true_src, dst, exc) if true_src == peers_to_copy_from[-1]: raise def _link_service(self, bseq, src, dst, **kwargs): cid, seq = self.get_cid_and_seq(bseq) peers = self._get_peers(bseq) args = self._get_args(bseq) self.directory.force( cid=cid, replace=True, service_type=self.service_type, services=dict(type=self.service_type, host=','.join(peers), args=args, seq=seq), **kwargs) # FIXME(ABO): This part can be removed when, either: # - meta1 sends the removed services bundled with the # account.services events. # - meta2 sends a storage.container.deleted event when the # sqliterepo layer is the one that notifies the deletion of # the databases. if self.service_type == 'meta2': self.rdir.meta2_index_delete(volume_id=src, container_id=cid, **kwargs) def _reset_election(self, bseq, src, dst, **kwargs): """ Reset the election, try to remove `base` from its old host, then trigger an election with the new peers. """ cid, _ = self.get_cid_and_seq(bseq) peers = self._get_peers(bseq) if src in peers: raise ValueError("Source service is already among the peers") self.logger.debug( "Resetting election (base=%s src=%s dst=%s)", bseq, src, dst) self.admin.election_leave(self.service_type, cid=cid, **kwargs) try: self.admin.remove_base(self.service_type, cid=cid, service_id=src, **kwargs) except Exception as exc: # pylint: disable=broad-except self.logger.warn( "Failed to remove source base (base=%s src=%s dst=%s): %s", bseq, src, dst, exc) try: election = self.admin.election_status(self.service_type, cid=cid, **kwargs) for service, status in election['peers'].items(): if status['status']['status'] in (200, 303): continue self.logger.warn( "Election not started (base=%s src=%s dst=%s service=%s " "status=%s)", bseq, src, dst, service, status) except Exception as exc: # pylint: disable=broad-except self.logger.warn( "Failed to get election status (base=%s src=%s dst=%s): %s", bseq, src, dst, exc) def _safe_move(self, bseq, src, dst, **kwargs): err = None try: self._check_src_service(bseq, src) dst = self._check_dst_service(bseq, src, dst, **kwargs) self.logger.debug( "Moving base (base=%s src=%s dst=%s)", bseq, src, dst) try: self.logger.debug("Step 1: check available bases.") peers_to_copy_from = self._has_base(bseq, **kwargs) except Exception as exc: # pylint: disable=broad-except self.logger.error( "Failed to check if each peer exists (base=%s src=%s " "dst=%s): %s", bseq, src, dst, exc) raise try: self.logger.debug("Step 2: set the new peers in the base.") self._set_peers(bseq, src, dst, **kwargs) except Exception as exc: # pylint: disable=broad-except self.logger.error( "Failed to set new peers (base=%s src=%s dst=%s): %s", bseq, src, dst, exc) raise try: self.logger.debug("Step 3: copy the database.") self._copy_base(bseq, dst, peers_to_copy_from, **kwargs) except Exception as exc: # pylint: disable=broad-except self.logger.error( "Failed to copy base (base=%s src=%s dst=%s): %s", bseq, src, dst, exc) raise try: self.logger.debug("Step 4: set the peers in meta1.") self._link_service(bseq, src, dst, **kwargs) except Exception as exc: # pylint: disable=broad-except self.logger.error( "Failed to link service (base=%s src=%s dst=%s): %s", bseq, src, dst, exc) raise try: self.logger.debug("Step 5: reset the election.") self._reset_election(bseq, src, dst, **kwargs) except Exception as exc: # pylint: disable=broad-except self.logger.error( "Failed to reset election (base=%s src=%s dst=%s): %s", bseq, src, dst, exc) raise except Exception as exc: # pylint: disable=broad-except self.logger.error( "Failed to move base (base=%s src=%s dst=%s): %s", bseq, src, dst, exc) err = exc return dst, err
[docs] @ensure_request_id2(prefix='m2mov-') def move(self, base, src, dst=None, **kwargs): """ Move a database from `src` to `dst`. If `dst` is None, find one automatically. """ self.reset_peers() try: bases_seq = self._get_bases_seq(base, **kwargs) except Exception as exc: # pylint: disable=broad-except self.logger.error( "Failed to load peers (base=%s src=%s dst=%s): %s", base, src, dst, exc) yield {'base': base, 'src': src, 'dst': dst, 'err': exc} return for bseq in bases_seq: _dst, err = self._safe_move(bseq, src, dst, **kwargs) yield {'base': bseq, 'src': src, 'dst': _dst, 'err': err}
def _safe_rebuild(self, bseq, **kwargs): err = None try: self.logger.debug("Rebuilding base (base=%s)") try: self.logger.debug("Step 1: check available bases.") available_bases = self._has_base(bseq, **kwargs) except Exception as exc: # pylint: disable=broad-except self.logger.error( "Failed to check if each peer exists (base=%s): %s", bseq, exc) raise exceptions = list() missing_bases = [x for x in self._get_peers(bseq) if x not in available_bases] if missing_bases: self.logger.debug("Step 2: copy the database.") for dst in missing_bases: try: self._copy_base(bseq, dst, available_bases, **kwargs) except Exception as exc: # pylint: disable=broad-except self.logger.error( "Failed to copy base (base=%s dst=%s): %s", bseq, dst, exc) exceptions.append(exc) if exceptions: raise Exception(exceptions) except Exception as exc: # pylint: disable=broad-except self.logger.error( "Failed to rebuild base (base=%s): %s", bseq, exc) err = exc return err
[docs] @ensure_request_id2(prefix='m2reb-') def rebuild(self, base, **kwargs): """ Rebuild a database. """ self.reset_peers() try: bases_seq = self._get_bases_seq(base, **kwargs) except Exception as exc: # pylint: disable=broad-except self.logger.error( "Failed to load peers (base=%s): %s", base, exc) yield {'base': base, 'err': exc} return for bseq in bases_seq: err = self._safe_rebuild(bseq, **kwargs) yield {'base': bseq, 'err': err}