Skip to content

NFAPI (Python API)

Utility class to implement Python API for interfacing with NorFab.

NorFab Python API Client initialization class

from norfab.core.nfapi import NorFab

nf = NorFab(inventory=inventory)
nf.start(start_broker=True, workers=["my-worker-1"])
NFCLIENT = nf.client

Parameters:

Name Type Description Default
inventory str

OS path to NorFab inventory YAML file

'./inventory.yaml'
log_level str

one or supported logging levels - CRITICAL, ERROR, WARNING, INFO, DEBUG

'WARNING'
Source code in norfab\core\nfapi.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def __init__(
    self, inventory: str = "./inventory.yaml", log_level: str = "WARNING"
) -> None:
    """
    NorFab Python API Client initialization class

    ```
    from norfab.core.nfapi import NorFab

    nf = NorFab(inventory=inventory)
    nf.start(start_broker=True, workers=["my-worker-1"])
    NFCLIENT = nf.client
    ```

    :param inventory: OS path to NorFab inventory YAML file
    :param log_level: one or supported logging levels - `CRITICAL`, `ERROR`, `WARNING`, `INFO`, `DEBUG`
    """
    self.inventory = NorFabInventory(inventory)
    self.log_level = log_level
    self.broker_endpoint = self.inventory.get("broker", {}).get("endpoint")
    self.workers_init_timeout = self.inventory.topology.get(
        "workers_init_timeout", 300
    )
    self.broker_exit_event = Event()
    self.workers_exit_event = Event()
    self.clients_exit_event = Event()

start(start_broker=None, workers=None) ¤

Main entry method to start NorFab components.

Parameters:

Name Type Description Default
start_broker bool

if True, starts broker process

None
workers list

list of worker names to start processes for

None
Source code in norfab\core\nfapi.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def start(
    self,
    start_broker: bool = None,
    workers: list = None,
):
    """
    Main entry method to start NorFab components.

    :param start_broker: if True, starts broker process
    :param workers: list of worker names to start processes for
    """
    if workers is None:
        workers = self.inventory.topology.get("workers", [])
    if start_broker is None:
        start_broker = self.inventory.topology.get("broker", False)

    # form a list of workers to start
    workers_to_start = set()
    for worker_name in workers:
        if isinstance(worker_name, dict):
            worker_name = tuple(worker_name)[0]
        workers_to_start.add(worker_name)

    # start the broker
    if start_broker is True:
        self.start_broker()

    # start all the workers
    while workers_to_start != set(self.workers_processes.keys()):
        for worker in workers:
            # extract worker name and data/params
            if isinstance(worker, dict):
                worker_name = tuple(worker)[0]
                worker_data = worker[worker_name]
            else:
                worker_name = worker
                worker_data = {}
            # verify if need to start this worker
            if worker_name not in workers_to_start:
                continue
            # start worker
            try:
                self.start_worker(worker_name, worker_data)
            # if failed to start remove from workers to start
            except KeyError:
                workers_to_start.remove(worker_name)
                log.error(
                    f"'{worker_name}' - failed to start worker, no inventory data found"
                )
            except FileNotFoundError as e:
                workers_to_start.remove(worker_name)
                log.error(
                    f"'{worker_name}' - failed to start worker, inventory file not found '{e}'"
                )
            except Exception as e:
                workers_to_start.remove(worker_name)
                log.error(f"'{worker_name}' - failed to start worker, error '{e}'")

        time.sleep(0.01)

    # wait for workers to initialize
    start_time = time.time()
    while self.workers_init_timeout > time.time() - start_time:
        if all(w["init_done"].is_set() for w in self.workers_processes.values()):
            break
    else:
        log.error(
            f"TimeoutError - {self.workers_init_timeout}s "
            f"workers init timeout expired"
        )
        self.destroy()

    # make the API client
    self.make_client()

destroy() ¤

Stop NORFAB processes.

Source code in norfab\core\nfapi.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def destroy(self) -> None:
    """
    Stop NORFAB processes.
    """
    # stop client
    self.clients_exit_event.set()
    if self.client:
        self.client.destroy()
    # stop workers
    self.workers_exit_event.set()
    while self.workers_processes:
        _, w = self.workers_processes.popitem()
        w["process"].join()
    # stop broker
    self.broker_exit_event.set()
    if self.broker:
        self.broker.join()

make_client(broker_endpoint=None) ¤

Make an instance of NorFab client

Parameters:

Name Type Description Default
broker_endpoint str

(str), Broker URL to connect with

None
Source code in norfab\core\nfapi.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
def make_client(self, broker_endpoint: str = None) -> NFPClient:
    """
    Make an instance of NorFab client

    :param broker_endpoint: (str), Broker URL to connect with
    """

    if broker_endpoint or self.broker_endpoint:
        client = NFPClient(
            broker_endpoint or self.broker_endpoint,
            "NFPClient",
            self.log_level,
            self.clients_exit_event,
        )
        if self.client is None:  # own the first client
            self.client = client
        return client
    else:
        log.error("Failed to make client, no broker endpoint defined")
        return None