Source code for snmp_orm.device

"""Manager for devices."""
from __future__ import absolute_import

import logging

from six import b, u, string_types, binary_type
from pyasn1.type.univ import ObjectIdentifier

from snmp_orm import devices
from snmp_orm.utils import find_classes, oid_to_str, symbol_by_name
from snmp_orm.adapter import get_adapter
from snmp_orm.config import OID_OBJECT_ID

logger = logging.getLogger(__name__)


[docs]def maybe_oid_to_str(objectId): """Convert given OID to string if needed.""" if objectId is None: return None elif isinstance(objectId, binary_type): return objectId elif isinstance(objectId, string_types): return b(objectId) elif isinstance(objectId, ObjectIdentifier): return b(objectId) elif isinstance(objectId, (tuple, list)): return oid_to_str(objectId) else: raise ValueError('Unknown OID %r' % objectId)
[docs]class DeviceClassRegistry(dict): """Store mapping of OID to device class. Arguments: - **packages**, iterable, contains list of packages with defined :class:`devices.Device`; - **default**, default class, that should returned if no class found; """ #: Abstract device class, used to find other devices in package. AbstractDevice = devices.AbstractDevice def __init__(self, packages=None, default=None): self.packages = tuple(symbol_by_name(maybe_path) for maybe_path in (packages or [devices])) self.default_device = symbol_by_name(default or devices.DefaultDevice) for cls in self.find_devices(): self.add(cls)
[docs] def find_devices(self): """Find existed device classes.""" for cls in find_classes(self.AbstractDevice, self.packages): yield cls
[docs] def add(self, cls): """Add class to registry.""" assert issubclass(cls, self.AbstractDevice), 'wrong cls given' objectIds = cls.classId if not isinstance(objectIds, (list, tuple, set)): objectIds = (objectIds, ) for objectId in objectIds: objectId = maybe_oid_to_str(objectId) if objectId and objectId not in self: self[objectId] = cls
def __getitem__(self, key): """Get class by it's OID or return default.""" objectId = maybe_oid_to_str(key) try: return super(DeviceClassRegistry, self).__getitem__(objectId) except KeyError: return self.default_device get_class = __getitem__
[docs]class DeviceManager(object): """Used to get class of given host.""" Registry = DeviceClassRegistry def __init__(self, registry=None): self._cache = {} self.registry = registry or self.Registry() def _get_device_id(self, host, **kwargs): host = u(host).lower() logger.debug("Get device class for %s" % host) try: oid = self._cache[host] except KeyError: oid = self._cache[host] = self._autodetect_id(host, **kwargs) return oid def _autodetect_id(self, host, **kwargs): logger.debug("Trying to detect objectId for %s" % host) adapter = get_adapter(host, **kwargs) id_ = oid_to_str(adapter.get_one(OID_OBJECT_ID)) assert id_ is not None, 'empty OID returned' logger.debug("objectId for %s is %s" % (host, id_)) return id_
[docs] def get_class(self, host, **kwargs): registry = kwargs.pop('registry', None) registry = self.registry if registry is None else registry cls = registry[self._get_device_id(host, **kwargs)] logger.debug("Use %r for %s" % (cls, host)) return cls #: Default device manager instance.
default_manager = DeviceManager()
[docs]def get_device(host, **kwargs): """Return `Device` instance for specified host. Arguments for device (for write access use `write_` prefix): - **host** -- IP or hostname if snmp agent; - **port** -- agent UDP port, default 161; - **version** -- SNMP version, 1, 2 or 3; - **registry** -- custom registry class for class lookup; - **class_name** -- adapter class; - **community** -- SNMP community; - **sec_name** -- security name; - **sec_level** -- security level; - **auth_protocol** -- auth protocol; - **auth_passphrase** -- auth passphrase; - **priv_protocol** -- priv protocol; - **priv_passphrase** -- priv passphrase. Other arguments: - **manager** custom instance of manager class; - **device_cls** custom device class, will be used instead of autolookup. """ manager = symbol_by_name(kwargs.pop('manager', default_manager)) cls = symbol_by_name(kwargs.pop('device_cls', None)) cls = manager.get_class(host=host, **kwargs) if cls is None else cls return cls(host, **kwargs)

Project Versions

This Page