PICLE APIs Reference
PicleConfig
Each Pydantic model can have PicleConfig
subclass defined
with model configuration parameters:
ruler
- The character used to draw separator lines under the help-message headers. If empty, no ruler line is drawn, defaults is empty
intro
- A string to issue as an intro or banner
prompt
- command line shell prompt
newline
- newline character to use while printing output, default is \r\n
completekey
- is the readline
name of a completion key, defaults to tab
pipe
- reference to Pydantic model class to use with |
(pipe) to process the
results with various functions, special value pipe = "self"
instruct to use
current model for piping results through.
processors
- list of functions to run results of first command
through one by one
outputter
- function to output results, by default results written to stdout
outputter_kwargs
- dictionary containing any additional argument to use with outputter
Sample PicleConfig
definition:
from picle.models import PipeFunctionsModel, Formatters, Outputters
class ShellModel(BaseModel):
""" define command attributes here """
<...>
class PicleConfig:
prompt = "picle#"
ruler = ""
intro = "PICLE Sample app"
newline = "\r\n"
completekey = "tab"
pipe = PipeFunctionsModel
processors = [Formatters.formatter_json]
outputter = Outputters.outputter_rich_print
outputter_kwargs = {"any": "extra_argument"}
PICLE supports reading additional parameters from model Field's json_schema_extra
definition to control PICLE behavior.
json_schema_extra
PICLE parameters:
function
- refers to @staticmethod
of the model to call with command arguments
presence
- command argument set to presence
value if command given
processors
- list of functions to run results of each command through one by one
outputter
- function to output results, by default results written to
stdout. Field's outputter
overrides PicleConfig's outputter
outputter_kwargs
- dictionary containing any additional argument to use with outputter
multiline
- True/False, indicates if multi line input mode is enabled for this field
root_model
- True/False, if True reference to PICLE App's root model passed on to
the run
method or to the function
inside root_model
argument
picle_app
- True/False, if True reference to PICLE App passed on to the run
method or to the function
inside picle_app
argument, useful if need to modify
PICLE App in a runtime, for example mount or remove models
Field processors
Processors allow to pass command execution results through a list of arbitrary functions.
Results returned by processor function passed on as input to next processor function in the
list and so on.
In example below results returned by produce_structured_data
function passed through
pprint formatter Formatters.formatter_pprint
function to produce pretty formatted string.
from picle.models import Formatters
class model_show(BaseModel):
data_pprint: Callable = Field(
"produce_structured_data",
description="Show data using pprint formatter",
json_schema_extra={
"processors": [
Formatters.formatter_pprint
]
}
)
@staticmethod
def produce_structured_data():
return {"some": {"dictionary": {"data": None}}, "more": {"dictionary": ["data"]}, "even": {"more": {"dictionary": "data"}}}
Multi line input allows to read multiple lines of text into field value if
json_schema_extra multiline
argument is set to True
. To use it need
to specify input
as a field value on the command line, that will trigger
multi line input collection when hit return:
Sample model that has multi line input enabled:
class model_TestMultilineInput(BaseModel):
data: StrictStr = Field(
None,
description="Multi line string",
json_schema_extra={"multiline": True}
)
arg: Any = Field(None, description="Some field")
@staticmethod
def run(**kwargs):
return kwargs
This is how help will look like for data
field:
picle#test_multiline_input data ?
<'data' value> Multi line string
input Collect value using multi line input mode
picle#
Tab completion for input
value also works. On hitting enter
,
multi line input mode will be invoked:
picle#test_multiline_input data input arg foo
Enter lines and hit Ctrl+D to finish multi line input
I'am
Multi Line
Input
<ctrl+D hit>
Result Specific Outputters
Sometimes having outputter defined per model is not enough and depending on produced
result different outputter need to be used, in that case result specific outputter can
be provided in return to run
function call by returning a tuple of
(result, outputter function, outputter kwargs,)
, where outputter kwargs
is
optional.
Example:
from picle.models import Outputters
class model_ResultSpecificOutputter(BaseModel):
data: StrictStr = Field(None, description="Multi line string")
arg: Any = Field(None, description="Some field")
class PicleConfig:
outputter = Outputters.outputter_rich_print
outputter_kwargs = {"any": "extra_argument"}
@staticmethod
def run(**kwargs):
if kwargs.get("args") == "json":
return kwargs["data"], Outputters.outputter_rich_json, {}
elif kwargs.get("args") == "table":
return kwargs["data"], Outputters.outputter_rich_table
else:
return kwargs
In addition to PicleConfig
outputter, depending on arguments provided run
function returns outputter function to use to output the result with optional
outputter_kwargs
as a third argument. By default, if return result is not a tuple,
outputter specified in PicleConfig
is used.
Note
Result specific outputters supported starting with PICLE version 0.7.0
Mounting Models at a Runtime
Sometimes it is needed to dynamically add new shell commands to the app,
for that PICLE App
has model_mount
and model_remove
methods.
Example how to mount Pydantic model to PICLE App at given path in a runtime.
from picle import App
from pydantic import BaseModel, StrictStr
class my_mount_model(BaseModel):
param: StrictStr = Field(None, description="Param string")
@staticmethod
def run(**kwargs):
return kwargs
# create PICLE Root model
class Root(BaseModel):
command: StrictStr = Field(None, description="Some command string")
# instantiate PICLE App shell
shell = App(Root)
# mount model
shell.model_mount(my_mount_model, ["another_command"])
# remove model
shell.model_remove(["another_command"])
shell.close()
PICLE App
picle.App(root, stdin=None, stdout=None)
Bases: Cmd
PICLE App class to construct shell.
Parameters:
Source code in picle\picle.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 | def __init__(self, root, stdin=None, stdout=None):
self.root = root
self.shell = self.root.model_construct()
self.shell_defaults = {}
self.shells = [self.shell]
# extract configuration from shell model
if hasattr(self.shell, "PicleConfig"):
self.ruler = getattr(self.shell.PicleConfig, "ruler", self.ruler)
self.intro = getattr(self.shell.PicleConfig, "intro", self.intro)
self.prompt = getattr(self.shell.PicleConfig, "prompt", self.prompt)
self.newline = getattr(self.shell.PicleConfig, "newline", self.newline)
self.completekey = getattr(
self.shell.PicleConfig, "completekey", self.completekey
)
# mount override methods
if hasattr(self.shell.PicleConfig, "methods_override"):
for (
method_name,
override,
) in self.shell.PicleConfig.methods_override.items():
setattr(self, method_name, getattr(self.shell, override))
# mount models
self.model_mount(MAN, ["man"], "Manual/documentation functions")
super(App, self).__init__(stdin=stdin, stdout=stdout)
|
picle.App.completedefault(text, line, begidx, endidx)
This method called for every command parameter on
complete key hit except for the very first one.
Source code in picle\picle.py
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662 | def completedefault(self, text, line, begidx, endidx):
"""
This method called for every command parameter on
complete key hit except for the very first one.
"""
fieldnames = []
try:
command_models = self.parse_command(line, is_help=True)
last_model = command_models[-1][-1]["model"]
# check if last model has fields collected
if command_models[-1][-1]["fields"]:
last_field_name = command_models[-1][-1]["fields"][-1]["name"]
last_field = last_model.model_fields[last_field_name]
last_field_value = command_models[-1][-1]["fields"][-1]["values"]
fparam = self._get_field_params(last_field)
if isinstance(last_field_value, list):
last_field_value = last_field_value[-1]
elif last_field_value == ...:
last_field_value = ""
# check if need to extract enum values
if isinstance(last_field.annotation, enum.EnumMeta):
fieldnames = [
i.value
for i in last_field.annotation
if i.value.startswith(last_field_value)
]
# check if model has method to source field choices
elif hasattr(last_model, f"source_{last_field_name}"):
fieldnames = getattr(last_model, f"source_{last_field_name}")()
fieldnames = [
i for i in fieldnames if i.startswith(last_field_value)
]
# auto complete 'input' for multi-line input mode
elif fparam.get("multiline") is True:
if (
"input".startswith(last_field_value)
and last_field_value != "input"
):
fieldnames = ["input"]
# return a list of all model fields
else:
for name, f in last_model.model_fields.items():
if f.alias:
fieldnames.append(f.alias)
else:
fieldnames.append(name)
except FieldLooseMatchOnly as e:
model, parameter = e.args
for name, f in model["model"].model_fields.items():
# skip fields with already collected values from complete prompt
if any(
collected_field["name"] == name
for collected_field in model["fields"]
if collected_field["values"] is not ...
):
continue
elif f.alias and f.alias.startswith(parameter):
fieldnames.append(f.alias)
elif name.startswith(parameter):
fieldnames.append(name)
except FieldKeyError:
pass
except:
tb = traceback.format_exc()
self.write(tb)
return sorted(fieldnames)
|
picle.App.completenames(text, line, begidx, endidx)
This method only called for the very first command parameter on
complete key hit.
Source code in picle\picle.py
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692 | def completenames(self, text, line, begidx, endidx):
"""
This method only called for the very first command parameter on
complete key hit.
"""
fieldnames = []
# collect global methods
for method_name in dir(self):
if method_name.startswith("do_"):
name = method_name.replace("do_", "")
if name.startswith(line):
fieldnames.append(name)
# collect model arguments
try:
command_models = self.parse_command(line, is_help=True)
fieldnames.extend(command_models[-1][-1]["model"].model_fields)
# collect arguments that startswith last parameter
except FieldLooseMatchOnly as e:
model, parameter = e.args
for name, f in model["model"].model_fields.items():
if f.alias and f.alias.startswith(parameter):
fieldnames.append(f.alias)
elif name.startswith(parameter):
fieldnames.append(name)
# raised if no model fields matched last parameter
except FieldKeyError as e:
log.debug(f"No model fields matched last parameter - {e}")
pass
return sorted(fieldnames)
|
picle.App.default(line: str)
Method called if no do_xyz methods found
Source code in picle\picle.py
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036 | @run_print_exception
def default(self, line: str):
"""Method called if no do_xyz methods found"""
ret = False
outputter = None
if line.strip().endswith("?"):
try:
command_models = self.parse_command(
line.strip().rstrip("?"), is_help=True
)
except FieldLooseMatchOnly as e:
model, parameter = e.args
self.print_model_help(
[[model]],
verbose=True if line.strip().endswith("??") else False,
match=parameter,
)
except FieldKeyError as e:
model, parameter = e.args
model_name = (
model["model"].__name__
if hasattr(model["model"], "__name__")
else model["model"].__repr_name__()
)
self.write(
f"Incorrect command, '{parameter}' not part of '{model_name}' model fields"
)
else:
self.print_model_help(
command_models,
verbose=True if line.strip().endswith("??") else False,
)
else:
try:
command_models = self.parse_command(line, collect_multiline=True)
except FieldLooseMatchOnly as e:
model, parameter = e.args
# filter fields to return message for
fields = [
f.alias or name
for name, f in model["model"].model_fields.items()
if name.startswith(parameter)
]
self.write(
f"Incomplete command, possible completions: " f"{', '.join(fields)}"
)
except FieldKeyError as e:
model, parameter = e.args
model_name = (
model["model"].__name__
if hasattr(model["model"], "__name__")
else model["model"].__repr_name__()
)
self.write(
f"Incorrect command, '{parameter}' not part of '{model_name}' model fields"
)
except ValidationError as e:
self.write(e)
else:
# go over collected commands separated by pipe
for index, command in enumerate(command_models):
# collect arguments
command_arguments = {
f["name"]: f["values"]
for model in command
for f in model["fields"]
if f["values"] is not ...
}
# collect command defaults
command_defaults = {}
for model in command:
command_defaults.update(model.get("defaults", {}))
model = command[-1]["model"]
# check if model has subshell
if (
not command_arguments
and hasattr(model, "PicleConfig")
and getattr(model.PicleConfig, "subshell", None) is True
):
# collect parent shells and defaults
for item in command[:-1]:
m = item["model"]
self.defaults_update(m) # store shell defaults
if (
hasattr(m, "PicleConfig")
and getattr(m.PicleConfig, "subshell", None) is True
):
if m not in self.shells:
self.shells.append(m)
# update prompt value
self.prompt = getattr(model.PicleConfig, "prompt", self.prompt)
self.shell = model
self.shells.append(self.shell)
# run model "run" function if it exits
elif hasattr(model, "run"):
# validate command argument values
self._validate_values(command)
# call first command using collected arguments only
if index == 0:
kw = {
**self.shell_defaults,
**command_defaults,
**command_arguments,
}
ret = model.run(**kw)
# pipe results through subsequent commands
else:
kw = {
**command_defaults,
**command_arguments,
}
ret = model.run(ret, **kw)
# run processors from PicleConfig if any for first command only
if index == 0:
if hasattr(model, "PicleConfig") and hasattr(
model.PicleConfig, "processors"
):
for processor in model.PicleConfig.processors:
if callable(processor):
ret = processor(ret)
# extract outputter from PicleConfig
if index == 0:
# check if outputter returned together with results
if isinstance(ret, tuple):
if len(ret) == 2:
ret, outputter = ret
outputter_kwargs = {}
elif len(ret) == 3:
ret, outputter, outputter_kwargs = ret
elif hasattr(model, "PicleConfig") and hasattr(
model.PicleConfig, "outputter"
):
outputter = model.PicleConfig.outputter
outputter_kwargs = getattr(
model.PicleConfig, "outputter_kwargs", {}
)
# run command using Callable or json_schema_extra["function"]
elif command[-1]["fields"]:
# validate command argument values
self._validate_values(command)
# extract last field
last_field_name = command[-1]["fields"][-1]["name"]
last_field = model.model_fields[last_field_name]
json_schema_extra = (
getattr(last_field, "json_schema_extra") or {}
)
# check if last field refers to callable e.g. function
if last_field.annotation is Callable:
method_name = last_field.get_default()
if method_name and hasattr(model, method_name):
# call first command using collected arguments only
if index == 0:
kw = {
**self.shell_defaults,
**command_defaults,
**command_arguments,
}
# check if need to give root model as an argument
if json_schema_extra.get("root_model"):
kw["root_model"] = self.root
# check if need to give PICLE App as an argument
if json_schema_extra.get("picle_app"):
kw["picle_app"] = self
ret = getattr(model, method_name)(**kw)
# pipe results through subsequent commands
else:
kw = {
**command_defaults,
**command_arguments,
}
ret = getattr(model, method_name)(ret, **kw)
else:
self.write(
f"Model '{model.__name__}' has no '{method_name}' "
f"method defined for '{last_field_name}' Callable field"
)
# check if last field has `function` parameter defined
elif json_schema_extra.get("function"):
method_name = json_schema_extra["function"]
if hasattr(model, method_name):
# call first command using collected arguments only
if index == 0:
kw = {
**self.shell_defaults,
**command_defaults,
**command_arguments,
}
# check if need to give root model as an argument
if json_schema_extra.get("root_model"):
kw["root_model"] = self.root
# check if need to give PICLE App as an argument
if json_schema_extra.get("picle_app"):
kw["picle_app"] = self
ret = getattr(model, method_name)(**kw)
# pipe results through subsequent commands
else:
kw = {
**command_defaults,
**command_arguments,
}
ret = getattr(model, method_name)(ret, **kw)
else:
self.write(
f"Model '{model.__name__}' has no '{method_name}' "
f"method defined for '{last_field_name}' function"
)
else:
self.write(
f"Model '{model.__name__}' has no 'run' method defined"
)
# use processors from Field definition if any
if json_schema_extra.get("processors"):
for processor in json_schema_extra["processors"]:
if callable(processor):
ret = processor(ret)
# run processors from PicleConfig if any for first command only
if index == 0:
if hasattr(model, "PicleConfig") and hasattr(
model.PicleConfig, "processors"
):
for processor in model.PicleConfig.processors:
if callable(processor):
ret = processor(ret)
# extract outputter from first command
if index == 0:
# check if outputter returned together with results
if isinstance(ret, tuple):
if len(ret) == 2:
ret, outputter = ret
outputter_kwargs = {}
elif len(ret) == 3:
ret, outputter, outputter_kwargs = ret
# use outputter from Field definition
elif json_schema_extra.get("outputter"):
outputter = json_schema_extra["outputter"]
outputter_kwargs = json_schema_extra.get(
"outputter_kwargs", {}
)
# use PicleConfig outputter
elif hasattr(model, "PicleConfig") and hasattr(
model.PicleConfig, "outputter"
):
outputter = model.PicleConfig.outputter
outputter_kwargs = getattr(
model.PicleConfig, "outputter_kwargs", {}
)
else:
self.defaults_pop(model)
ret = f"Incorrect command, provide more arguments for '{model}' model"
break
# returning True will end the shell - exit
if ret is True:
return True
if ret:
# use specified outputter to output results
if callable(outputter):
outputter(ret, **outputter_kwargs)
# write to stdout by default
else:
self.write(ret)
|
picle.App.do_cls(arg)
Clear shell Screen
Source code in picle\picle.py
764
765
766
767
768
769
770
771
772 | def do_cls(self, arg):
"""Clear shell Screen"""
if "?" in arg:
self.write(" cls Clear shell Screen")
else:
if "LINUX" in platform.system().upper():
os.system("clear")
elif "WINDOWS" in platform.system().upper():
os.system("cls")
|
picle.App.do_end(arg)
Exit application
Source code in picle\picle.py
| def do_end(self, arg):
"""Exit application"""
if "?" in arg:
self.write(" end Exit application")
else:
return True
|
picle.App.do_exit(arg)
Exit current shell
Source code in picle\picle.py
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732 | def do_exit(self, arg):
"""Exit current shell"""
if "?" in arg:
self.write(" exit Exit current shell")
else:
# delete defaults for closing shell
self.defaults_pop(self.shells[-1])
_ = self.shells.pop(-1)
if self.shells:
self.shell = self.shells[-1]
self.prompt = self.shell.PicleConfig.prompt
if len(self.shells) == 1: # check if reached top shell
self.defaults_set(self.shell)
else:
return True
|
picle.App.do_help(arg)
Print help message
Source code in picle\picle.py
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716 | def do_help(self, arg):
"""Print help message"""
command_models = self.parse_command(arg.strip("?"), is_help=True)
help_msg, width = self.print_model_help(
command_models,
verbose=True if arg.strip().endswith("?") else False,
print_help=False,
)
# print help for global top commands
if len(arg.strip().split(" ")) == 1:
lines = {} # dict of {cmd: cmd_help}
for method_name in dir(self):
if method_name.startswith("do_"):
name = method_name.replace("do_", "")
lines[name] = getattr(self, method_name).__doc__
width = max(width, len(name))
# form help lines
if lines:
for k, v in lines.items():
padding = " " * (width - len(k)) + (" " * 4)
help_msg.append(f" {k}{padding}{v}")
# print help message
self.write(self.newline.join(help_msg))
|
picle.App.do_pwd(arg)
Print current shell path
Source code in picle\picle.py
754
755
756
757
758
759
760
761
762 | def do_pwd(self, arg):
"""Print current shell path"""
if "?" in arg:
self.write(" pwd Print current shell path")
else:
path = ["Root"]
for shell in self.shells[1:]:
path.append(shell.__name__)
self.write("->".join(path))
|
picle.App.do_top(arg)
Exit to top shell
Source code in picle\picle.py
734
735
736
737
738
739
740
741
742
743
744
745 | def do_top(self, arg):
"""Exit to top shell"""
if "?" in arg:
self.write(" top Exit to top shell")
else:
self.shell = self.shells[0]
self.prompt = self.shell.PicleConfig.prompt
while self.shells:
_ = self.shells.pop()
self.shells.append(self.shell)
# set shell defaults
self.defaults_set(self.shell)
|
picle.App.emptyline() -> None
Override empty line method to not run last command
Source code in picle\picle.py
| def emptyline(self) -> None:
"""Override empty line method to not run last command"""
return None
|
picle.App.model_mount(model: ModelMetaclass, path: Union[str, list[str]], description: str = None, default=None, **kwargs: dict) -> None
Method to mount pydantic model at provided path in relation to the root model.
Parameters:
-
model
(ModelMetaclass
)
–
-
path
(Union[str, list[str]]
)
–
List of path segments to mount the model.
-
description
(str
, default:
None
)
–
Description of the model.
-
default
–
Default value for the model.
-
kwargs
(dict
, default:
{}
)
–
Additional keyword arguments for the FieldInfo.
Source code in picle\picle.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142 | def model_mount(
self,
model: ModelMetaclass,
path: Union[str, list[str]],
description: str = None,
default=None,
**kwargs: dict,
) -> None:
"""
Method to mount pydantic model at provided path in relation to the root model.
:param model: Pydantic model to mount.
:param path: List of path segments to mount the model.
:param description: Description of the model.
:param default: Default value for the model.
:param kwargs: Additional keyword arguments for the FieldInfo.
"""
if isinstance(path, str):
path = [path.strip()]
parent_model = self.root
while path:
mount_name = path.pop(0)
if mount_name in parent_model.model_fields:
parent_model = parent_model.model_fields[mount_name].annotation
else:
# handle when not all path items before last one are in models tree
if len(path) > 0:
raise KeyError(
f"'{mount_name}' not part of '{parent_model}' model fields, but remaining path still not empty - {path}"
)
parent_model.model_fields[mount_name] = FieldInfo(
annotation=model,
required=False,
description=description,
default=default,
**kwargs,
)
break
|
picle.App.model_remove(path: list[str]) -> None
Method to remove pydantic model at provided path in relation to the root model.
Parameters:
-
path
(list[str]
)
–
List of path segments to remove the model.
Source code in picle\picle.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163 | def model_remove(self, path: list[str]) -> None:
"""
Method to remove pydantic model at provided path in relation to the root model.
:param path: List of path segments to remove the model.
"""
if isinstance(path, str):
path = [path.strip()]
parent_model = self.root
while path:
mount_name = path.pop(0)
if mount_name in parent_model.model_fields:
if len(path) == 0:
parent_model = parent_model.model_fields.pop(mount_name)
else:
parent_model = parent_model.model_fields[mount_name].annotation
else:
raise KeyError(
f"Failed to remove model at path '{mount_name}', parent model: '{parent_model}'"
)
|
picle.App.parse_command(command: str, collect_multiline: bool = False, is_help: bool = False) -> list
Function to parse command string and construct list of model
references and fields values.
Parameters:
-
command
(str
)
–
command string to parse through
-
is_help
(bool
, default:
False
)
–
indicates that parsing help command or tab completion command, if set to True disables presence
argument handling for last field
-
collect_multiline
(bool
, default:
False
)
–
enables multiple input collection for fields
Returns:
-
list
–
returns a list of lists of dictionaries with collected models details each dictionary containing model
, fields
and parameter
keys.
Source code in picle\picle.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492 | def parse_command(
self, command: str, collect_multiline: bool = False, is_help: bool = False
) -> list:
"""
Function to parse command string and construct list of model
references and fields values.
:param command: command string to parse through
:param is_help: indicates that parsing help command or tab completion command,
if set to True disables ``presence`` argument handling for last field
:param collect_multiline: enables multiple input collection for fields
:return: returns a list of lists of dictionaries with collected models details
each dictionary containing ``model``, ``fields`` and ``parameter`` keys.
"""
current_model = {
"model": self.shell,
"fields": [],
"parameter": ...,
"defaults": self.extract_model_defaults(self.shell),
}
current_field = {}
models = [current_model]
parameters = [i for i in command.split(" ") if i.strip()]
ret = [models]
# iterate over command parameters and decide if its a reference
# to a model or model's field value
while parameters:
parameter = parameters.pop(0)
# handle pipe - "|"
if parameter == "|":
# check if current model has pipe defined
if hasattr(current_model["model"], "PicleConfig") and getattr(
current_model["model"].PicleConfig, "pipe", None
):
if current_model["model"].PicleConfig.pipe == "self":
# reference pipe model to current model
current_model = {
"model": current_model["model"],
"fields": [],
"parameter": parameter,
}
else:
# goto pipe model
current_model = {
"model": current_model["model"].PicleConfig.pipe,
"fields": [],
"parameter": parameter,
}
models = [current_model]
ret.append(models)
else:
log.error(
f"'{current_model['model'].__name__}' does not support pipe handling"
)
break
# collect json dictionary string
elif parameter.strip().startswith("{") and current_field:
value_items = [parameter]
# collect further values
while parameters:
parameter = parameters.pop(0)
value_items.append(parameter)
if parameter.strip().endswith("}"):
break
value = " ".join(value_items) # form value string
self._save_collected_value(current_field, value)
# collect json list string
elif parameter.strip().startswith("[") and current_field:
value_items = [parameter]
# collect further values
while parameters:
parameter = parameters.pop(0)
value_items.append(parameter)
if parameter.strip().endswith("]"):
break
value = " ".join(value_items) # form value string
self._save_collected_value(current_field, value)
# collect double quoted field value
elif '"' in parameter and current_field:
value_items = [parameter.replace('"', "")]
# collect further values if first parameter not double quoted value e.g. "nrp1"
if parameter.count('"') != 2:
while parameters:
parameter = parameters.pop(0)
value_items.append(parameter.replace('"', ""))
if '"' in parameter:
break
value = " ".join(value_items) # form value string
self._save_collected_value(current_field, value)
# collect single quoted field value
elif "'" in parameter and current_field:
value_items = [parameter.replace("'", "")]
# collect further values if first parameter not double quoted value e.g. 'nrp1'
if parameter.count("'") != 2:
while parameters:
parameter = parameters.pop(0)
value_items.append(parameter.replace("'", ""))
if "'" in parameter:
break
value = " ".join(value_items) # form value string
self._save_collected_value(current_field, value)
# handle reference to model
elif current_model["model"].model_fields.get(parameter) or any(
parameter == f.alias
for f in current_model["model"].model_fields.values()
):
# source field by name
if current_model["model"].model_fields.get(parameter):
field = current_model["model"].model_fields[parameter]
else:
# source field by alias
for f_name, field in current_model["model"].model_fields.items():
if parameter == field.alias:
parameter = f_name # use actual field name
break
# handle next level model reference
if isinstance(field.annotation, ModelMetaclass):
# check need to record field presence before going to next model
if (
current_field.get("values") is ...
and current_field["field"].json_schema_extra is not None
and "presence" in current_field["field"].json_schema_extra
):
value = current_field["field"].json_schema_extra["presence"]
self._save_collected_value(current_field, value)
# goto next model
current_model = {
"model": field.annotation,
"fields": [],
"parameter": parameter,
}
models.append(current_model)
current_field = {} # empty current field
# extract first command default values from current model
if len(ret) == 1:
current_model["defaults"] = self.extract_model_defaults(
field.annotation
)
# handle actual field reference
elif isinstance(field, FieldInfo):
# check need to record field presence before going to next field
if (
current_field.get("values") is ...
and current_field["field"].json_schema_extra is not None
and "presence" in current_field["field"].json_schema_extra
):
value = current_field["field"].json_schema_extra["presence"]
self._save_collected_value(current_field, value)
# goto next field
current_field = {"name": parameter, "values": ..., "field": field}
# find and replace default value if present
for index, field in enumerate(current_model["fields"]):
if field["name"] == current_field["name"]:
current_model["fields"][index] = current_field
break
else:
current_model["fields"].append(current_field)
else:
raise TypeError(
f"Unsupported pydantic field type: '{type(field.annotation)}', "
f"parameter: '{parameter}', command: '{command}', current model: "
f"'{current_model['model']}'"
)
# check if parameter value partially matches any of the model fields
elif any(
field_name.startswith(parameter)
for field_name in current_model["model"].model_fields
):
raise FieldLooseMatchOnly(current_model, parameter)
# check if parameter value partially matches any of the model fields' aliases
elif any(
field.alias.startswith(parameter)
for field in current_model["model"].model_fields.values()
if field.alias is not None
):
raise FieldLooseMatchOnly(current_model, parameter)
# parameter is a value, save it to current model
elif current_field:
self._save_collected_value(current_field, parameter)
else:
raise FieldKeyError(current_model, parameter)
# check presence for last parameter is not is_help
if (
is_help is False
and current_field.get("values") is ...
and current_field["field"].json_schema_extra is not None
and "presence" in current_field["field"].json_schema_extra
):
value = current_field["field"].json_schema_extra["presence"]
self._save_collected_value(current_field, value)
# iterate over collected models and fields to see
# if need to collect multi-line input
if collect_multiline:
for command_models in ret:
for model in command_models:
for field in model["fields"]:
self._collect_multiline(field)
return ret
|
picle.App.print_model_help(models: list, verbose: bool = False, match: str = None, print_help: bool = True) -> None
Function to form and print help message for model fields.
Parameters:
-
match
(str
, default:
None
)
–
only collect help for fields that start with match
string
-
print_help
(bool
, default:
True
)
–
if true prints help, return tuple of help lines list and width of longest line
Source code in picle\picle.py
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594 | def print_model_help(
self,
models: list,
verbose: bool = False,
match: str = None,
print_help: bool = True,
) -> None:
"""
Function to form and print help message for model fields.
:param match: only collect help for fields that start with ``match`` string
:param print_help: if true prints help, return tuple of help lines
list and width of longest line
"""
model = models[-1][-1] # get last model
last_field = model["fields"][-1] if model["fields"] else None
fparam = self._get_field_params(last_field)
lines = {} # dict of {cmd: cmd_help}
width = 0 # record longest command width for padding
# print help message only for last collected field
if last_field and last_field["values"] == ...:
field = model["model"].model_fields[last_field["name"]]
json_schema_extra = getattr(field, "json_schema_extra") or {}
name = f"<'{last_field['name']}' value>"
# check if field is callable
if field.annotation is Callable:
name = "<ENTER>"
lines[name] = "Execute command"
width = max(width, len(name))
# check if field referencing function
elif json_schema_extra.get("function"):
lines[name] = f"{field.description}"
name = "<ENTER>"
lines[name] = "Execute command"
width = max(width, len(name))
# add options for enumerations
elif isinstance(field.annotation, enum.EnumMeta):
options = [i.value for i in field.annotation]
lines[name] = ", ".join(options)
# check if model has method to source field choices
elif hasattr(model["model"], f"source_{last_field['name']}"):
options = getattr(model["model"], f"source_{last_field['name']}")()
lines[name] = ", ".join(options)
else:
lines[name] = f"{field.description}"
# check if field supports multiline input
if fparam.get("multiline") is True:
lines["input"] = "Collect value using multi line input mode"
if verbose:
lines[name] += (
f"; default '{field.get_default()}', type '{str(field.annotation)}', "
f"is required - {field.is_required()}"
)
# collect help message for all fields of this model
else:
# check if model supports subshell
if (
hasattr(model["model"], "PicleConfig")
and getattr(model["model"].PicleConfig, "subshell", None) is True
# exclude <ENTER> if already in model's shell
and not self.shells[-1] == model["model"]
):
name = "<ENTER>"
lines[name] = "Enter command subshell"
width = max(width, len(name))
# iterate over model fields
for name, field in model["model"].model_fields.items():
# skip fields that already have values
if any(f["name"] == name for f in model["fields"]):
continue
# check if field has alias
if field.alias:
name = field.alias
# filter fields
if match and not name.startswith(match):
continue
lines[name] = f"{field.description}"
if verbose:
lines[name] += (
f"; default '{field.get_default()}', type '{str(field.annotation)}', "
f"is required - {field.is_required()}"
)
width = max(width, len(name))
# check if model has pipe defined
if hasattr(model["model"], "PicleConfig") and getattr(
model["model"].PicleConfig, "pipe", None
):
name = "|"
lines[name] = "Execute pipe command"
width = max(width, len(name))
width = max(width, len(name))
# form help lines
help_msg = []
for k in sorted(lines.keys()):
padding = " " * (width - len(k)) + (" " * 4)
help_msg.append(f" {k}{padding}{lines[k]}")
if print_help: # print help message
self.write(self.newline.join(help_msg))
else:
return help_msg, width
|
picle.App.write(text: str) -> None
Method to write output to stdout
Parameters:
Source code in picle\picle.py
92
93
94
95
96
97
98
99
100
101
102
103 | def write(self, text: str) -> None:
"""
Method to write output to stdout
:param text: text output
"""
if not isinstance(text, str):
text = str(text)
if not text.endswith(self.newline):
self.stdout.write(text + self.newline)
else:
self.stdout.write(text)
|
PICLE Build In Models
picle.models.Filters
Bases: BaseModel
picle.models.Filters.filter_exclude(data: Any, exclude: Any = None) -> str
staticmethod
Filter data line by line using provided pattern. Returns
only lines that does not contains requested exclude
pattern.
Parameters:
-
data
(Any
)
–
-
exclude
(Any
, default:
None
)
–
Source code in picle\models.py
50
51
52
53
54
55
56
57
58
59
60
61
62 | @staticmethod
def filter_exclude(data: Any, exclude: Any = None) -> str:
"""
Filter data line by line using provided pattern. Returns
only lines that does not contains requested ``exclude`` pattern.
:param data: data to filter
:param exclude: pattern to filter data
"""
exclude = str(exclude)
return "\n".join(
[line for line in str(data).splitlines() if exclude not in line]
)
|
picle.models.Filters.filter_include(data: Any, include: Any = None) -> str
staticmethod
Filter data line by line using provided pattern. Returns
only lines that contains requested include
pattern.
Parameters:
-
data
(Any
)
–
-
include
(Any
, default:
None
)
–
Source code in picle\models.py
38
39
40
41
42
43
44
45
46
47
48 | @staticmethod
def filter_include(data: Any, include: Any = None) -> str:
"""
Filter data line by line using provided pattern. Returns
only lines that contains requested ``include`` pattern.
:param data: data to filter
:param include: pattern to filter data
"""
include = str(include)
return "\n".join([line for line in str(data).splitlines() if include in line])
|
Bases: BaseModel
Function to transform results into JSON string
Parameters:
Source code in picle\models.py
97
98
99
100
101
102
103
104 | @staticmethod
def formatter_json(data: Any) -> str:
"""
Function to transform results into JSON string
:param data: any data to convert
"""
return json.dumps(data, indent=4, sort_keys=True)
|
Function to format dictionary result as a key: value output
Parameters:
Source code in picle\models.py
118
119
120
121
122
123
124
125 | @staticmethod
def formatter_kv(data: dict) -> str:
"""
Function to format dictionary result as a key: value output
:param data: dictionary to format
"""
return "\n".join([f" {k}: {v}" for k, v in data.items()])
|
Function to pretty print results using python pprint
module
Parameters:
Source code in picle\models.py
| @staticmethod
def formatter_pprint(data: Any) -> str:
"""
Function to pretty print results using python ``pprint`` module
:param data: any data to pretty print
"""
return pprint.pformat(data, indent=4)
|
Function to transform results into YAML string
Parameters:
Source code in picle\models.py
106
107
108
109
110
111
112
113
114
115
116 | @staticmethod
def formatter_yaml(data: Any) -> str:
"""
Function to transform results into YAML string
:param data: any data to convert
"""
if HAS_YAML:
return yaml_dump(data, default_flow_style=False)
else:
return data
|
picle.models.Outputters
Bases: BaseModel
picle.models.Outputters.outputter_rich_json(data: Union[dict, list]) -> None
staticmethod
Function to pretty print JSON string using Rich library
Parameters:
-
data
(Union[dict, list]
)
–
Source code in picle\models.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165 | @staticmethod
def outputter_rich_json(data: Union[dict, list]) -> None:
"""
Function to pretty print JSON string using Rich library
:param data: any data to print
"""
if isinstance(data, bytes):
data = data.decode("utf-8")
if not isinstance(data, str):
data = json.dumps(data)
# data should be a json string
try:
if HAS_RICH:
RICHCONSOLE.print_json(data, sort_keys=True, indent=4)
else:
print(data)
except Exception as e:
print(f"ERROR: Data is not a valid JSON string:\n{data}\n\nError: '{e}'")
|
picle.models.Outputters.outputter_rich_print(data: Any) -> None
staticmethod
Function to pretty print output using Rich library
Parameters:
Source code in picle\models.py
167
168
169
170
171
172
173
174
175
176
177 | @staticmethod
def outputter_rich_print(data: Any) -> None:
"""
Function to pretty print output using Rich library
:param data: any data to print
"""
if HAS_RICH:
RICHCONSOLE.print(data)
else:
print(data)
|
picle.models.Outputters.outputter_rich_table(data: list[dict], headers: list = None, title: str = None, sortby: str = None)
staticmethod
Function to pretty print output in table format using Rich library
Parameters:
-
data
(list[dict]
)
–
list of dictionaries to print
Source code in picle\models.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215 | @staticmethod
def outputter_rich_table(
data: list[dict], headers: list = None, title: str = None, sortby: str = None
):
"""
Function to pretty print output in table format using Rich library
:param data: list of dictionaries to print
"""
if not HAS_RICH or not isinstance(data, list):
print(data)
return
headers = headers or list(data[0].keys())
table = RICHTABLE(title=title, box=False)
# add table columns
for h in headers:
table.add_column(h, justify="left", no_wrap=True)
# sort the table
if sortby:
# form dictionary keyed by sortby value and index
items_to_sortby = {i[sortby]: index for index, i in enumerate(data)}
# form a list of sorted sortby values
sorted_keys = sorted(items_to_sortby.keys())
# for sorted data list
sorted_data = [data[items_to_sortby[key]] for key in sorted_keys]
else:
sorted_data = data
# add table rows
for item in sorted_data:
cells = [item.get(h, "") for h in headers]
table.add_row(*cells)
RICHCONSOLE.print(table)
|
picle.models.PipeFunctionsModel
Bases: Filters
, Formatters
, Outputters
Collection of common pipe functions to use in PICLE shell models
picle.models.MAN
Bases: BaseModel
Model with manual/documentation related functions
picle.models.MAN.print_model_tree(root_model, **kwargs) -> None
staticmethod
Method to print model tree for shell model specified by dot separated path e.g. model.shell.command
Parameters:
-
root_model
–
PICLE App root model to print tree for
Source code in picle\models.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277 | @staticmethod
def print_model_tree(root_model, **kwargs) -> None:
"""
Method to print model tree for shell model specified by dot separated path e.g. model.shell.command
:param root_model: PICLE App root model to print tree for
"""
path = kwargs["tree"].split(".") if kwargs.get("tree") else []
rich_tree = RICHTREE("[bold]root[/bold]")
RICHCONSOLE.print(
MAN._construct_model_tree(
model=root_model.model_construct(), tree=rich_tree, path=path
)
)
|