Skip to content

NFAPI (Python API)

Utility class to implement Python API for interfacing with NorFab.

NorFab Python API Client initialization class

from norfab.core.nfapi import NorFab

nf = NorFab(inventory="./inventory.yaml")
nf.start(start_broker=True, workers=["my-worker-1"])
NFCLIENT = nf.make_client()

or using dictionary inventory data

from norfab.core.nfapi import NorFab

data = {
    'broker': {'endpoint': 'tcp://127.0.0.1:5555'},
    'workers': {'my-worker-1': ['workers/common.yaml'],
}

nf = NorFab(inventory_data=data, base_dir="./")
nf.start(start_broker=True, workers=["my-worker-1"])
NFCLIENT = nf.make_client()

Args: inventory: OS path to NorFab inventory YAML file inventory_data: dictionary with NorFab inventory base_dir: OS path to base directory to anchor NorFab at log_level: one or supported logging levels - CRITICAL, ERROR, WARNING, INFO, DEBUG

Source code in norfab\core\nfapi.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def __init__(
    self,
    inventory: str = "./inventory.yaml",
    inventory_data: dict = None,
    base_dir: str = None,
    log_level: str = None,
) -> None:
    """
    NorFab Python API Client initialization class

    ```
    from norfab.core.nfapi import NorFab

    nf = NorFab(inventory="./inventory.yaml")
    nf.start(start_broker=True, workers=["my-worker-1"])
    NFCLIENT = nf.make_client()
    ```

    or using dictionary inventory data

    ```
    from norfab.core.nfapi import NorFab

    data = {
        'broker': {'endpoint': 'tcp://127.0.0.1:5555'},
        'workers': {'my-worker-1': ['workers/common.yaml'],
    }

    nf = NorFab(inventory_data=data, base_dir="./")
    nf.start(start_broker=True, workers=["my-worker-1"])
    NFCLIENT = nf.make_client()
    ```

    Args:
        inventory: OS path to NorFab inventory YAML file
        inventory_data: dictionary with NorFab inventory
        base_dir: OS path to base directory to anchor NorFab at
        log_level: one or supported logging levels - `CRITICAL`, `ERROR`, `WARNING`, `INFO`, `DEBUG`
    """
    self.exiting = False  # flag to signal that Norfab is exiting
    self.inventory = NorFabInventory(
        path=inventory, data=inventory_data, base_dir=base_dir
    )
    self.log_queue = Queue()
    self.log_level = log_level
    self.broker_endpoint = self.inventory.broker["endpoint"]
    self.workers_init_timeout = self.inventory.topology.get(
        "workers_init_timeout", 300
    )
    self.broker_exit_event = Event()
    self.workers_exit_event = Event()
    self.clients_exit_event = Event()

    # create needed folders to kickstart the logs
    os.makedirs(
        os.path.join(self.inventory.base_dir, "__norfab__", "files"), exist_ok=True
    )
    os.makedirs(
        os.path.join(self.inventory.base_dir, "__norfab__", "logs"), exist_ok=True
    )

    self.setup_logging()
    signal.signal(signal.SIGINT, self.handle_ctrl_c)

    # find all workers plugins
    self.register_plugins()

register_plugins() ¤

Registers worker plugins by iterating through the entry points in the 'norfab.workers' group and registering each worker plugin.

This method loads each entry point and registers it using the register_worker_plugin method.

Raises: Any exceptions raised by the entry point loading or registration process.

Source code in norfab\core\nfapi.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def register_plugins(self) -> None:
    """
    Registers worker plugins by iterating through the entry points in the
    'norfab.workers' group and registering each worker plugin.

    This method loads each entry point and registers it using the
    `register_worker_plugin` method.

    Raises:
        Any exceptions raised by the entry point loading or registration process.
    """
    # register worker plugins from entrypoints
    for entry_point in entry_points(group="norfab.workers"):
        self.register_worker_plugin(entry_point.name, entry_point)

    # register worker plugins from inventory
    for service_name, service_data in self.inventory.plugins.items():
        if service_data.get("worker"):
            self.register_worker_plugin(service_name, service_data["worker"])

register_worker_plugin(service_name, worker_plugin) ¤

Registers a worker plugin for a given service.

This method registers a worker plugin under the specified service name. If a plugin is already registered under the same service name and it is different from the provided plugin, an exception is raised.

Args: service_name (str): The name of the service to register the plugin for. worker_plugin (object): The worker plugin to be registered.

Raises: norfab_exceptions.ServicePluginAlreadyRegistered: If a different plugin is already registered under the same service name.

Source code in norfab\core\nfapi.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def register_worker_plugin(
    self, service_name: str, worker_plugin: Union[EntryPoint, object]
) -> None:
    """
    Registers a worker plugin for a given service.

    This method registers a worker plugin under the specified service name.
    If a plugin is already registered under the same service name and it is
    different from the provided plugin, an exception is raised.

    Args:
        service_name (str): The name of the service to register the plugin for.
        worker_plugin (object): The worker plugin to be registered.

    Raises:
        norfab_exceptions.ServicePluginAlreadyRegistered: If a different plugin
        is already registered under the same service name.
    """
    existing_plugin = self.worker_plugins.get(service_name)
    if existing_plugin is None:
        self.worker_plugins[service_name] = worker_plugin
    else:
        log.debug(
            f"Worker plugin {worker_plugin} can't be registered for "
            f"service '{service_name}' because plugin '{existing_plugin}' "
            f"was already registered under this service."
        )

handle_ctrl_c(signum, frame) ¤

Handle the CTRL-C signal (SIGINT) to gracefully exit the application.

This method is called when the user interrupts the program with a CTRL-C signal. It logs the interruption, performs necessary cleanup by calling self.destroy(), and then signals termination to the main process.

Args: signum (int): The signal number (should be SIGINT). frame (FrameType): The current stack frame.

Note: This method reassigns the SIGINT signal to the default handler and sends the SIGINT signal to the current process to ensure proper termination.

Source code in norfab\core\nfapi.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def handle_ctrl_c(self, signum, frame) -> None:
    """
    Handle the CTRL-C signal (SIGINT) to gracefully exit the application.

    This method is called when the user interrupts the program with a CTRL-C
    signal. It logs the interruption, performs necessary cleanup by calling
    `self.destroy()`, and then signals termination to the main process.

    Args:
        signum (int): The signal number (should be SIGINT).
        frame (FrameType): The current stack frame.

    Note:
        This method reassigns the SIGINT signal to the default handler and
        sends the SIGINT signal to the current process to ensure proper
        termination.
    """
    if self.exiting is False:
        msg = "CTRL-C, NorFab exiting, interrupted by user..."
        print(f"\n{msg}")
        log.info(msg)
        self.destroy()
        # signal termination to main process
        signal.signal(signal.SIGINT, signal.default_int_handler)
        os.kill(os.getpid(), signal.SIGINT)

setup_logging() ¤

Sets up logging configuration and starts a log queue listener.

This method updates the logging levels for all handlers based on the inventory, configures the logging system using the provided inventory, and starts a log queue listener to process logs from child processes.

Source code in norfab\core\nfapi.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def setup_logging(self) -> None:
    """
    Sets up logging configuration and starts a log queue listener.

    This method updates the logging levels for all handlers based on the
    inventory, configures the logging system using the provided
    inventory, and starts a log queue listener to process logs from child
    processes.
    """
    # update logging levels for all handlers
    if self.log_level is not None:
        self.inventory["logging"]["root"]["level"] = self.log_level
        for handler in self.inventory["logging"]["handlers"].values():
            handler["level"] = self.log_level
    # configure logging
    logging.config.dictConfig(self.inventory["logging"])
    # start logs queue listener thread to process logs from child processes
    self.log_listener = logging.handlers.QueueListener(
        self.log_queue,
        *logging.getLogger("root").handlers,
        respect_handler_level=True,
    )
    self.log_listener.start()

start_broker() ¤

Starts the broker process if a broker endpoint is defined. This method initializes and starts a separate process for the broker using the provided broker endpoint. It waits for the broker to signal that it has fully initiated, with a timeout of 30 seconds. If the broker fails to start within this time, the method logs an error message and raises a SystemExit exception.

Raises: SystemExit: If the broker fails to start within 30 seconds.

Logs: Info: When the broker starts successfully. Error: If no broker endpoint is defined or if the broker fails to start.

Source code in norfab\core\nfapi.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
def start_broker(self) -> None:
    """
    Starts the broker process if a broker endpoint is defined.
    This method initializes and starts a separate process for the broker using the
    provided broker endpoint. It waits for the broker to signal that it has fully
    initiated, with a timeout of 30 seconds. If the broker fails to start within
    this time, the method logs an error message and raises a SystemExit exception.

    Raises:
        SystemExit: If the broker fails to start within 30 seconds.

    Logs:
        Info: When the broker starts successfully.
        Error: If no broker endpoint is defined or if the broker fails to start.
    """
    if self.broker_endpoint:
        init_done_event = Event()  # for worker to signal if its fully initiated

        self.broker = Process(
            target=start_broker_process,
            args=(
                self.broker_endpoint,
                self.broker_exit_event,
                self.inventory,
                self.log_level,
                self.log_queue,
                init_done_event,
            ),
        )
        self.broker.start()

        # wait for broker to start
        start_time = time.time()
        while 30 > time.time() - start_time:
            if init_done_event.is_set():
                break
            time.sleep(0.1)
        else:
            log.info(
                f"Broker failed to start in 30 seconds on '{self.broker_endpoint}'"
            )
            raise SystemExit()

        log.info(
            f"Started broker, broker listening for connections on '{self.broker_endpoint}'"
        )
    else:
        log.error("Failed to start broker, no broker endpoint defined")

start_worker(worker_name, worker_data) ¤

Starts a worker process if it is not already running.

Args: worker_name (str): The name of the worker to start. worker_data (dict): A dictionary containing data about the worker, including any dependencies.

Raises: RuntimeError: If a dependent process is not alive. norfab_exceptions.ServicePluginNotRegistered: If no worker plugin is registered for the worker's service.

Returns: None

Source code in norfab\core\nfapi.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
def start_worker(self, worker_name, worker_data) -> None:
    """
    Starts a worker process if it is not already running.

    Args:
        worker_name (str): The name of the worker to start.
        worker_data (dict): A dictionary containing data about the worker, including any dependencies.

    Raises:
        RuntimeError: If a dependent process is not alive.
        norfab_exceptions.ServicePluginNotRegistered: If no worker plugin is registered for the worker's service.

    Returns:
        None
    """
    if not self.workers_processes.get(worker_name):
        worker_inventory = self.inventory[worker_name]
        init_done_event = Event()  # for worker to signal if its fully initiated

        # check dependent processes
        if worker_data.get("depends_on"):
            # check if all dependent processes are alive
            for w in worker_data["depends_on"]:
                if not self.workers_processes[w]["process"].is_alive():
                    raise RuntimeError(f"Dependent process is dead '{w}'")
            # check if all depended process fully initialized
            if not all(
                self.workers_processes[w]["init_done"].is_set()
                for w in worker_data["depends_on"]
            ):
                return

        if self.worker_plugins.get(worker_inventory["service"]):
            worker_plugin = self.worker_plugins[worker_inventory["service"]]
            # load entry point on first call
            if isinstance(worker_plugin, EntryPoint):
                worker_plugin = worker_plugin.load()
                self.worker_plugins[worker_inventory["service"]] = worker_plugin
        else:
            raise norfab_exceptions.ServicePluginNotRegistered(
                f"No worker plugin registered for service '{worker_inventory['service']}'"
            )

        self.workers_processes[worker_name] = {
            "process": Process(
                target=start_worker_process,
                args=(
                    worker_plugin,
                    self.inventory,
                    self.broker_endpoint,
                    worker_name,
                    self.workers_exit_event,
                    self.log_level,
                    self.log_queue,
                    init_done_event,
                ),
            ),
            "init_done": init_done_event,
        }

        self.workers_processes[worker_name]["process"].start()

start(start_broker=True, workers=True) ¤

Starts the broker and specified workers.

Args: start_broker (bool): If True, starts the broker if it is defined in the inventory topology. workers (Union[bool, list]): Determines which workers to start. If True, starts all workers defined in the inventory topology. If False or None, no workers are started. If a list, starts the specified workers.

Returns: None

Raises: KeyError: If a worker fails to start due to missing inventory data. FileNotFoundError: If a worker fails to start because the inventory file is not found. Exception: If a worker fails to start due to any other error.

Notes: - The method waits for all workers to initialize within a specified timeout period. - If the initialization timeout expires, an error is logged and the system is destroyed. - After starting the workers, any startup hooks defined in the inventory are executed.

Source code in norfab\core\nfapi.py
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
def start(
    self,
    start_broker: bool = True,
    workers: Union[bool, list] = True,
) -> None:
    """
    Starts the broker and specified workers.

    Args:
        start_broker (bool): If True, starts the broker if it is defined in the inventory topology.
        workers (Union[bool, list]): Determines which workers to start. If True, starts all workers defined in the inventory topology.
                                     If False or None, no workers are started. If a list, starts the specified workers.

    Returns:
        None

    Raises:
        KeyError: If a worker fails to start due to missing inventory data.
        FileNotFoundError: If a worker fails to start because the inventory file is not found.
        Exception: If a worker fails to start due to any other error.

    Notes:
        - The method waits for all workers to initialize within a specified timeout period.
        - If the initialization timeout expires, an error is logged and the system is destroyed.
        - After starting the workers, any startup hooks defined in the inventory are executed.
    """

    workers_to_start = set()

    # start the broker
    if start_broker is True and self.inventory.topology.get("broker") is True:
        self.start_broker()

    # decide on a set of workers to start
    if workers is False or workers is None:
        workers = []
    elif isinstance(workers, list) and workers:
        workers = [w.strip() for w in workers if w.strip()]
    # start workers defined in inventory
    elif workers is True:
        workers = self.inventory.topology.get("workers", [])

    # exit if no workers
    if not workers:
        return

    # form a list of workers to start
    for worker_name in workers:
        if isinstance(worker_name, dict):
            worker_name = tuple(worker_name)[0]
        if worker_name:
            workers_to_start.add(worker_name)
        else:
            log.error(f"'{worker_name}' - worker name is bad, skipping..")
            continue

    while workers_to_start != set(self.workers_processes.keys()):
        for worker in workers:
            # extract worker name and data/params
            if isinstance(worker, dict):
                worker_name = tuple(worker)[0]
                worker_data = worker[worker_name]
            elif worker:
                worker_name = worker
                worker_data = {}
            else:
                continue
            # verify if need to start this worker
            if worker_name not in workers_to_start:
                continue
            # start worker
            try:
                self.start_worker(worker_name, worker_data)
            # if failed to start remove from workers to start
            except KeyError:
                workers_to_start.discard(worker_name)
                log.error(
                    f"'{worker_name}' - failed to start worker, no inventory data found"
                )
            except FileNotFoundError as e:
                workers_to_start.discard(worker_name)
                log.error(
                    f"'{worker_name}' - failed to start worker, inventory file not found '{e}'"
                )
            except Exception as e:
                workers_to_start.discard(worker_name)
                log.exception(
                    f"'{worker_name}' - failed to start worker, error '{e}'"
                )

        time.sleep(0.01)

    # wait for workers to initialize
    start_time = time.time()
    while self.workers_init_timeout > time.time() - start_time:
        if all(w["init_done"].is_set() for w in self.workers_processes.values()):
            break
    else:
        log.error(
            f"TimeoutError - {self.workers_init_timeout}s workers init timeout expired"
        )
        self.destroy()

    # run startup hooks
    for f in self.inventory.hooks.get("startup", []):
        f["function"](self, *f.get("args", []), **f.get("kwargs", {}))

run() ¤

Runs the main loop until a termination signal (CTRL+C) is received. This method checks if there are any broker or worker processes running. If none are detected, it logs a critical message and exits. Otherwise, it enters a loop that continues to run until the exiting flag is set to True.

Source code in norfab\core\nfapi.py
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
def run(self):
    """
    Runs the main loop until a termination signal (CTRL+C) is received.
    This method checks if there are any broker or worker processes running.
    If none are detected, it logs a critical message and exits.
    Otherwise, it enters a loop that continues to run until the `exiting` flag is set to True.
    """
    if not self.broker and not self.workers_processes:
        log.critical(
            f"NorFab detected no broker or worker processes running, exiting.."
        )
        return

    while self.exiting is False:
        time.sleep(0.1)

destroy() ¤

Gracefully stop all NORFAB processes and clean up resources.

This method performs the following steps:

  1. Executes any registered exit hooks.
  2. Sets the exiting flag to indicate that NORFAB is shutting down.
  3. Stops all client processes.
  4. Stops all worker processes and waits for them to terminate.
  5. Stops the broker process and waits for it to terminate.
  6. Stops the logging queue listener.

Returns: None

Source code in norfab\core\nfapi.py
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
def destroy(self) -> None:
    """
    Gracefully stop all NORFAB processes and clean up resources.

    This method performs the following steps:

    1. Executes any registered exit hooks.
    2. Sets the `exiting` flag to indicate that NORFAB is shutting down.
    3. Stops all client processes.
    4. Stops all worker processes and waits for them to terminate.
    5. Stops the broker process and waits for it to terminate.
    6. Stops the logging queue listener.

    Returns:
        None
    """
    # run exit hooks
    for f in self.inventory.hooks.get("exit", []):
        f["function"](self, *f.get("args", []), **f.get("kwargs", {}))

    if self.exiting is not True:
        self.exiting = True  # indicate that NorFab already exiting
        # stop client
        log.info("NorFab is exiting, stopping clients")
        self.clients_exit_event.set()
        if self.client:
            self.client.destroy()
        # stop workers
        log.info("NorFab is exiting, stopping workers")
        self.workers_exit_event.set()
        while self.workers_processes:
            wname, w = self.workers_processes.popitem()
            w["process"].join()
            log.info(f"NorFab is exiting, stopped {wname} worker")
        # stop broker
        log.info("NorFab is exiting, stopping broker")
        self.broker_exit_event.set()
        if self.broker:
            self.broker.join()
        # stop logging thread
        log.info("NorFab is exiting, stopping logging queue listener")

make_client(broker_endpoint=None) ¤

Creates and returns an NFPClient instance.

Args: broker_endpoint (str, optional): The broker endpoint to connect to. If not provided, the instance's broker_endpoint attribute will be used.

Returns: NFPClient: The created client instance if a broker endpoint is defined. None: If no broker endpoint is defined.

Raises: None

Notes: If this is the first client being created, it will be assigned to the instance's client attribute.

Source code in norfab\core\nfapi.py
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
def make_client(self, broker_endpoint: str = None) -> NFPClient:
    """
    Creates and returns an NFPClient instance.

    Args:
        broker_endpoint (str, optional): The broker endpoint to connect to.
            If not provided, the instance's broker_endpoint attribute will be used.

    Returns:
        NFPClient: The created client instance if a broker endpoint is defined.
        None: If no broker endpoint is defined.

    Raises:
        None

    Notes:
        If this is the first client being created, it will be assigned to the
        instance's client attribute.
    """
    if broker_endpoint or self.broker_endpoint:
        client = NFPClient(
            self.inventory,
            broker_endpoint or self.broker_endpoint,
            "NFPClient",
            self.clients_exit_event,
        )
        if self.client is None:  # own the first client
            self.client = client
        return client
    else:
        log.error("Failed to make client, no broker endpoint defined")
        return None