| | from comfy_execution.graph_utils import GraphBuilder
|
| | from .tools import VariantSupport
|
| |
|
| | @VariantSupport()
|
| | class TestAccumulateNode:
|
| | def __init__(self):
|
| | pass
|
| |
|
| | @classmethod
|
| | def INPUT_TYPES(cls):
|
| | return {
|
| | "required": {
|
| | "to_add": ("*",),
|
| | },
|
| | "optional": {
|
| | "accumulation": ("ACCUMULATION",),
|
| | },
|
| | }
|
| |
|
| | RETURN_TYPES = ("ACCUMULATION",)
|
| | FUNCTION = "accumulate"
|
| |
|
| | CATEGORY = "Testing/Lists"
|
| |
|
| | def accumulate(self, to_add, accumulation = None):
|
| | if accumulation is None:
|
| | value = [to_add]
|
| | else:
|
| | value = accumulation["accum"] + [to_add]
|
| | return ({"accum": value},)
|
| |
|
| | @VariantSupport()
|
| | class TestAccumulationHeadNode:
|
| | def __init__(self):
|
| | pass
|
| |
|
| | @classmethod
|
| | def INPUT_TYPES(cls):
|
| | return {
|
| | "required": {
|
| | "accumulation": ("ACCUMULATION",),
|
| | },
|
| | }
|
| |
|
| | RETURN_TYPES = ("ACCUMULATION", "*",)
|
| | FUNCTION = "accumulation_head"
|
| |
|
| | CATEGORY = "Testing/Lists"
|
| |
|
| | def accumulation_head(self, accumulation):
|
| | accum = accumulation["accum"]
|
| | if len(accum) == 0:
|
| | return (accumulation, None)
|
| | else:
|
| | return ({"accum": accum[1:]}, accum[0])
|
| |
|
| | class TestAccumulationTailNode:
|
| | def __init__(self):
|
| | pass
|
| |
|
| | @classmethod
|
| | def INPUT_TYPES(cls):
|
| | return {
|
| | "required": {
|
| | "accumulation": ("ACCUMULATION",),
|
| | },
|
| | }
|
| |
|
| | RETURN_TYPES = ("ACCUMULATION", "*",)
|
| | FUNCTION = "accumulation_tail"
|
| |
|
| | CATEGORY = "Testing/Lists"
|
| |
|
| | def accumulation_tail(self, accumulation):
|
| | accum = accumulation["accum"]
|
| | if len(accum) == 0:
|
| | return (None, accumulation)
|
| | else:
|
| | return ({"accum": accum[:-1]}, accum[-1])
|
| |
|
| | @VariantSupport()
|
| | class TestAccumulationToListNode:
|
| | def __init__(self):
|
| | pass
|
| |
|
| | @classmethod
|
| | def INPUT_TYPES(cls):
|
| | return {
|
| | "required": {
|
| | "accumulation": ("ACCUMULATION",),
|
| | },
|
| | }
|
| |
|
| | RETURN_TYPES = ("*",)
|
| | OUTPUT_IS_LIST = (True,)
|
| |
|
| | FUNCTION = "accumulation_to_list"
|
| |
|
| | CATEGORY = "Testing/Lists"
|
| |
|
| | def accumulation_to_list(self, accumulation):
|
| | return (accumulation["accum"],)
|
| |
|
| | @VariantSupport()
|
| | class TestListToAccumulationNode:
|
| | def __init__(self):
|
| | pass
|
| |
|
| | @classmethod
|
| | def INPUT_TYPES(cls):
|
| | return {
|
| | "required": {
|
| | "list": ("*",),
|
| | },
|
| | }
|
| |
|
| | RETURN_TYPES = ("ACCUMULATION",)
|
| | INPUT_IS_LIST = (True,)
|
| |
|
| | FUNCTION = "list_to_accumulation"
|
| |
|
| | CATEGORY = "Testing/Lists"
|
| |
|
| | def list_to_accumulation(self, list):
|
| | return ({"accum": list},)
|
| |
|
| | @VariantSupport()
|
| | class TestAccumulationGetLengthNode:
|
| | def __init__(self):
|
| | pass
|
| |
|
| | @classmethod
|
| | def INPUT_TYPES(cls):
|
| | return {
|
| | "required": {
|
| | "accumulation": ("ACCUMULATION",),
|
| | },
|
| | }
|
| |
|
| | RETURN_TYPES = ("INT",)
|
| |
|
| | FUNCTION = "accumlength"
|
| |
|
| | CATEGORY = "Testing/Lists"
|
| |
|
| | def accumlength(self, accumulation):
|
| | return (len(accumulation['accum']),)
|
| |
|
| | @VariantSupport()
|
| | class TestAccumulationGetItemNode:
|
| | def __init__(self):
|
| | pass
|
| |
|
| | @classmethod
|
| | def INPUT_TYPES(cls):
|
| | return {
|
| | "required": {
|
| | "accumulation": ("ACCUMULATION",),
|
| | "index": ("INT", {"default":0, "step":1})
|
| | },
|
| | }
|
| |
|
| | RETURN_TYPES = ("*",)
|
| |
|
| | FUNCTION = "get_item"
|
| |
|
| | CATEGORY = "Testing/Lists"
|
| |
|
| | def get_item(self, accumulation, index):
|
| | return (accumulation['accum'][index],)
|
| |
|
| | @VariantSupport()
|
| | class TestAccumulationSetItemNode:
|
| | def __init__(self):
|
| | pass
|
| |
|
| | @classmethod
|
| | def INPUT_TYPES(cls):
|
| | return {
|
| | "required": {
|
| | "accumulation": ("ACCUMULATION",),
|
| | "index": ("INT", {"default":0, "step":1}),
|
| | "value": ("*",),
|
| | },
|
| | }
|
| |
|
| | RETURN_TYPES = ("ACCUMULATION",)
|
| |
|
| | FUNCTION = "set_item"
|
| |
|
| | CATEGORY = "Testing/Lists"
|
| |
|
| | def set_item(self, accumulation, index, value):
|
| | new_accum = accumulation['accum'][:]
|
| | new_accum[index] = value
|
| | return ({"accum": new_accum},)
|
| |
|
| | class TestIntMathOperation:
|
| | def __init__(self):
|
| | pass
|
| |
|
| | @classmethod
|
| | def INPUT_TYPES(cls):
|
| | return {
|
| | "required": {
|
| | "a": ("INT", {"default": 0, "min": -0xffffffffffffffff, "max": 0xffffffffffffffff, "step": 1}),
|
| | "b": ("INT", {"default": 0, "min": -0xffffffffffffffff, "max": 0xffffffffffffffff, "step": 1}),
|
| | "operation": (["add", "subtract", "multiply", "divide", "modulo", "power"],),
|
| | },
|
| | }
|
| |
|
| | RETURN_TYPES = ("INT",)
|
| | FUNCTION = "int_math_operation"
|
| |
|
| | CATEGORY = "Testing/Logic"
|
| |
|
| | def int_math_operation(self, a, b, operation):
|
| | if operation == "add":
|
| | return (a + b,)
|
| | elif operation == "subtract":
|
| | return (a - b,)
|
| | elif operation == "multiply":
|
| | return (a * b,)
|
| | elif operation == "divide":
|
| | return (a // b,)
|
| | elif operation == "modulo":
|
| | return (a % b,)
|
| | elif operation == "power":
|
| | return (a ** b,)
|
| |
|
| |
|
| | from .flow_control import NUM_FLOW_SOCKETS
|
| | @VariantSupport()
|
| | class TestForLoopOpen:
|
| | def __init__(self):
|
| | pass
|
| |
|
| | @classmethod
|
| | def INPUT_TYPES(cls):
|
| | return {
|
| | "required": {
|
| | "remaining": ("INT", {"default": 1, "min": 0, "max": 100000, "step": 1}),
|
| | },
|
| | "optional": {
|
| | f"initial_value{i}": ("*",) for i in range(1, NUM_FLOW_SOCKETS)
|
| | },
|
| | "hidden": {
|
| | "initial_value0": ("*",)
|
| | }
|
| | }
|
| |
|
| | RETURN_TYPES = tuple(["FLOW_CONTROL", "INT",] + ["*"] * (NUM_FLOW_SOCKETS-1))
|
| | RETURN_NAMES = tuple(["flow_control", "remaining"] + [f"value{i}" for i in range(1, NUM_FLOW_SOCKETS)])
|
| | FUNCTION = "for_loop_open"
|
| |
|
| | CATEGORY = "Testing/Flow"
|
| |
|
| | def for_loop_open(self, remaining, **kwargs):
|
| | graph = GraphBuilder()
|
| | if "initial_value0" in kwargs:
|
| | remaining = kwargs["initial_value0"]
|
| | graph.node("TestWhileLoopOpen", condition=remaining, initial_value0=remaining, **{(f"initial_value{i}"): kwargs.get(f"initial_value{i}", None) for i in range(1, NUM_FLOW_SOCKETS)})
|
| | outputs = [kwargs.get(f"initial_value{i}", None) for i in range(1, NUM_FLOW_SOCKETS)]
|
| | return {
|
| | "result": tuple(["stub", remaining] + outputs),
|
| | "expand": graph.finalize(),
|
| | }
|
| |
|
| | @VariantSupport()
|
| | class TestForLoopClose:
|
| | def __init__(self):
|
| | pass
|
| |
|
| | @classmethod
|
| | def INPUT_TYPES(cls):
|
| | return {
|
| | "required": {
|
| | "flow_control": ("FLOW_CONTROL", {"rawLink": True}),
|
| | },
|
| | "optional": {
|
| | f"initial_value{i}": ("*",{"rawLink": True}) for i in range(1, NUM_FLOW_SOCKETS)
|
| | },
|
| | }
|
| |
|
| | RETURN_TYPES = tuple(["*"] * (NUM_FLOW_SOCKETS-1))
|
| | RETURN_NAMES = tuple([f"value{i}" for i in range(1, NUM_FLOW_SOCKETS)])
|
| | FUNCTION = "for_loop_close"
|
| |
|
| | CATEGORY = "Testing/Flow"
|
| |
|
| | def for_loop_close(self, flow_control, **kwargs):
|
| | graph = GraphBuilder()
|
| | while_open = flow_control[0]
|
| | sub = graph.node("TestIntMathOperation", operation="subtract", a=[while_open,1], b=1)
|
| | cond = graph.node("TestToBoolNode", value=sub.out(0))
|
| | input_values = {f"initial_value{i}": kwargs.get(f"initial_value{i}", None) for i in range(1, NUM_FLOW_SOCKETS)}
|
| | while_close = graph.node("TestWhileLoopClose",
|
| | flow_control=flow_control,
|
| | condition=cond.out(0),
|
| | initial_value0=sub.out(0),
|
| | **input_values)
|
| | return {
|
| | "result": tuple([while_close.out(i) for i in range(1, NUM_FLOW_SOCKETS)]),
|
| | "expand": graph.finalize(),
|
| | }
|
| |
|
| | NUM_LIST_SOCKETS = 10
|
| | @VariantSupport()
|
| | class TestMakeListNode:
|
| | def __init__(self):
|
| | pass
|
| |
|
| | @classmethod
|
| | def INPUT_TYPES(cls):
|
| | return {
|
| | "required": {
|
| | "value1": ("*",),
|
| | },
|
| | "optional": {
|
| | f"value{i}": ("*",) for i in range(1, NUM_LIST_SOCKETS)
|
| | },
|
| | }
|
| |
|
| | RETURN_TYPES = ("*",)
|
| | FUNCTION = "make_list"
|
| | OUTPUT_IS_LIST = (True,)
|
| |
|
| | CATEGORY = "Testing/Lists"
|
| |
|
| | def make_list(self, **kwargs):
|
| | result = []
|
| | for i in range(NUM_LIST_SOCKETS):
|
| | if f"value{i}" in kwargs:
|
| | result.append(kwargs[f"value{i}"])
|
| | return (result,)
|
| |
|
| | UTILITY_NODE_CLASS_MAPPINGS = {
|
| | "TestAccumulateNode": TestAccumulateNode,
|
| | "TestAccumulationHeadNode": TestAccumulationHeadNode,
|
| | "TestAccumulationTailNode": TestAccumulationTailNode,
|
| | "TestAccumulationToListNode": TestAccumulationToListNode,
|
| | "TestListToAccumulationNode": TestListToAccumulationNode,
|
| | "TestAccumulationGetLengthNode": TestAccumulationGetLengthNode,
|
| | "TestAccumulationGetItemNode": TestAccumulationGetItemNode,
|
| | "TestAccumulationSetItemNode": TestAccumulationSetItemNode,
|
| | "TestForLoopOpen": TestForLoopOpen,
|
| | "TestForLoopClose": TestForLoopClose,
|
| | "TestIntMathOperation": TestIntMathOperation,
|
| | "TestMakeListNode": TestMakeListNode,
|
| | }
|
| | UTILITY_NODE_DISPLAY_NAME_MAPPINGS = {
|
| | "TestAccumulateNode": "Accumulate",
|
| | "TestAccumulationHeadNode": "Accumulation Head",
|
| | "TestAccumulationTailNode": "Accumulation Tail",
|
| | "TestAccumulationToListNode": "Accumulation to List",
|
| | "TestListToAccumulationNode": "List to Accumulation",
|
| | "TestAccumulationGetLengthNode": "Accumulation Get Length",
|
| | "TestAccumulationGetItemNode": "Accumulation Get Item",
|
| | "TestAccumulationSetItemNode": "Accumulation Set Item",
|
| | "TestForLoopOpen": "For Loop Open",
|
| | "TestForLoopClose": "For Loop Close",
|
| | "TestIntMathOperation": "Int Math Operation",
|
| | "TestMakeListNode": "Make List",
|
| | }
|
| |
|