Own plugin for Nautobot is not difficult at all
If someone is not in the know, Nautobot is a fork of Netbox, it is promoted by the NTC (Network to Code) network automation provider, widely known in narrow circles. Perhaps, rummaging through your memory, you will remember the not very beautiful story of the beginning of last year about overwriting the history of Netbox commits – it just concerned Nautobot and NTC. But this will not be about the rules of decency of the open source community, but about how to easily and naturally write your own plugin for someone who has chosen Nautobot, rather than Netbox, as a source of truth and automation platform. Although I am sure that the implementation of the plugin, which will be discussed next, could just as easily be done for Netbox.
Formulation of the problem
In our not very large network, there are ~ 50 branches, each of which has switches with a total number of ports from 300 to 500. In fact, there are more branches, but we will start with this number, because all of them have Cisco equipment installed. This is important, since our plugin will be single-vendor for now.
For each port of each switch, we need to know:
when was the last time a connection was made to this port
device with which MAC/IP address and in which VLAN is connected to this port
“Wait a minute, author,” you say, “I have already seen such functionality somewhere.” Absolutely, this is a switchmap (googled for the phrase cisco switchmap). Well, no one promised that there would be a completely new idea for the plugin in the article. But this idea fits very well with the concept of using Nautobot as an automation platform and SoT.
At first, I wanted to pay homage to the developers of switchmap and immortalize their creation in the name of my plugin. But in the end I settled on a more functional name – nautobot-porthistory-plugin (Sorry guys)
Stack
Well, everything is simple here. Since we are using Nautobot, we will write in Python and actively use Django modules. No external scripts, everything is inside Nautobot. We will collect data from network devices via SNMP. Switches and routers – Cisco, all set up in Nautobot, each device takes into account the primary address and all interfaces, for each branch the list of prefixes and VLANs is taken into account.
What about documentation?
Nautobot has great documentation. Still, because it is almost completely copied from Netbox. But all the changes and differences from Netbox are documented no less well, for this special thanks to the developers. The section on plugin development is quite large – https://nautobot.readthedocs.io/en/stable/plugins/development/
But only the first reading of the official tutorial did not give answers to many questions (however, as well as subsequent ones). Much knowledge had to be pulled out from the source code of Nautobot itself, as well as plugins, of which there are more and more.
So, let’s begin
In accordance with the task, we will divide the development of the plugin into two parts. In the first part, we will do something easier – we will teach the plugin to determine the last time the switch ports were used, store and display this information to users.
At the end of this step, our plugin is expected to do the following:
create a table in the database to store information about unused interfaces;
will create a job that will fill the created table with the frequency we need;
will change the switch information display page so that you can see which ports have not been used for more than the specified number of days.
While we act strictly according to the documentation
Let’s create a plugin skeleton on the server in the nautobot-plugin-porthistory directory (we work under the nautobot user).
Nautobot includes a command to help create the plugin directory: nautobot-server startplugin [app_name]
Team nautobot-server startplugin nautobot_porthistory_plugin
will create the following set of folders and files:
.
└── nautobot_porthistory_plugin
├── __init__.py
├── migrations
│ └── __init__.py
├── models.py
├── navigation.py
├── tests
│ ├── __init__.py
│ ├── test_models.py
│ └── test_views.py
├── urls.py
└── views.py
Well, there are files, let’s fill them with working code.
The package file __init__.py already contains most of the required options. All we need is to specify which config options are required, as well as the default plugin settings.
__init__.py
"""nautobot_porthistory_plugin Plugin Initilization."""
from nautobot.extras.plugins import PluginConfig
class NautobotPorthistoryPluginConfig(PluginConfig):
"""Plugin configuration for the nautobot_porthistory_plugin plugin."""
name = "nautobot_porthistory_plugin" # Raw plugin name; same as the plugin's source directory.
verbose_name = "nautobot_porthistory_plugin" # Human-friendly name for the plugin.
base_url = "nautobot_porthistory_plugin" # (Optional) Base path to use for plugin URLs. Defaulting to app_name.
required_settings = [] # A list of any configuration parameters that must be defined by the user.
min_version = "1.0.0" # Minimum version of Nautobot with which the plugin is compatible.
max_version = "1.999" # Maximum version of Nautobot with which the plugin is compatible.
default_settings = {} # A dictionary of configuration parameters and their default values.
caching_config = {} # Plugin-specific cache configuration.
# А вот и нужная нам конфигурация
required_settings = ['switches_role_slug']
default_settings = {
'min_idle_days': 14,
'snmp_community': 'public',
'workers': 50,
}
config = NautobotPorthistoryPluginConfig
Parameters that will be passed to the plugin:
switches_role_slug
– The role by which we will filter the switchesmin_idle_days
– If the port is not used for less than a day, we are not interested in itworkers
– the number of parallel asynchronous requests to the equipment
The model will be defined in models.py. Table UnusedPorts we will store the update time, the last time the interface was used, and the actual ID of the interface itself. As you can see, nothing complicated at all.
models.py
"""Model definition for nautobot_porthistory_plugin."""
from django.db import models
from nautobot.core.models import BaseModel
from nautobot.dcim.fields import MACAddressCharField
class UnusedPorts(BaseModel):
# Дата/время последнего output на порту коммутатора
updated = models.DateTimeField(auto_now=True)
last_output = models.DateTimeField()
interface = models.ForeignKey(
to="dcim.Interface",
on_delete=models.CASCADE,
blank=False,
)
def __str__(self):
return f'{self.interface.name} - {self.last_output}'
Argument auto_now=True
specifies that each time data is saved to the table, the field updated automatically updated to the current time
We don’t need the navigation.py, urls.py and views.py files for now, let’s leave them as they are. These three files will be needed in the next step, when we make a separate link to the plugin in the menu. But, as usual, there is a nuance. Imports in views.py “out of the box” refer to non-existent django modules, so in order not to catch an exception during installation, comment out the line from django.views.generic import views
So, we have a table in the database, but there is no data in it. Let’s create a job (job in Nautobot – like custom script and report in Netbox), which will collect information from the switches, process it and save it in our table.
Add a jobs.py file with a class (job) UnusedPortsUpdate
Its logic is simple:
Generate a list of switches – use a filter on device_role.slug = switches_role_slug from config
Asynchronously poll all switches via SNMP – device uptime
Asynchronously poll all switches via SNMP – matching ifindex and port names
Asynchronously poll all switches via SNMP – Last output on ports
If Last output is negative, see if there is information on this port in the table. If missing – add the date of the last use equal to the date of loading (p. 2). If there is data, just update the check date
If Last output converted to days is less than min_idle_days from the config, delete the port entry from the table. Otherwise, we create/update the record.
Just in case, we will add Site to the optional input parameters in order to be able to update information on a specific branch.
jobs.py code
jobs.py
from nautobot.dcim.models import Device, DeviceRole, Site, Interface
from nautobot.extras.jobs import Job, ObjectVar
from nautobot.extras.models import Status
from django.conf import settings
from nautobot_porthistory_plugin.models import UnusedPorts
import asyncio
import aiosnmp
from collections import defaultdict
from netutils.interface import canonical_interface_name
from datetime import datetime, timedelta
class UnusedPortsUpdate(Job):
class Meta:
name = "Обновление информации о неподключенных интерфейсах"
site = ObjectVar(
model=Site,
label="БЮ",
required=False
)
async def bulk_snmp(self, device, oid_list, community):
oid_results = {}
try:
async with aiosnmp.Snmp(
host=device,
port=161,
community=community,
timeout=5,
retries=3,
max_repetitions=10,
) as snmp:
oid_bulk_result = {}
for oid in oid_list:
reply = await snmp.bulk_walk(oid)
for index in reply:
oid_bulk_result[index.oid] = index.value
oid_results[oid] = oid_bulk_result
return (device, oid_results)
except Exception as error:
return (device, error)
return (device, None)
async def bulk_snmp_with_semaphore(self, semaphore, function, *args, **kwargs):
async with semaphore:
return await function(*args, **kwargs)
async def async_bulk_snmp(self, devices, oid_list, community, workers):
semaphore = asyncio.Semaphore(workers)
coroutines = [
self.bulk_snmp_with_semaphore(semaphore, self.bulk_snmp, device, oid_list, community)
for device in devices
]
result = []
for future in asyncio.as_completed(coroutines):
result.append(await future)
return result
def round_datetime(self, date):
date_tuple = date.timetuple()
return datetime(year=date_tuple.tm_year,
month=date_tuple.tm_mon,
day=date_tuple.tm_mon,
hour=date_tuple.tm_hour, minute=0, second=0, microsecond=0
)
def run(self, data, commit):
# запускать job могут только пользователи is_superuser
if not self.request.user.is_superuser:
self.log_info(message="Неавторизованный запуск")
return
PLUGIN_CFG = settings.PLUGINS_CONFIG['nautobot_porthistory_plugin']
COMMUNITY = PLUGIN_CFG['snmp_community']
MIN_IDLE_DAYS = PLUGIN_CFG.get('min_idle_days', 14)
SWITCHES_ROLE_SLUG = PLUGIN_CFG['switches_role_slug']
WORKERS = PLUGIN_CFG['workers']
STATUS_ACTIVE = Status.objects.get(slug='active')
# сгенерируем справочник устройств
devices = [] #этот список передадим в модуль snmp
device_dict = defaultdict(dict)
device_role = DeviceRole.objects.filter(slug__in=SWITCHES_ROLE_SLUG)
if data['site']:
nb_devices = Device.objects.filter(site=data['site'], device_role__in=device_role, status=STATUS_ACTIVE)
else:
nb_devices = Device.objects.filter(device_role__in=device_role, status=STATUS_ACTIVE)
for nb_device in nb_devices:
if nb_device.platform and nb_device.platform.napalm_driver and nb_device.platform.napalm_driver == 'cisco_iosxe' and nb_device.primary_ip4:
primary_ip = str(nb_device.primary_ip4).split('/')[0]
devices.append(primary_ip)
device_dict[primary_ip]['device'] = nb_device
device_dict[primary_ip]['interfaces'] = {}
device_dict[primary_ip]['ifindexes'] = {}
device_interfaces = Interface.objects.filter(device_id=nb_device)
for intf in device_interfaces:
device_dict[primary_ip]['interfaces'][intf.name] = [intf]
# получим uptime оборудования по SNMP (в секундах)
# и занесем эту информацию в справочник
oid_list = ['.1.3.6.1.6.3.10.2.1.3']
results = asyncio.run(self.async_bulk_snmp(devices, oid_list, COMMUNITY, WORKERS))
for device_ip, device_result in results:
if type(device_result) != dict:
self.log_warning(obj=device_dict[device_ip]['device'],message=f'не удалось получить информацию по SNMP - {device_result}')
continue
for oid, oid_result in device_result.items():
for uptime in oid_result.values():
device_dict[device_ip]['uptime'] = uptime
boottime = datetime.now() - timedelta(seconds=uptime)
device_dict[device_ip]['boottime'] = boottime
# получим названия интерфейсов и их индексы с оборудования по SNMP
# и занесем эту информацию в справочник
oid_list = ['.1.3.6.1.2.1.31.1.1.1.1']
results = asyncio.run(self.async_bulk_snmp(devices, oid_list, COMMUNITY, WORKERS))
for device_ip, device_result in results:
if type(device_result) != dict or 'uptime' not in device_dict[device_ip]:
continue
for oid, oid_result in device_result.items():
for index, index_result in oid_result.items():
ifindex = index.split('.')[-1]
canonical_intf_name = canonical_interface_name(index_result.decode("utf-8"))
if canonical_intf_name in device_dict[device_ip]['interfaces']:
device_dict[device_ip]['ifindexes'][ifindex] = canonical_intf_name
# получим время последнего output по SNMP
oid_list = ['.1.3.6.1.4.1.9.2.2.1.1.4']
results = asyncio.run(self.async_bulk_snmp(devices, oid_list, COMMUNITY, WORKERS))
output=""
for device_ip, device_result in results:
if type(device_result) != dict or 'uptime' not in device_dict[device_ip]:
continue
nb_device = device_dict[device_ip]['device']
boottime = device_dict[device_ip]['boottime']
uptime = device_dict[device_ip]['uptime']
output += f'{nb_device.name} - power on {boottime}\n'
unused_port_count = 0
for oid, oid_result in device_result.items():
for index, time_from_last_output in oid_result.items():
ifindex = index.split('.')[-1]
if ifindex in device_dict[device_ip]['ifindexes']:
intf_name = device_dict[device_ip]['ifindexes'][ifindex]
nb_interface = device_dict[device_ip]['interfaces'][intf_name][0]
if time_from_last_output < 0 or time_from_last_output / 1000 > uptime - 300:
unused_port_count += 1
unused_port, created = UnusedPorts.objects.get_or_create(
interface=nb_interface,
defaults={
'last_output': boottime
}
)
unused_port.save()
else:
last_output = datetime.now() - timedelta(seconds=round(time_from_last_output/1000))
if 1000 * 60 * 60 * 24 * MIN_IDLE_DAYS > time_from_last_output:
# прошло меньше MIN_IDLE_DAYS дней
UnusedPorts.objects.filter(interface=nb_interface).delete()
else:
unused_port_count += 1
unused_port, created = UnusedPorts.objects.get_or_create(
interface=nb_interface,
defaults={
'last_output': last_output
}
)
if not created:
unused_port.last_output = last_output
unused_port.save()
output += f'неиспользуемых в течении {MIN_IDLE_DAYS} дн. портов - {unused_port_count}\n'
return output
jobs = [UnusedPortsUpdate]
In the future, it will be possible to schedule the regular launch of the job in order to always have an up-to-date list of unused ports. Let me remind you that Nautobot has a built-in job scheduler for this. You just need to specify the frequency when starting the job.
Now let’s teach the plugin to display the received data on the switch page. To do this, add the template_content.py and templates/unused_ports.html files to the plugin root
The names speak for themselves – prepare the template content and render the HTML block. The rendered block will be placed on the Device page, on the right side.
template_content.py
from nautobot.extras.plugins import PluginTemplateExtension
from django.conf import settings
from .models import UnusedPorts
class DeviceUnusedPorts(PluginTemplateExtension):
"""Template extension to display unused ports on the right side of the page."""
model="dcim.device"
def right_page(self):
PLUGIN_CFG = settings.PLUGINS_CONFIG['nautobot_porthistory_plugin']
SWITCHES_ROLE_SLUG = PLUGIN_CFG['switches_role_slug']
MIN_IDLE_DAYS = PLUGIN_CFG.get('min_idle_days', 14)
device = self.context['object']
if device.device_role.slug in SWITCHES_ROLE_SLUG:
device_intefaces = device.interfaces.values_list("id", flat=True)
unused_ports = UnusedPorts.objects.filter(interface_id__in=device_intefaces)
unused_ports_with_delta = []
for port in unused_ports:
unused_ports_with_delta.append({
'interface_name': port.interface.name,
'last_output': port.last_output.strftime("%d.%m.%Y %H:%M"),
'updated': port.updated.strftime("%d.%m.%Y %H:%M"),
'delta': str(port.updated - port.last_output).split()[0]
})
return self.render('unused_ ports.html', extra_context={
'unused_ports': unused_ports_with_delta,
'min_idle_days': MIN_IDLE_DAYS
})
else:
return ''
template_extensions = [DeviceUnusedPorts]
templates/unused_ports.html
<div class="panel panel-default">
<div class="panel-heading"><strong>Неиспользуемые порты (в течение последних {{ min_idle_days }} дн.)</strong></div>
<table class="table table-hover panel-body attr-table">
<tr>
<td>Порт</td>
<td>Время последней активности</td>
<td>Обновлено (UTC)</td>
</tr>
{% for item in unused_ports %}
<tr>
<td>{{ item.interface_name }}</td>
<td>{{ item.last_output }} (прошло ~{{ item.delta }} дн.)</td>
<td>{{ item.updated }}</td>
</tr>
{% endfor %}
</table>
</div>
It remains to mix all the ingredients and try what happened:
Let’s create migrations – a standard Django mechanism for ensuring database versioning in the Nautobot wrapper.
nautobot-server makemigrations
(run from plugin folder)add setup.py and MANIFEST.in files
install plugin simple
pip3 install .
from plugin directoryenable the plugin in the nautobot configuration file and specify the plugin parameters there
PLUGINS = [
'nautobot_porthistory_plugin',
]
PLUGINS_CONFIG = {
'nautobot_porthistory_plugin': {
'switches_role_slug': ['Access-switch'],
'min_idle_days': 14,
}
}run post-installation script
nautobot-server post_upgrade
. It will update the database and copy the HTML files to the appropriate folder.Let’s restart the services:
sudo systemctl restart nautobot nautobot-worker
Ready! You can run the job and check the results of its execution:
The full plugin code created at this stage can be peeped at the link – https://github.com/iontzev/nautobot-porthistory-plugin/tree/part_1
Stage two – pumping the plugin
The second part of the implementation will be more difficult. The more interesting, because it will allow you to understand what else the plugins in Nautobot can do. First, let’s add another model to the database – a table in which we will store MAC and IP addresses associated with the interface. The way is already familiar – update models.py and create a new migration nautobot-server makemigrations
from nautobot.dcim.fields import MACAddressCharField
class MAConPorts(BaseModel):
# MAC и IP на порту коммутатора
updated = models.DateTimeField(auto_now=True)
mac = MACAddressCharField(blank=False, verbose_name="MAC Address")
vlan = models.ForeignKey(
to="ipam.VLAN",
on_delete=models.CASCADE,
blank=False,
)
ipaddress = models.ForeignKey(
to="ipam.IPAddress",
on_delete=models.SET_NULL,
default=None,
blank=True,
null=True,
)
interface = models.ForeignKey(
to="dcim.Interface",
on_delete=models.CASCADE,
blank=False,
)
device = models.ForeignKey(
to="dcim.Device",
on_delete=models.CASCADE,
blank=False,
)
def __str__(self):
return f'{self.intervace} - VLAN {seld.vlan.vid} MAC {self.mac}'
class Meta:
verbose_name_plural="MAC and IP on switches ports"
Let’s add one more class to jobs.py – it’s a job. The algorithm of its work is relatively simple:
asynchronously poll via SNMP all devices with the role device_role.slug = switches_role_slug from config – get mac address table and match MAC to port names.
asynchronously poll via SNMP all devices with the role device_role.slug = routers_role_slug from config – get ARP table. Let’s not forget that the config also needs to be corrected – it is located in __init__.py
try to get hostname by IP
if the IP address is not listed in Nautobot, add
update MAC and IP binding information to device interfaces
The update is designed in such a way that if the MAC on the port physically disappears, we can find it in the connection history if nothing else appeared on this switch port.
New job in jobs.py
class MAConPortsUpdate(Job):
class Meta:
name = "Обновление информации о подключенных устройствах"
site = ObjectVar(
model=Site,
label="БЮ",
required=False
)
async def bulk_snmp(self, device, oid_list, community):
oid_results = {}
try:
async with aiosnmp.Snmp(
host=device,
port=161,
community=community,
timeout=5,
retries=3,
max_repetitions=10,
) as snmp:
oid_bulk_result = {}
for oid in oid_list:
reply = await snmp.bulk_walk(oid)
for index in reply:
oid_bulk_result[index.oid] = index.value
oid_results[oid] = oid_bulk_result
return (device, oid_results)
except Exception as error:
return (device, error)
return (device, None)
async def bulk_snmp_with_semaphore(self, semaphore, function, *args, **kwargs):
async with semaphore:
return await function(*args, **kwargs)
async def async_bulk_snmp(self, devices, oid_list, community, workers):
semaphore = asyncio.Semaphore(workers)
coroutines = [
self.bulk_snmp_with_semaphore(semaphore, self.bulk_snmp, device, oid_list, community)
for device in devices
]
result = []
for future in asyncio.as_completed(coroutines):
result.append(await future)
return result
def run(self, data, commit):
# запускать job могут только пользователи is_superuser
if not self.request.user.is_superuser:
self.log_info(message="Неавторизованный запуск")
return
PLUGIN_CFG = settings.PLUGINS_CONFIG['nautobot_porthistory_plugin']
COMMUNITY = PLUGIN_CFG['snmp_community']
SWITCHES_ROLE_SLUG = PLUGIN_CFG['switches_role_slug']
ROUTERS_ROLE_SLUG = PLUGIN_CFG['routers_role_slug']
WORKERS = PLUGIN_CFG['workers']
STATUS_ACTIVE = Status.objects.get(slug='active')
STATUS_STATIC = Status.objects.get(slug='static')
STATUS_DHCP = Status.objects.get(slug='dhcp')
device_role = DeviceRole.objects.filter(slug__in=SWITCHES_ROLE_SLUG)
devices = defaultdict(dict)
devices_list = []
vlans = defaultdict(list)
# построим список всех связей, чтобы потом исключить из результатов линки между свичами
cable_set = defaultdict(set)
all_cables = Cable.objects.all()
for cable in all_cables:
if cable.termination_a_type == ContentType.objects.get(app_label="dcim", model="interface"):
if not data['site'] or cable.termination_a.device.site == data['site']:
cable_set[cable.termination_a.device.name].add(cable.termination_a.name)
if cable.termination_b_type == ContentType.objects.get(app_label="dcim", model="interface"):
if not data['site'] or cable.termination_b.device.site == data['site']:
cable_set[cable.termination_b.device.name].add(cable.termination_b.name)
# сгенерируем справочник вланов с разбивкой по сайтам
vlans_by_site = defaultdict(list)
if data['site']:
nb_vlans = VLAN.objects.filter(site=data['site'], status=STATUS_ACTIVE, _custom_field_data={'flag-porthistory':True})
else:
nb_vlans = VLAN.objects.filter(status=STATUS_ACTIVE, _custom_field_data={'flag-porthistory':True})
for nb_vlan in nb_vlans:
vlans_by_site[nb_vlan.site.name].append(nb_vlan.vid)
# сгенерируем справочник устройств
for site in vlans_by_site:
site_id = Site.objects.get(name=site)
nb_devices_in_site = Device.objects.filter(
site=site_id,
device_role__in=device_role,
status=STATUS_ACTIVE,
)
for nb_device in nb_devices_in_site:
if (nb_device.platform and
nb_device.platform.napalm_driver and
nb_device.platform.napalm_driver == 'cisco_iosxe' and
nb_device.primary_ip4):
primary_ip = str(nb_device.primary_ip4).split('/')[0]
devices_list.append(primary_ip)
device = devices[primary_ip] = {}
device['device'] = nb_device
device['site'] = nb_device.site
device['interfaces'] = {}
device['ifindexes'] = {}
device['bridge_ports'] = {}
device['vlans'] = vlans_by_site[site]
for intf in Interface.objects.filter(device_id=nb_device):
device['interfaces'][intf.name] = intf
for vlan in vlans_by_site[site]:
vlans[vlan].append(primary_ip)
# получим названия интерфейсов и их индексы с оборудования по SNMP
oid_list = ['.1.3.6.1.2.1.31.1.1.1.1']
results = asyncio.run(self.async_bulk_snmp(devices_list, oid_list, COMMUNITY, WORKERS))
for device_ip, device_result in results:
if type(device_result) != dict:
self.log_warning(obj=devices[device_ip]['device'],message=f'не удалось получить информацию по SNMP')
del devices[device_ip]
devices_list.remove(device_ip)
continue
for oid, oid_result in device_result.items():
for index, index_result in oid_result.items():
ifindex = index.split('.')[-1]
canonical_intf_name = canonical_interface_name(index_result.decode("utf-8"))
if canonical_intf_name in devices[device_ip]['interfaces']:
devices[device_ip]['ifindexes'][ifindex] = canonical_intf_name
# пройдемся по списку вланов и получим с устройства таблицу MAC адресов для каждого влана
# MAC адреса в десятичном формате
port_mac_relation = defaultdict(list)
for vlan, devices_dict in vlans.items():
self.log_info(message=f'Получаем информацию по VLAN {vlan}')
community_with_vlan = f'{COMMUNITY}@{vlan}'
devices_list = [device for device in devices_dict if device in devices_list]
# получим bridge ports с оборудования по SNMP (зависит от VLAN)
oid_list = ['.1.3.6.1.2.1.17.1.4.1.2']
results = asyncio.run(self.async_bulk_snmp(devices_list, oid_list, community_with_vlan, WORKERS))
for device_ip, device_result in results:
if type(device_result) != dict:
# скорее всего, такого VLAN нет на этом устройстве
continue
for oid, oid_result in device_result.items():
for index, index_result in oid_result.items():
bridge_port = index.split('.')[-1]
ifindex = str(index_result)
if ifindex in devices[device_ip]['ifindexes']:
ifname = devices[device_ip]['ifindexes'][ifindex]
nb_interface = devices[device_ip]['interfaces'][ifname]
devices[device_ip]['bridge_ports'][bridge_port] = nb_interface
else:
oid_list = ['.1.3.6.1.2.1.17.4.3.1.2']
results = asyncio.run(self.async_bulk_snmp(devices_list, oid_list, community_with_vlan, WORKERS))
for device_ip, device_result in results:
nb_device = devices[device_ip]['device']
nb_vlan = VLAN.objects.get(vid=vlan, site_id=nb_device.site.id)
if type(device_result) != dict:
continue
for oid, oid_result in device_result.items():
for mac_dec, bridge_port in oid_result.items():
if str(bridge_port) in devices[device_ip]['bridge_ports']:
if (devices[device_ip]['bridge_ports'][str(bridge_port)].name not in cable_set[nb_device.name]
and not devices[device_ip]['bridge_ports'][str(bridge_port)]._custom_field_data.get('flag-ignore-mac')):
# преобразуем MAC из десятичного формата в шестнадцатеричный
mac_hex = ''.join(['{0:x}'.format(int(i)).zfill(2) for i in mac_dec.split('.')[-6:]]).upper()
port_mac_relation[devices[device_ip]['bridge_ports'][str(bridge_port)].id].append({
'vlan': nb_vlan,
'mac': mac_hex,
})
# подготовим список L3 устройств
routers = defaultdict(dict)
routers_list = []
device_role = DeviceRole.objects.filter(slug__in=ROUTERS_ROLE_SLUG)
for site in vlans_by_site:
site_id = Site.objects.get(name=site)
nb_devices_in_site = Device.objects.filter(
site=site_id,
device_role__in=device_role,
status=STATUS_ACTIVE,
)
for nb_device in nb_devices_in_site:
if (nb_device.platform and
nb_device.platform.napalm_driver and
nb_device.platform.napalm_driver == 'cisco_iosxe' and
nb_device.primary_ip4):
primary_ip = str(nb_device.primary_ip4).split('/')[0]
routers_list.append(primary_ip)
router = routers[primary_ip] = {}
router['site'] = nb_device.site.name
router['device'] = nb_device
arp = defaultdict(dict)
# получим ARP-таблицу с оборудования по SNMP
oid_list = ['.1.3.6.1.2.1.3.1.1.2']
results = asyncio.run(self.async_bulk_snmp(routers_list, oid_list, COMMUNITY, WORKERS))
for device_ip, device_result in results:
site = routers[device_ip]['site']
arp[site] = defaultdict(list)
if type(device_result) != dict:
self.log_warning(obj=routers[device_ip]['device'],message=f'не удалось получить информацию по SNMP')
continue
for oid, oid_result in device_result.items():
for index, index_result in oid_result.items():
snmp_address=".".join(index.split('.')[-4:])
snmp_mac="".join(["{0:x}".format(int(i)).zfill(2) for i in index_result]).upper()
arp[site][snmp_mac].append(snmp_address)
output=""
for device in devices.values():
nb_device = device['device']
site = nb_device.site.name
output += f'device {nb_device} :'
mac_on_device = ip_on_device = name_on_device = 0
for intf in device['interfaces'].values():
if len(port_mac_relation[intf.id]) > 0:
MAConPorts.objects.filter(interface=intf).delete()
for vlan_and_mac in port_mac_relation[intf.id]:
mac_on_device += 1
nb_prefixes = Prefix.objects.filter(vlan_id=vlan_and_mac['vlan'].id)
addresses = arp[site].get(vlan_and_mac['mac'])
address_with_prefix = ''
if nb_prefixes and addresses:
for nb_prefix in nb_prefixes:
for address in addresses:
if IPv4Address(address) in IPv4Network(str(nb_prefix)):
prefixlen = str(nb_prefix).split('/')[-1]
address_with_prefix = f'{address}/{prefixlen}'
break
else:
continue
break
if address_with_prefix:
ip_on_device += 1
try:
hostname, aliaslist, ipaddrlist = socket.gethostbyaddr(address)
name_on_device += 1
except:
hostname=""
nb_address, created = IPAddress.objects.get_or_create(
address=address_with_prefix,
vrf=nb_prefix.vrf,
defaults={
'status': STATUS_STATIC,
'dns_name': hostname
}
)
if created:
self.log_success(obj=nb_address, message=f'Добавлен IP адрес {hostname}')
elif nb_address.status != STATUS_DHCP and hostname and nb_address.dns_name != hostname:
old_hostname = nb_address.dns_name
nb_address.dns_name = hostname
nb_address.save()
self.log_success(obj=nb_address, message=f'Обновлено DNS name "{old_hostname}" -> "{hostname}"')
else:
nb_address = None
mac, created = MAConPorts.objects.get_or_create(
vlan=vlan_and_mac['vlan'],
mac=vlan_and_mac['mac'],
defaults={
'interface': intf,
'device': nb_device,
'ipaddress': nb_address,
}
)
if not created:
updated = False
if nb_address and mac.ipaddress != nb_address:
self.log_info(obj=nb_address, message=f'Устройство с MAC {mac.mac} поменяло IP {mac.ipaddress} -> {nb_address}')
mac.ipaddress = nb_address
updated = True
if mac.interface != intf:
self.log_info(obj=intf, message=f'MAC {mac.mac} переехал с порта "{mac.interface}"')
mac.interface = intf
mac.device = nb_device
updated = True
if updated:
mac.save()
output += f" MAC count - {mac_on_device}, IP count - {ip_on_device}, resolved to hostname - {name_on_device}\n"
return output
Important! The peculiarity of obtaining a MAC table from Cisco switches via SNMP is that it is necessary to indicate in the community line the VLAN number in which MAC addresses live.
We also take into account that not all VLANs need to be known. For example, if there are wireless clients in a VLAN, it makes no sense to track the MAC in them. The same goes for ports. So we need a filtering mechanism for VLANs and switch ports. Custom_fields is great here, but using the names of the fields that the user needs to add in scripts means hoping for a small miracle.
So let’s be realistic, add custom_fileds when installing the plugin. The Django signals functionality will help us with this, again in the Nautobot wrapper.
Let’s add a signal call to __init__.py, and the signal itself to signals.py.
__init__.py
class NautobotPorthistoryPluginConfig(PluginConfig):
............
def ready(self):
super().ready()
nautobot_database_ready.connect(create_custom_fields_for_porthistory, sender=self)
signals.py
from nautobot.extras.choices import CustomFieldTypeChoices
def create_custom_fields_for_porthistory(sender, apps, **kwargs):
"""Create a custom field flag_porthistory for VLAN if it doesn't already exist."""
# Use apps.get_model to look up Nautobot core models
ContentType = apps.get_model("contenttypes", "ContentType")
CustomField = apps.get_model("extras", "CustomField")
VLAN = apps.get_model("ipam", "VLAN")
Interface = apps.get_model("dcim", "Interface")
# Create custom fields
cf_for_vlan, created = CustomField.objects.update_or_create(
name="flag-porthistory",
defaults={
"label": "Search MACs on ports in this VLAN",
"type": CustomFieldTypeChoices.TYPE_BOOLEAN,
},
)
cf_for_vlan.content_types.set([ContentType.objects.get_for_model(VLAN)])
cf_for_interface, created = CustomField.objects.update_or_create(
name="flag-ignore-mac",
defaults={
"label": "Ignore MACs on this port",
"type": CustomFieldTypeChoices.TYPE_BOOLEAN,
},
)
cf_for_interface.content_types.set([ContentType.objects.get_for_model(Interface)])
Now, when installing the plugin, new fields will be automatically added to the VLAN and Device.interface models. By checking the box on the VLAN editing page Search MACs on ports in this VLAN we thereby allow our job to look for MAC addresses in this VLAN.
So, we have a job and configured VLANs, which means we can fill the database with the necessary data. But how to see this data? The first way is already familiar to us – we use templates for interface pages and IP addresses. Thus, it is possible to extract information pointwise from the database and show it to the user:
The second way is to create a separate plugin page with the ability to display and search the received data. This path is more thorny than the first, but the results are more interesting.
6 components are involved in the preparation of the plugin page. They can be reduced in number by using HTML templates, but that’s a slightly different story.
So, in order:
navigation.py – Adds a line to the Plugins dropdown menu
urls.py – hangs on url (added line in step 1) handler
views.py – contains handlers (View). The handler collects data, output parameters (tables), form and filtering algorithms into one whole
tables.py – displays a table from the database
forms.py – forms an HTML form for filtering data in a table
filters.py – contains data filtering algorithms
navigation.py
from nautobot.extras.plugins import PluginMenuItem
menu_items = (
PluginMenuItem(
link = 'plugins:nautobot_porthistory_plugin:history', # A reverse compatible link to follow.
link_text="MAC and IP on switches ports", # Text to display to user.
),
)
urls.py
from django.urls import path
from nautobot_porthistory_plugin import views
urlpatterns = [
path('history/', views.PortHistoryView.as_view(), name="history"),
]
views.py
from django.shortcuts import render
from nautobot.core.views import generic
from nautobot_porthistory_plugin import models, tables, filters, forms
class PortHistoryView(generic.ObjectListView):
"""Показывает MAC и IP адреса на портах"""
queryset = models.MAConPorts.objects.all()
table = tables.PortHistoryTable
filterset = filters.PortHistoryFilterSet
filterset_form = forms.PortHistoryFilterForm
action_buttons = ()
tables.py
import django_tables2 as tables
from django_tables2.utils import A
from nautobot.utilities.tables import BaseTable, ToggleColumn
from nautobot_porthistory_plugin import models
class PortHistoryTable(BaseTable):
pk = ToggleColumn()
device = tables.Column(linkify=True)
interface = tables.LinkColumn(orderable=False)
vlan = tables.LinkColumn()
ipaddress = tables.Column(linkify=True, verbose_name="IPv4 Address")
class Meta(BaseTable.Meta): # pylint: disable=too-few-public-methods
"""Meta attributes."""
model = models.MAConPorts
fields = (
'pk',
'device',
'interface',
'vlan',
'mac',
'ipaddress',
'updated',
)
forms.py
from django import forms
from nautobot.dcim.models import Region, Site, Device
from nautobot.ipam.models import VLAN
from nautobot.utilities.forms import BootstrapMixin, DynamicModelMultipleChoiceField
from nautobot.extras.forms import CustomFieldFilterForm
from nautobot_porthistory_plugin.models import MAConPorts
class PortHistoryFilterForm(BootstrapMixin, forms.Form):
"""Filter form to filter searches for MAC."""
model = MAConPorts
field_order = ["q", "site", "device_id", "vlan"]
q = forms.CharField(required=False, label="Search MAC")
site = DynamicModelMultipleChoiceField(
queryset=Site.objects.all(),
to_field_name="slug",
required=False,
)
device_id = DynamicModelMultipleChoiceField(
queryset=Device.objects.all(),
required=False,
label="Device",
query_params={"site": "$site"},
)
vlan = DynamicModelMultipleChoiceField(
queryset=VLAN.objects.all(),
required=False,
label="VLAN",
query_params={"site": "$site"},
)
filters.py
import django_filters
from nautobot.dcim.models import Device
from nautobot.utilities.filters import BaseFilterSet, MultiValueCharFilter
from django.db.models import Q
from nautobot_porthistory_plugin.models import MAConPorts
class PortHistoryFilterSet(BaseFilterSet):
"""Filter for MAConPorts"""
q = django_filters.CharFilter(method="search", label="Search MAC")
site = MultiValueCharFilter(
method="filter_site",
field_name="pk",
label="site",
)
device_id = MultiValueCharFilter(
method="filter_device_id",
field_name="pk",
label="Device (ID)",
)
vlan = MultiValueCharFilter(
method="filter_vlan",
field_name="pk",
label="VLAN",
)
class Meta:
"""Meta attributes for filter."""
model = MAConPorts
fields = [
'vlan'
]
def search(self, queryset, mac, value):
if not value.strip():
return queryset
mac="".join(ch for ch in value if ch.isalnum())
mac=":".join(mac[i:i+2] for i in range(0,len(mac),2))
return queryset.filter(Q(mac__icontains=mac))
def filter_site(self, queryset, name, id_list):
if not id_list:
return queryset
return queryset.filter(Q(device__site__slug__in=id_list) )
def filter_device_id(self, queryset, name, id_list):
if not id_list:
return queryset
return queryset.filter(Q(device__id__in=id_list) )
def filter_vlan(self, queryset, name, id_list):
if not id_list:
return queryset
return queryset.filter(Q(vlan__id__in=id_list) )
Here is the plugin page. You can filter by site, by device, by VLAN. You can search for the MAC part, and the input format in the search field is absolutely not important, even through underscores, even with caps through one.
What conclusions did I draw for myself
Writing your own plugin is not as difficult as it seemed to me at the very beginning, when I was just learning about Netbox and Nautobot. Nautobot/Netbox is a great automation platform. With a minimum of effort, you can achieve excellent results in adapting Nautobot / Netbox to the realities of your organization.
That’s all, thanks for your attention.
Well, for those who are really interested in this topic and who have read to the end, the link to the plugin code is https://github.com/iontzev/nautobot-porthistory-plugin