Own plugin for Nautobot is not difficult at all

If someone is not in the know, Nautobot is a fork of Netbox, it is promoted by the NTC (Network to Code) network automation provider, widely known in narrow circles. Perhaps, rummaging through your memory, you will remember the not very beautiful story of the beginning of last year about overwriting the history of Netbox commits – it just concerned Nautobot and NTC. But this will not be about the rules of decency of the open source community, but about how to easily and naturally write your own plugin for someone who has chosen Nautobot, rather than Netbox, as a source of truth and automation platform. Although I am sure that the implementation of the plugin, which will be discussed next, could just as easily be done for Netbox.

Formulation of the problem

In our not very large network, there are ~ 50 branches, each of which has switches with a total number of ports from 300 to 500. In fact, there are more branches, but we will start with this number, because all of them have Cisco equipment installed. This is important, since our plugin will be single-vendor for now.

For each port of each switch, we need to know:

  1. when was the last time a connection was made to this port

  2. device with which MAC/IP address and in which VLAN is connected to this port

“Wait a minute, author,” you say, “I have already seen such functionality somewhere.” Absolutely, this is a switchmap (googled for the phrase cisco switchmap). Well, no one promised that there would be a completely new idea for the plugin in the article. But this idea fits very well with the concept of using Nautobot as an automation platform and SoT.

At first, I wanted to pay homage to the developers of switchmap and immortalize their creation in the name of my plugin. But in the end I settled on a more functional name – nautobot-porthistory-plugin (Sorry guys)

Stack

Well, everything is simple here. Since we are using Nautobot, we will write in Python and actively use Django modules. No external scripts, everything is inside Nautobot. We will collect data from network devices via SNMP. Switches and routers – Cisco, all set up in Nautobot, each device takes into account the primary address and all interfaces, for each branch the list of prefixes and VLANs is taken into account.

What about documentation?

Nautobot has great documentation. Still, because it is almost completely copied from Netbox. But all the changes and differences from Netbox are documented no less well, for this special thanks to the developers. The section on plugin development is quite large – https://nautobot.readthedocs.io/en/stable/plugins/development/

But only the first reading of the official tutorial did not give answers to many questions (however, as well as subsequent ones). Much knowledge had to be pulled out from the source code of Nautobot itself, as well as plugins, of which there are more and more.

So, let’s begin

In accordance with the task, we will divide the development of the plugin into two parts. In the first part, we will do something easier – we will teach the plugin to determine the last time the switch ports were used, store and display this information to users.

At the end of this step, our plugin is expected to do the following:

  • create a table in the database to store information about unused interfaces;

  • will create a job that will fill the created table with the frequency we need;

  • will change the switch information display page so that you can see which ports have not been used for more than the specified number of days.

While we act strictly according to the documentation

Let’s create a plugin skeleton on the server in the nautobot-plugin-porthistory directory (we work under the nautobot user).

Nautobot includes a command to help create the plugin directory: nautobot-server startplugin [app_name]

Team nautobot-server startplugin nautobot_porthistory_plugin will create the following set of folders and files:

.
└── nautobot_porthistory_plugin
    ├── __init__.py
    ├── migrations
    │   └── __init__.py
    ├── models.py
    ├── navigation.py
    ├── tests
    │   ├── __init__.py
    │   ├── test_models.py
    │   └── test_views.py
    ├── urls.py
    └── views.py

Well, there are files, let’s fill them with working code.

The package file __init__.py already contains most of the required options. All we need is to specify which config options are required, as well as the default plugin settings.

__init__.py
"""nautobot_porthistory_plugin Plugin Initilization."""

from nautobot.extras.plugins import PluginConfig

class NautobotPorthistoryPluginConfig(PluginConfig):
    """Plugin configuration for the nautobot_porthistory_plugin plugin."""

    name = "nautobot_porthistory_plugin"  # Raw plugin name; same as the plugin's source directory.
    verbose_name = "nautobot_porthistory_plugin"  # Human-friendly name for the plugin.
    base_url = "nautobot_porthistory_plugin"  # (Optional) Base path to use for plugin URLs. Defaulting to app_name.
    required_settings = []  # A list of any configuration parameters that must be defined by the user.
    min_version = "1.0.0"  # Minimum version of Nautobot with which the plugin is compatible.
    max_version = "1.999"  # Maximum version of Nautobot with which the plugin is compatible.
    default_settings = {}  # A dictionary of configuration parameters and their default values.
    caching_config = {}  # Plugin-specific cache configuration.

    # А вот и нужная нам конфигурация
    required_settings = ['switches_role_slug']
    default_settings = {
        'min_idle_days': 14,
        'snmp_community': 'public',
        'workers': 50,
    }

config = NautobotPorthistoryPluginConfig

Parameters that will be passed to the plugin:

  • switches_role_slug – The role by which we will filter the switches

  • min_idle_days – If the port is not used for less than a day, we are not interested in it

  • workers – the number of parallel asynchronous requests to the equipment

The model will be defined in models.py. Table UnusedPorts we will store the update time, the last time the interface was used, and the actual ID of the interface itself. As you can see, nothing complicated at all.

models.py
"""Model definition for nautobot_porthistory_plugin."""

from django.db import models

from nautobot.core.models import BaseModel
from nautobot.dcim.fields import MACAddressCharField

class UnusedPorts(BaseModel):
    # Дата/время последнего output на порту коммутатора 

    updated = models.DateTimeField(auto_now=True)
    last_output = models.DateTimeField()
    interface = models.ForeignKey(
        to="dcim.Interface",
        on_delete=models.CASCADE,
        blank=False,
    )
    
    def __str__(self):
        return f'{self.interface.name} - {self.last_output}'

Argument auto_now=True specifies that each time data is saved to the table, the field updated automatically updated to the current time

We don’t need the navigation.py, urls.py and views.py files for now, let’s leave them as they are. These three files will be needed in the next step, when we make a separate link to the plugin in the menu. But, as usual, there is a nuance. Imports in views.py “out of the box” refer to non-existent django modules, so in order not to catch an exception during installation, comment out the line from django.views.generic import views

So, we have a table in the database, but there is no data in it. Let’s create a job (job in Nautobot – like custom script and report in Netbox), which will collect information from the switches, process it and save it in our table.

Add a jobs.py file with a class (job) UnusedPortsUpdate

Its logic is simple:

  1. Generate a list of switches – use a filter on device_role.slug = switches_role_slug from config

  2. Asynchronously poll all switches via SNMP – device uptime

  3. Asynchronously poll all switches via SNMP – matching ifindex and port names

  4. Asynchronously poll all switches via SNMP – Last output on ports

  5. If Last output is negative, see if there is information on this port in the table. If missing – add the date of the last use equal to the date of loading (p. 2). If there is data, just update the check date

  6. If Last output converted to days is less than min_idle_days from the config, delete the port entry from the table. Otherwise, we create/update the record.

Just in case, we will add Site to the optional input parameters in order to be able to update information on a specific branch.

jobs.py code
jobs.py

from nautobot.dcim.models import Device, DeviceRole, Site, Interface
from nautobot.extras.jobs import Job, ObjectVar
from nautobot.extras.models import Status
from django.conf import settings

from nautobot_porthistory_plugin.models import UnusedPorts

import asyncio
import aiosnmp

from collections import defaultdict
from netutils.interface import canonical_interface_name
from datetime import datetime, timedelta

class UnusedPortsUpdate(Job):

    class Meta:
        name = "Обновление информации о неподключенных интерфейсах"

    site = ObjectVar(
        model=Site,
        label="БЮ",
        required=False
    )

    async def bulk_snmp(self, device, oid_list, community):
        oid_results = {}
        try:
            async with aiosnmp.Snmp(
                host=device,
                port=161,
                community=community,
                timeout=5,
                retries=3,
                max_repetitions=10,
            ) as snmp:
                oid_bulk_result = {}
                for oid in oid_list:
                    reply = await snmp.bulk_walk(oid)
                    for index in reply:
                        oid_bulk_result[index.oid] = index.value
                    oid_results[oid] = oid_bulk_result

                return (device, oid_results)

        except Exception as error:
            return (device, error)
        return (device, None)

    async def bulk_snmp_with_semaphore(self, semaphore, function, *args, **kwargs):
        async with semaphore:
            return await function(*args, **kwargs)

    async def async_bulk_snmp(self, devices, oid_list, community, workers):
        semaphore = asyncio.Semaphore(workers)
        coroutines = [
            self.bulk_snmp_with_semaphore(semaphore, self.bulk_snmp, device, oid_list, community)
            for device in devices
        ]
        result = []
        for future in asyncio.as_completed(coroutines):
            result.append(await future)
        return result

    def round_datetime(self, date):
        date_tuple = date.timetuple()
        return datetime(year=date_tuple.tm_year,
                        month=date_tuple.tm_mon,
                        day=date_tuple.tm_mon,
                        hour=date_tuple.tm_hour, minute=0, second=0, microsecond=0
                        )

    def run(self, data, commit):
        # запускать job могут только пользователи is_superuser
        if not self.request.user.is_superuser:
            self.log_info(message="Неавторизованный запуск")
            return

        PLUGIN_CFG = settings.PLUGINS_CONFIG['nautobot_porthistory_plugin']
        COMMUNITY = PLUGIN_CFG['snmp_community']
        MIN_IDLE_DAYS = PLUGIN_CFG.get('min_idle_days', 14)
        SWITCHES_ROLE_SLUG = PLUGIN_CFG['switches_role_slug']
        WORKERS = PLUGIN_CFG['workers']
        STATUS_ACTIVE = Status.objects.get(slug='active')

        # сгенерируем справочник устройств
        devices = [] #этот список передадим в модуль snmp
        device_dict = defaultdict(dict)
        device_role = DeviceRole.objects.filter(slug__in=SWITCHES_ROLE_SLUG)
        if data['site']:
            nb_devices = Device.objects.filter(site=data['site'], device_role__in=device_role, status=STATUS_ACTIVE)
        else:
            nb_devices = Device.objects.filter(device_role__in=device_role, status=STATUS_ACTIVE)
            
        for nb_device in nb_devices:
            if nb_device.platform and nb_device.platform.napalm_driver and nb_device.platform.napalm_driver == 'cisco_iosxe' and nb_device.primary_ip4:
                primary_ip = str(nb_device.primary_ip4).split('/')[0]
                devices.append(primary_ip)
                device_dict[primary_ip]['device'] = nb_device
                device_dict[primary_ip]['interfaces'] = {}
                device_dict[primary_ip]['ifindexes'] = {}
                device_interfaces = Interface.objects.filter(device_id=nb_device)
                for intf in device_interfaces:
                    device_dict[primary_ip]['interfaces'][intf.name] = [intf]

        # получим uptime оборудования по SNMP (в секундах)
        # и занесем эту информацию в справочник
        oid_list = ['.1.3.6.1.6.3.10.2.1.3']
        results = asyncio.run(self.async_bulk_snmp(devices, oid_list, COMMUNITY, WORKERS))
        for device_ip, device_result in results:
            if type(device_result) != dict:
                self.log_warning(obj=device_dict[device_ip]['device'],message=f'не удалось получить информацию по SNMP - {device_result}')
                continue
            for oid, oid_result in device_result.items():
                for uptime in oid_result.values():
                    device_dict[device_ip]['uptime'] = uptime
                    boottime = datetime.now() - timedelta(seconds=uptime)
                    device_dict[device_ip]['boottime'] = boottime
    
        # получим названия интерфейсов и их индексы с оборудования по SNMP
        # и занесем эту информацию в справочник
        oid_list = ['.1.3.6.1.2.1.31.1.1.1.1']
        results = asyncio.run(self.async_bulk_snmp(devices, oid_list, COMMUNITY, WORKERS))
        for device_ip, device_result in results:
            if type(device_result) != dict or 'uptime' not in device_dict[device_ip]:
                continue
            for oid, oid_result in device_result.items():
                for index, index_result in oid_result.items():
                    ifindex = index.split('.')[-1]
                    canonical_intf_name = canonical_interface_name(index_result.decode("utf-8"))
                    if canonical_intf_name in device_dict[device_ip]['interfaces']:
                        device_dict[device_ip]['ifindexes'][ifindex] = canonical_intf_name

        # получим время последнего output по SNMP
        oid_list = ['.1.3.6.1.4.1.9.2.2.1.1.4']
        results = asyncio.run(self.async_bulk_snmp(devices, oid_list, COMMUNITY, WORKERS))
        output=""
        for device_ip, device_result in results:
            if type(device_result) != dict or 'uptime' not in device_dict[device_ip]:
                continue
            nb_device = device_dict[device_ip]['device']
            boottime = device_dict[device_ip]['boottime']
            uptime = device_dict[device_ip]['uptime']
            output += f'{nb_device.name} - power on {boottime}\n'
            unused_port_count = 0
            for oid, oid_result in device_result.items():
                for index, time_from_last_output in oid_result.items():
                    ifindex = index.split('.')[-1]
                    if ifindex in device_dict[device_ip]['ifindexes']:
                        intf_name = device_dict[device_ip]['ifindexes'][ifindex]
                        nb_interface = device_dict[device_ip]['interfaces'][intf_name][0]
                        if time_from_last_output < 0 or time_from_last_output / 1000 > uptime - 300:
                            unused_port_count += 1
                            unused_port, created = UnusedPorts.objects.get_or_create(
                                interface=nb_interface,
                                defaults={
                                    'last_output': boottime
                                }
                            )
                            unused_port.save()
                        else:
                            last_output = datetime.now() - timedelta(seconds=round(time_from_last_output/1000))
                            if 1000 * 60 * 60 * 24 * MIN_IDLE_DAYS > time_from_last_output:
                                # прошло меньше MIN_IDLE_DAYS дней
                                UnusedPorts.objects.filter(interface=nb_interface).delete()
                            else:
                                unused_port_count += 1
                                unused_port, created = UnusedPorts.objects.get_or_create(
                                    interface=nb_interface,
                                    defaults={
                                        'last_output': last_output
                                    }
                                )
                                if not created:
                                    unused_port.last_output = last_output
                                    unused_port.save()
            output += f'неиспользуемых в течении {MIN_IDLE_DAYS} дн. портов - {unused_port_count}\n'

        return output

jobs = [UnusedPortsUpdate]

In the future, it will be possible to schedule the regular launch of the job in order to always have an up-to-date list of unused ports. Let me remind you that Nautobot has a built-in job scheduler for this. You just need to specify the frequency when starting the job.

Now let’s teach the plugin to display the received data on the switch page. To do this, add the template_content.py and templates/unused_ports.html files to the plugin root

The names speak for themselves – prepare the template content and render the HTML block. The rendered block will be placed on the Device page, on the right side.

template_content.py

from nautobot.extras.plugins import PluginTemplateExtension
from django.conf import settings

from .models import UnusedPorts

class DeviceUnusedPorts(PluginTemplateExtension):
    """Template extension to display unused ports on the right side of the page."""

    model="dcim.device"

    def right_page(self):
        PLUGIN_CFG = settings.PLUGINS_CONFIG['nautobot_porthistory_plugin']
        SWITCHES_ROLE_SLUG = PLUGIN_CFG['switches_role_slug']
        MIN_IDLE_DAYS = PLUGIN_CFG.get('min_idle_days', 14)
        device = self.context['object']

        if device.device_role.slug in SWITCHES_ROLE_SLUG:
            device_intefaces = device.interfaces.values_list("id", flat=True)
            unused_ports = UnusedPorts.objects.filter(interface_id__in=device_intefaces)
            unused_ports_with_delta = []
            for port in unused_ports:
                unused_ports_with_delta.append({
                    'interface_name': port.interface.name,
                    'last_output': port.last_output.strftime("%d.%m.%Y %H:%M"),
                    'updated': port.updated.strftime("%d.%m.%Y %H:%M"),
                    'delta': str(port.updated - port.last_output).split()[0]
                })
            return self.render('unused_ ports.html', extra_context={
                'unused_ports': unused_ports_with_delta,
                'min_idle_days': MIN_IDLE_DAYS
            })
        else:
            return ''

template_extensions = [DeviceUnusedPorts]
templates/unused_ports.html

<div class="panel panel-default">
	<div class="panel-heading"><strong>Неиспользуемые порты (в течение последних {{ min_idle_days }} дн.)</strong></div>
	<table class="table table-hover panel-body attr-table">
		<tr>
			<td>Порт</td>
			<td>Время последней активности</td>
			<td>Обновлено (UTC)</td>
		</tr>
		{% for item in unused_ports %}
		<tr>
			<td>{{ item.interface_name }}</td>
			<td>{{ item.last_output }} (прошло ~{{ item.delta }} дн.)</td>
			<td>{{ item.updated }}</td>
		</tr>
		{% endfor %}
	</table>
</div>

It remains to mix all the ingredients and try what happened:

  1. Let’s create migrations – a standard Django mechanism for ensuring database versioning in the Nautobot wrapper.
    nautobot-server makemigrations (run from plugin folder)

  2. add setup.py and MANIFEST.in files

  3. install plugin simple pip3 install . from plugin directory

  4. enable the plugin in the nautobot configuration file and specify the plugin parameters there
    PLUGINS = [
    'nautobot_porthistory_plugin',
    ]
    PLUGINS_CONFIG = {
    'nautobot_porthistory_plugin': {
    'switches_role_slug': ['Access-switch'],
    'min_idle_days': 14,
    }
    }

  5. run post-installation script nautobot-server post_upgrade. It will update the database and copy the HTML files to the appropriate folder.

  6. Let’s restart the services: sudo systemctl restart nautobot nautobot-worker

Ready! You can run the job and check the results of its execution:

Unused Switch Ports
Unused Switch Ports

The full plugin code created at this stage can be peeped at the link – https://github.com/iontzev/nautobot-porthistory-plugin/tree/part_1

Stage two – pumping the plugin

The second part of the implementation will be more difficult. The more interesting, because it will allow you to understand what else the plugins in Nautobot can do. First, let’s add another model to the database – a table in which we will store MAC and IP addresses associated with the interface. The way is already familiar – update models.py and create a new migration nautobot-server makemigrations

from nautobot.dcim.fields import MACAddressCharField

class MAConPorts(BaseModel):
    # MAC и IP на порту коммутатора 

    updated = models.DateTimeField(auto_now=True)
    mac = MACAddressCharField(blank=False, verbose_name="MAC Address")
    vlan = models.ForeignKey(
        to="ipam.VLAN",
        on_delete=models.CASCADE,
        blank=False,
    )
    ipaddress = models.ForeignKey(
        to="ipam.IPAddress",
        on_delete=models.SET_NULL,
        default=None,
        blank=True,
        null=True,
    )
    interface = models.ForeignKey(
        to="dcim.Interface",
        on_delete=models.CASCADE,
        blank=False,
    )
    device = models.ForeignKey(
        to="dcim.Device",
        on_delete=models.CASCADE,
        blank=False,
    )
    
    def __str__(self):
        return f'{self.intervace} - VLAN {seld.vlan.vid} MAC {self.mac}'

    class Meta:
        verbose_name_plural="MAC and IP on switches ports"

Let’s add one more class to jobs.py – it’s a job. The algorithm of its work is relatively simple:

  • asynchronously poll via SNMP all devices with the role device_role.slug = switches_role_slug from config – get mac address table and match MAC to port names.

  • asynchronously poll via SNMP all devices with the role device_role.slug = routers_role_slug from config – get ARP table. Let’s not forget that the config also needs to be corrected – it is located in __init__.py

  • try to get hostname by IP

  • if the IP address is not listed in Nautobot, add

  • update MAC and IP binding information to device interfaces

The update is designed in such a way that if the MAC on the port physically disappears, we can find it in the connection history if nothing else appeared on this switch port.

New job in jobs.py
class MAConPortsUpdate(Job):

    class Meta:
        name = "Обновление информации о подключенных устройствах"

    site = ObjectVar(
        model=Site,
        label="БЮ",
        required=False
    )

    async def bulk_snmp(self, device, oid_list, community):
        oid_results = {}
        try:
            async with aiosnmp.Snmp(
                host=device,
                port=161,
                community=community,
                timeout=5,
                retries=3,
                max_repetitions=10,
            ) as snmp:
                oid_bulk_result = {}
                for oid in oid_list:
                    reply = await snmp.bulk_walk(oid)
                    for index in reply:
                        oid_bulk_result[index.oid] = index.value
                    oid_results[oid] = oid_bulk_result

                return (device, oid_results)

        except Exception as error:
            return (device, error)
        return (device, None)

    async def bulk_snmp_with_semaphore(self, semaphore, function, *args, **kwargs):
        async with semaphore:
            return await function(*args, **kwargs)

    async def async_bulk_snmp(self, devices, oid_list, community, workers):
        semaphore = asyncio.Semaphore(workers)
        coroutines = [
            self.bulk_snmp_with_semaphore(semaphore, self.bulk_snmp, device, oid_list, community)
            for device in devices
        ]
        result = []
        for future in asyncio.as_completed(coroutines):
            result.append(await future)
        return result

    def run(self, data, commit):
        # запускать job могут только пользователи is_superuser
        if not self.request.user.is_superuser:
            self.log_info(message="Неавторизованный запуск")
            return

        PLUGIN_CFG = settings.PLUGINS_CONFIG['nautobot_porthistory_plugin']
        COMMUNITY = PLUGIN_CFG['snmp_community']
        SWITCHES_ROLE_SLUG = PLUGIN_CFG['switches_role_slug']
        ROUTERS_ROLE_SLUG = PLUGIN_CFG['routers_role_slug']
        WORKERS = PLUGIN_CFG['workers']
        STATUS_ACTIVE = Status.objects.get(slug='active')
        STATUS_STATIC = Status.objects.get(slug='static')
        STATUS_DHCP = Status.objects.get(slug='dhcp')

        device_role = DeviceRole.objects.filter(slug__in=SWITCHES_ROLE_SLUG)

        devices = defaultdict(dict)
        devices_list = []
        vlans = defaultdict(list)

        # построим список всех связей, чтобы потом исключить из результатов линки между свичами
        cable_set = defaultdict(set)
        all_cables = Cable.objects.all()
        for cable in all_cables:
            if cable.termination_a_type == ContentType.objects.get(app_label="dcim", model="interface"):
                if not data['site'] or cable.termination_a.device.site == data['site']:
                    cable_set[cable.termination_a.device.name].add(cable.termination_a.name)
            if cable.termination_b_type == ContentType.objects.get(app_label="dcim", model="interface"):
                if not data['site'] or cable.termination_b.device.site == data['site']:
                    cable_set[cable.termination_b.device.name].add(cable.termination_b.name)

        # сгенерируем справочник вланов с разбивкой по сайтам
        vlans_by_site = defaultdict(list)
        if data['site']:
            nb_vlans = VLAN.objects.filter(site=data['site'], status=STATUS_ACTIVE, _custom_field_data={'flag-porthistory':True})
        else:
            nb_vlans = VLAN.objects.filter(status=STATUS_ACTIVE, _custom_field_data={'flag-porthistory':True})
        for nb_vlan in nb_vlans:
            vlans_by_site[nb_vlan.site.name].append(nb_vlan.vid)

        # сгенерируем справочник устройств
        for site in vlans_by_site:
            site_id = Site.objects.get(name=site)
            nb_devices_in_site = Device.objects.filter(
                site=site_id, 
                device_role__in=device_role, 
                status=STATUS_ACTIVE,
            )
            for nb_device in nb_devices_in_site:
                if (nb_device.platform and 
                            nb_device.platform.napalm_driver and 
                            nb_device.platform.napalm_driver == 'cisco_iosxe' and 
                            nb_device.primary_ip4):

                    primary_ip = str(nb_device.primary_ip4).split('/')[0]
                    devices_list.append(primary_ip)
                    device = devices[primary_ip] = {}
                    device['device'] = nb_device
                    device['site'] = nb_device.site
                    device['interfaces'] = {}
                    device['ifindexes'] = {}
                    device['bridge_ports'] = {}
                    device['vlans'] = vlans_by_site[site]
                    for intf in Interface.objects.filter(device_id=nb_device):
                        device['interfaces'][intf.name] = intf
                    for vlan in vlans_by_site[site]:
                        vlans[vlan].append(primary_ip)

        # получим названия интерфейсов и их индексы с оборудования по SNMP
        oid_list = ['.1.3.6.1.2.1.31.1.1.1.1']
        results = asyncio.run(self.async_bulk_snmp(devices_list, oid_list, COMMUNITY, WORKERS))
        for device_ip, device_result in results:
            if type(device_result) != dict:
                self.log_warning(obj=devices[device_ip]['device'],message=f'не удалось получить информацию по SNMP')
                del devices[device_ip]
                devices_list.remove(device_ip)
                continue
            for oid, oid_result in device_result.items():
                for index, index_result in oid_result.items():
                    ifindex = index.split('.')[-1]
                    canonical_intf_name = canonical_interface_name(index_result.decode("utf-8"))
                    if canonical_intf_name in devices[device_ip]['interfaces']:
                        devices[device_ip]['ifindexes'][ifindex] = canonical_intf_name

        # пройдемся по списку вланов и получим с устройства таблицу MAC адресов для каждого влана
        # MAC адреса в десятичном формате

        port_mac_relation = defaultdict(list)

        for vlan, devices_dict in vlans.items():
            self.log_info(message=f'Получаем информацию по VLAN {vlan}')
            community_with_vlan = f'{COMMUNITY}@{vlan}'
            devices_list = [device for device in devices_dict if device in devices_list]

            # получим bridge ports с оборудования по SNMP (зависит от VLAN)
            oid_list = ['.1.3.6.1.2.1.17.1.4.1.2']
            results = asyncio.run(self.async_bulk_snmp(devices_list, oid_list, community_with_vlan, WORKERS))
            for device_ip, device_result in results:
                if type(device_result) != dict:
                    # скорее всего, такого VLAN нет на этом устройстве
                    continue
                for oid, oid_result in device_result.items():
                    for index, index_result in oid_result.items():
                        bridge_port = index.split('.')[-1]
                        ifindex = str(index_result)
                        if ifindex in devices[device_ip]['ifindexes']:
                            ifname = devices[device_ip]['ifindexes'][ifindex]
                            nb_interface = devices[device_ip]['interfaces'][ifname]
                            devices[device_ip]['bridge_ports'][bridge_port] = nb_interface
            else:
                oid_list = ['.1.3.6.1.2.1.17.4.3.1.2']

                results = asyncio.run(self.async_bulk_snmp(devices_list, oid_list, community_with_vlan, WORKERS))
                for device_ip, device_result in results:
                    nb_device = devices[device_ip]['device']
                    nb_vlan = VLAN.objects.get(vid=vlan, site_id=nb_device.site.id)
                    if type(device_result) != dict:
                        continue
                    for oid, oid_result in device_result.items():
                        for mac_dec, bridge_port in oid_result.items():
                            if str(bridge_port) in devices[device_ip]['bridge_ports']:
                                if (devices[device_ip]['bridge_ports'][str(bridge_port)].name not in cable_set[nb_device.name]
                                        and not devices[device_ip]['bridge_ports'][str(bridge_port)]._custom_field_data.get('flag-ignore-mac')):
                                    # преобразуем MAC из десятичного формата в шестнадцатеричный
                                    mac_hex = ''.join(['{0:x}'.format(int(i)).zfill(2) for i in mac_dec.split('.')[-6:]]).upper()
                                    port_mac_relation[devices[device_ip]['bridge_ports'][str(bridge_port)].id].append({
                                        'vlan': nb_vlan,
                                        'mac': mac_hex,
                                        })

        # подготовим список L3 устройств
        routers = defaultdict(dict)
        routers_list = []
        device_role = DeviceRole.objects.filter(slug__in=ROUTERS_ROLE_SLUG)
        for site in vlans_by_site:
            site_id = Site.objects.get(name=site)
            nb_devices_in_site = Device.objects.filter(
                site=site_id, 
                device_role__in=device_role, 
                status=STATUS_ACTIVE,
            )
            for nb_device in nb_devices_in_site:
                if (nb_device.platform and 
                            nb_device.platform.napalm_driver and 
                            nb_device.platform.napalm_driver == 'cisco_iosxe' and 
                            nb_device.primary_ip4):

                    primary_ip = str(nb_device.primary_ip4).split('/')[0]
                    routers_list.append(primary_ip)
                    router = routers[primary_ip] = {}
                    router['site'] = nb_device.site.name
                    router['device'] = nb_device

        arp = defaultdict(dict)
        # получим ARP-таблицу с оборудования по SNMP
        oid_list = ['.1.3.6.1.2.1.3.1.1.2']
        results = asyncio.run(self.async_bulk_snmp(routers_list, oid_list, COMMUNITY, WORKERS))
        for device_ip, device_result in results:
            site = routers[device_ip]['site']
            arp[site] = defaultdict(list)
            if type(device_result) != dict:
                self.log_warning(obj=routers[device_ip]['device'],message=f'не удалось получить информацию по SNMP')
                continue
            for oid, oid_result in device_result.items():
                for index, index_result in oid_result.items():
                    snmp_address=".".join(index.split('.')[-4:])
                    snmp_mac="".join(["{0:x}".format(int(i)).zfill(2) for i in index_result]).upper()
                    arp[site][snmp_mac].append(snmp_address)

        output=""

        for device in devices.values():
            nb_device = device['device']
            site = nb_device.site.name
            output += f'device {nb_device} :'
            mac_on_device = ip_on_device = name_on_device = 0
            for intf in device['interfaces'].values():
                if len(port_mac_relation[intf.id]) > 0:
                    MAConPorts.objects.filter(interface=intf).delete()
                for vlan_and_mac in port_mac_relation[intf.id]:
                    mac_on_device += 1
                    nb_prefixes = Prefix.objects.filter(vlan_id=vlan_and_mac['vlan'].id)
                    addresses = arp[site].get(vlan_and_mac['mac'])
                    address_with_prefix = ''
                    if nb_prefixes and addresses:
                        for nb_prefix in nb_prefixes:
                            for address in addresses:
                                if IPv4Address(address) in IPv4Network(str(nb_prefix)):
                                    prefixlen = str(nb_prefix).split('/')[-1]
                                    address_with_prefix = f'{address}/{prefixlen}'
                                    break
                            else:
                                continue
                            break
                    if address_with_prefix:
                        ip_on_device += 1
                        try:
                            hostname, aliaslist, ipaddrlist  = socket.gethostbyaddr(address)
                            name_on_device += 1
                        except:
                            hostname=""
                        nb_address, created = IPAddress.objects.get_or_create(
                            address=address_with_prefix,
                            vrf=nb_prefix.vrf,
                            defaults={
                                'status': STATUS_STATIC,
                                'dns_name': hostname
                            }
                        )
                        if created:
                            self.log_success(obj=nb_address, message=f'Добавлен IP адрес {hostname}')
                        elif nb_address.status != STATUS_DHCP and hostname and nb_address.dns_name != hostname:
                            old_hostname = nb_address.dns_name
                            nb_address.dns_name = hostname
                            nb_address.save()
                            self.log_success(obj=nb_address, message=f'Обновлено DNS name "{old_hostname}" -> "{hostname}"')
                    else:
                        nb_address = None
                    mac, created = MAConPorts.objects.get_or_create(
                        vlan=vlan_and_mac['vlan'],
                        mac=vlan_and_mac['mac'],
                        defaults={
                            'interface': intf,
                            'device': nb_device,
                            'ipaddress': nb_address,
                        }
                    )
                    if not created:
                        updated = False
                        if nb_address and mac.ipaddress != nb_address:
                            self.log_info(obj=nb_address, message=f'Устройство с MAC {mac.mac} поменяло IP {mac.ipaddress} -> {nb_address}')
                            mac.ipaddress = nb_address
                            updated = True
                        if mac.interface != intf:
                            self.log_info(obj=intf, message=f'MAC {mac.mac} переехал с порта "{mac.interface}"')
                            mac.interface = intf
                            mac.device = nb_device
                            updated = True
                        if updated:
                            mac.save()

            output += f" MAC count - {mac_on_device}, IP count - {ip_on_device}, resolved to hostname - {name_on_device}\n"

        return output

Important! The peculiarity of obtaining a MAC table from Cisco switches via SNMP is that it is necessary to indicate in the community line the VLAN number in which MAC addresses live.

We also take into account that not all VLANs need to be known. For example, if there are wireless clients in a VLAN, it makes no sense to track the MAC in them. The same goes for ports. So we need a filtering mechanism for VLANs and switch ports. Custom_fields is great here, but using the names of the fields that the user needs to add in scripts means hoping for a small miracle.

So let’s be realistic, add custom_fileds when installing the plugin. The Django signals functionality will help us with this, again in the Nautobot wrapper.

Let’s add a signal call to __init__.py, and the signal itself to signals.py.

__init__.py
class NautobotPorthistoryPluginConfig(PluginConfig):
  ............
		def ready(self):
				super().ready()
    		nautobot_database_ready.connect(create_custom_fields_for_porthistory, sender=self)
signals.py
from nautobot.extras.choices import CustomFieldTypeChoices

def create_custom_fields_for_porthistory(sender, apps, **kwargs):
    """Create a custom field flag_porthistory for VLAN if it doesn't already exist."""
    # Use apps.get_model to look up Nautobot core models
    ContentType = apps.get_model("contenttypes", "ContentType")
    CustomField = apps.get_model("extras", "CustomField")
    VLAN = apps.get_model("ipam", "VLAN")
    Interface = apps.get_model("dcim", "Interface")

    # Create custom fields
    cf_for_vlan, created = CustomField.objects.update_or_create(
        name="flag-porthistory",
        defaults={
            "label": "Search MACs on ports in this VLAN",
            "type": CustomFieldTypeChoices.TYPE_BOOLEAN,
        },
    )
    cf_for_vlan.content_types.set([ContentType.objects.get_for_model(VLAN)])
    cf_for_interface, created = CustomField.objects.update_or_create(
        name="flag-ignore-mac",
        defaults={
            "label": "Ignore MACs on this port",
            "type": CustomFieldTypeChoices.TYPE_BOOLEAN,
        },
    )
    cf_for_interface.content_types.set([ContentType.objects.get_for_model(Interface)])

Now, when installing the plugin, new fields will be automatically added to the VLAN and Device.interface models. By checking the box on the VLAN editing page Search MACs on ports in this VLAN we thereby allow our job to look for MAC addresses in this VLAN.

So, we have a job and configured VLANs, which means we can fill the database with the necessary data. But how to see this data? The first way is already familiar to us – we use templates for interface pages and IP addresses. Thus, it is possible to extract information pointwise from the database and show it to the user:

MAC addresses on a specific switch interface
MAC addresses on a specific switch interface

The second way is to create a separate plugin page with the ability to display and search the received data. This path is more thorny than the first, but the results are more interesting.

6 components are involved in the preparation of the plugin page. They can be reduced in number by using HTML templates, but that’s a slightly different story.

So, in order:

  1. navigation.py – Adds a line to the Plugins dropdown menu

  2. urls.py – hangs on url (added line in step 1) handler

  3. views.py – contains handlers (View). The handler collects data, output parameters (tables), form and filtering algorithms into one whole

  4. tables.py – displays a table from the database

  5. forms.py – forms an HTML form for filtering data in a table

  6. filters.py – contains data filtering algorithms

navigation.py
from nautobot.extras.plugins import PluginMenuItem

menu_items = (
    PluginMenuItem(
        link = 'plugins:nautobot_porthistory_plugin:history',  # A reverse compatible link to follow.
        link_text="MAC and IP on switches ports",  # Text to display to user.
    ),
)

urls.py
from django.urls import path

from nautobot_porthistory_plugin import views

urlpatterns = [
    path('history/', views.PortHistoryView.as_view(), name="history"),
]
views.py
from django.shortcuts import render
from nautobot.core.views import generic

from nautobot_porthistory_plugin import models, tables, filters, forms

class PortHistoryView(generic.ObjectListView):
    """Показывает MAC и IP адреса на портах"""

    queryset = models.MAConPorts.objects.all()
    table = tables.PortHistoryTable
    filterset = filters.PortHistoryFilterSet
    filterset_form = forms.PortHistoryFilterForm
    action_buttons = ()
tables.py
import django_tables2 as tables
from django_tables2.utils import A
from nautobot.utilities.tables import BaseTable, ToggleColumn

from nautobot_porthistory_plugin import models

class PortHistoryTable(BaseTable):
    pk = ToggleColumn()
    device = tables.Column(linkify=True)
    interface = tables.LinkColumn(orderable=False)
    vlan = tables.LinkColumn()
    ipaddress = tables.Column(linkify=True, verbose_name="IPv4 Address")

    class Meta(BaseTable.Meta):  # pylint: disable=too-few-public-methods
        """Meta attributes."""

        model = models.MAConPorts
        fields = (
            'pk',
            'device',
            'interface',
            'vlan',
            'mac',
            'ipaddress',
            'updated',
        )
forms.py
from django import forms

from nautobot.dcim.models import Region, Site, Device
from nautobot.ipam.models import VLAN
from nautobot.utilities.forms import BootstrapMixin, DynamicModelMultipleChoiceField
from nautobot.extras.forms import CustomFieldFilterForm

from nautobot_porthistory_plugin.models import MAConPorts

class PortHistoryFilterForm(BootstrapMixin, forms.Form):
    """Filter form to filter searches for MAC."""

    model = MAConPorts
    field_order = ["q", "site", "device_id", "vlan"]
    q = forms.CharField(required=False, label="Search MAC")
    site = DynamicModelMultipleChoiceField(
        queryset=Site.objects.all(),
        to_field_name="slug",
        required=False,
    )
    device_id = DynamicModelMultipleChoiceField(
        queryset=Device.objects.all(),
        required=False,
        label="Device",
        query_params={"site": "$site"},
    )
    vlan = DynamicModelMultipleChoiceField(
        queryset=VLAN.objects.all(),
        required=False,
        label="VLAN",
        query_params={"site": "$site"},
    )
filters.py
import django_filters
from nautobot.dcim.models import Device
from nautobot.utilities.filters import BaseFilterSet, MultiValueCharFilter
from django.db.models import Q

from nautobot_porthistory_plugin.models import MAConPorts

class PortHistoryFilterSet(BaseFilterSet):
    """Filter for MAConPorts"""

    q = django_filters.CharFilter(method="search", label="Search MAC")

    site = MultiValueCharFilter(
        method="filter_site",
        field_name="pk",
        label="site",
    )
    device_id = MultiValueCharFilter(
        method="filter_device_id",
        field_name="pk",
        label="Device (ID)",
    )
    vlan = MultiValueCharFilter(
        method="filter_vlan",
        field_name="pk",
        label="VLAN",
    )

    class Meta:
        """Meta attributes for filter."""

        model = MAConPorts

        fields = [
            'vlan'
        ]

    def search(self, queryset, mac, value):
        if not value.strip():
            return queryset
        mac="".join(ch for ch in value if ch.isalnum())
        mac=":".join(mac[i:i+2] for i in range(0,len(mac),2))
        return queryset.filter(Q(mac__icontains=mac))

    def filter_site(self, queryset, name, id_list):
        if not id_list:
            return queryset
        return queryset.filter(Q(device__site__slug__in=id_list) )

    def filter_device_id(self, queryset, name, id_list):
        if not id_list:
            return queryset
        return queryset.filter(Q(device__id__in=id_list) )

    def filter_vlan(self, queryset, name, id_list):
        if not id_list:
            return queryset
        return queryset.filter(Q(vlan__id__in=id_list) )

Here is the plugin page. You can filter by site, by device, by VLAN. You can search for the MAC part, and the input format in the search field is absolutely not important, even through underscores, even with caps through one.

What conclusions did I draw for myself

Writing your own plugin is not as difficult as it seemed to me at the very beginning, when I was just learning about Netbox and Nautobot. Nautobot/Netbox is a great automation platform. With a minimum of effort, you can achieve excellent results in adapting Nautobot / Netbox to the realities of your organization.

That’s all, thanks for your attention.

Well, for those who are really interested in this topic and who have read to the end, the link to the plugin code is https://github.com/iontzev/nautobot-porthistory-plugin

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *