Source code for snmp_orm.adapters.abstract_adapter

"""Abstract adapter class."""
from __future__ import absolute_import

import logging

from pprint import pformat
from functools import wraps

from six import Iterator
from pyasn1.type.univ import Null

from snmp_orm.config import DEBUG
from snmp_orm.settings import SnmpV2Settings, SnmpV3Settings
from snmp_orm.utils import str_to_oid

logger = logging.getLogger(__name__)


[docs]class AbstractException(Exception): pass
[docs]def log(f): @wraps(f) def inner_wrapper(self, *args): logger.debug("[%s] Call %s%s" % (self.host, f.__name__, pformat(args))) result = f(self, *args) logger.debug("[%s] %s return %s" % (self.host, f.__name__, pformat(result))) return result if DEBUG: return inner_wrapper else: return f
[docs]class Walker(Iterator): """SNMP walker class""" def __init__(self, agent, baseoid, use_bulk=True, bulk_rows=None): self.baseoid = baseoid self.baseoid_len = len(baseoid) self.lastoid = baseoid self.agent = agent self.use_bulk = use_bulk self.bulk_rows = bulk_rows self.raise_stop = False def __iter__(self): return self def __next__(self): if self.raise_stop: raise StopIteration() if self.use_bulk: rows = self.agent.getbulk(self.bulk_rows, self.lastoid) else: rows = self.agent.getnext(self.lastoid) if not rows: raise StopIteration() if self.use_bulk: slice = 0 for oid, _ in reversed(rows): diff = self.baseoid_len - len(oid) if (diff == 0 and oid[:-1] == self.baseoid[:-1]) or \ (diff != 0 and oid[:diff] == self.baseoid): break else: slice += 1 if slice > 0: rows = rows[:0 - slice] self.raise_stop = True if not rows: raise StopIteration() self.lastoid = rows[-1][0] return rows
[docs]class AbstractAdapter(object): def __init__(self, settings_read, settings_write=None): settings_write = settings_write or settings_read.__class__() assert settings_write.__class__ == settings_read.__class__ _settings_write = settings_write.__class__() _settings_write.update(settings_read) _settings_write.update(settings_write) # Create session for host if isinstance(settings_read, SnmpV3Settings): session_getter = self.get_snmp_v3_session elif isinstance(settings_read, SnmpV2Settings): session_getter = self.get_snmp_v2_session else: raise TypeError self.host = settings_read["host"] self.settings_read = settings_read self.settings_write = _settings_write self.session_read = session_getter(**settings_read.prepare_kwargs()) if settings_read == _settings_write: self.session_write = self.session_read else: self.session_write = session_getter(**_settings_write.prepare_kwargs())
[docs] def get_snmp_v2_session(self, host, port, version, community, **kwargs): raise NotImplementedError()
[docs] def get_snmp_v3_session(self, host, port, version, sec_name=None, sec_level=None, auth_protocol=None, auth_passphrase=None, priv_protocol=None, priv_passphrase=None, **kwargs): raise NotImplementedError()
@log
[docs] def get(self, *args): """Return tuple of pairs: ((1, 3, 6, 1, 2, 1, 1, 1, 0), OctetString('DGS-3100-24 Gigabit stackable L2 Managed Switch')) """ return self.session_read.get(*map(str_to_oid, args))
[docs] def get_one(self, oid): """Return oid value""" variables = self.get(oid) if variables: result = variables[0][1] if not isinstance(result, Null): return result return None
@log
[docs] def getnext(self, *args): """Return table: [ ((1, 3, 6, 1, 2, 1, 1, 1, 0), OctetString('DGS-3100-24 Gigabit stackable L2 Managed Switch')), ((1, 3, 6, 1, 2, 1, 1, 2, 0), ObjectIdentifier('1.3.6.1.4.1.171.10.94.1')), ((1, 3, 6, 1, 2, 1, 1, 3, 0), TimeTicks('512281800')), ((1, 3, 6, 1, 2, 1, 1, 4, 0), OctetString('')) ] """ return self.session_read.getnext(*map(str_to_oid, args))
@log
[docs] def getbulk(self, rows=None, *args): """Return same as getnext method, but use rows number""" if rows is None: rows = self.settings_read["bulk_rows"] return self.session_read.getbulk(rows, *map(str_to_oid, args))
@log
[docs] def set(self, *args): #TODO: set more than one values return self.session_write.set(args)
[docs] def walk(self, oid): """Collect all rows in given OID""" oid = str_to_oid(oid) result = [] walker = Walker(self, oid, use_bulk=self.settings_read["use_bulk"], bulk_rows=self.settings_read["bulk_rows"]) for rows in walker: result.extend(rows) return result

Project Versions

This Page