Source code for snmp_orm.adapters.pysnmp

"""Abstract adapter class."""
from __future__ import absolute_import

from pysnmp import error as pysnmp_error
from pysnmp.entity.rfc3413.oneliner.cmdgen import CommunityData, UsmUserData, \
    UdpTransportTarget, CommandGenerator

from snmp_orm.utils import str_to_oid
from snmp_orm.adapters.base import AbstractAdapter, AbstractException


[docs]class PySNMPError(AbstractException): pass
[docs]class AbstractSession(object): def __init__(self, host, port=None): self.transportTarget = UdpTransportTarget((host, port)) self.authData = None self.generator = CommandGenerator()
[docs] def format_varBinds(self, varBinds): return [(str_to_oid(oid), value) for oid, value in varBinds]
[docs] def format_varBindTable(self, varBindTable): result = [] for varBinds in varBindTable: result.extend(self.format_varBinds(varBinds)) return result
[docs] def handle_error(self, errorIndication, errorStatus, errorIndex, varBinds=None, varBindTable=None): if errorIndication: raise PySNMPError(errorIndication) elif errorStatus: variables = varBinds or varBindTable[-1] text = errorStatus.prettyPrint() position = errorIndex and variables[int(errorIndex) - 1] or '?' raise PySNMPError("%s at %s" % (text, position))
[docs] def get(self, *args): try: errorIndication, errorStatus, \ errorIndex, varBinds = self.generator.getCmd( self.authData, self.transportTarget, *args) except pysnmp_error.PySnmpError as e: # handle origin PySNMPError from pysnmp module. errorIndication = e errorStatus, errorIndex, varBinds = None, None, [] self.handle_error(errorIndication, errorStatus, errorIndex, varBinds) return self.format_varBinds(varBinds)
[docs] def set(self, *args): errorIndication, errorStatus, \ errorIndex, varBinds = self.generator.setCmd( self.authData, self.transportTarget, *args) self.handle_error(errorIndication, errorStatus, errorIndex, varBinds) return self.format_varBinds(varBinds)
[docs] def getnext(self, *args): errorIndication, errorStatus, errorIndex, \ varBindTable = self.generator.nextCmd( self.authData, self.transportTarget, *args) self.handle_error( errorIndication, errorStatus, errorIndex, None, varBindTable) return self.format_varBindTable(varBindTable)
[docs] def getbulk(self, rows, *args): errorIndication, errorStatus, errorIndex, \ varBindTable = self.generator.bulkCmd( self.authData, self.transportTarget, 0, rows, *args) self.handle_error( errorIndication, errorStatus, errorIndex, None, varBindTable) return self.format_varBindTable(varBindTable)
[docs]class Session(AbstractSession): def __init__(self, host, port, version, community): super(Session, self).__init__(host, port) self.authData = CommunityData( 'agent', community, None if version == 2 else 0)
[docs]class UsmSession(AbstractSession): def __init__(self, host, port=None, sec_name=None, sec_level=None, auth_protocol=None, auth_passphrase=None, priv_protocol=None, priv_passphrase=None): super(UsmSession, self).__init__(host, port) self.authData = UsmUserData(sec_name, auth_passphrase, priv_passphrase)
[docs]class Adapter(AbstractAdapter):
[docs] def get_snmp_v2_session(self, host, port, version, community, **kwargs): if community is None: raise TypeError("community can`t be None") return Session(host, port, version, community)
[docs] def get_snmp_v3_session(self, host, port, version, sec_name, sec_level, auth_protocol, auth_passphrase, priv_protocol, priv_passphrase, **kwargs): if sec_name is None: raise TypeError("sec_name can`t be None") if auth_passphrase is None: raise TypeError("auth_passphrase can`t be None") if priv_passphrase is None: raise TypeError("priv_passphrase can`t be None") return UsmSession(host, port, sec_name, sec_level, auth_protocol, auth_passphrase, priv_protocol, priv_passphrase)

Project Versions

This Page