Source code for snmp_orm.utils

"""Some useful tools."""
from __future__ import absolute_import

import sys
import inspect
import pkgutil
import importlib

from six import string_types, binary_type, b, reraise
from pyasn1.type.univ import ObjectIdentifier


[docs]def get_all_parents(cls): parents = [] parents.extend(cls.__bases__) left_bases = [] left_bases.extend(cls.__bases__) while left_bases: for base in left_bases[:]: left_bases.extend(base.__bases__) parents.extend(base.__bases__) left_bases.remove(base) parents.reverse() return tuple(parents)
[docs]def str_to_oid(s): if isinstance(s, string_types + (binary_type, )): return tuple(int(val) for val in s.split(".")) elif isinstance(s, ObjectIdentifier): return tuple(s) else: return s
[docs]def oid_to_str(t): try: return ".".join(str(val) for val in t) except TypeError: return t
[docs]def symbol_by_name(name, aliases={}, imp=None, package=None, sep='.', default=None, **kwargs): """Get symbol by qualified name. The name should be the full dot-separated path to the class:: modulename.ClassName Example:: celery.concurrency.processes.TaskPool ^- class name or using ':' to separate module and symbol:: celery.concurrency.processes:TaskPool If `aliases` is provided, a dict containing short name/long name mappings, the name is looked up in the aliases first. Examples: >>> symbol_by_name("celery.concurrency.processes.TaskPool") <class 'celery.concurrency.processes.TaskPool'> >>> symbol_by_name("default", { ... "default": "celery.concurrency.processes.TaskPool"}) <class 'celery.concurrency.processes.TaskPool'> # Does not try to look up non-string names. >>> from celery.concurrency.processes import TaskPool >>> symbol_by_name(TaskPool) is TaskPool True """ if imp is None: imp = importlib.import_module if not isinstance(name, string_types): return name # already a class name = aliases.get(name) or name sep = ':' if ':' in name else sep module_name, _, cls_name = name.rpartition(sep) if not module_name: cls_name, module_name = None, package if package else cls_name try: try: module = imp(module_name, package=package, **kwargs) except ValueError as exc: exc = ValueError("Couldn't import %r: %s" % (name, exc)) reraise(ValueError, exc, sys.exc_info()[2]) return getattr(module, cls_name) if cls_name else module except (ImportError, AttributeError): if default is None: raise return default
[docs]def load_modules(packages): """Find all attributes of given modules.""" for package in packages: for _, name, _ in pkgutil.walk_packages(package.__path__, package.__name__ + '.'): yield importlib.import_module(name) raise StopIteration()
[docs]def find_classes(cls, packages): """Find all subclass of given class.""" entities = set() for module in load_modules(packages): for attr in dir(module): entity = getattr(module, attr) if inspect.isclass(entity) and \ issubclass(entity, cls) and \ entity is not cls: if entity not in entities: yield entity entities.add(entity) raise StopIteration()

Project Versions

This Page