# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import re
import pkg_resources
from six.moves import configparser
DEFAULT_HANDLER = 'egg:oio#default'
_loaders = {}
class _Type(object):
name = None
egg_protocols = None
def invoke(self, ctx, **kwargs):
pass
class _Handler(_Type):
name = 'handler'
egg_protocols = ['oio.event.handler_factory']
config_prefixes = ['handler']
def invoke(self, context, **kwargs):
if context.protocol == 'oio.event.handler_factory':
app = kwargs.pop('app')
return context.object(
app, context.global_conf, **context.local_conf)
else:
assert 0, 'Protocol %r unknown' % context.protocol
HANDLER = _Handler()
class _Filter(_Type):
name = 'filter'
egg_protocols = ['oio.event.filter_factory']
config_prefixes = ['filter']
def invoke(self, context, **kwargs):
if context.protocol == 'oio.event.filter_factory':
return context.object(context.global_conf, **context.local_conf)
else:
assert 0, 'Protocol %r unknown' % context.protocol
FILTER = _Filter()
class _Pipeline(_Type):
name = 'pipeline'
def invoke(self, context, **kwargs):
app = context.handler_context.create(**kwargs)
filters = [c.create(**kwargs) for c in context.filter_contexts]
filters.reverse()
for filter_ in filters:
app = filter_(app)
return app
PIPELINE = _Pipeline()
[docs]def loadhandlers(path, global_conf=None, **kwargs):
loader = ConfigLoader(path)
handlers = {}
handlers.update(
(name[8:], loadhandler(loader, name[8:], global_conf, **kwargs))
for name in loader.get_sections(prefix="handler")
)
return handlers
[docs]def loadhandler(loader, name, global_conf=None, **kwargs):
context = loader.get_context(HANDLER, name, global_conf)
return context.create(**kwargs)
[docs]def loadcontext(obj_type, uri, name=None, global_conf=None):
if '#' in uri:
if name is None:
uri, name = uri.split('#', 1)
else:
uri = uri.split('#', 1)[0]
if ':' not in uri:
raise LookupError("URI scheme invalid %r" % uri)
scheme, path = uri.split(':', 1)
scheme = scheme.lower()
if scheme not in _loaders:
raise LookupError('URI scheme unknown: %r' % scheme)
return _loaders[scheme](
obj_type, uri, path, name=name, global_conf=global_conf)
def _loadegg(obj_type, uri, spec, name, global_conf):
loader = EggLoader(spec)
return loader.get_context(obj_type, name, global_conf)
_loaders['egg'] = _loadegg
class _Loader(object):
pass
[docs]class CustomConfigParser(configparser.ConfigParser):
def __init__(self, filename, *args, **kwargs):
configparser.ConfigParser.__init__(self, *args, **kwargs)
self.filename = filename
[docs]class ConfigLoader(object):
def __init__(self, filename):
self.filename = filename = filename.strip()
defaults = {
'__file__': os.path.abspath(filename)}
self.parser = CustomConfigParser(filename, defaults=defaults)
self.parser.optionxform = str
with open(filename) as f:
self.parser.readfp(f)
_absolute_re = re.compile(r'^[a-zA-Z]+:')
[docs] def absolute_name(self, name):
if name is None:
return False
return self._absolute_re.search(name)
[docs] def get_context(self, obj_type, name=None, global_conf=None):
if self.absolute_name(name):
return loadcontext(obj_type, name, global_conf=global_conf)
section = self.find_config_section(
obj_type, name=name)
defaults = self.parser.defaults()
_global_conf = defaults.copy()
if global_conf is not None:
_global_conf.update(global_conf)
global_conf = _global_conf
local_conf = {}
for option in self.parser.options(section):
if option in defaults:
continue
local_conf[option] = self.parser.get(section, option)
if obj_type == HANDLER:
context = self._pipeline_context(
obj_type, section, name=name, global_conf=global_conf,
local_conf=local_conf)
elif 'use' in local_conf:
context = self._context_from_use(
obj_type, local_conf, global_conf, section)
else:
raise LookupError(
"Invalid section config %r" % section)
return context
def _pipeline_context(self, obj_type, section, name, global_conf,
local_conf):
if 'pipeline' not in local_conf:
pipeline = []
else:
pipeline = local_conf.pop('pipeline').split()
context = LoaderContext(
None, PIPELINE, None, global_conf, local_conf, self)
# context.handler_context = self._context_from_use(
# obj_type, local_conf, global_conf, section)
context.handler_context = loadcontext(
HANDLER, DEFAULT_HANDLER, global_conf=global_conf)
context.filter_contexts = [
self.get_context(FILTER, n, global_conf)
for n in pipeline]
return context
def _context_from_use(self, obj_type, local_conf, global_conf, section):
use = local_conf.pop('use', None)
if not use:
raise LookupError("Missing 'use' in section config %r" % section)
context = self.get_context(
obj_type, name=use, global_conf=global_conf)
context.local_conf.update(local_conf)
context.loader = self
if context.protocol is None:
section_protocol = section.split(':', 1)[0]
if section_protocol in ('handler'):
context.protocol = 'handler_factory'
else:
context.protocol = '%s_factory' % section_protocol
return context
[docs] def find_config_section(self, obj_type, name=None):
sections = []
for prefix in obj_type.config_prefixes:
found = self._find_sections(
self.parser.sections(), prefix, name)
if found:
sections.extend(found)
break
if not sections:
raise LookupError(
'No section %r found in config %r' % (name, self.filename))
if len(sections) > 1:
raise LookupError(
'Ambiguous section %r found in config %r'
% (name, self.filename))
return sections[0]
def _find_sections(self, sections, prefix, name):
found = []
if name is None:
if prefix in sections:
found.append(prefix)
name = 'main'
for section in sections:
if section.startswith(prefix + ':'):
if section[len(prefix) + 1:].strip() == name:
found.append(section)
return found
[docs] def get_sections(self, prefix=None):
return [section for section in self.parser.sections()
if not prefix or section.startswith(prefix + ':')]
[docs]class EggLoader(_Loader):
def __init__(self, spec):
self.spec = spec
[docs] def get_context(self, obj_type, name=None, global_conf=None):
entry_point, protocol, ep_name = self.find_egg_ep(obj_type, name=name)
distribution = pkg_resources.get_distribution(self.spec)
return LoaderContext(
entry_point, obj_type, protocol, global_conf or {}, {}, self,
distribution=distribution, ep_name=ep_name)
[docs] def find_egg_ep(self, obj_type, name=None):
if name is None:
name = 'main'
entries = []
for protocol in obj_type.egg_protocols:
pkg_resources.require(self.spec)
entry = pkg_resources.get_entry_info(
self.spec, protocol, name)
if entry is not None:
entries.append((entry.load(), protocol, entry.name))
break
if not entries:
raise LookupError(
"Entry point %r not found in egg %r"
% (name, self.spec))
if len(entries) > 1:
raise LookupError(
"Ambiguous entry for %r in egg %r"
% (name, self.spec))
return entries[0]
[docs]class LoaderContext(object):
def __init__(self, obj, obj_type, protocol, global_conf, local_conf,
loader, distribution=None, ep_name=None):
self.object = obj
self.obj_type = obj_type
self.protocol = protocol
self.global_conf = global_conf
self.local_conf = local_conf
self.loader = loader
self.distribution = distribution
self.ep_name = ep_name
[docs] def create(self, **kwargs):
return self.obj_type.invoke(self, **kwargs)
[docs] def config(self):
conf = AttrDict(self.global_conf)
conf.update(self.local_conf)
conf.local_conf = self.local_conf
conf.global_conf = self.global_conf
conf.context = self
return conf
[docs]class AttrDict(dict):
pass