Skip to content

Netbox Worker

Netbox Worker Inventory Reference¤

Sample Netbox Worker Inventory¤
service: netbox
broker_endpoint: "tcp://127.0.0.1:5555"
instances:
  prod:
    default: True
    url: "http://192.168.4.130:8000/"
    token: "0123456789abcdef0123456789abcdef01234567"
    ssl_verify: False
  dev:
    url: "http://192.168.4.131:8000/"
    token: "0123456789abcdef0123456789abcdef01234567"
    ssl_verify: False
  preprod:
    url: "http://192.168.4.132:8000/"
    token: "0123456789abcdef0123456789abcdef01234567"
    ssl_verify: False
Sample Nornir Worker Netbox Inventory¤
netbox:
  retry: 3
  retry_interval: 1
  instance: prod
  interfaces:
    ip_addresses: True
    inventory_items: True
  connections:
    cables: True
    circuits: True
  nbdata: True
  primary_ip: "ipv4"
  devices:
    - fceos4
    - fceos5
    - fceos8
    - ceos1
  filters: 
    - q: fceos3
    - manufacturer: cisco
      platform: cisco_xr

NetboxWorker(broker, service, worker_name, exit_event=None, init_done_event=None, log_level='WARNING', log_queue=None) ¤

Bases: NFPWorker

Parameters:

Name Type Description Default
broker

broker URL to connect to

required
service

name of the service with worker belongs to

required
worker_name

name of this worker

required
exit_event

if set, worker need to stop/exit

None
init_done_event

event to set when worker done initializing

None
log_keve

logging level of this worker

required
Source code in norfab\workers\netbox_worker.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def __init__(
    self,
    broker,
    service,
    worker_name,
    exit_event=None,
    init_done_event=None,
    log_level="WARNING",
    log_queue: object = None,
):
    super().__init__(broker, service, worker_name, exit_event, log_level, log_queue)
    self.init_done_event = init_done_event

    # get inventory from broker
    self.inventory = self.load_inventory()
    if not self.inventory:
        log.critical(
            f"{self.name} - Broker {self.broker} returned no inventory for {self.name}, killing myself..."
        )
        self.destroy()

    assert self.inventory.get(
        "instances"
    ), f"{self.name} - inventory has no Netbox instances"

    # extract parameters
    self.netbox_connect_timeout = self.inventory.get("netbox_connect_timeout", 10)
    self.netbox_read_timeout = self.inventory.get("netbox_read_timeout", 300)

    # find default instance
    for name, params in self.inventory["instances"].items():
        if params.get("default") is True:
            self.default_instance = name
            break
    else:
        self.default_instance = name

    # check Netbox compatibility
    self._verify_compatibility()

    self.init_done_event.set()
    log.info(f"{self.name} - Started")

graphql(instance=None, dry_run=False, obj=None, filters=None, fields=None, queries=None, query_string=None) ¤

Function to query Netbox v4 GraphQL API

Parameters:

Name Type Description Default
instance str

Netbox instance name

None
dry_run bool

only return query content, do not run it

False
Source code in norfab\workers\netbox_worker.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
def graphql(
    self,
    instance: str = None,
    dry_run: bool = False,
    obj: dict = None,
    filters: dict = None,
    fields: list = None,
    queries: dict = None,
    query_string: str = None,
) -> Result:
    """
    Function to query Netbox v4 GraphQL API

    :param instance: Netbox instance name
    :param dry_run: only return query content, do not run it
    """
    nb_params = self._get_instance_params(instance)
    ret = Result(task=f"{self.name}:graphql")

    # form graphql query(ies) payload
    if queries:
        queries_list = []
        for alias, query_data in queries.items():
            query_data["alias"] = alias
            if self.nb_version[0] == 4:
                queries_list.append(_form_query_v4(**query_data))
            elif self.nb_version[0] == 3:
                queries_list.append(_form_query_v3(**query_data))
        queries_strings = "    ".join(queries_list)
        query = f"query {{{queries_strings}}}"
    elif obj and filters and fields:
        if self.nb_version[0] == 4:
            query = _form_query_v4(obj, filters, fields)
        elif self.nb_version[0] == 3:
            query = _form_query_v3(obj, filters, fields)
        query = f"query {{{query}}}"
    elif query_string:
        query = query_string
    else:
        raise RuntimeError(
            f"{self.name} - graphql method expects quieries argument or obj, filters, "
            f"fields arguments or query_string argument provided"
        )
    payload = json.dumps({"query": query})

    # form and return dry run response
    if dry_run:
        ret.result = {
            "url": f"{nb_params['url']}/graphql/",
            "data": payload,
            "verify": nb_params.get("ssl_verify", True),
            "headers": {
                "Content-Type": "application/json",
                "Accept": "application/json",
                "Authorization": f"Token ...{nb_params['token'][-6:]}",
            },
        }
        return ret

    # send request to Netbox GraphQL API
    log.debug(
        f"{self.name} - sending GraphQL query '{payload}' to URL '{nb_params['url']}/graphql/'"
    )
    req = requests.post(
        url=f"{nb_params['url']}/graphql/",
        headers={
            "Content-Type": "application/json",
            "Accept": "application/json",
            "Authorization": f"Token {nb_params['token']}",
        },
        data=payload,
        verify=nb_params.get("ssl_verify", True),
        timeout=(self.netbox_connect_timeout, self.netbox_read_timeout),
    )
    try:
        req.raise_for_status()
    except Exception as e:
        raise Exception(
            f"{self.name} -  Netbox GraphQL query failed, query '{query}', "
            f"URL '{req.url}', status-code '{req.status_code}', reason '{req.reason}', "
            f"response content '{req.text}'"
        )

    # return results
    reply = req.json()
    if reply.get("errors"):
        msg = f"{self.name} - GrapQL query error '{reply['errors']}', query '{payload}'"
        log.error(msg)
        ret.errors.append(msg)
        if reply.get("data"):
            ret.result = reply["data"]  # at least return some data
    elif queries or query_string:
        ret.result = reply["data"]
    else:
        ret.result = reply["data"][obj]

    return ret

rest(instance=None, method='get', api='', **kwargs) ¤

Method to query Netbox REST API.

Parameters:

Name Type Description Default
instance str

Netbox instance name

None
method str

requests method name e.g. get, post, put etc.

'get'
api str

api url to query e.g. "extras" or "dcim/interfaces" etc.

''
kwargs

any additional requests method's arguments

{}
Source code in norfab\workers\netbox_worker.py
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
def rest(
    self, instance: str = None, method: str = "get", api: str = "", **kwargs
) -> dict:
    """
    Method to query Netbox REST API.

    :param instance: Netbox instance name
    :param method: requests method name e.g. get, post, put etc.
    :param api: api url to query e.g. "extras" or "dcim/interfaces" etc.
    :param kwargs: any additional requests method's arguments
    """
    params = self._get_instance_params(instance)

    # send request to Netbox REST API
    response = getattr(requests, method)(
        url=f"{params['url']}/api/{api}/",
        headers={
            "Content-Type": "application/json",
            "Accept": "application/json",
            "Authorization": f"Token {params['token']}",
        },
        verify=params.get("ssl_verify", True),
        **kwargs,
    )

    response.raise_for_status()

    return response.json()

get_devices(filters=None, instance=None, dry_run=False, devices=None) ¤

Function to retrieve devices data from Netbox using GraphQL API.

Parameters:

Name Type Description Default
filters list

list of filters dictionaries to filter devices

None
instance str

Netbox instance name

None
dry_run bool

only return query content, do not run it

False
devices list

list of device names to query data for

None

Returns:

Type Description
Result

dictionary keyed by device name with device data

Source code in norfab\workers\netbox_worker.py
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
def get_devices(
    self,
    filters: list = None,
    instance: str = None,
    dry_run: bool = False,
    devices: list = None,
) -> Result:
    """
    Function to retrieve devices data from Netbox using GraphQL API.

    :param filters: list of filters dictionaries to filter devices
    :param instance: Netbox instance name
    :param dry_run: only return query content, do not run it
    :param devices: list of device names to query data for
    :return: dictionary keyed by device name with device data
    """
    ret = Result(task=f"{self.name}:get_devices", result={})
    instance = instance or self.default_instance
    filters = filters or []

    device_fields = [
        "name",
        "last_updated",
        "custom_field_data",
        "tags {name}",
        "device_type {model}",
        "role {name}",
        "config_context",
        "tenant {name}",
        "platform {name}",
        "serial",
        "asset_tag",
        "site {name tags{name}}",
        "location {name}",
        "rack {name}",
        "status",
        "primary_ip4 {address}",
        "primary_ip6 {address}",
        "airflow",
        "position",
    ]

    # form queries dictionary out of filters
    queries = {
        f"devices_by_filter_{index}": {
            "obj": "device_list",
            "filters": filter_item,
            "fields": device_fields,
        }
        for index, filter_item in enumerate(filters)
    }

    # add devices list query
    if devices:
        if self.nb_version[0] == 4:
            dlist = '["{dl}"]'.format(dl='", "'.join(devices))
            filters_dict = {"name": f"{{in_list: {dlist}}}"}
        elif self.nb_version[0] == 3:
            filters_dict = {"name": devices}
        queries["devices_by_devices_list"] = {
            "obj": "device_list",
            "filters": filters_dict,
            "fields": device_fields,
        }

    # send queries
    query_result = self.graphql(queries=queries, instance=instance, dry_run=dry_run)
    devices_data = query_result.result

    # return dry run result
    if dry_run:
        return query_result

    # check for errors
    if query_result.errors:
        msg = f"{self.name} - get devices query failed with errors:\n{query_result.errors}"
        raise Exception(msg)

    # process devices
    for devices_list in devices_data.values():
        for device in devices_list:
            if device["name"] not in ret.result:
                ret.result[device.pop("name")] = device

    return ret

get_interfaces(instance=None, devices=None, ip_addresses=False, inventory_items=False, dry_run=False) ¤

Function to retrieve device interfaces from Netbox using GraphQL API.

Parameters:

Name Type Description Default
instance str

Netbox instance name

None
devices list

list of devices to retrieve interfaces for

None
ip_addresses bool

if True, retrieves interface IPs

False
inventory_items bool

if True, retrieves interface inventory items

False
dry_run bool

only return query content, do not run it

False

Returns:

Type Description
Result

dictionary keyed by device name with interface details

Source code in norfab\workers\netbox_worker.py
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
def get_interfaces(
    self,
    instance: str = None,
    devices: list = None,
    ip_addresses: bool = False,
    inventory_items: bool = False,
    dry_run: bool = False,
) -> Result:
    """
    Function to retrieve device interfaces from Netbox using GraphQL API.

    :param instance: Netbox instance name
    :param devices: list of devices to retrieve interfaces for
    :param ip_addresses: if True, retrieves interface IPs
    :param inventory_items: if True, retrieves interface inventory items
    :param dry_run: only return query content, do not run it
    :return: dictionary keyed by device name with interface details
    """
    # form final result object
    ret = Result(
        task=f"{self.name}:get_interfaces", result={d: {} for d in devices}
    )
    intf_fields = [
        "name",
        "enabled",
        "description",
        "mtu",
        "parent {name}",
        "mac_address",
        "mode",
        "untagged_vlan {vid name}",
        "vrf {name}",
        "tagged_vlans {vid name}",
        "tags {name}",
        "custom_fields",
        "last_updated",
        "bridge {name}",
        "child_interfaces {name}",
        "bridge_interfaces {name}",
        "member_interfaces {name}",
        "wwn",
        "duplex",
        "speed",
        "id",
        "device {name}",
    ]

    # add IP addresses to interfaces fields
    if ip_addresses:
        intf_fields.append(
            "ip_addresses {address status role dns_name description custom_fields last_updated tenant {name} tags {name}}"
        )

    # form interfaces query dictionary
    queries = {
        "interfaces": {
            "obj": "interface_list",
            "filters": {"device": devices},
            "fields": intf_fields,
        }
    }

    # add query to retrieve inventory items
    if inventory_items:
        inv_filters = {"device": devices, "component_type": "dcim.interface"}
        inv_fields = [
            "name",
            "component {... on InterfaceType {id}}",
            "role {name}",
            "manufacturer {name}",
            "custom_fields",
            "label",
            "description",
            "tags {name}",
            "asset_tag",
            "serial",
            "part_id",
        ]
        queries["inventor_items"] = {
            "obj": "inventory_item_list",
            "filters": inv_filters,
            "fields": inv_fields,
        }

    query_result = self.graphql(instance=instance, queries=queries, dry_run=dry_run)

    # return dry run result
    if dry_run:
        return query_result

    interfaces_data = query_result.result

    # exit if no Interfaces returned
    if not interfaces_data.get("interfaces"):
        raise Exception(
            f"{self.name} - no interfaces data in '{interfaces_data}' returned by '{instance}' "
            f"for devices {', '.join(devices)}"
        )

    # process query results
    interfaces = interfaces_data.pop("interfaces")

    # process inventory items
    if inventory_items:
        inventory_items_list = interfaces_data.pop("inventor_items")
        # transform inventory items list to a dictionary keyed by intf_id
        inventory_items_dict = {}
        while inventory_items_list:
            inv_item = inventory_items_list.pop()
            # skip inventory items that does not assigned to components
            if inv_item.get("component") is None:
                continue
            intf_id = str(inv_item.pop("component").pop("id"))
            inventory_items_dict.setdefault(intf_id, [])
            inventory_items_dict[intf_id].append(inv_item)
        # iterate over interfaces and add inventory items
        for intf in interfaces:
            intf["inventory_items"] = inventory_items_dict.pop(intf["id"], [])

    # transform interfaces list to dictionary keyed by device and interfaces names
    while interfaces:
        intf = interfaces.pop()
        _ = intf.pop("id")
        device_name = intf.pop("device").pop("name")
        intf_name = intf.pop("name")
        if device_name in ret.result:  # Netbox issue #16299
            ret.result[device_name][intf_name] = intf

    return ret

get_connections(devices, instance=None, dry_run=False, cables=False, circuits=False) ¤

Function to retrieve device connections data from Netbox using GraphQL API.

Parameters:

Name Type Description Default
instance str

Netbox instance name

None
devices list

list of devices to retrieve interface for

required
dry_run bool

only return query content, do not run it

False
cables bool

if True includes interfaces' directly attached cables details

False
circuits bool

if True includes interfaces' circuits termination details

False

Returns:

Type Description
Result

dictionary keyed by device name with connections data

Source code in norfab\workers\netbox_worker.py
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
def get_connections(
    self,
    devices: list,
    instance: str = None,
    dry_run: bool = False,
    cables: bool = False,
    circuits: bool = False,
) -> Result:
    """
    Function to retrieve device connections data from Netbox using GraphQL API.

    :param instance: Netbox instance name
    :param devices: list of devices to retrieve interface for
    :param dry_run: only return query content, do not run it
    :param cables: if True includes interfaces' directly attached cables details
    :param circuits: if True includes interfaces' circuits termination details
    :return: dictionary keyed by device name with connections data
    """
    # form final result dictionary
    ret = Result(
        task=f"{self.name}:get_connections", result={d: {} for d in devices}
    )

    # form lists of fields to request from netbox
    cable_fields = """
        cable {
            type
            status
            tenant {name}
            label
            tags {name}
            length
            length_unit
            custom_fields
        }
    """
    if self.nb_version[0] == 4:
        interfaces_fields = [
            "name",
            "device {name}",
            """connected_endpoints {
            __typename 
            ... on InterfaceType {name device {name}}
            ... on ProviderNetworkType {name}
            }""",
        ]
    elif self.nb_version[0] == 3:
        interfaces_fields = [
            "name",
            "device {name}",
            """connected_endpoints {
            __typename 
            ... on InterfaceType {name device {name}}
            }""",
        ]
    console_ports_fields = [
        "name",
        "device {name}",
        """connected_endpoints {
          __typename 
          ... on ConsoleServerPortType {name device {name}}
        }""",
        """link_peers {
          __typename
          ... on ConsoleServerPortType {name device {name}}
          ... on FrontPortType {name device {name}}
          ... on RearPortType {name device {name}}
        }""",
    ]
    console_server_ports_fields = [
        "name",
        "device {name}",
        """connected_endpoints {
          __typename 
          ... on ConsolePortType {name device {name}}
        }""",
        """link_peers {
          __typename
          ... on ConsolePortType {name device {name}}
          ... on FrontPortType {name device {name}}
          ... on RearPortType {name device {name}}
        }""",
    ]

    # add circuits info
    if circuits is True:
        interfaces_fields.append(
            """
            link_peers {
                __typename
                ... on InterfaceType {name device {name}}
                ... on FrontPortType {name device {name}}
                ... on RearPortType {name device {name}}
                ... on CircuitTerminationType {
                    circuit{
                        cid 
                        description 
                        tags{name} 
                        provider{name} 
                        status
                        custom_fields
                        commit_rate
                    }
                }
            }
        """
        )
    else:
        interfaces_fields.append(
            """
            link_peers {
                __typename
                ... on InterfaceType {name device {name}}
                ... on FrontPortType {name device {name}}
                ... on RearPortType {name device {name}}
            }
        """
        )

    # check if need to include cables info
    if cables is True:
        interfaces_fields.append(cable_fields)
        console_ports_fields.append(cable_fields)
        console_server_ports_fields.append(cable_fields)

    # form query dictionary with aliases to get data from Netbox
    queries = {
        "interface": {
            "obj": "interface_list",
            "filters": {"device": devices},
            "fields": interfaces_fields,
        },
        "consoleport": {
            "obj": "console_port_list",
            "filters": {"device": devices},
            "fields": console_ports_fields,
        },
        "consoleserverport": {
            "obj": "console_server_port_list",
            "filters": {"device": devices},
            "fields": console_server_ports_fields,
        },
    }

    # retrieve full list of devices interface with all cables
    query_result = self.graphql(queries=queries, instance=instance, dry_run=dry_run)

    # return dry run result
    if dry_run:
        return query_result

    all_ports = query_result.result

    # extract interfaces
    for port_type, ports in all_ports.items():
        for port in ports:
            endpoints = port["connected_endpoints"]
            # skip ports that have no remote device connected
            if not endpoints or not all(i for i in endpoints):
                continue

            # extract required parameters
            cable = port.get("cable", {})
            device_name = port["device"]["name"]
            port_name = port["name"]
            link_peers = port["link_peers"]
            remote_termination_type = endpoints[0]["__typename"].lower()
            remote_termination_type = remote_termination_type.replace("type", "")

            # form initial connection dictionary
            connection = {
                "breakout": len(endpoints) > 1,
                "remote_termination_type": remote_termination_type,
                "termination_type": port_type,
            }

            # add remote connection details
            if remote_termination_type == "providernetwork":
                connection["remote_device"] = None
                connection["remote_interface"] = None
                connection["provider"] = endpoints[0]["name"]
            else:
                remote_interface = endpoints[0]["name"]
                if len(endpoints) > 1:
                    remote_interface = [i["name"] for i in endpoints]
                connection["remote_interface"] = remote_interface
                connection["remote_device"] = endpoints[0]["device"]["name"]

            # handle circuits
            if (
                circuits and "circuit" in link_peers[0]
            ):  # add circuit connection details
                connection["circuit"] = link_peers[0]["circuit"]

            # add cable and its peer details
            if cables:
                peer_termination_type = link_peers[0]["__typename"].lower()
                peer_termination_type = peer_termination_type.replace("type", "")
                cable["peer_termination_type"] = peer_termination_type
                cable["peer_device"] = link_peers[0].get("device", {}).get("name")
                cable["peer_interface"] = link_peers[0].get("name")
                if len(link_peers) > 1:  # handle breakout cable
                    cable["peer_interface"] = [i["name"] for i in link_peers]
                connection["cable"] = cable

            ret.result[device_name][port_name] = connection

    return ret

get_circuits(devices, instance=None, dry_run=False, cid=None) ¤

Function to retrieve device circuits data from Netbox using GraphQL API.

Parameters:

Name Type Description Default
devices list

list of devices to retrieve interface for

required
instance str

Netbox instance name

None
dry_run bool

only return query content, do not run it

False
cid list

list of circuit identifiers to retrieve data for

None

Returns:

Type Description

dictionary keyed by device name with circuits data

Source code in norfab\workers\netbox_worker.py
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
def get_circuits(
    self,
    devices: list,
    instance: str = None,
    dry_run: bool = False,
    cid: list = None,
):
    """
    Function to retrieve device circuits data from Netbox using GraphQL API.

    :param devices: list of devices to retrieve interface for
    :param instance: Netbox instance name
    :param dry_run: only return query content, do not run it
    :param cid: list of circuit identifiers to retrieve data for
    :return: dictionary keyed by device name with circuits data
    """
    # form final result object
    ret = Result(task=f"{self.name}:get_circuits", result={d: {} for d in devices})

    device_sites_fields = ["site {slug}"]
    circuit_fields = [
        "cid",
        "tags {name}",
        "provider {name}",
        "commit_rate",
        "description",
        "status",
        "type {name}",
        "provider_account {name}",
        "tenant {name}",
        "termination_a {id}",
        "termination_z {id}",
        "custom_fields",
        "comments",
    ]

    # retrieve list of hosts' sites
    if self.nb_version[0] == 4:
        dlist = '["{dl}"]'.format(dl='", "'.join(devices))
        device_filters_dict = {"name": f"{{in_list: {dlist}}}"}
    elif self.nb_version[0] == 3:
        device_filters_dict = {"name": devices}
    device_sites = self.graphql(
        obj="device_list",
        filters=device_filters_dict,
        fields=device_sites_fields,
        instance=instance,
    )
    sites = list(set([i["site"]["slug"] for i in device_sites.result]))

    # retrieve all circuits for devices' sites
    if self.nb_version[0] == 4:
        circuits_filters_dict = {"site": sites}
        if cid:
            cid_list = '["{cl}"]'.format(cl='", "'.join(cid))
            circuits_filters_dict["cid"] = f"{{in_list: {cid_list}}}"
    elif self.nb_version[0] == 3:
        circuits_filters_dict = {"site": sites}

    query_result = self.graphql(
        obj="circuit_list",
        filters=circuits_filters_dict,
        fields=circuit_fields,
        dry_run=dry_run,
        instance=instance,
    )

    # return dry run result
    if dry_run is True:
        return query_result

    all_circuits = query_result.result

    # iterate over circuits and map them to devices
    for circuit in all_circuits:
        cid = circuit.pop("cid")
        circuit["tags"] = [i["name"] for i in circuit["tags"]]
        circuit["type"] = circuit["type"]["name"]
        circuit["provider"] = circuit["provider"]["name"]
        circuit["tenant"] = circuit["tenant"]["name"] if circuit["tenant"] else None
        circuit["provider_account"] = (
            circuit["provider_account"]["name"]
            if circuit["provider_account"]
            else None
        )
        termination_a = circuit.pop("termination_a")
        termination_z = circuit.pop("termination_z")
        termination_a = termination_a["id"] if termination_a else None
        termination_z = termination_z["id"] if termination_z else None

        # retrieve A or Z termination path using Netbox REST API
        if termination_a is not None:
            circuit_path = self.rest(
                instance=instance,
                method="get",
                api=f"/circuits/circuit-terminations/{termination_a}/paths/",
            )
        elif termination_z is not None:
            circuit_path = self.rest(
                instance=instance,
                method="get",
                api=f"/circuits/circuit-terminations/{termination_z}/paths/",
            )
        else:
            continue

        # check if circuit ends connect to device or provider network
        if (
            not circuit_path
            or "name" not in circuit_path[0]["path"][0][0]
            or "name" not in circuit_path[0]["path"][-1][-1]
        ):
            continue

        # form A and Z connection endpoints
        end_a = {
            "device": circuit_path[0]["path"][0][0]
            .get("device", {})
            .get("name", False),
            "provider_network": "provider-network"
            in circuit_path[0]["path"][0][0]["url"],
            "name": circuit_path[0]["path"][0][0]["name"],
        }
        end_z = {
            "device": circuit_path[0]["path"][-1][-1]
            .get("device", {})
            .get("name", False),
            "provider_network": "provider-network"
            in circuit_path[0]["path"][-1][-1]["url"],
            "name": circuit_path[0]["path"][-1][-1]["name"],
        }
        circuit["is_active"] = circuit_path[0]["is_active"]

        # map path ends to devices
        if end_a["device"] and end_a["device"] in devices:
            ret.result[end_a["device"]][cid] = copy.deepcopy(circuit)
            ret.result[end_a["device"]][cid]["interface"] = end_a["name"]
            if end_z["device"]:
                ret.result[end_a["device"]][cid]["remote_device"] = end_z["device"]
                ret.result[end_a["device"]][cid]["remote_interface"] = end_z["name"]
            elif end_z["provider_network"]:
                ret.result[end_a["device"]][cid]["provider_network"] = end_z["name"]
        if end_z["device"] and end_z["device"] in devices:
            ret.result[end_z["device"]][cid] = copy.deepcopy(circuit)
            ret.result[end_z["device"]][cid]["interface"] = end_z["name"]
            if end_a["device"]:
                ret.result[end_z["device"]][cid]["remote_device"] = end_a["device"]
                ret.result[end_z["device"]][cid]["remote_interface"] = end_a["name"]
            elif end_a["provider_network"]:
                ret.result[end_z["device"]][cid]["provider_network"] = end_a["name"]

    return ret

get_nornir_inventory(filters=None, devices=None, instance=None, interfaces=False, connections=False, circuits=False, nbdata=False, primary_ip='ip4') ¤

Method to query Netbox and return devices data in Nornir inventory format.

Source code in norfab\workers\netbox_worker.py
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
def get_nornir_inventory(
    self,
    filters: list = None,
    devices: list = None,
    instance: str = None,
    interfaces: Union[dict, bool] = False,
    connections: Union[dict, bool] = False,
    circuits: Union[dict, bool] = False,
    nbdata: bool = False,
    primary_ip: str = "ip4",
) -> Result:
    """
    Method to query Netbox and return devices data in Nornir inventory format.
    """
    hosts = {}
    inventory = {"hosts": hosts}
    ret = Result(task=f"{self.name}:get_nornir_inventory", result=inventory)

    # check Netbox status
    netbox_status = self.get_netbox_status(instance=instance)
    if netbox_status.result[instance or self.default_instance]["status"] is False:
        return ret

    # retrieve devices data
    nb_devices = self.get_devices(
        filters=filters, devices=devices, instance=instance
    )

    # form Nornir hosts inventory
    for device_name, device in nb_devices.result.items():
        host = device["config_context"].pop("nornir", {})
        host.setdefault("data", {})
        name = host.pop("name", device_name)
        hosts[name] = host
        # add platform if not provided in device config context
        if not host.get("platform"):
            if device["platform"]:
                host["platform"] = device["platform"]["name"]
            else:
                log.warning(f"{self.name} - no platform found for '{name}' device")
        # add hostname if not provided in config context
        if not host.get("hostname"):
            if device["primary_ip4"] and primary_ip in ["ip4", "ipv4"]:
                host["hostname"] = device["primary_ip4"]["address"].split("/")[0]
            elif device["primary_ip6"] and primary_ip in ["ip6", "ipv6"]:
                host["hostname"] = device["primary_ip6"]["address"].split("/")[0]
            else:
                host["hostname"] = name
        # add netbox data to host's data
        if nbdata is True:
            host["data"].update(device)

    # return if no hosts found for provided parameters
    if not hosts:
        log.warning(f"{self.name} - no viable hosts returned by Netbox")
        return ret

    # add interfaces data
    if interfaces:
        # decide on get_interfaces arguments
        kwargs = interfaces if isinstance(interfaces, dict) else {}
        # add 'interfaces' key to all hosts' data
        for host in hosts.values():
            host["data"].setdefault("interfaces", {})
        # query interfaces data from netbox
        nb_interfaces = self.get_interfaces(
            devices=list(hosts), instance=instance, **kwargs
        )
        # save interfaces data to hosts' inventory
        while nb_interfaces.result:
            device, device_interfaces = nb_interfaces.result.popitem()
            hosts[device]["data"]["interfaces"] = device_interfaces

    # add connections data
    if connections:
        # decide on get_interfaces arguments
        kwargs = connections if isinstance(connections, dict) else {}
        # add 'connections' key to all hosts' data
        for host in hosts.values():
            host["data"].setdefault("connections", {})
        # query connections data from netbox
        nb_connections = self.get_connections(
            devices=list(hosts), instance=instance, **kwargs
        )
        # save connections data to hosts' inventory
        while nb_connections.result:
            device, device_connections = nb_connections.result.popitem()
            hosts[device]["data"]["connections"] = device_connections

    # add circuits data
    if circuits:
        # decide on get_interfaces arguments
        kwargs = circuits if isinstance(circuits, dict) else {}
        # add 'circuits' key to all hosts' data
        for host in hosts.values():
            host["data"].setdefault("circuits", {})
        # query circuits data from netbox
        nb_circuits = self.get_circuits(
            devices=list(hosts), instance=instance, **kwargs
        )
        # save circuits data to hosts' inventory
        while nb_circuits.result:
            device, device_circuits = nb_circuits.result.popitem()
            hosts[device]["data"]["circuits"] = device_circuits

    return ret

update_device_facts(instance=None, dry_run=False, datasource='nornir', timeout=60, devices=None, **kwargs) ¤

Function to update device facts in Netbox using information provided by NAPALM get_facts getter:

  • serial number

Parameters:

Name Type Description Default
instance str

Netbox instance name

None
dry_run bool

return information that would be pushed to Netbox but do not push it

False
datasource str

service name to use to retrieve devices' data, default is nornir parse task

'nornir'
timeout int

seconds to wait before timeout data retrieval job

60
kwargs

any additional arguments to send to service for device data retrieval

{}
Source code in norfab\workers\netbox_worker.py
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
def update_device_facts(
    self,
    instance: str = None,
    dry_run: bool = False,
    datasource: str = "nornir",
    timeout: int = 60,
    devices: list = None,
    **kwargs,
):
    """
    Function to update device facts in Netbox using information
    provided by NAPALM get_facts getter:

    - serial number

    :param instance: Netbox instance name
    :param dry_run: return information that would be pushed to Netbox but do not push it
    :param datasource: service name to use to retrieve devices' data, default is nornir parse task
    :param timeout: seconds to wait before timeout data retrieval job
    :param kwargs: any additional arguments to send to service for device data retrieval
    """
    result = {}
    ret = Result(task=f"{self.name}:update_device_facts", result=result)
    nb = self._get_pynetbox(instance)
    kwargs["add_details"] = True

    if datasource == "nornir":
        if devices:
            kwargs["FL"] = devices
        data = self.client.run_job(
            "nornir",
            "parse",
            kwargs=kwargs,
            workers="all",
            timeout=timeout,
        )
        for worker, results in data.items():
            for host, host_data in results["result"].items():
                if host_data["napalm_get"]["failed"]:
                    log.error(
                        f"{host} - facts update failed: '{host_data['napalm_get']['exception']}'"
                    )
                    self.event(f"{host} - facts update failed")
                    continue
                nb_device = nb.dcim.devices.get(name=host)
                if not nb_device:
                    raise Exception(f"'{host}' does not exist in Netbox")
                facts = host_data["napalm_get"]["result"]["get_facts"]
                # update serial number
                nb_device.serial = facts["serial_number"]
                if not dry_run:
                    nb_device.save()
                result[host] = {
                    "update_device_facts_dry_run"
                    if dry_run
                    else "update_device_facts": {
                        "serial": facts["serial_number"],
                    }
                }
                self.event(f"{host} - facts updated")
    else:
        raise UnsupportedServiceError(f"'{datasource}' service not supported")

    return ret

update_device_interfaces(instance=None, dry_run=False, datasource='nornir', timeout=60, devices=None, create=True, **kwargs) ¤

Function to update device interfaces in Netbox using information provided by NAPALM get_interfaces getter:

  • interface name
  • interface description
  • mtu
  • mac address
  • admin status
  • speed

Parameters:

Name Type Description Default
instance str

Netbox instance name

None
dry_run bool

return information that would be pushed to Netbox but do not push it

False
datasource str

service name to use to retrieve devices' data, default is nornir parse task

'nornir'
timeout int

seconds to wait before timeout data retrieval job

60
create bool

create missing interfaces

True
kwargs

any additional arguments to send to service for device data retrieval

{}
Source code in norfab\workers\netbox_worker.py
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
def update_device_interfaces(
    self,
    instance: str = None,
    dry_run: bool = False,
    datasource: str = "nornir",
    timeout: int = 60,
    devices: list = None,
    create: bool = True,
    **kwargs,
):
    """
    Function to update device interfaces in Netbox using information
    provided by NAPALM get_interfaces getter:

    - interface name
    - interface description
    - mtu
    - mac address
    - admin status
    - speed

    :param instance: Netbox instance name
    :param dry_run: return information that would be pushed to Netbox but do not push it
    :param datasource: service name to use to retrieve devices' data, default is nornir parse task
    :param timeout: seconds to wait before timeout data retrieval job
    :param create: create missing interfaces
    :param kwargs: any additional arguments to send to service for device data retrieval
    """
    result = {}
    ret = Result(task=f"{self.name}:update_device_interfaces", result=result)
    nb = self._get_pynetbox(instance)

    if datasource == "nornir":
        if devices:
            kwargs["FL"] = devices
        kwargs["getters"] = "get_interfaces"
        data = self.client.run_job(
            "nornir",
            "parse",
            kwargs=kwargs,
            workers="all",
            timeout=timeout,
        )
        for worker, results in data.items():
            for host, host_data in results["result"].items():
                updated = {}
                result[host] = {
                    "update_device_interfaces_dry_run"
                    if dry_run
                    else "update_device_interfaces": updated
                }
                interfaces = host_data["napalm_get"]["get_interfaces"]
                nb_device = nb.dcim.devices.get(name=host)
                if not nb_device:
                    raise Exception(f"'{host}' does not exist in Netbox")
                nb_interfaces = nb.dcim.interfaces.filter(device_id=nb_device.id)
                # update existing interfaces
                for nb_interface in nb_interfaces:
                    if nb_interface.name not in interfaces:
                        continue
                    interface = interfaces.pop(nb_interface.name)
                    nb_interface.description = interface["description"]
                    nb_interface.mtu = interface["mtu"]
                    nb_interface.speed = interface["speed"] * 1000
                    nb_interface.mac_address = interface["mac_address"]
                    nb_interface.enabled = interface["is_enabled"]
                    if dry_run is not True:
                        nb_interface.save()
                    updated[nb_interface.name] = interface
                    self.event(f"{host} - updated interface {nb_interface.name}")
                # create new interfaces
                if create is not True:
                    continue
                for interface_name, interface in interfaces.items():
                    nb_interface = nb.dcim.interfaces.create(
                        name=interface_name,
                        device={"name": nb_device.name},
                        type="other",
                    )
                    nb_interface.description = interface["description"]
                    nb_interface.mtu = interface["mtu"]
                    nb_interface.speed = interface["speed"] * 1000
                    nb_interface.mac_address = interface["mac_address"]
                    nb_interface.enabled = interface["is_enabled"]
                    if dry_run is not True:
                        nb_interface.save()
                    updated[interface_name] = interface
                    self.event(f"{host} - created interface {nb_interface.name}")
    else:
        raise UnsupportedServiceError(f"'{datasource}' service not supported")

    return ret

get_next_ip(subnet, description=None, device=None, interface=None, vrf=None, tags=None, dns_name=None, tenant=None, comments=None, instance=None, dry_run=False) ¤

Method to retrieve existing or allocate new IP address in Netbox.

Parameters:

Name Type Description Default
subnet str

IPv4 or IPv6 subnet e.g. 10.0.0.0/24 to allocate next available IP Address from

required
description str

IP address description to record in Netbox database

None
device str

device name to find interface for and link IP address with

None
interface str

interface name to link IP address with, device attribute also must be provided

None
Source code in norfab\workers\netbox_worker.py
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
def get_next_ip(
    self,
    subnet: str,
    description: str = None,
    device: str = None,
    interface: str = None,
    vrf: str = None,
    tags: list = None,
    dns_name: str = None,
    tenant: str = None,
    comments: str = None,
    instance: str = None,
    dry_run: bool = False,
):
    """
    Method to retrieve existing or allocate new IP address in Netbox.

    :param subnet: IPv4 or IPv6 subnet e.g. ``10.0.0.0/24`` to allocate next 
        available IP Address from
    :param description: IP address description to record in Netbox database
    :param device: device name to find interface for and link IP address with
    :param interface: interface name to link IP address with, ``device`` attribute
        also must be provided
    """
    nb = self._get_pynetbox(instance)
    nb_prefix = nb.ipam.prefixes.get(prefix=subnet, vrf=vrf)
    nb_ip = nb_prefix.available_ips.create()
    if description is not None:
        nb_ip.description = description
    nb_ip.save()

    return Result(result=str(nb_ip))