Skip to content

Broker

Majordomo Protocol broker A minimal implementation of http:#rfc.zeromq.org/spec:7 and spec:8

Author: Min RK benjaminrk@gmail.com Based on Java example by Arkadiusz Orzechowski

NFPService(name) ¤

Bases: object

A single NFP Service

Source code in norfab\core\broker.py
41
42
43
def __init__(self, name: str):
    self.name = name  # Service name
    self.workers = []  # list of known workers

NFPWorker(address, socket, socket_lock, multiplier, keepalive, service=None, log_level='WARNING') ¤

Bases: object

An NFP Worker convenience class

Source code in norfab\core\broker.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def __init__(
    self,
    address: str,
    socket,
    socket_lock,
    multiplier: int,  # e.g. 6 times
    keepalive: int,  # e.g. 5000 ms
    service: NFPService = None,
    log_level: str = "WARNING",
):
    self.address = address  # Address to route to
    self.service = service
    self.ready = False
    self.socket = socket
    self.exit_event = threading.Event()
    self.keepalive = keepalive
    self.multiplier = multiplier
    self.socket_lock = socket_lock
    self.log_level = log_level

is_ready() ¤

True if worker signaled W.READY

Source code in norfab\core\broker.py
84
85
86
def is_ready(self):
    """True if worker signaled W.READY"""
    return self.service is not None and self.ready is True

destroy(disconnect=False) ¤

Clean up routine

Source code in norfab\core\broker.py
88
89
90
91
92
93
94
95
96
97
def destroy(self, disconnect=False):
    """Clean up routine"""
    self.exit_event.set()
    self.keepaliver.stop()
    self.service.workers.remove(self)

    if disconnect is True:
        msg = [self.address, b"", NFP.WORKER, self.service.name, NFP.DISCONNECT]
        with self.socket_lock:
            self.socket.send_multipart(msg)

NFPBroker(endpoint, exit_event, inventory, log_level='WARNING', log_queue=None, multiplier=6, keepalive=2500, init_done_event=None) ¤

NORFAB Protocol broker

Initialize broker state.

Source code in norfab\core\broker.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def __init__(
    self,
    endpoint: str,
    exit_event: Event,
    inventory: NorFabInventory,
    log_level: str = "WARNING",
    log_queue: object = None,
    multiplier: int = 6,
    keepalive: int = 2500,
    init_done_event: Event = None,
):
    """Initialize broker state."""
    setup_logging(queue=log_queue, log_level=log_level)
    self.log_level = log_level
    self.keepalive = keepalive
    self.multiplier = multiplier
    init_done_event = init_done_event or Event()

    self.services = {}
    self.workers = {}
    self.exit_event = exit_event
    self.inventory = inventory

    self.base_dir = os.getcwd()
    self.broker_base_dir = f"{self.base_dir}/__norfab__/files/broker/"
    os.makedirs(self.base_dir, exist_ok=True)
    os.makedirs(self.broker_base_dir, exist_ok=True)

    # generate certificates, create directories and load certs
    generate_certificates(self.broker_base_dir, cert_name="broker")
    secret_keys_dir = os.path.join(self.broker_base_dir, "private_keys")
    server_secret_file = os.path.join(secret_keys_dir, "broker.key_secret")
    server_public, server_secret = zmq.auth.load_certificate(server_secret_file)

    self.ctx = zmq.Context()

    # Start an authenticator for this context.
    self.auth = ThreadAuthenticator(self.ctx)
    self.auth.start()
    self.auth.allow("127.0.0.1")
    # Tell the authenticator how to handle CURVE requests
    self.auth.configure_curve(domain="*", location=zmq.auth.CURVE_ALLOW_ANY)

    self.socket = self.ctx.socket(zmq.ROUTER)
    self.socket.curve_secretkey = server_secret
    self.socket.curve_publickey = server_public
    self.socket.curve_server = True  # must come before bind
    self.socket.linger = 0
    self.poller = zmq.Poller()
    self.poller.register(self.socket, zmq.POLLIN)
    self.socket.bind(endpoint)
    self.socket_lock = (
        threading.Lock()
    )  # used for keepalives to protect socket object

    init_done_event.set()  # signal finished initializing broker
    log.debug(f"NFPBroker - is ready and listening on {endpoint}")

mediate() ¤

Main broker work happens here

Client send messages of this frame format:

Source code in norfab\core\broker.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def mediate(self):
    """
    Main broker work happens here

    Client send messages of this frame format:


    """
    while True:
        try:
            items = self.poller.poll(self.keepalive)
        except KeyboardInterrupt:
            break  # Interrupted

        if items:
            msg = self.socket.recv_multipart()
            log.debug(f"NFPBroker - received '{msg}'")

            sender = msg.pop(0)
            empty = msg.pop(0)
            header = msg.pop(0)

            if header == NFP.CLIENT:
                self.process_client(sender, msg)
            elif header == NFP.WORKER:
                self.process_worker(sender, msg)

        self.purge_workers()

        # check if need to stop
        if self.exit_event.is_set():
            self.destroy()
            break

destroy() ¤

Disconnect all workers, destroy context.

Source code in norfab\core\broker.py
198
199
200
201
202
203
204
205
206
def destroy(self):
    """Disconnect all workers, destroy context."""
    log.info(f"NFPBroker - interrupt received, killing broker")
    for name in list(self.workers.keys()):
        # in case worker self destroyed while we iterating
        if self.workers.get(name):
            self.delete_worker(self.workers[name], True)
    self.auth.stop()
    self.ctx.destroy(0)

delete_worker(worker, disconnect) ¤

Deletes worker from all data structures, and deletes worker.

Source code in norfab\core\broker.py
208
209
210
211
def delete_worker(self, worker, disconnect):
    """Deletes worker from all data structures, and deletes worker."""
    worker.destroy(disconnect)
    self.workers.pop(worker.address, None)

purge_workers() ¤

Look for & delete expired workers.

Source code in norfab\core\broker.py
213
214
215
216
217
218
219
220
def purge_workers(self):
    """Look for & delete expired workers."""
    for name in list(self.workers.keys()):
        # in case worker self destroyed while we iterating
        if self.workers.get(name):
            w = self.workers[name]
        if not w.keepaliver.is_alive():
            self.delete_worker(w, False)

send_to_worker(worker, command, sender, uuid, data) ¤

Send message to worker. If message is provided, sends that message.

Source code in norfab\core\broker.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
def send_to_worker(
    self, worker: NFPWorker, command: bytes, sender: bytes, uuid: bytes, data: bytes
):
    """Send message to worker. If message is provided, sends that message."""
    # Stack routing and protocol envelopes to start of message
    if command == NFP.POST:
        msg = [worker.address, b"", NFP.WORKER, NFP.POST, sender, b"", uuid, data]
    elif command == NFP.GET:
        msg = [worker.address, b"", NFP.WORKER, NFP.GET, sender, b"", uuid, data]
    else:
        log.error(f"NFPBroker - invalid worker command: {command}")
        return
    with self.socket_lock:
        log.debug(f"NFPBroker - sending to worker '{msg}'")
        self.socket.send_multipart(msg)

send_to_client(client, command, service, message) ¤

Send message to client.

Source code in norfab\core\broker.py
238
239
240
241
242
243
244
245
246
247
248
249
250
def send_to_client(self, client: str, command: str, service: str, message: list):
    """Send message to client."""
    # Stack routing and protocol envelopes to start of message
    if command == NFP.RESPONSE:
        msg = [client, b"", NFP.CLIENT, NFP.RESPONSE, service] + message
    elif command == NFP.EVENT:
        msg = [client, b"", NFP.CLIENT, NFP.EVENT, service] + message
    else:
        log.error(f"NFPBroker - invalid client command: {command}")
        return
    with self.socket_lock:
        log.debug(f"NFPBroker - sending to client '{msg}'")
        self.socket.send_multipart(msg)

process_worker(sender, msg) ¤

Process message received from worker.

Source code in norfab\core\broker.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def process_worker(self, sender, msg):
    """Process message received from worker."""
    command = msg.pop(0)
    worker = self.require_worker(sender)

    if NFP.READY == command and not worker.is_ready():
        service = msg.pop(0)
        worker.service = self.require_service(service)
        worker.ready = True
        worker.start_keepalives()
        worker.service.workers.append(worker)
    elif NFP.RESPONSE == command and worker.is_ready():
        client = msg.pop(0)
        empty = msg.pop(0)
        self.send_to_client(client, NFP.RESPONSE, worker.service.name, msg)
    elif NFP.KEEPALIVE == command:
        worker.keepaliver.received_heartbeat([worker.address] + msg)
    elif NFP.DISCONNECT == command and worker.is_ready():
        self.delete_worker(worker, False)
    elif NFP.EVENT == command and worker.is_ready():
        client = msg.pop(0)
        empty = msg.pop(0)
        self.send_to_client(client, NFP.EVENT, worker.service.name, msg)
    elif not worker.is_ready():
        self.delete_worker(worker, disconnect=True)
    else:
        log.error(f"NFPBroker - invalid message: {msg}")

require_worker(address) ¤

Finds the worker, creates if necessary.

Source code in norfab\core\broker.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
def require_worker(self, address):
    """Finds the worker, creates if necessary."""
    if not self.workers.get(address):
        self.workers[address] = NFPWorker(
            address=address,
            socket=self.socket,
            multiplier=self.multiplier,
            keepalive=self.keepalive,
            socket_lock=self.socket_lock,
            log_level=self.log_level,
        )
        log.info(f"NFPBroker - registered new worker {address}")

    return self.workers[address]

require_service(name) ¤

Locates the service (creates if necessary).

Source code in norfab\core\broker.py
295
296
297
298
299
300
301
302
def require_service(self, name):
    """Locates the service (creates if necessary)."""
    if not self.services.get(name):
        service = NFPService(name)
        self.services[name] = service
        log.debug(f"NFPBroker - registered new service {name}")

    return self.services[name]

process_client(sender, msg) ¤

Process a request coming from a client.

Source code in norfab\core\broker.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
def process_client(self, sender, msg):
    """Process a request coming from a client."""
    command = msg.pop(0)
    service = msg.pop(0)
    target = msg.pop(0)
    uuid = msg.pop(0)
    data = msg.pop(0)

    # check if valid command from client
    if command not in NFP.client_commands:
        message = f"NFPBroker - Unsupported client command '{command}'"
        log.error(message)
        self.send_to_client(
            sender, NFP.RESPONSE, service, [message.encode("utf-8")]
        )
    # Management Interface
    elif service == b"mmi.service.broker":
        self.mmi_service(sender, command, target, uuid, data)
    elif service == b"sid.service.broker":
        self.inventory_service(sender, command, target, uuid, data)
    elif service == b"fss.service.broker":
        self.file_sharing_service(sender, command, target, uuid, data)
    else:
        self.dispatch(
            sender, command, self.require_service(service), target, uuid, data
        )

filter_workers(target, service) ¤

Helper function to filter workers

Parameters:

Name Type Description Default
target bytes

bytest string, workers target

required
service NFPService

NFPService object

required
Source code in norfab\core\broker.py
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
def filter_workers(self, target: bytes, service: NFPService) -> list:
    """
    Helper function to filter workers

    :param target: bytest string, workers target
    :param service: NFPService object
    """
    ret = []
    if not service.workers:
        log.warning(
            f"NFPBroker - '{service.name}' has no active workers registered, try later"
        )
        ret = []
    elif target == b"any":
        ret = [service.workers[random.randint(0, len(service.workers) - 1)]]
    elif target == b"all":
        ret = service.workers
    elif target in self.workers:  # single worker
        ret = [self.workers[target]]
    else:  # target list of workers
        try:
            target = json.loads(target)
            if isinstance(target, list):
                for w in target:
                    w = w.encode("utf-8")
                    if w in self.workers:
                        ret.append(self.workers[w])
                ret = list(set(ret))  # dedup workers
        except Exception as e:
            log.error(
                f"NFPBroker - Failed to load target '{target}' with error '{e}'"
            )
    return ret

dispatch(sender, command, service, target, uuid, data) ¤

Dispatch requests to waiting workers as possible

Parameters:

Name Type Description Default
service

service object

required
target

string indicating workers addresses to dispatch to

required
msg

string with work request content

required
Source code in norfab\core\broker.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
def dispatch(self, sender, command, service, target, uuid, data):
    """
    Dispatch requests to waiting workers as possible

    :param service: service object
    :param target: string indicating workers addresses to dispatch to
    :param msg: string with work request content
    """
    log.debug(
        f"NFPBroker - dispatching request to workers: sender '{sender}', "
        f"command '{command}', service '{service.name}', target '{target}'"
        f"data '{data}', uuid '{uuid}'"
    )
    self.purge_workers()
    workers = self.filter_workers(target, service)

    # handle case when service has no workers registered
    if not workers:
        message = f"NFPBroker - {service.name} service failed to target workers '{target}'"
        log.error(message)
        self.send_to_client(
            sender,
            NFP.RESPONSE,
            service.name,
            [uuid, b"400", message.encode("utf-8")],
        )
    else:
        # inform client that JOB dispatched
        w_addresses = [w.address.decode("utf-8") for w in workers]
        self.send_to_client(
            sender,
            NFP.RESPONSE,
            service.name,
            [
                uuid,
                b"202",
                json.dumps(
                    {
                        "workers": w_addresses,
                        "uuid": uuid.decode("utf-8"),
                        "target": target.decode("utf-8"),
                        "status": "DISPATCHED",
                        "service": service.name.decode("utf-8"),
                    }
                ).encode("utf-8"),
            ],
        )
        # send job to workers
        for worker in workers:
            self.send_to_worker(worker, command, sender, uuid, data)

mmi_service(sender, command, target, uuid, data) ¤

Handle internal service according to 8/MMI specification

Source code in norfab\core\broker.py
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def mmi_service(self, sender, command, target, uuid, data):
    """Handle internal service according to 8/MMI specification"""
    log.debug(
        f"mmi.service.broker - processing request: sender '{sender}', "
        f"command '{command}', target '{target}'"
        f"data '{data}', uuid '{uuid}'"
    )
    data = json.loads(data)
    task = data.get("task")
    args = data.get("args", [])
    kwargs = data.get("kwargs", {})
    ret = f"Unsupported task '{task}'"
    if task == "show_workers":
        if self.workers:
            ret = [
                {
                    "name": w.address.decode("utf-8"),
                    "service": w.service.name.decode("utf-8"),
                    "status": "alive" if w.keepaliver.is_alive() else "dead",
                    "holdtime": str(w.keepaliver.show_holdtime()),
                    "keepalives tx/rx": f"{w.keepaliver.keepalives_send} / {w.keepaliver.keepalives_received}",
                    "alive (s)": str(w.keepaliver.show_alive_for()),
                }
                for k, w in self.workers.items()
            ]
            # filter reply
            service = kwargs.get("service")
            status = kwargs.get("status")
            if service and service != "all":
                ret = [w for w in ret if w["service"] == service]
            if status in ["alive", "dead"]:
                ret = [w for w in ret if w["status"] == status]
            if not ret:
                ret = [{"name": "", "service": "", "status": ""}]
        else:
            ret = [{"name": "", "service": "", "status": ""}]
    elif task == "show_broker":
        ret = {
            "address": self.socket.getsockopt_string(zmq.LAST_ENDPOINT),
            "status": "active",
            "multiplier": self.multiplier,
            "keepalive": self.keepalive,
            "workers count": len(self.workers),
            "services count": len(self.services),
            "base_dir": self.base_dir,
        }
    reply = json.dumps(ret).encode("utf-8")
    self.send_to_client(
        sender, NFP.RESPONSE, b"mmi.service.broker", [uuid, b"200", reply]
    )