class KlingTextToVideoNode(KlingNodeBase):
"""Kling Text to Video Node"""
@staticmethod
def poll_for_task_status(task_id: str, auth_token: str) -> KlingText2VideoResponse:
"""Polls the Kling API endpoint until the task reaches a terminal state."""
polling_operation = PollingOperation(
poll_endpoint=ApiEndpoint(
path=f"{PATH_TEXT_TO_VIDEO}/{task_id}",
method=HttpMethod.GET,
request_model=EmptyRequest,
response_model=KlingText2VideoResponse,
),
completed_statuses=[
TaskStatus.succeed.value,
],
failed_statuses=[TaskStatus.failed.value],
status_extractor=lambda response: (
response.data.task_status.value
if response.data and response.data.task_status
else None
),
auth_token=auth_token,
)
return polling_operation.execute()
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"prompt": model_field_to_node_input(
IO.STRING, KlingText2VideoRequest, "prompt", multiline=True
),
"negative_prompt": model_field_to_node_input(
IO.STRING, KlingText2VideoRequest, "negative_prompt", multiline=True
),
"model_name": model_field_to_node_input(
IO.COMBO,
KlingText2VideoRequest,
"model_name",
enum_type=ModelName,
default="kling-v2-master",
),
"cfg_scale": model_field_to_node_input(
IO.FLOAT, KlingText2VideoRequest, "cfg_scale"
),
"mode": model_field_to_node_input(
IO.COMBO, KlingText2VideoRequest, "mode", enum_type=Mode
),
"duration": model_field_to_node_input(
IO.COMBO, KlingText2VideoRequest, "duration", enum_type=Duration
),
"aspect_ratio": model_field_to_node_input(
IO.COMBO,
KlingText2VideoRequest,
"aspect_ratio",
enum_type=AspectRatio,
),
},
"hidden": {"auth_token": "AUTH_TOKEN_COMFY_ORG"},
}
RETURN_TYPES = ("VIDEO", "STRING", "STRING")
RETURN_NAMES = ("VIDEO", "Kling ID", "Duration (sec)")
DESCRIPTION = "Kling Text to Video Node"
def api_call(
self,
prompt: str,
negative_prompt: str,
model_name: str,
cfg_scale: float,
mode: str,
duration: int,
aspect_ratio: str,
camera_control: Optional[CameraControl] = None,
auth_token: Optional[str] = None,
) -> tuple[VideoFromFile, str, str]:
validate_prompts(prompt, negative_prompt, MAX_PROMPT_LENGTH_T2V)
initial_operation = SynchronousOperation(
endpoint=ApiEndpoint(
path=PATH_TEXT_TO_VIDEO,
method=HttpMethod.POST,
request_model=KlingText2VideoRequest,
response_model=KlingText2VideoResponse,
),
request=KlingText2VideoRequest(
prompt=prompt if prompt else None,
negative_prompt=negative_prompt if negative_prompt else None,
duration=Duration(duration),
mode=Mode(mode),
model_name=ModelName(model_name),
cfg_scale=cfg_scale,
aspect_ratio=AspectRatio(aspect_ratio),
camera_control=camera_control,
),
auth_token=auth_token,
)
initial_response = initial_operation.execute()
if not is_valid_initial_response(initial_response):
error_msg = f"Kling initial request failed. Code: {initial_response.code}, Message: {initial_response.message}, Data: {initial_response.data}"
logging.error(error_msg)
raise KlingApiError(error_msg)
task_id = initial_response.data.task_id
final_response = self.poll_for_task_status(task_id, auth_token)
if not is_valid_video_response(final_response):
error_msg = (
f"Kling task {task_id} succeeded but no video data found in response."
)
logging.error(error_msg)
raise KlingApiError(error_msg)
video = final_response.data.task_result.videos[0]
logging.debug("Kling task %s succeeded. Video URL: %s", task_id, video.url)
return (
download_url_to_video_output(video.url),
str(video.id),
str(video.duration),
)