"""Implement fields here."""
from __future__ import absolute_import
import math
from datetime import timedelta
from netaddr import EUI, IPAddress
from pyasn1.type.univ import Null, ObjectIdentifier
from pyasn1.type.base import Asn1ItemBase
from pysnmp.proto import rfc1902
from six import itervalues, integer_types, string_types
from snmp_orm.utils import str_to_oid
[docs]class Mapper(object):
[docs] def toAsn1(self, var):
"""toAsn1 method must convert base python objects to pyasn1 format"""
if var is None:
return Null
else:
return var
[docs]class Field(Mapper):
def __init__(self, oid):
self.oid = str_to_oid(oid)
def __get__(self, instance, owner):
"""Descriptor for retrieving a value from a field."""
if instance is None:
return self
return instance._get(self)
def load(self, adapter):
raise NotImplementedError()
def set(self, adapter, value):
raise NotImplementedError()
def prepare(self, vars):
return self.form(vars)
[docs]class Group(object):
def __init__(self, **fields):
# get prefix from args for bulk loading
if "prefix" in fields:
self.prefix = fields.pop("prefix")
else:
self.prefix = None
# check fields type
for field in itervalues(fields):
if isinstance(field, Field):
continue
raise TypeError("Object %r must be instance of Field" % field)
self.fields = fields
def __getattr__(self, name):
try:
return self.fields[name]
except KeyError:
raise AttributeError()
[docs]class TableField(object):
[docs] def load_many(self, adapter):
return adapter.walk(self.oid)
[docs] def load_one(self, adapter, key):
key = format_key(key)
return adapter.get_one(self.oid + key)
[docs] def set_one(self, adapter, key, value):
key = format_key(key)
value = self.toAsn1(value)
return adapter.set(self.oid + key, value)
[docs] def prepare_many(self, vars):
return [(oid, self.form(value)) for oid, value in vars]
[docs]class SingleValueField(Field):
def load(self, adapter):
return adapter.get_one(self.oid)
def set(self, adapter, value):
value = self.toAsn1(value)
return adapter.set(self.oid, value)
[docs]class TableValueField(Field, TableField):
pass
[docs]class FromDictMapper(Mapper):
[docs] def toAsn1(self, var):
var = super(FromDictMapper, self).toAsn1(var)
if not (not var in self.d.values() and isinstance(var, integer_types)):
# var is value but not index
var = self.d.keys()[self.d.values().index(var)]
var = rfc1902.Integer(var)
return var
[docs]class FromDictField(SingleValueField, FromDictMapper):
"""Convert data to dict value"""
def __init__(self, oid, d, conv_to=None):
super(FromDictField, self).__init__(oid)
self.d = d
self.conv_to = conv_to or (lambda x: x)
[docs]class FromDictTableField(FromDictField, TableField):
"""Convert data to dict value"""
pass
[docs]class UnicodeMapper(Mapper):
[docs] def toAsn1(self, var):
var = super(UnicodeMapper, self).toAsn1(var)
var = rfc1902.OctetString(var)
return var
[docs]class UnicodeField(SingleValueField, UnicodeMapper):
"""Convert data to unicode"""
pass
[docs]class UnicodeTableField(UnicodeField, TableField):
"""Convert data to unicode"""
pass
[docs]class OIDMapper(Mapper):
[docs] def toAsn1(self, var):
var = super(OIDMapper, self).toAsn1(var)
if not isinstance(var, Asn1ItemBase):
if isinstance(var, string_types):
var = tuple(map(int, var.split('.')))
var = ObjectIdentifier(var)
return var
[docs]class OIDField(SingleValueField, OIDMapper):
"""Convert data to OID tuple"""
pass
[docs]class OIDTableField(OIDField, TableField):
"""Convert data to OID tuple"""
pass
[docs]class IntegerMapper(Mapper):
[docs] def toAsn1(self, var):
var = super(IntegerMapper, self).toAsn1(var)
if not isinstance(var, Asn1ItemBase):
var = rfc1902.Integer(var)
return var
[docs]class IntegerField(SingleValueField, IntegerMapper):
"""Convert data to int"""
pass
[docs]class IntegerTableField(IntegerField, TableField):
"""Convert data to int"""
pass
[docs]class LongIntegerMapper(Mapper):
[docs] def toAsn1(self, var):
# FIXME: should the LongInteger type be equal to Integer?
var = super(IntegerMapper, self).toAsn1(var)
if not isinstance(var, Asn1ItemBase):
var = rfc1902.Integer(var)
return var
[docs]class LongIntegerField(SingleValueField, LongIntegerMapper):
"""Convert data to long"""
pass
[docs]class LongIntegerTableField(LongIntegerField, TableField):
"""Convert data to long"""
pass
[docs]class TimeTickMapper(IntegerMapper):
[docs] def toAsn1(self, var):
# TODO: Convert TimeTick object to Asn1 Integer.
pass
[docs]class TimeTickField(SingleValueField, TimeTickMapper):
"""Convert data to timedelta"""
pass
[docs]class TimeTickTableField(TimeTickField, TableField):
"""Convert data to timedelta"""
pass
[docs]class MacMapper(Mapper):
[docs] def toAsn1(self, var):
var = super(IntegerMapper, self).toAsn1(var)
if isinstance(var, string_types) and len(var) != 6:
var = EUI(var)
if isinstance(var, EUI):
var = var.packed
if not isinstance(var, Asn1ItemBase):
var = rfc1902.OctetString(var)
return var
[docs]class MacField(SingleValueField, MacMapper):
"""Convert data to MAC"""
pass
[docs]class MacTableField(MacField, TableField):
"""Convert data to MAC"""
pass
[docs]class IPAddressMapper(Mapper):
[docs] def toAsn1(self, var):
var = super(IPAddressMapper, self).toAsn1(var)
if isinstance(var, string_types) and len(var) != 4:
var = IPAddress(var)
if isinstance(var, IPAddress):
var = var.packed
if not isinstance(var, Asn1ItemBase):
var = rfc1902.IpAddress(var)
return var
[docs]class IPAddressField(SingleValueField, IPAddressMapper):
"""Convert data to IP"""
pass
[docs]class IPAddressTableField(IPAddressField, TableField):
"""Convert data to IP"""
pass