Skip to content

NFAPI (Python API)

Utility class to implement Python API for interfacing with NorFab.

NorFab Python API Client initialization class

from norfab.core.nfapi import NorFab

nf = NorFab(inventory=inventory)
nf.start(start_broker=True, workers=["my-worker-1"])
NFCLIENT = nf.client

Parameters:

Name Type Description Default
inventory str

OS path to NorFab inventory YAML file

'./inventory.yaml'
log_level str

one or supported logging levels - CRITICAL, ERROR, WARNING, INFO, DEBUG

'WARNING'
Source code in norfab\core\nfapi.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def __init__(
    self, inventory: str = "./inventory.yaml", log_level: str = "WARNING"
) -> None:
    """
    NorFab Python API Client initialization class

    ```
    from norfab.core.nfapi import NorFab

    nf = NorFab(inventory=inventory)
    nf.start(start_broker=True, workers=["my-worker-1"])
    NFCLIENT = nf.client
    ```

    :param inventory: OS path to NorFab inventory YAML file
    :param log_level: one or supported logging levels - `CRITICAL`, `ERROR`, `WARNING`, `INFO`, `DEBUG`
    """
    setup_logging(log_level=log_level, filename="norfab.log")
    self.inventory = NorFabInventory(inventory)
    self.log_level = log_level
    self.broker_endpoint = self.inventory.get("broker", {}).get("endpoint")
    self.workers_init_timeout = self.inventory.topology.get(
        "workers_init_timeout", 300
    )
    self.broker_exit_event = Event()
    self.workers_exit_event = Event()
    self.clients_exit_event = Event()

    # start logger thread to log logs to a file
    self.log_queue = Queue(-1)
    self.logger_exit_event = Event()
    self.logger_thread = threading.Thread(
        target=logger_thread,
        daemon=True,
        name=f"{__name__}_logger_thread",
        args=(
            self.log_queue,
            self.logger_exit_event,
        ),
    )
    self.logger_thread.start()

start(start_broker=True, workers=True) ¤

Main entry method to start NorFab components.

Parameters:

Name Type Description Default
start_broker bool

if True, starts broker process as defined in inventory topology section

True
workers list

list of worker names to start processes for or boolean, if True starts all workers defined in inventory topology sections

True
client

If true return and instance of NorFab client

required
Source code in norfab\core\nfapi.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def start(
    self,
    start_broker: bool = True,
    workers: list = True,
):
    """
    Main entry method to start NorFab components.

    :param start_broker: if True, starts broker process as defined in inventory
        ``topology`` section
    :param workers: list of worker names to start processes for or boolean, if True
        starts all workers defined in inventory ``topology`` sections
    :param client: If true return and instance of NorFab client
    """
    # start the broker
    if start_broker is True and self.inventory.topology.get("broker") is True:
        self.start_broker()

    # decide on a set of workers to start
    if workers is False or workers is None:
        workers = []
    elif isinstance(workers, list) and workers:
        workers = workers
    # start workers defined in inventory
    elif workers is True:
        workers = self.inventory.topology.get("workers", [])

    # start worker processes
    if not workers:
        return

    # form a list of workers to start
    workers_to_start = set()
    for worker_name in workers:
        if isinstance(worker_name, dict):
            worker_name = tuple(worker_name)[0]
        workers_to_start.add(worker_name)

    while workers_to_start != set(self.workers_processes.keys()):
        for worker in workers:
            # extract worker name and data/params
            if isinstance(worker, dict):
                worker_name = tuple(worker)[0]
                worker_data = worker[worker_name]
            else:
                worker_name = worker
                worker_data = {}
            # verify if need to start this worker
            if worker_name not in workers_to_start:
                continue
            # start worker
            try:
                self.start_worker(worker_name, worker_data)
            # if failed to start remove from workers to start
            except KeyError:
                workers_to_start.discard(worker_name)
                log.error(
                    f"'{worker_name}' - failed to start worker, no inventory data found"
                )
            except FileNotFoundError as e:
                workers_to_start.discard(worker_name)
                log.error(
                    f"'{worker_name}' - failed to start worker, inventory file not found '{e}'"
                )
            except Exception as e:
                workers_to_start.discard(worker_name)
                log.error(f"'{worker_name}' - failed to start worker, error '{e}'")

        time.sleep(0.01)

    # wait for workers to initialize
    start_time = time.time()
    while self.workers_init_timeout > time.time() - start_time:
        if all(w["init_done"].is_set() for w in self.workers_processes.values()):
            break
    else:
        log.error(
            f"TimeoutError - {self.workers_init_timeout}s workers init timeout expired"
        )
        self.destroy()

run() ¤

Helper method to run the loop before CTRL+C called

Source code in norfab\core\nfapi.py
303
304
305
306
307
308
309
310
311
312
def run(self):
    """
    Helper method to run the loop before CTRL+C called
    """
    try:
        while True:
            time.sleep(0.5)
    except KeyboardInterrupt:
        print("\nInterrupted by user...")
        self.destroy()

destroy() ¤

Stop NORFAB processes.

Source code in norfab\core\nfapi.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def destroy(self) -> None:
    """
    Stop NORFAB processes.
    """
    # stop client
    self.clients_exit_event.set()
    if self.client:
        self.client.destroy()
    # stop workers
    self.workers_exit_event.set()
    while self.workers_processes:
        _, w = self.workers_processes.popitem()
        w["process"].join()
    # stop broker
    self.broker_exit_event.set()
    if self.broker:
        self.broker.join()
    # stop logger thread
    self.logger_exit_event.set()

make_client(broker_endpoint=None) ¤

Make an instance of NorFab client

Parameters:

Name Type Description Default
broker_endpoint str

(str), Broker URL to connect with

None
Source code in norfab\core\nfapi.py
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
def make_client(self, broker_endpoint: str = None) -> NFPClient:
    """
    Make an instance of NorFab client

    :param broker_endpoint: (str), Broker URL to connect with
    """

    if broker_endpoint or self.broker_endpoint:
        client = NFPClient(
            broker_endpoint or self.broker_endpoint,
            "NFPClient",
            self.log_level,
            self.clients_exit_event,
        )
        if self.client is None:  # own the first client
            self.client = client
        return client
    else:
        log.error("Failed to make client, no broker endpoint defined")
        return None