| import torch, comfy
|
| from nodes import MAX_RESOLUTION
|
|
|
|
|
|
|
|
|
|
|
|
|
| PROMPT_TEMPLATE_ENCODE_VIDEO_I2V = (
|
| "<|start_header_id|>system<|end_header_id|>\n\n<image>\nDescribe the video by detailing the following aspects according to the reference image: "
|
| "1. The main content and theme of the video."
|
| "2. The color, shape, size, texture, quantity, text, and spatial relationships of the objects."
|
| "3. Actions, events, behaviors temporal relationships, physical movement changes of the objects."
|
| "4. background environment, light, style and atmosphere."
|
| "5. camera angles, movements, and transitions used in the video:<|eot_id|>\n\n"
|
| "<|start_header_id|>user<|end_header_id|>\n\n{}<|eot_id|>"
|
| "<|start_header_id|>assistant<|end_header_id|>\n\n"
|
| )
|
|
|
| PROMPT_TEMPLATE_QWEN_IMAGE_EDIT_PLUS = "<|im_start|>system\nDescribe the key features of the input image (color, shape, size, texture, objects, background), then explain how the user's text instruction should alter or modify the image. Generate a new image that meets the user's requirements while maintaining consistency with the original input where appropriate.<|im_end|>\n<|im_start|>user\n{}<|im_end|>\n<|im_start|>assistant\n"
|
|
|
| class SwarmClipTextEncodeAdvanced:
|
| @classmethod
|
| def INPUT_TYPES(s):
|
| return {
|
| "required": {
|
| "clip": ("CLIP", ),
|
| "steps": ("INT", {"default": 20, "min": 1, "max": 10000, "tooltip": "How many sampling steps will be ran - this is needed for per-step features (from-to/alternate/...) to work properly."}),
|
| "prompt": ("STRING", {"multiline": True, "dynamicPrompts": True, "tooltip": "Your actual prompt text."} ),
|
| "width": ("INT", {"default": 1024.0, "min": 0, "max": MAX_RESOLUTION, "tooltip": "Intended width of the image, used by some models (eg SDXL)."}),
|
| "height": ("INT", {"default": 1024.0, "min": 0, "max": MAX_RESOLUTION, "tooltip": "Intended height of the image, used by some models (eg SDXL)."}),
|
| "target_width": ("INT", {"default": 1024.0, "min": 0, "max": MAX_RESOLUTION, "tooltip": "Actual width of the image, used by some models (eg SDXL)."}),
|
| "target_height": ("INT", {"default": 1024.0, "min": 0, "max": MAX_RESOLUTION, "tooltip": "Actual height of the image, used by some models (eg SDXL)."}),
|
| },
|
| "optional": {
|
| "guidance": ("FLOAT", {"default": -1, "min": -1, "max": 100.0, "step": 0.1, "tooltip": "Guidance value to embed, used by some models (eg Flux)."}),
|
| "llama_template": ("STRING", {"default": "", "multiline": True, "tooltip": "Template for the LLaMA model, if applicable."}),
|
| "clip_vision_output": ("CLIP_VISION_OUTPUT", {"default": None, "tooltip": "Optional CLIP Vision Output to use for the LLaMA model, if applicable."}),
|
| "images": ("IMAGE", {"default": None, "tooltip": "Optional images to use for a text-vision model, if applicable."}),
|
| }
|
| }
|
|
|
| CATEGORY = "SwarmUI/clip"
|
| RETURN_TYPES = ("CONDITIONING",)
|
| FUNCTION = "encode"
|
| DESCRIPTION = "Acts like the regular CLIPTextEncode, but supports more advanced special features like '<break>', '[from:to:when]', '[alter|nate]', ..."
|
|
|
| def encode(self, clip, steps: int, prompt: str, width: int, height: int, target_width: int, target_height: int, guidance: float = -1, llama_template = None, clip_vision_output = None, images = None):
|
| image_prompt = ""
|
| if llama_template == "hunyuan_image":
|
| llama_template = PROMPT_TEMPLATE_ENCODE_VIDEO_I2V
|
| elif llama_template == "qwen_image_edit_plus":
|
| llama_template = PROMPT_TEMPLATE_QWEN_IMAGE_EDIT_PLUS
|
| if images is not None:
|
| if len(images.shape) == 3:
|
| images = [images]
|
| else:
|
| images = [i.unsqueeze(0) for i in images]
|
| for i, image in enumerate(images):
|
| image_prompt += f"Picture {i + 1}: <|vision_start|><|image_pad|><|vision_end|>"
|
|
|
| def tokenize(text: str):
|
| if clip_vision_output is not None:
|
| return clip.tokenize(text, llama_template=llama_template, image_embeds=clip_vision_output.mm_projected)
|
| elif images is not None:
|
| return clip.tokenize(image_prompt + text, llama_template=llama_template, images=images)
|
| else:
|
| return clip.tokenize(text)
|
|
|
| encoding_cache = {}
|
|
|
| def text_to_cond(text: str, start_percent: float, end_percent: float):
|
| text = text.replace("\0\1", "[").replace("\0\2", "]").replace("\0\3", "embedding:")
|
| if text in encoding_cache:
|
| cond_arr = encoding_cache[text]
|
| else:
|
| cond_chunks = text.split("<break>")
|
| tokens = tokenize(cond_chunks[0])
|
| cond_arr = clip.encode_from_tokens_scheduled(tokens)
|
| if len(cond_chunks) > 1:
|
| for chunk in cond_chunks[1:]:
|
| tokens = tokenize(chunk)
|
| cond_arr_chunk = clip.encode_from_tokens_scheduled(tokens)
|
| catted_cond = torch.cat([cond_arr[0][0], cond_arr_chunk[0][0]], dim=1)
|
| cond_arr[0] = [catted_cond, cond_arr[0][1]]
|
| encoding_cache[text] = cond_arr
|
| result = {"pooled_output": cond_arr[0][1]["pooled_output"], "width": width, "height": height, "crop_w": 0, "crop_h": 0, "target_width": target_width, "target_height": target_height, "start_percent": start_percent, "end_percent": end_percent}
|
| if guidance >= 0:
|
| result["guidance"] = guidance
|
| out_cond_arr = [[cond_arr[0][0], result]]
|
| out_cond_arr.extend(cond_arr[1:])
|
| return out_cond_arr
|
|
|
| prompt = prompt.replace("\\[", "\0\1").replace("\\]", "\0\2").replace("embedding:", "\0\3")
|
|
|
| chunks = []
|
| any = [False]
|
| escapable = ["\\", "[", "]", ":", "|", "(", ")", "<", ">"]
|
|
|
| def append_chunk(text: str, applies_to: list[int], can_subprocess: bool, limit_to: list[int]):
|
| applies_to = [i for i in applies_to if i in limit_to]
|
| fixed_text = ""
|
| do_skip = False
|
| for i in range(len(text)):
|
| if text[i] == "\\" and not do_skip and i + 1 < len(text) and text[i + 1] in escapable:
|
| do_skip = True
|
| else:
|
| do_skip = False
|
| fixed_text += text[i]
|
| if can_subprocess and '[' in fixed_text:
|
| get_chunks(fixed_text, applies_to)
|
| else:
|
| chunks.append({'text': text, 'applies_to': applies_to})
|
|
|
| def get_chunks(remaining: str, limit_to: list[int] = [i for i in range(steps)]):
|
| while True:
|
| start = remaining.find("[")
|
| if start == -1:
|
| append_chunk(remaining, [i for i in range(steps)], False, limit_to)
|
| break
|
|
|
| end = -1
|
| count = 0
|
| do_skip = False
|
| colon_indices = []
|
| pipe_indices = []
|
| for i in range(start + 1, len(remaining)):
|
| char = remaining[i]
|
| if char == "\\" and not do_skip and i + 1 < len(remaining) and remaining[i + 1] in escapable:
|
| do_skip = True
|
| elif do_skip:
|
| do_skip = False
|
| elif char == "[":
|
| count += 1
|
| elif char == "]":
|
| if count == 0:
|
| end = i
|
| break
|
| count -= 1
|
| elif char == ":" and count == 0 and len(pipe_indices) == 0:
|
| colon_indices.append(i)
|
| elif char == "|" and count == 0 and len(colon_indices) == 0:
|
| pipe_indices.append(i)
|
|
|
| if end == -1:
|
| chunks[-1].text += remaining
|
| break
|
| append_chunk(remaining[:start], [i for i in range(steps)], False, limit_to)
|
| control = remaining[start + 1:end]
|
|
|
| if len(pipe_indices) > 0:
|
| data = split_text_on(control, pipe_indices, start + 1)
|
| for i in range(len(data)):
|
| append_chunk(data[i], [step for step in range(steps) if step % len(data) == i], True, limit_to)
|
| any[0] = True
|
| elif len(colon_indices) == 2:
|
| coloned = split_text_on(control, colon_indices, start + 1)
|
| when = float(coloned[2])
|
| if when < 1:
|
| when = when * steps
|
| append_chunk(coloned[0], [i for i in range(steps) if i < when], True, limit_to)
|
| append_chunk(coloned[1], [i for i in range(steps) if i >= when], True, limit_to)
|
| any[0] = True
|
| elif len(colon_indices) == 1:
|
| coloned = split_text_on(control, colon_indices, start + 1)
|
| when = float(coloned[1])
|
| if when < 1:
|
| when = when * steps
|
| append_chunk(coloned[0], [i for i in range(steps) if i >= when], True, limit_to)
|
| any[0] = True
|
| else:
|
| append_chunk(control, [i for i in range(steps)], False, limit_to)
|
|
|
| remaining = remaining[end + 1:]
|
|
|
| get_chunks(prompt)
|
|
|
| if not any[0]:
|
| return (text_to_cond(prompt, 0, 1), )
|
|
|
| conds_out = []
|
| last_text = ""
|
| start_perc = 0
|
| for i in range(steps):
|
| perc = i / steps
|
| text = ""
|
| for chunk in chunks:
|
| if i in chunk['applies_to']:
|
| text += chunk['text']
|
| if text != last_text or i == 0:
|
| if i != 0:
|
| conds_out.extend(text_to_cond(last_text, start_perc - 0.001, perc + 0.001))
|
| last_text = text
|
| start_perc = perc
|
| conds_out.extend(text_to_cond(last_text, start_perc - 0.001, 1))
|
| return (conds_out, )
|
|
|
|
|
| def split_text_on(text: str, indices: list[str], offset: int) -> list[str]:
|
| indices = [i - offset for i in indices]
|
| result = []
|
| result.append(text[:indices[0]])
|
| for i in range(len(indices) - 1):
|
| result.append(text[indices[i] + 1:indices[i + 1]])
|
| result.append(text[indices[-1] + 1:])
|
| return result
|
|
|
|
|
| NODE_CLASS_MAPPINGS = {
|
| "SwarmClipTextEncodeAdvanced": SwarmClipTextEncodeAdvanced,
|
| }
|
|
|