Agent-side MIB implementations¶
Implementing scalar MIB objects¶
Listen and respond to SNMP GET/GETNEXT queries with the following options:
SNMPv1 or SNMPv2c
with SNMP community “public”
over IPv4/UDP, listening at 127.0.0.1:161
over IPv6/UDP, listening at [::1]:161
serving two Managed Objects Instances (sysDescr.0 and sysUptime.0)
Either of the following Net-SNMP commands will walk this Agent:
$ snmpwalk -v2c -c public 127.0.0.1 .1.3.6
$ snmpwalk -v2c -c public udp6:[::1] .1.3.6
The Command Receiver below uses two distinct transports for communication with Command Generators - UDP over IPv4 and UDP over IPv6.
from pysnmp.carrier.asyncio.dispatch import AsyncioDispatcher
from pysnmp.carrier.asyncio.dgram import udp, udp6
from pyasn1.codec.ber import encoder, decoder
from pysnmp.proto import api
import time, bisect
class SysDescr:
name = (1, 3, 6, 1, 2, 1, 1, 1, 0)
def __eq__(self, other):
return self.name == other
def __ne__(self, other):
return self.name != other
def __lt__(self, other):
return self.name < other
def __le__(self, other):
return self.name <= other
def __gt__(self, other):
return self.name > other
def __ge__(self, other):
return self.name >= other
def __call__(self, protoVer):
return api.PROTOCOL_MODULES[protoVer].OctetString(
"PySNMP example command responder"
)
class Uptime:
name = (1, 3, 6, 1, 2, 1, 1, 3, 0)
birthday = time.time()
def __eq__(self, other):
return self.name == other
def __ne__(self, other):
return self.name != other
def __lt__(self, other):
return self.name < other
def __le__(self, other):
return self.name <= other
def __gt__(self, other):
return self.name > other
def __ge__(self, other):
return self.name >= other
def __call__(self, protoVer):
return api.PROTOCOL_MODULES[protoVer].TimeTicks(
(time.time() - self.birthday) * 100
)
mibInstr = (SysDescr(), Uptime()) # sorted by object name
mibInstrIdx = {}
for mibVar in mibInstr:
mibInstrIdx[mibVar.name] = mibVar
def __callback(transportDispatcher, transportDomain, transportAddress, wholeMsg):
while wholeMsg:
msgVer = api.decodeMessageVersion(wholeMsg)
if msgVer in api.PROTOCOL_MODULES:
pMod = api.PROTOCOL_MODULES[msgVer]
else:
print("Unsupported SNMP version %s" % msgVer)
return
reqMsg, wholeMsg = decoder.decode(
wholeMsg,
asn1Spec=pMod.Message(),
)
rspMsg = pMod.apiMessage.get_response(reqMsg)
rspPDU = pMod.apiMessage.get_pdu(rspMsg)
reqPDU = pMod.apiMessage.get_pdu(reqMsg)
varBinds = []
pendingErrors = []
errorIndex = 0
# GETNEXT PDU
if reqPDU.isSameTypeWith(pMod.GetNextRequestPDU()):
# Produce response var-binds
for oid, val in pMod.apiPDU.get_varbinds(reqPDU):
errorIndex = errorIndex + 1
# Search next OID to report
nextIdx = bisect.bisect(mibInstr, oid)
if nextIdx == len(mibInstr):
# Out of MIB
varBinds.append((oid, val))
pendingErrors.append((pMod.apiPDU.set_end_of_mib_error, errorIndex))
else:
# Report value if OID is found
varBinds.append((mibInstr[nextIdx].name, mibInstr[nextIdx](msgVer)))
elif reqPDU.isSameTypeWith(pMod.GetRequestPDU()):
for oid, val in pMod.apiPDU.get_varbinds(reqPDU):
if oid in mibInstrIdx:
varBinds.append((oid, mibInstrIdx[oid](msgVer)))
else:
# No such instance
varBinds.append((oid, val))
pendingErrors.append(
(pMod.apiPDU.set_no_such_instance_error, errorIndex)
)
break
else:
# Report unsupported request type
pMod.apiPDU.set_error_status(rspPDU, "genErr")
pMod.apiPDU.set_varbinds(rspPDU, varBinds)
# Commit possible error indices to response PDU
for f, i in pendingErrors:
f(rspPDU, i)
transportDispatcher.sendMessage(
encoder.encode(rspMsg), transportDomain, transportAddress
)
return wholeMsg
transportDispatcher = AsyncioDispatcher()
transportDispatcher.register_recv_callback(__callback)
# UDP/IPv4
transportDispatcher.register_transport(
udp.DOMAIN_NAME, udp.UdpAsyncioTransport().open_server_mode(("localhost", 161))
)
# UDP/IPv6
transportDispatcher.register_transport(
udp6.DOMAIN_NAME, udp6.Udp6AsyncioTransport().open_server_mode(("::1", 161))
)
transportDispatcher.job_started(1)
try:
print("This program needs to run as root/administrator to monitor port 161.")
print("Started. Press Ctrl-C to stop")
# Dispatcher will never finish as job#1 never reaches zero
transportDispatcher.run_dispatcher()
except KeyboardInterrupt:
print("Shutting down...")
finally:
transportDispatcher.close_dispatcher()
Download
script.
See also: library reference.