521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681 | def get_devices(
self,
filters: list = None,
instance: str = None,
dry_run: bool = False,
devices: list = None,
cache: Union[bool, str] = None,
) -> dict:
"""
Function to retrieve devices data from Netbox using GraphQL API.
:param filters: list of filters dictionaries to filter devices
:param instance: Netbox instance name
:param dry_run: only return query content, do not run it
:param devices: list of device names to query data for
:param cache: if `True` use data stored in cache if it is up to date
refresh it otherwise, `False` do not use cache do not update cache,
`refresh` ignore data in cache and replace it with data fetched
from Netbox, `force` use data in cache without checking if it is up
to date
:return: dictionary keyed by device name with device data
"""
ret = Result(task=f"{self.name}:get_devices", result={})
cache = self.cache_use if cache is None else cache
instance = instance or self.default_instance
filters = filters or []
devices = devices or []
queries = {} # devices queries
device_fields = [
"name",
"last_updated",
"custom_field_data",
"tags {name}",
"device_type {model}",
"role {name}",
"config_context",
"tenant {name}",
"platform {name}",
"serial",
"asset_tag",
"site {name slug tags{name} }",
"location {name}",
"rack {name}",
"status",
"primary_ip4 {address}",
"primary_ip6 {address}",
"airflow",
"position",
]
if cache == True or cache == "force":
# retrieve last updated data from Netbox for devices
last_updated_query = {
f"devices_by_filter_{index}": {
"obj": "device_list",
"filters": filter_item,
"fields": ["name", "last_updated"],
}
for index, filter_item in enumerate(filters)
}
if devices:
# use cache data without checking if it is up to date for cached devices
if cache == "force":
for device_name in list(devices):
device_cache_key = f"get_devices::{device_name}"
if device_cache_key in self.cache:
devices.remove(device_name)
ret.result[device_name] = self.cache[device_cache_key]
# query netbox last updated data for devices
if self.nb_version[0] == 4:
dlist = '["{dl}"]'.format(dl='", "'.join(devices))
filters_dict = {"name": f"{{in_list: {dlist}}}"}
elif self.nb_version[0] == 3:
filters_dict = {"name": devices}
last_updated_query["devices_by_devices_list"] = {
"obj": "device_list",
"filters": filters_dict,
"fields": ["name", "last_updated"],
}
last_updated = self.graphql(
queries=last_updated_query, instance=instance, dry_run=dry_run
)
last_updated.raise_for_status(f"{self.name} - get devices query failed")
# return dry run result
if dry_run:
ret.result["get_devices_dry_run"] = last_updated.result
return ret
# try to retrieve device data from cache
self.cache.expire() # remove expired items from cache
for devices_list in last_updated.result.values():
for device in devices_list:
device_cache_key = f"get_devices::{device['name']}"
# check if cache is up to date and use it if so
if device_cache_key in self.cache and (
self.cache[device_cache_key]["last_updated"]
== device["last_updated"]
or cache == "force"
):
ret.result[device["name"]] = self.cache[device_cache_key]
# remove device from list of devices to retrieve
if device["name"] in devices:
devices.remove(device["name"])
# cache old or no cache, fetch device data
elif device["name"] not in devices:
devices.append(device["name"])
# ignore cache data, fetch data from netbox
elif cache == False or cache == "refresh":
queries = {
f"devices_by_filter_{index}": {
"obj": "device_list",
"filters": filter_item,
"fields": device_fields,
}
for index, filter_item in enumerate(filters)
}
# fetch devices data from Netbox
if devices or queries:
if devices:
if self.nb_version[0] == 4:
dlist = '["{dl}"]'.format(dl='", "'.join(devices))
filters_dict = {"name": f"{{in_list: {dlist}}}"}
elif self.nb_version[0] == 3:
filters_dict = {"name": devices}
queries["devices_by_devices_list"] = {
"obj": "device_list",
"filters": filters_dict,
"fields": device_fields,
}
# send queries
query_result = self.graphql(
queries=queries, instance=instance, dry_run=dry_run
)
# check for errors
if query_result.errors:
msg = f"{self.name} - get devices query failed with errors:\n{query_result.errors}"
raise Exception(msg)
# return dry run result
if dry_run:
ret.result["get_devices_dry_run"] = query_result.result
return ret
# process devices data
devices_data = query_result.result
for devices_list in devices_data.values():
for device in devices_list:
if device["name"] not in ret.result:
device_name = device.pop("name")
# cache device data
if cache != False:
cache_key = f"get_devices::{device_name}"
self.cache.set(cache_key, device, expire=self.cache_ttl)
# add device data to return result
ret.result[device_name] = device
return ret
|