Source code for snmp_orm.tests.agent

from __future__ import absolute_import

import time
import bisect
import select
from threading import Thread

from pyasn1.codec.ber import encoder, decoder
from pysnmp.proto import api
from pysnmp.carrier.asynsock.dispatch import AsynsockDispatcher
from pysnmp.carrier.asynsock.dgram import udp


[docs]class Instr(object): """Abstract MIB instruction.""" @property
[docs] def name(self): raise NotImplementedError()
def __cmp__(self, other): return cmp(self.name, other)
[docs] def execute(self, module, *args, **kwargs): raise NotImplementedError()
def __call__(self, protoVer, *args, **kwargs): return self.execute(api.protoModules[protoVer], *args, **kwargs)
[docs]class SysDescr(Instr): name = (1, 3, 6, 1, 2, 1, 1, 1, 0)
[docs] def execute(self, module): return module.OctetString( 'PySNMP example command responder at %s' % __file__ )
[docs]class Uptime(Instr): name = (1, 3, 6, 1, 2, 1, 1, 3, 0) birthday = time.time()
[docs] def execute(self, module): return module.TimeTicks( (time.time() - self.birthday) * 100 )
[docs]class Agent(object): def __init__(self, host, port): self._mibInstr = [] self._mibInstrIdx = {} self._transportDispatcher = AsynsockDispatcher() self.host = host self.port = port
[docs] def prepare(self): address = (self.host, self.port) transportDispatcher = self._transportDispatcher transportDispatcher.registerTransport( udp.domainName, udp.UdpSocketTransport().openServerMode(address) ) transportDispatcher.registerRecvCbFun(self.cbFun) transportDispatcher.jobStarted(1)
[docs] def start(self): try: self._transportDispatcher.runDispatcher() except select.error: pass
[docs] def stop(self): self._transportDispatcher.closeDispatcher()
[docs] def registerInstr(self, instr): assert callable(instr) self._mibInstr.append(instr) self._mibInstrIdx[instr.name] = instr
[docs] def cbFun(self, transportDispatcher, transportDomain, transportAddress, wholeMsg): mibInstr = self._mibInstr mibInstrIdx = self._mibInstrIdx while wholeMsg: msgVer = api.decodeMessageVersion(wholeMsg) if msgVer in api.protoModules: pMod = api.protoModules[msgVer] else: return reqMsg, wholeMsg = decoder.decode( wholeMsg, asn1Spec=pMod.Message(), ) rspMsg = pMod.apiMessage.getResponse(reqMsg) rspPDU = pMod.apiMessage.getPDU(rspMsg) reqPDU = pMod.apiMessage.getPDU(reqMsg) varBinds = [] errorIndex = -1 # GETNEXT PDU if reqPDU.isSameTypeWith(pMod.GetNextRequestPDU()): # Produce response var-binds errorIndex = -1 for oid, val in pMod.apiPDU.getVarBinds(reqPDU): errorIndex = errorIndex + 1 # Search next OID to report nextIdx = bisect.bisect(mibInstr, oid) if nextIdx == len(mibInstr): # Out of MIB pMod.apiPDU.setEndOfMibError(rspPDU, errorIndex) else: # Report value if OID is found varBinds.append( (mibInstr[nextIdx].name, mibInstr[nextIdx](msgVer)) ) elif reqPDU.isSameTypeWith(pMod.GetRequestPDU()): for oid, val in pMod.apiPDU.getVarBinds(reqPDU): if oid in mibInstrIdx: varBinds.append( (oid, mibInstrIdx[oid](msgVer)) ) else: # No such instance try: pMod.apiPDU.setNoSuchInstanceError(rspPDU, errorIndex) except IndexError: pass varBinds = pMod.apiPDU.getVarBinds(reqPDU) break else: # Report unsupported request type pMod.apiPDU.setErrorStatus(rspPDU, 'genErr') pMod.apiPDU.setVarBinds(rspPDU, varBinds) transportDispatcher.sendMessage( encoder.encode(rspMsg), transportDomain, transportAddress ) return wholeMsg
[docs]class BackgroundAgent(Agent): Container = Thread def __init__(self, host, port): super(BackgroundAgent, self).__init__(host, port) self.container = None
[docs] def start(self): assert self.container is None self.prepare() container = self.container = self.Container( target=super(BackgroundAgent, self).start ) container.daemon = True container.start()
[docs] def stop(self): assert self.container is not None super(BackgroundAgent, self).stop() self.container.join() self.container = None

Project Versions

This Page