Agent-side MIB implementations#
Implementing scalar MIB objects#
Listen and respond to SNMP GET/GETNEXT queries with the following options:
SNMPv1 or SNMPv2c
with SNMP community “public”
over IPv4/UDP, listening at 127.0.0.1:161
over IPv6/UDP, listening at [::1]:161
serving two Managed Objects Instances (sysDescr.0 and sysUptime.0)
Either of the following Net-SNMP commands will walk this Agent:
$ snmpwalk -v2c -c public 127.0.0.1 .1.3.6
$ snmpwalk -v2c -c public udp6:[::1] .1.3.6
The Command Receiver below uses two distinct transports for communication with Command Generators - UDP over IPv4 and UDP over IPv6.
from pysnmp.carrier.asyncio.dispatch import AsyncioDispatcher
from pysnmp.carrier.asyncio.dgram import udp, udp6, unix
from pyasn1.codec.ber import encoder, decoder
from pysnmp.proto import api
import time, bisect
class SysDescr:
name = (1, 3, 6, 1, 2, 1, 1, 1, 0)
def __eq__(self, other):
return self.name == other
def __ne__(self, other):
return self.name != other
def __lt__(self, other):
return self.name < other
def __le__(self, other):
return self.name <= other
def __gt__(self, other):
return self.name > other
def __ge__(self, other):
return self.name >= other
def __call__(self, protoVer):
return api.protoModules[protoVer].OctetString(
"PySNMP example command responder"
)
class Uptime:
name = (1, 3, 6, 1, 2, 1, 1, 3, 0)
birthday = time.time()
def __eq__(self, other):
return self.name == other
def __ne__(self, other):
return self.name != other
def __lt__(self, other):
return self.name < other
def __le__(self, other):
return self.name <= other
def __gt__(self, other):
return self.name > other
def __ge__(self, other):
return self.name >= other
def __call__(self, protoVer):
return api.protoModules[protoVer].TimeTicks((time.time() - self.birthday) * 100)
mibInstr = (SysDescr(), Uptime()) # sorted by object name
mibInstrIdx = {}
for mibVar in mibInstr:
mibInstrIdx[mibVar.name] = mibVar
def cbFun(transportDispatcher, transportDomain, transportAddress, wholeMsg):
while wholeMsg:
msgVer = api.decodeMessageVersion(wholeMsg)
if msgVer in api.protoModules:
pMod = api.protoModules[msgVer]
else:
print("Unsupported SNMP version %s" % msgVer)
return
reqMsg, wholeMsg = decoder.decode(
wholeMsg,
asn1Spec=pMod.Message(),
)
rspMsg = pMod.apiMessage.getResponse(reqMsg)
rspPDU = pMod.apiMessage.getPDU(rspMsg)
reqPDU = pMod.apiMessage.getPDU(reqMsg)
varBinds = []
pendingErrors = []
errorIndex = 0
# GETNEXT PDU
if reqPDU.isSameTypeWith(pMod.GetNextRequestPDU()):
# Produce response var-binds
for oid, val in pMod.apiPDU.getVarBinds(reqPDU):
errorIndex = errorIndex + 1
# Search next OID to report
nextIdx = bisect.bisect(mibInstr, oid)
if nextIdx == len(mibInstr):
# Out of MIB
varBinds.append((oid, val))
pendingErrors.append((pMod.apiPDU.setEndOfMibError, errorIndex))
else:
# Report value if OID is found
varBinds.append((mibInstr[nextIdx].name, mibInstr[nextIdx](msgVer)))
elif reqPDU.isSameTypeWith(pMod.GetRequestPDU()):
for oid, val in pMod.apiPDU.getVarBinds(reqPDU):
if oid in mibInstrIdx:
varBinds.append((oid, mibInstrIdx[oid](msgVer)))
else:
# No such instance
varBinds.append((oid, val))
pendingErrors.append(
(pMod.apiPDU.setNoSuchInstanceError, errorIndex)
)
break
else:
# Report unsupported request type
pMod.apiPDU.setErrorStatus(rspPDU, "genErr")
pMod.apiPDU.setVarBinds(rspPDU, varBinds)
# Commit possible error indices to response PDU
for f, i in pendingErrors:
f(rspPDU, i)
transportDispatcher.sendMessage(
encoder.encode(rspMsg), transportDomain, transportAddress
)
return wholeMsg
transportDispatcher = AsyncioDispatcher()
transportDispatcher.registerRecvCbFun(cbFun)
# UDP/IPv4
transportDispatcher.registerTransport(
udp.domainName, udp.UdpAsyncioTransport().openServerMode(("localhost", 161))
)
# UDP/IPv6
transportDispatcher.registerTransport(
udp6.domainName, udp6.Udp6AsyncioTransport().openServerMode(("::1", 161))
)
## Local domain socket
# transportDispatcher.registerTransport(
# unix.domainName, unix.UnixAsyncioTransport().openServerMode('/tmp/snmp-agent')
# )
transportDispatcher.jobStarted(1)
try:
# Dispatcher will never finish as job#1 never reaches zero
transportDispatcher.runDispatcher()
except:
transportDispatcher.closeDispatcher()
raise
Download
script.
See also: library reference.