Skip to content

PICLE APIs Reference

PicleConfig

Each Pydantic model can have PicleConfig subclass defined with model configuration parameters:

  • ruler - The character used to draw separator lines under the help-message headers. If empty, no ruler line is drawn, defaults is empty
  • intro - A string to issue as an intro or banner
  • prompt - command line shell prompt
  • newline - newline character to use while printing output, default is \r\n
  • completekey - is the readline name of a completion key, defaults to tab
  • pipe - reference to Pydantic model class to use with | (pipe) to process the results with various functions, special value pipe = "self" instruct to use current model for piping results through.
  • processors - list of functions to run results of first command through one by one
  • outputter - function to output results, by default results written to stdout
  • outputter_kwargs - dictionary containing any additional argument to use with outputter

Sample PicleConfig definition:

from picle.models import PipeFunctionsModel, Formatters, Outputters

class ShellModel(BaseModel):
    """ define command attributes here """
    <...>

    class PicleConfig:
        prompt = "picle#"
        ruler = ""
        intro = "PICLE Sample app"
        newline = "\r\n"
        completekey = "tab"
        pipe = PipeFunctionsModel
        processors = [Formatters.formatter_json]
        outputter = Outputters.outputter_rich_print
        outputter_kwargs = {"any": "extra_argument"}

Field json_schema_extra

PICLE supports reading additional parameters from model Field's json_schema_extra definition to control PICLE behavior.

json_schema_extra PICLE parameters:

  • function - refers to @staticmethod of the model to call with command arguments
  • presence - command argument set to presence value if command given
  • processors - list of functions to run results of each command through one by one
  • outputter - function to output results, by default results written to stdout. Field's outputter overrides PicleConfig's outputter
  • outputter_kwargs - dictionary containing any additional argument to use with outputter
  • multiline - True/False, indicates if multi line input mode is enabled for this field
  • root_model - True/False, if True reference to PICLE App's root model passed on to the run method or to the function inside root_model argument
  • picle_app - True/False, if True reference to PICLE App passed on to the run method or to the function inside picle_app argument, useful if need to modify PICLE App in a runtime, for example mount or remove models

Field processors

Processors allow to pass command execution results through a list of arbitrary functions. Results returned by processor function passed on as input to next processor function in the list and so on.

In example below results returned by produce_structured_data function passed through pprint formatter Formatters.formatter_pprint function to produce pretty formatted string.

from picle.models import Formatters

class model_show(BaseModel):
    data_pprint: Callable = Field(
        "produce_structured_data",
        description="Show data using pprint formatter",
        json_schema_extra={
            "processors": [
                    Formatters.formatter_pprint
                ]
            }
    )

    @staticmethod
    def produce_structured_data():
        return {"some": {"dictionary": {"data": None}}, "more": {"dictionary": ["data"]}, "even": {"more": {"dictionary": "data"}}}

Multi Line Input

Multi line input allows to read multiple lines of text into field value if json_schema_extra multiline argument is set to True. To use it need to specify input as a field value on the command line, that will trigger multi line input collection when hit return:

Sample model that has multi line input enabled:

class model_TestMultilineInput(BaseModel):
    data: StrictStr = Field(
        None,
        description="Multi line string",
        json_schema_extra={"multiline": True}
    )
    arg: Any = Field(None, description="Some field")

    @staticmethod
    def run(**kwargs):
        return kwargs

This is how help will look like for data field:

picle#test_multiline_input data ?
 <'data' value>    Multi line string
 input             Collect value using multi line input mode
picle#

Tab completion for input value also works. On hitting enter, multi line input mode will be invoked:

picle#test_multiline_input data input arg foo
Enter lines and hit Ctrl+D to finish multi line input
I'am
Multi Line
Input
<ctrl+D hit>

Result Specific Outputters

Sometimes having outputter defined per model is not enough and depending on produced result different outputter need to be used, in that case result specific outputter can be provided in return to run function call by returning a tuple of (result, outputter function, outputter kwargs,), where outputter kwargs is optional.

Example:

from picle.models import Outputters

class model_ResultSpecificOutputter(BaseModel):
    data: StrictStr = Field(None, description="Multi line string")
    arg: Any = Field(None, description="Some field")

    class PicleConfig:
        outputter = Outputters.outputter_rich_print
        outputter_kwargs = {"any": "extra_argument"}

    @staticmethod
    def run(**kwargs):
        if kwargs.get("args") == "json":
            return kwargs["data"], Outputters.outputter_rich_json, {}
        elif kwargs.get("args") == "table":
            return kwargs["data"], Outputters.outputter_rich_table
        else:
            return kwargs

In addition to PicleConfig outputter, depending on arguments provided run function returns outputter function to use to output the result with optional outputter_kwargs as a third argument. By default, if return result is not a tuple, outputter specified in PicleConfig is used.

Note

Result specific outputters supported starting with PICLE version 0.7.0

Mounting Models at a Runtime

Sometimes it is needed to dynamically add new shell commands to the app, for that PICLE App has model_mount and model_remove methods.

Example how to mount Pydantic model to PICLE App at given path in a runtime.

from picle import App
from pydantic import BaseModel, StrictStr

class my_mount_model(BaseModel):
    param: StrictStr = Field(None, description="Param string")

    @staticmethod
    def run(**kwargs):
        return kwargs

# create PICLE Root model
class Root(BaseModel):
    command: StrictStr = Field(None, description="Some command string")

# instantiate PICLE App shell
shell = App(Root)

# mount model
shell.model_mount(my_mount_model, ["another_command"])

# remove model
shell.model_remove(["another_command"])

shell.close()

PICLE App

picle.App(root, stdin=None, stdout=None)

Bases: Cmd

PICLE App class to construct shell.

Parameters:

  • root

    Root/Top Pydantic model

Source code in picle\picle.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def __init__(self, root, stdin=None, stdout=None):
    self.root = root
    self.shell = self.root.model_construct()
    self.shell_defaults = {}
    self.shells = [self.shell]

    # extract configuration from shell model
    if hasattr(self.shell, "PicleConfig"):
        self.ruler = getattr(self.shell.PicleConfig, "ruler", self.ruler)
        self.intro = getattr(self.shell.PicleConfig, "intro", self.intro)
        self.prompt = getattr(self.shell.PicleConfig, "prompt", self.prompt)
        self.newline = getattr(self.shell.PicleConfig, "newline", self.newline)
        self.completekey = getattr(
            self.shell.PicleConfig, "completekey", self.completekey
        )

    # mount override methods
    if hasattr(self.shell.PicleConfig, "methods_override"):
        for (
            method_name,
            override,
        ) in self.shell.PicleConfig.methods_override.items():
            setattr(self, method_name, getattr(self.shell, override))

    # mount models
    self.model_mount(MAN, ["man"], "Manual/documentation functions")

    super(App, self).__init__(stdin=stdin, stdout=stdout)

picle.App.completedefault(text, line, begidx, endidx)

This method called for every command parameter on complete key hit except for the very first one.

Source code in picle\picle.py
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
def completedefault(self, text, line, begidx, endidx):
    """
    This method called for every  command parameter on
    complete key hit except for the very first one.
    """
    fieldnames = []
    try:
        command_models = self.parse_command(line, is_help=True)
        last_model = command_models[-1][-1]["model"]
        # check if last model has fields collected
        if command_models[-1][-1]["fields"]:
            last_field_name = command_models[-1][-1]["fields"][-1]["name"]
            last_field = last_model.model_fields[last_field_name]
            last_field_value = command_models[-1][-1]["fields"][-1]["values"]
            fparam = self._get_field_params(last_field)
            if isinstance(last_field_value, list):
                last_field_value = last_field_value[-1]
            elif last_field_value == ...:
                last_field_value = ""
            # check if need to extract enum values
            if isinstance(last_field.annotation, enum.EnumMeta):
                fieldnames = [
                    i.value
                    for i in last_field.annotation
                    if i.value.startswith(last_field_value)
                ]
            # check if model has method to source field choices
            elif hasattr(last_model, f"source_{last_field_name}"):
                fieldnames = getattr(last_model, f"source_{last_field_name}")()
                fieldnames = [
                    i for i in fieldnames if i.startswith(last_field_value)
                ]
            # auto complete 'input' for multi-line input mode
            elif fparam.get("multiline") is True:
                if (
                    "input".startswith(last_field_value)
                    and last_field_value != "input"
                ):
                    fieldnames = ["input"]
        # return a list of all model fields
        else:
            for name, f in last_model.model_fields.items():
                if f.alias:
                    fieldnames.append(f.alias)
                else:
                    fieldnames.append(name)
    except FieldLooseMatchOnly as e:
        model, parameter = e.args
        for name, f in model["model"].model_fields.items():
            # skip fields with already collected values from complete prompt
            if any(
                collected_field["name"] == name
                for collected_field in model["fields"]
                if collected_field["values"] is not ...
            ):
                continue
            elif f.alias and f.alias.startswith(parameter):
                fieldnames.append(f.alias)
            elif name.startswith(parameter):
                fieldnames.append(name)
    except FieldKeyError:
        pass
    except:
        tb = traceback.format_exc()
        self.write(tb)

    return sorted(fieldnames)

picle.App.completenames(text, line, begidx, endidx)

This method only called for the very first command parameter on complete key hit.

Source code in picle\picle.py
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
def completenames(self, text, line, begidx, endidx):
    """
    This method only called for the very first command parameter on
    complete key hit.
    """
    fieldnames = []
    # collect global methods
    for method_name in dir(self):
        if method_name.startswith("do_"):
            name = method_name.replace("do_", "")
            if name.startswith(line):
                fieldnames.append(name)
    # collect model arguments
    try:
        command_models = self.parse_command(line, is_help=True)
        fieldnames.extend(command_models[-1][-1]["model"].model_fields)
    # collect arguments that startswith last parameter
    except FieldLooseMatchOnly as e:
        model, parameter = e.args
        for name, f in model["model"].model_fields.items():
            if f.alias and f.alias.startswith(parameter):
                fieldnames.append(f.alias)
            elif name.startswith(parameter):
                fieldnames.append(name)
    # raised if no model fields matched last parameter
    except FieldKeyError as e:
        log.debug(f"No model fields matched last parameter - {e}")
        pass
    return sorted(fieldnames)

picle.App.default(line: str)

Method called if no do_xyz methods found

Source code in picle\picle.py
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
@run_print_exception
def default(self, line: str):
    """Method called if no do_xyz methods found"""
    ret = False
    outputter = None

    if line.strip().endswith("?"):
        try:
            command_models = self.parse_command(
                line.strip().rstrip("?"), is_help=True
            )
        except FieldLooseMatchOnly as e:
            model, parameter = e.args
            self.print_model_help(
                [[model]],
                verbose=True if line.strip().endswith("??") else False,
                match=parameter,
            )
        except FieldKeyError as e:
            model, parameter = e.args
            model_name = (
                model["model"].__name__
                if hasattr(model["model"], "__name__")
                else model["model"].__repr_name__()
            )
            self.write(
                f"Incorrect command, '{parameter}' not part of '{model_name}' model fields"
            )
        else:
            self.print_model_help(
                command_models,
                verbose=True if line.strip().endswith("??") else False,
            )
    else:
        try:
            command_models = self.parse_command(line, collect_multiline=True)
        except FieldLooseMatchOnly as e:
            model, parameter = e.args
            # filter fields to return message for
            fields = [
                f.alias or name
                for name, f in model["model"].model_fields.items()
                if name.startswith(parameter)
            ]
            self.write(
                f"Incomplete command, possible completions: " f"{', '.join(fields)}"
            )
        except FieldKeyError as e:
            model, parameter = e.args
            model_name = (
                model["model"].__name__
                if hasattr(model["model"], "__name__")
                else model["model"].__repr_name__()
            )
            self.write(
                f"Incorrect command, '{parameter}' not part of '{model_name}' model fields"
            )
        except ValidationError as e:
            self.write(e)
        else:
            # go over collected commands separated by pipe
            for index, command in enumerate(command_models):
                # collect arguments
                command_arguments = {
                    f["name"]: f["values"]
                    for model in command
                    for f in model["fields"]
                    if f["values"] is not ...
                }
                # collect command defaults
                command_defaults = {}
                for model in command:
                    command_defaults.update(model.get("defaults", {}))
                model = command[-1]["model"]
                # check if model has subshell
                if (
                    not command_arguments
                    and hasattr(model, "PicleConfig")
                    and getattr(model.PicleConfig, "subshell", None) is True
                ):
                    # collect parent shells and defaults
                    for item in command[:-1]:
                        m = item["model"]
                        self.defaults_update(m)  # store shell defaults
                        if (
                            hasattr(m, "PicleConfig")
                            and getattr(m.PicleConfig, "subshell", None) is True
                        ):
                            if m not in self.shells:
                                self.shells.append(m)
                    # update prompt value
                    self.prompt = getattr(model.PicleConfig, "prompt", self.prompt)
                    self.shell = model
                    self.shells.append(self.shell)
                # run model "run" function if it exits
                elif hasattr(model, "run"):
                    # validate command argument values
                    self._validate_values(command)
                    # call first command using collected arguments only
                    if index == 0:
                        kw = {
                            **self.shell_defaults,
                            **command_defaults,
                            **command_arguments,
                        }
                        ret = model.run(**kw)
                    # pipe results through subsequent commands
                    else:
                        kw = {
                            **command_defaults,
                            **command_arguments,
                        }
                        ret = model.run(ret, **kw)
                    # run processors from PicleConfig if any for first command only
                    if index == 0:
                        if hasattr(model, "PicleConfig") and hasattr(
                            model.PicleConfig, "processors"
                        ):
                            for processor in model.PicleConfig.processors:
                                if callable(processor):
                                    ret = processor(ret)
                    # extract outputter from PicleConfig
                    if index == 0:
                        # check if outputter returned together with results
                        if isinstance(ret, tuple):
                            if len(ret) == 2:
                                ret, outputter = ret
                                outputter_kwargs = {}
                            elif len(ret) == 3:
                                ret, outputter, outputter_kwargs = ret
                        elif hasattr(model, "PicleConfig") and hasattr(
                            model.PicleConfig, "outputter"
                        ):
                            outputter = model.PicleConfig.outputter
                            outputter_kwargs = getattr(
                                model.PicleConfig, "outputter_kwargs", {}
                            )
                # run command using Callable or json_schema_extra["function"]
                elif command[-1]["fields"]:
                    # validate command argument values
                    self._validate_values(command)
                    # extract last field
                    last_field_name = command[-1]["fields"][-1]["name"]
                    last_field = model.model_fields[last_field_name]
                    json_schema_extra = (
                        getattr(last_field, "json_schema_extra") or {}
                    )
                    # check if last field refers to callable e.g. function
                    if last_field.annotation is Callable:
                        method_name = last_field.get_default()
                        if method_name and hasattr(model, method_name):
                            # call first command using collected arguments only
                            if index == 0:
                                kw = {
                                    **self.shell_defaults,
                                    **command_defaults,
                                    **command_arguments,
                                }
                                # check if need to give root model as an argument
                                if json_schema_extra.get("root_model"):
                                    kw["root_model"] = self.root
                                # check if need to give PICLE App as an argument
                                if json_schema_extra.get("picle_app"):
                                    kw["picle_app"] = self
                                ret = getattr(model, method_name)(**kw)
                            # pipe results through subsequent commands
                            else:
                                kw = {
                                    **command_defaults,
                                    **command_arguments,
                                }
                                ret = getattr(model, method_name)(ret, **kw)
                        else:
                            self.write(
                                f"Model '{model.__name__}' has no '{method_name}' "
                                f"method defined for '{last_field_name}' Callable field"
                            )
                    # check if last field has `function` parameter defined
                    elif json_schema_extra.get("function"):
                        method_name = json_schema_extra["function"]
                        if hasattr(model, method_name):
                            # call first command using collected arguments only
                            if index == 0:
                                kw = {
                                    **self.shell_defaults,
                                    **command_defaults,
                                    **command_arguments,
                                }
                                # check if need to give root model as an argument
                                if json_schema_extra.get("root_model"):
                                    kw["root_model"] = self.root
                                # check if need to give PICLE App as an argument
                                if json_schema_extra.get("picle_app"):
                                    kw["picle_app"] = self
                                ret = getattr(model, method_name)(**kw)
                            # pipe results through subsequent commands
                            else:
                                kw = {
                                    **command_defaults,
                                    **command_arguments,
                                }
                                ret = getattr(model, method_name)(ret, **kw)
                        else:
                            self.write(
                                f"Model '{model.__name__}' has no '{method_name}' "
                                f"method defined for '{last_field_name}' function"
                            )
                    else:
                        self.write(
                            f"Model '{model.__name__}' has no 'run' method defined"
                        )
                    # use processors from Field definition if any
                    if json_schema_extra.get("processors"):
                        for processor in json_schema_extra["processors"]:
                            if callable(processor):
                                ret = processor(ret)
                    # run processors from PicleConfig if any for first command only
                    if index == 0:
                        if hasattr(model, "PicleConfig") and hasattr(
                            model.PicleConfig, "processors"
                        ):
                            for processor in model.PicleConfig.processors:
                                if callable(processor):
                                    ret = processor(ret)
                    # extract outputter from first command
                    if index == 0:
                        # check if outputter returned together with results
                        if isinstance(ret, tuple):
                            if len(ret) == 2:
                                ret, outputter = ret
                                outputter_kwargs = {}
                            elif len(ret) == 3:
                                ret, outputter, outputter_kwargs = ret
                        # use outputter from Field definition
                        elif json_schema_extra.get("outputter"):
                            outputter = json_schema_extra["outputter"]
                            outputter_kwargs = json_schema_extra.get(
                                "outputter_kwargs", {}
                            )
                        # use PicleConfig outputter
                        elif hasattr(model, "PicleConfig") and hasattr(
                            model.PicleConfig, "outputter"
                        ):
                            outputter = model.PicleConfig.outputter
                            outputter_kwargs = getattr(
                                model.PicleConfig, "outputter_kwargs", {}
                            )
                else:
                    self.defaults_pop(model)
                    ret = f"Incorrect command, provide more arguments for '{model}' model"
                    break

    # returning True will end the shell - exit
    if ret is True:
        return True

    if ret:
        # use specified outputter to output results
        if callable(outputter):
            outputter(ret, **outputter_kwargs)
        # write to stdout by default
        else:
            self.write(ret)

picle.App.do_cls(arg)

Clear shell Screen

Source code in picle\picle.py
764
765
766
767
768
769
770
771
772
def do_cls(self, arg):
    """Clear shell Screen"""
    if "?" in arg:
        self.write(" cls    Clear shell Screen")
    else:
        if "LINUX" in platform.system().upper():
            os.system("clear")
        elif "WINDOWS" in platform.system().upper():
            os.system("cls")

picle.App.do_end(arg)

Exit application

Source code in picle\picle.py
747
748
749
750
751
752
def do_end(self, arg):
    """Exit application"""
    if "?" in arg:
        self.write(" end    Exit application")
    else:
        return True

picle.App.do_exit(arg)

Exit current shell

Source code in picle\picle.py
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
def do_exit(self, arg):
    """Exit current shell"""
    if "?" in arg:
        self.write(" exit    Exit current shell")
    else:
        # delete defaults for closing shell
        self.defaults_pop(self.shells[-1])
        _ = self.shells.pop(-1)
        if self.shells:
            self.shell = self.shells[-1]
            self.prompt = self.shell.PicleConfig.prompt
            if len(self.shells) == 1:  # check if reached top shell
                self.defaults_set(self.shell)
        else:
            return True

picle.App.do_help(arg)

Print help message

Source code in picle\picle.py
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
def do_help(self, arg):
    """Print help message"""
    command_models = self.parse_command(arg.strip("?"), is_help=True)
    help_msg, width = self.print_model_help(
        command_models,
        verbose=True if arg.strip().endswith("?") else False,
        print_help=False,
    )
    # print help for global top commands
    if len(arg.strip().split(" ")) == 1:
        lines = {}  # dict of {cmd: cmd_help}
        for method_name in dir(self):
            if method_name.startswith("do_"):
                name = method_name.replace("do_", "")
                lines[name] = getattr(self, method_name).__doc__
                width = max(width, len(name))
        # form help lines
        if lines:
            for k, v in lines.items():
                padding = " " * (width - len(k)) + (" " * 4)
                help_msg.append(f" {k}{padding}{v}")
            # print help message
            self.write(self.newline.join(help_msg))

picle.App.do_pwd(arg)

Print current shell path

Source code in picle\picle.py
754
755
756
757
758
759
760
761
762
def do_pwd(self, arg):
    """Print current shell path"""
    if "?" in arg:
        self.write(" pwd    Print current shell path")
    else:
        path = ["Root"]
        for shell in self.shells[1:]:
            path.append(shell.__name__)
        self.write("->".join(path))

picle.App.do_top(arg)

Exit to top shell

Source code in picle\picle.py
734
735
736
737
738
739
740
741
742
743
744
745
def do_top(self, arg):
    """Exit to top shell"""
    if "?" in arg:
        self.write(" top    Exit to top shell")
    else:
        self.shell = self.shells[0]
        self.prompt = self.shell.PicleConfig.prompt
        while self.shells:
            _ = self.shells.pop()
        self.shells.append(self.shell)
        # set shell defaults
        self.defaults_set(self.shell)

picle.App.emptyline() -> None

Override empty line method to not run last command

Source code in picle\picle.py
88
89
90
def emptyline(self) -> None:
    """Override empty line method to not run last command"""
    return None

picle.App.model_mount(model: ModelMetaclass, path: Union[str, list[str]], description: str = None, default=None, **kwargs: dict) -> None

Method to mount pydantic model at provided path in relation to the root model.

Parameters:

  • model (ModelMetaclass) –

    Pydantic model to mount.

  • path (Union[str, list[str]]) –

    List of path segments to mount the model.

  • description (str, default: None ) –

    Description of the model.

  • default

    Default value for the model.

  • kwargs (dict, default: {} ) –

    Additional keyword arguments for the FieldInfo.

Source code in picle\picle.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def model_mount(
    self,
    model: ModelMetaclass,
    path: Union[str, list[str]],
    description: str = None,
    default=None,
    **kwargs: dict,
) -> None:
    """
    Method to mount pydantic model at provided path in relation to the root model.

    :param model: Pydantic model to mount.
    :param path: List of path segments to mount the model.
    :param description: Description of the model.
    :param default: Default value for the model.
    :param kwargs: Additional keyword arguments for the FieldInfo.
    """
    if isinstance(path, str):
        path = [path.strip()]
    parent_model = self.root
    while path:
        mount_name = path.pop(0)
        if mount_name in parent_model.model_fields:
            parent_model = parent_model.model_fields[mount_name].annotation
        else:
            # handle when not all path items before last one are in models tree
            if len(path) > 0:
                raise KeyError(
                    f"'{mount_name}' not part of '{parent_model}' model fields, but remaining path still not empty - {path}"
                )
            parent_model.model_fields[mount_name] = FieldInfo(
                annotation=model,
                required=False,
                description=description,
                default=default,
                **kwargs,
            )
            break

picle.App.model_remove(path: list[str]) -> None

Method to remove pydantic model at provided path in relation to the root model.

Parameters:

  • path (list[str]) –

    List of path segments to remove the model.

Source code in picle\picle.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def model_remove(self, path: list[str]) -> None:
    """
    Method to remove pydantic model at provided path in relation to the root model.

    :param path: List of path segments to remove the model.
    """
    if isinstance(path, str):
        path = [path.strip()]
    parent_model = self.root
    while path:
        mount_name = path.pop(0)
        if mount_name in parent_model.model_fields:
            if len(path) == 0:
                parent_model = parent_model.model_fields.pop(mount_name)
            else:
                parent_model = parent_model.model_fields[mount_name].annotation
        else:
            raise KeyError(
                f"Failed to remove model at path '{mount_name}', parent model: '{parent_model}'"
            )

picle.App.parse_command(command: str, collect_multiline: bool = False, is_help: bool = False) -> list

Function to parse command string and construct list of model references and fields values.

Parameters:

  • command (str) –

    command string to parse through

  • is_help (bool, default: False ) –

    indicates that parsing help command or tab completion command, if set to True disables presence argument handling for last field

  • collect_multiline (bool, default: False ) –

    enables multiple input collection for fields

Returns:

  • list

    returns a list of lists of dictionaries with collected models details each dictionary containing model, fields and parameter keys.

Source code in picle\picle.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
def parse_command(
    self, command: str, collect_multiline: bool = False, is_help: bool = False
) -> list:
    """
    Function to parse command string and construct list of model
    references and fields values.

    :param command: command string to parse through
    :param is_help: indicates that parsing help command or tab completion command,
        if set to True disables ``presence`` argument handling for last field
    :param collect_multiline: enables multiple input collection for fields
    :return: returns a list of lists of dictionaries with collected models details
        each dictionary containing ``model``, ``fields`` and ``parameter`` keys.
    """
    current_model = {
        "model": self.shell,
        "fields": [],
        "parameter": ...,
        "defaults": self.extract_model_defaults(self.shell),
    }
    current_field = {}
    models = [current_model]
    parameters = [i for i in command.split(" ") if i.strip()]
    ret = [models]

    # iterate over command parameters and decide if its a reference
    # to a model or model's field value
    while parameters:
        parameter = parameters.pop(0)
        # handle pipe - "|"
        if parameter == "|":
            # check if current model has pipe defined
            if hasattr(current_model["model"], "PicleConfig") and getattr(
                current_model["model"].PicleConfig, "pipe", None
            ):
                if current_model["model"].PicleConfig.pipe == "self":
                    # reference pipe model to current model
                    current_model = {
                        "model": current_model["model"],
                        "fields": [],
                        "parameter": parameter,
                    }
                else:
                    # goto pipe model
                    current_model = {
                        "model": current_model["model"].PicleConfig.pipe,
                        "fields": [],
                        "parameter": parameter,
                    }
                models = [current_model]
                ret.append(models)
            else:
                log.error(
                    f"'{current_model['model'].__name__}' does not support pipe handling"
                )
                break
        # collect json dictionary string
        elif parameter.strip().startswith("{") and current_field:
            value_items = [parameter]
            # collect further values
            while parameters:
                parameter = parameters.pop(0)
                value_items.append(parameter)
                if parameter.strip().endswith("}"):
                    break
            value = " ".join(value_items)  # form value string
            self._save_collected_value(current_field, value)
        # collect json list string
        elif parameter.strip().startswith("[") and current_field:
            value_items = [parameter]
            # collect further values
            while parameters:
                parameter = parameters.pop(0)
                value_items.append(parameter)
                if parameter.strip().endswith("]"):
                    break
            value = " ".join(value_items)  # form value string
            self._save_collected_value(current_field, value)
        # collect double quoted field value
        elif '"' in parameter and current_field:
            value_items = [parameter.replace('"', "")]
            # collect further values if first parameter not double quoted value e.g. "nrp1"
            if parameter.count('"') != 2:
                while parameters:
                    parameter = parameters.pop(0)
                    value_items.append(parameter.replace('"', ""))
                    if '"' in parameter:
                        break
            value = " ".join(value_items)  # form value string
            self._save_collected_value(current_field, value)
        # collect single quoted field value
        elif "'" in parameter and current_field:
            value_items = [parameter.replace("'", "")]
            # collect further values if first parameter not double quoted value e.g. 'nrp1'
            if parameter.count("'") != 2:
                while parameters:
                    parameter = parameters.pop(0)
                    value_items.append(parameter.replace("'", ""))
                    if "'" in parameter:
                        break
            value = " ".join(value_items)  # form value string
            self._save_collected_value(current_field, value)
        # handle reference to model
        elif current_model["model"].model_fields.get(parameter) or any(
            parameter == f.alias
            for f in current_model["model"].model_fields.values()
        ):
            # source field by name
            if current_model["model"].model_fields.get(parameter):
                field = current_model["model"].model_fields[parameter]
            else:
                # source field by alias
                for f_name, field in current_model["model"].model_fields.items():
                    if parameter == field.alias:
                        parameter = f_name  # use actual field name
                        break
            # handle next level model reference
            if isinstance(field.annotation, ModelMetaclass):
                # check need to record field presence before going to next model
                if (
                    current_field.get("values") is ...
                    and current_field["field"].json_schema_extra is not None
                    and "presence" in current_field["field"].json_schema_extra
                ):
                    value = current_field["field"].json_schema_extra["presence"]
                    self._save_collected_value(current_field, value)
                # goto next model
                current_model = {
                    "model": field.annotation,
                    "fields": [],
                    "parameter": parameter,
                }
                models.append(current_model)
                current_field = {}  # empty current field
                # extract first command default values from current model
                if len(ret) == 1:
                    current_model["defaults"] = self.extract_model_defaults(
                        field.annotation
                    )
            # handle actual field reference
            elif isinstance(field, FieldInfo):
                # check need to record field presence before going to next field
                if (
                    current_field.get("values") is ...
                    and current_field["field"].json_schema_extra is not None
                    and "presence" in current_field["field"].json_schema_extra
                ):
                    value = current_field["field"].json_schema_extra["presence"]
                    self._save_collected_value(current_field, value)
                # goto next field
                current_field = {"name": parameter, "values": ..., "field": field}
                # find and replace default value if present
                for index, field in enumerate(current_model["fields"]):
                    if field["name"] == current_field["name"]:
                        current_model["fields"][index] = current_field
                        break
                else:
                    current_model["fields"].append(current_field)
            else:
                raise TypeError(
                    f"Unsupported pydantic field type: '{type(field.annotation)}', "
                    f"parameter: '{parameter}', command: '{command}', current model: "
                    f"'{current_model['model']}'"
                )
        # check if parameter value partially matches any of the model fields
        elif any(
            field_name.startswith(parameter)
            for field_name in current_model["model"].model_fields
        ):
            raise FieldLooseMatchOnly(current_model, parameter)
        # check if parameter value partially matches any of the model fields' aliases
        elif any(
            field.alias.startswith(parameter)
            for field in current_model["model"].model_fields.values()
            if field.alias is not None
        ):
            raise FieldLooseMatchOnly(current_model, parameter)
        # parameter is a value, save it to current model
        elif current_field:
            self._save_collected_value(current_field, parameter)
        else:
            raise FieldKeyError(current_model, parameter)
    # check presence for last parameter is not is_help
    if (
        is_help is False
        and current_field.get("values") is ...
        and current_field["field"].json_schema_extra is not None
        and "presence" in current_field["field"].json_schema_extra
    ):
        value = current_field["field"].json_schema_extra["presence"]
        self._save_collected_value(current_field, value)

    # iterate over collected models and fields to see
    # if need to collect multi-line input
    if collect_multiline:
        for command_models in ret:
            for model in command_models:
                for field in model["fields"]:
                    self._collect_multiline(field)

    return ret

picle.App.print_model_help(models: list, verbose: bool = False, match: str = None, print_help: bool = True) -> None

Function to form and print help message for model fields.

Parameters:

  • match (str, default: None ) –

    only collect help for fields that start with match string

  • print_help (bool, default: True ) –

    if true prints help, return tuple of help lines list and width of longest line

Source code in picle\picle.py
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
def print_model_help(
    self,
    models: list,
    verbose: bool = False,
    match: str = None,
    print_help: bool = True,
) -> None:
    """
    Function to form and print help message for model fields.

    :param match: only collect help for fields that start with ``match`` string
    :param print_help: if true prints help, return tuple of help lines
        list and width of longest line
    """
    model = models[-1][-1]  # get last model
    last_field = model["fields"][-1] if model["fields"] else None
    fparam = self._get_field_params(last_field)
    lines = {}  # dict of {cmd: cmd_help}
    width = 0  # record longest command width for padding
    # print help message only for last collected field
    if last_field and last_field["values"] == ...:
        field = model["model"].model_fields[last_field["name"]]
        json_schema_extra = getattr(field, "json_schema_extra") or {}
        name = f"<'{last_field['name']}' value>"
        # check if field is callable
        if field.annotation is Callable:
            name = "<ENTER>"
            lines[name] = "Execute command"
            width = max(width, len(name))
        # check if field referencing function
        elif json_schema_extra.get("function"):
            lines[name] = f"{field.description}"
            name = "<ENTER>"
            lines[name] = "Execute command"
            width = max(width, len(name))
        # add options for enumerations
        elif isinstance(field.annotation, enum.EnumMeta):
            options = [i.value for i in field.annotation]
            lines[name] = ", ".join(options)
        # check if model has method to source field choices
        elif hasattr(model["model"], f"source_{last_field['name']}"):
            options = getattr(model["model"], f"source_{last_field['name']}")()
            lines[name] = ", ".join(options)
        else:
            lines[name] = f"{field.description}"
            # check if field supports multiline input
            if fparam.get("multiline") is True:
                lines["input"] = "Collect value using multi line input mode"
            if verbose:
                lines[name] += (
                    f"; default '{field.get_default()}', type '{str(field.annotation)}', "
                    f"is required - {field.is_required()}"
                )
    # collect help message for all fields of this model
    else:
        # check if model supports subshell
        if (
            hasattr(model["model"], "PicleConfig")
            and getattr(model["model"].PicleConfig, "subshell", None) is True
            # exclude <ENTER> if already in model's shell
            and not self.shells[-1] == model["model"]
        ):
            name = "<ENTER>"
            lines[name] = "Enter command subshell"
            width = max(width, len(name))
        # iterate over model fields
        for name, field in model["model"].model_fields.items():
            # skip fields that already have values
            if any(f["name"] == name for f in model["fields"]):
                continue
            # check if field has alias
            if field.alias:
                name = field.alias
            # filter fields
            if match and not name.startswith(match):
                continue
            lines[name] = f"{field.description}"
            if verbose:
                lines[name] += (
                    f"; default '{field.get_default()}', type '{str(field.annotation)}', "
                    f"is required - {field.is_required()}"
                )
            width = max(width, len(name))
    # check if model has pipe defined
    if hasattr(model["model"], "PicleConfig") and getattr(
        model["model"].PicleConfig, "pipe", None
    ):
        name = "|"
        lines[name] = "Execute pipe command"
        width = max(width, len(name))
    width = max(width, len(name))
    # form help lines
    help_msg = []
    for k in sorted(lines.keys()):
        padding = " " * (width - len(k)) + (" " * 4)
        help_msg.append(f" {k}{padding}{lines[k]}")

    if print_help:  # print help message
        self.write(self.newline.join(help_msg))
    else:
        return help_msg, width

picle.App.write(text: str) -> None

Method to write output to stdout

Parameters:

  • text (str) –

    text output

Source code in picle\picle.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def write(self, text: str) -> None:
    """
    Method to write output to stdout

    :param text: text output
    """
    if not isinstance(text, str):
        text = str(text)
    if not text.endswith(self.newline):
        self.stdout.write(text + self.newline)
    else:
        self.stdout.write(text)

PICLE Build In Models

picle.models.Filters

Bases: BaseModel

picle.models.Filters.filter_exclude(data: Any, exclude: Any = None) -> str staticmethod

Filter data line by line using provided pattern. Returns only lines that does not contains requested exclude pattern.

Parameters:

  • data (Any) –

    data to filter

  • exclude (Any, default: None ) –

    pattern to filter data

Source code in picle\models.py
50
51
52
53
54
55
56
57
58
59
60
61
62
@staticmethod
def filter_exclude(data: Any, exclude: Any = None) -> str:
    """
    Filter data line by line using provided pattern. Returns
    only lines that does not contains requested ``exclude`` pattern.

    :param data: data to filter
    :param exclude: pattern to filter data
    """
    exclude = str(exclude)
    return "\n".join(
        [line for line in str(data).splitlines() if exclude not in line]
    )

picle.models.Filters.filter_include(data: Any, include: Any = None) -> str staticmethod

Filter data line by line using provided pattern. Returns only lines that contains requested include pattern.

Parameters:

  • data (Any) –

    data to filter

  • include (Any, default: None ) –

    pattern to filter data

Source code in picle\models.py
38
39
40
41
42
43
44
45
46
47
48
@staticmethod
def filter_include(data: Any, include: Any = None) -> str:
    """
    Filter data line by line using provided pattern. Returns
    only lines that contains requested ``include`` pattern.

    :param data: data to filter
    :param include: pattern to filter data
    """
    include = str(include)
    return "\n".join([line for line in str(data).splitlines() if include in line])

picle.models.Formatters

Bases: BaseModel

picle.models.Formatters.formatter_json(data: Any) -> str staticmethod

Function to transform results into JSON string

Parameters:

  • data (Any) –

    any data to convert

Source code in picle\models.py
 97
 98
 99
100
101
102
103
104
@staticmethod
def formatter_json(data: Any) -> str:
    """
    Function to transform results into JSON string

    :param data: any data to convert
    """
    return json.dumps(data, indent=4, sort_keys=True)

picle.models.Formatters.formatter_kv(data: dict) -> str staticmethod

Function to format dictionary result as a key: value output

Parameters:

  • data (dict) –

    dictionary to format

Source code in picle\models.py
118
119
120
121
122
123
124
125
@staticmethod
def formatter_kv(data: dict) -> str:
    """
    Function to format dictionary result as a key: value output

    :param data: dictionary to format
    """
    return "\n".join([f" {k}: {v}" for k, v in data.items()])

picle.models.Formatters.formatter_pprint(data: Any) -> str staticmethod

Function to pretty print results using python pprint module

Parameters:

  • data (Any) –

    any data to pretty print

Source code in picle\models.py
88
89
90
91
92
93
94
95
@staticmethod
def formatter_pprint(data: Any) -> str:
    """
    Function to pretty print results using python ``pprint`` module

    :param data: any data to pretty print
    """
    return pprint.pformat(data, indent=4)

picle.models.Formatters.formatter_yaml(data: Any) -> str staticmethod

Function to transform results into YAML string

Parameters:

  • data (Any) –

    any data to convert

Source code in picle\models.py
106
107
108
109
110
111
112
113
114
115
116
@staticmethod
def formatter_yaml(data: Any) -> str:
    """
    Function to transform results into YAML string

    :param data: any data to convert
    """
    if HAS_YAML:
        return yaml_dump(data, default_flow_style=False)
    else:
        return data

picle.models.Outputters

Bases: BaseModel

picle.models.Outputters.outputter_rich_json(data: Union[dict, list]) -> None staticmethod

Function to pretty print JSON string using Rich library

Parameters:

  • data (Union[dict, list]) –

    any data to print

Source code in picle\models.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
@staticmethod
def outputter_rich_json(data: Union[dict, list]) -> None:
    """
    Function to pretty print JSON string using Rich library

    :param data: any data to print
    """
    if isinstance(data, bytes):
        data = data.decode("utf-8")

    if not isinstance(data, str):
        data = json.dumps(data)

    # data should be a json string
    try:
        if HAS_RICH:
            RICHCONSOLE.print_json(data, sort_keys=True, indent=4)
        else:
            print(data)
    except Exception as e:
        print(f"ERROR: Data is not a valid JSON string:\n{data}\n\nError: '{e}'")

picle.models.Outputters.outputter_rich_print(data: Any) -> None staticmethod

Function to pretty print output using Rich library

Parameters:

  • data (Any) –

    any data to print

Source code in picle\models.py
167
168
169
170
171
172
173
174
175
176
177
@staticmethod
def outputter_rich_print(data: Any) -> None:
    """
    Function to pretty print output using Rich library

    :param data: any data to print
    """
    if HAS_RICH:
        RICHCONSOLE.print(data)
    else:
        print(data)

picle.models.Outputters.outputter_rich_table(data: list[dict], headers: list = None, title: str = None, sortby: str = None) staticmethod

Function to pretty print output in table format using Rich library

Parameters:

  • data (list[dict]) –

    list of dictionaries to print

Source code in picle\models.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
@staticmethod
def outputter_rich_table(
    data: list[dict], headers: list = None, title: str = None, sortby: str = None
):
    """
    Function to pretty print output in table format using Rich library

    :param data: list of dictionaries to print
    """
    if not HAS_RICH or not isinstance(data, list):
        print(data)
        return

    headers = headers or list(data[0].keys())
    table = RICHTABLE(title=title, box=False)

    # add table columns
    for h in headers:
        table.add_column(h, justify="left", no_wrap=True)

    # sort the table
    if sortby:
        # form dictionary keyed by sortby value and index
        items_to_sortby = {i[sortby]: index for index, i in enumerate(data)}
        # form a list of sorted sortby values
        sorted_keys = sorted(items_to_sortby.keys())
        # for sorted data list
        sorted_data = [data[items_to_sortby[key]] for key in sorted_keys]
    else:
        sorted_data = data

    # add table rows
    for item in sorted_data:
        cells = [item.get(h, "") for h in headers]
        table.add_row(*cells)

    RICHCONSOLE.print(table)

picle.models.PipeFunctionsModel

Bases: Filters, Formatters, Outputters

Collection of common pipe functions to use in PICLE shell models

picle.models.MAN

Bases: BaseModel

Model with manual/documentation related functions

picle.models.MAN.print_model_tree(root_model, **kwargs) -> None staticmethod

Method to print model tree for shell model specified by dot separated path e.g. model.shell.command

Parameters:

  • root_model

    PICLE App root model to print tree for

Source code in picle\models.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
@staticmethod
def print_model_tree(root_model, **kwargs) -> None:
    """
    Method to print model tree for shell model specified by dot separated path e.g. model.shell.command

    :param root_model: PICLE App root model to print tree for
    """
    path = kwargs["tree"].split(".") if kwargs.get("tree") else []
    rich_tree = RICHTREE("[bold]root[/bold]")
    RICHCONSOLE.print(
        MAN._construct_model_tree(
            model=root_model.model_construct(), tree=rich_tree, path=path
        )
    )