Skip to content

Worker

CUDOS¤

Inspired by Majordomo Protocol Worker API, ZeroMQ, Python version.

Original MDP/Worker spec

Location: http://rfc.zeromq.org/spec:7.

Author: Min RK benjaminrk@gmail.com

Based on Java example by Arkadiusz Orzechowski

WorkerWatchDog(worker) ¤

Bases: Thread

Class to monitor worker performance

Source code in norfab\core\worker.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def __init__(self, worker):
    super().__init__()
    self.worker = worker
    self.worker_process = psutil.Process(os.getpid())

    # extract inventory attributes
    self.watchdog_interval = worker.inventory.get("watchdog_interval", 30)
    self.memory_threshold_mbyte = worker.inventory.get(
        "memory_threshold_mbyte", 1000
    )
    self.memory_threshold_action = worker.inventory.get(
        "memory_threshold_action", "log"
    )

    # initiate variables
    self.runs = 0
    self.watchdog_tasks = []

get_ram_usage() ¤

Return RAM usage in Mbyte

Source code in norfab\core\worker.py
82
83
84
def get_ram_usage(self):
    """Return RAM usage in Mbyte"""
    return self.worker_process.memory_info().rss / 1024000

Result(result=None, failed=False, errors=None, task=None, messages=None) ¤

Result of running individual tasks.

Attributes/Arguments:

Parameters:

Name Type Description Default
changed

True if the task is changing the system

required
result Any

Result of the task execution, see task's documentation for details

None
failed bool

Whether the execution failed or not

False
(logging.LEVEL) severity_level

Severity level associated to the result of the execution

required
errors Optional[List[str]]

exception thrown during the execution of the task (if any)

None
task str

Task function name that produced the results

None
Source code in norfab\core\worker.py
128
129
130
131
132
133
134
135
136
137
138
139
140
def __init__(
    self,
    result: Any = None,
    failed: bool = False,
    errors: Optional[List[str]] = None,
    task: str = None,
    messages: Optional[List[str]] = None,
) -> None:
    self.task = task
    self.result = result
    self.failed = failed
    self.errors = errors or []
    self.messages = messages or []

dictionary() ¤

Method to serialize result as a dictionary

Source code in norfab\core\worker.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def dictionary(self):
    """Method to serialize result as a dictionary"""
    if not isinstance(self.errors, list):
        self.errors = [self.errors]
    if not isinstance(self.messages, list):
        self.messages = [self.messages]

    return {
        "task": self.task,
        "failed": self.failed,
        "errors": self.errors,
        "result": self.result,
        "messages": self.messages,
    }

NFPWorker(broker, service, name, exit_event, log_level='WARNING', multiplier=6, keepalive=2500) ¤

Parameters:

Name Type Description Default
broker str

str, broker endpoint e.g. tcp://127.0.0.1:5555

required
service str

str, service name

required
name str

str, worker name

required
exist_event

obj, threading event, if set signal worker to stop

required
multiplier int

int, number of keepalives lost before consider other party dead

6
keepalive int

int, keepalive interval in milliseconds

2500
Source code in norfab\core\worker.py
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
def __init__(
    self,
    broker: str,
    service: str,
    name: str,
    exit_event,
    log_level: str = "WARNING",
    multiplier: int = 6,
    keepalive: int = 2500,
):
    log.setLevel(log_level.upper())
    self.log_level = log_level
    self.broker = broker
    self.service = service
    self.name = name
    self.exit_event = exit_event
    self.broker_socket = None
    self.socket_lock = (
        threading.Lock()
    )  # used for keepalives to protect socket object
    self.base_dir = f"__norfab__/files/worker/{self.name}/"
    self.base_dir_jobs = os.path.join(self.base_dir, "jobs")

    self.ctx = zmq.Context()
    self.poller = zmq.Poller()
    self.reconnect_to_broker()

    self.destroy_event = threading.Event()
    self.request_thread = None
    self.reply_thread = None
    self.close_thread = None
    self.recv_thread = None
    self.event_thread = None

    self.post_queue = queue.Queue(maxsize=0)
    self.get_queue = queue.Queue(maxsize=0)
    self.delete_queue = queue.Queue(maxsize=0)
    self.event_queue = queue.Queue(maxsize=0)

    # create queue file
    os.makedirs(self.base_dir, exist_ok=True)
    os.makedirs(self.base_dir_jobs, exist_ok=True)
    self.queue_filename = os.path.join(self.base_dir_jobs, f"{self.name}.queue.txt")
    if not os.path.exists(self.queue_filename):
        with open(self.queue_filename, "w") as f:
            pass
    self.queue_done_filename = os.path.join(
        self.base_dir_jobs, f"{self.name}.queue.done.txt"
    )
    if not os.path.exists(self.queue_done_filename):
        with open(self.queue_done_filename, "w") as f:
            pass

    self.keepaliver = KeepAliver(
        address=None,
        socket=self.broker_socket,
        multiplier=multiplier,
        keepalive=keepalive,
        exit_event=self.destroy_event,
        service=self.service,
        whoami=NFP.WORKER,
        name=self.name,
        socket_lock=self.socket_lock,
        log_level=self.log_level,
    )
    self.keepaliver.start()
    self.client = NFPClient(
        self.broker, name=f"{self.name}-NFPClient", exit_event=self.exit_event
    )

reconnect_to_broker() ¤

Connect or reconnect to broker

Source code in norfab\core\worker.py
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
def reconnect_to_broker(self):
    """Connect or reconnect to broker"""
    if self.broker_socket:
        self.send_to_broker(NFP.DISCONNECT)
        self.poller.unregister(self.broker_socket)
        self.broker_socket.close()

    self.broker_socket = self.ctx.socket(zmq.DEALER)
    self.broker_socket.setsockopt_unicode(zmq.IDENTITY, self.name, "utf8")
    self.broker_socket.linger = 0
    self.broker_socket.connect(self.broker)
    self.poller.register(self.broker_socket, zmq.POLLIN)

    # Register service with broker
    self.send_to_broker(NFP.READY)

    log.info(
        f"{self.name} - registered to broker at '{self.broker}', service '{self.service}'"
    )

send_to_broker(command, msg=None) ¤

Send message to broker.

If no msg is provided, creates one internally

Source code in norfab\core\worker.py
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
def send_to_broker(self, command, msg: list = None):
    """Send message to broker.

    If no msg is provided, creates one internally
    """
    if command == NFP.READY:
        msg = [b"", NFP.WORKER, NFP.READY, self.service]
    elif command == NFP.DISCONNECT:
        msg = [b"", NFP.WORKER, NFP.DISCONNECT, self.service]
    elif command == NFP.RESPONSE:
        msg = [b"", NFP.WORKER, NFP.RESPONSE] + msg
    elif command == NFP.EVENT:
        msg = [b"", NFP.WORKER, NFP.EVENT] + msg
    else:
        log.error(
            f"{self.name} - cannot send '{command}' to broker, command unsupported"
        )
        return

    log.debug(f"{self.name} - sending '{msg}'")

    with self.socket_lock:
        self.broker_socket.send_multipart(msg)

load_inventory() ¤

Function to load inventory from broker for this worker name.

Source code in norfab\core\worker.py
504
505
506
507
508
509
510
511
512
513
514
515
516
517
def load_inventory(self):
    """
    Function to load inventory from broker for this worker name.
    """
    inventory_data = self.client.get(
        "sid.service.broker", "get_inventory", kwargs={"name": self.name}
    )

    log.debug(f"{self.name} - worker received invenotry data {inventory_data}")

    if inventory_data["results"]:
        return json.loads(inventory_data["results"])
    else:
        return {}

fetch_file(url, raise_on_fail=False, read=True) ¤

Function to download file from broker File Sharing Service

Parameters:

Name Type Description Default
url str

file location string in nf://<filepath> format

required
raise_on_fail bool

raise FIleNotFoundError if download fails

False
read bool

if True returns file content, return OS path to saved file otherwise

True
Source code in norfab\core\worker.py
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
def fetch_file(
    self, url: str, raise_on_fail: bool = False, read: bool = True
) -> str:
    """
    Function to download file from broker File Sharing Service

    :param url: file location string in ``nf://<filepath>`` format
    :param raise_on_fail: raise FIleNotFoundError if download fails
    :param read: if True returns file content, return OS path to saved file otherwise
    """
    status, file_content = self.client.fetch_file(url=url, read=read)
    msg = f"{self.name} - worker '{url}' fetch file failed with status '{status}'"

    if status == "200":
        return file_content
    elif raise_on_fail is True:
        raise FileNotFoundError(msg)
    else:
        log.error(msg)
        return None

fetch_jinja2(url) ¤

Helper function to recursively download Jinja2 template together with other templates referenced using "include" statements

Parameters:

Name Type Description Default
url str

nf://file/path like URL to download file

required
Source code in norfab\core\worker.py
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
def fetch_jinja2(self, url: str) -> str:
    """
    Helper function to recursively download Jinja2 template together with
    other templates referenced using "include" statements

    :param url: ``nf://file/path`` like URL to download file
    """
    filepath = self.fetch_file(url, read=False)
    if filepath is None:
        msg = f"{self.name} - file download failed '{url}'"
        raise FileNotFoundError(msg)

    # download Jinja2 template "include"-ed files
    content = self.fetch_file(url, read=True)
    j2env = Environment(loader="BaseLoader")
    try:
        parsed_content = j2env.parse(content)
    except Exception as e:
        msg = f"{self.name} - Jinja2 template parsing failed '{url}', error: '{e}'"
        raise Exception(msg)

    # run recursion on include statements
    for node in parsed_content.find_all(Include):
        include_file = node.template.value
        base_path = os.path.split(url)[0]
        self.fetch_jinja2(os.path.join(base_path, include_file))

    return filepath

request_filename(suuid, base_dir_jobs) ¤

Returns freshly allocated request filename for given UUID str

Source code in norfab\core\worker.py
187
188
189
190
def request_filename(suuid: Union[str, bytes], base_dir_jobs: str):
    """Returns freshly allocated request filename for given UUID str"""
    suuid = suuid.decode("utf-8") if isinstance(suuid, bytes) else suuid
    return os.path.join(base_dir_jobs, f"{suuid}.req")

reply_filename(suuid, base_dir_jobs) ¤

Returns freshly allocated reply filename for given UUID str

Source code in norfab\core\worker.py
193
194
195
196
def reply_filename(suuid: Union[str, bytes], base_dir_jobs: str):
    """Returns freshly allocated reply filename for given UUID str"""
    suuid = suuid.decode("utf-8") if isinstance(suuid, bytes) else suuid
    return os.path.join(base_dir_jobs, f"{suuid}.rep")

recv(worker, destroy_event) ¤

Thread to process receive messages from broker.

Source code in norfab\core\worker.py
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
def recv(worker, destroy_event):
    """Thread to process receive messages from broker."""
    while not destroy_event.is_set():
        # Poll socket for messages every second
        try:
            items = worker.poller.poll(1000)
        except KeyboardInterrupt:
            break  # Interrupted
        if items:
            msg = worker.broker_socket.recv_multipart()
            log.debug(f"{worker.name} - received '{msg}'")
            empty = msg.pop(0)
            header = msg.pop(0)
            command = msg.pop(0)

            if command == NFP.POST:
                worker.post_queue.put(msg)
            elif command == NFP.DELETE:
                worker.delete_queue.put(msg)
            elif command == NFP.GET:
                worker.get_queue.put(msg)
            elif command == NFP.KEEPALIVE:
                worker.keepaliver.received_heartbeat([header] + msg)
            elif command == NFP.DISCONNECT:
                worker.reconnect_to_broker()
            else:
                log.debug(
                    f"{worker.name} - invalid input, header '{header}', command '{command}', message '{msg}'"
                )

        if not worker.keepaliver.is_alive():
            log.warning(f"{worker.name} - '{worker.broker}' broker keepalive expired")
            worker.reconnect_to_broker()