> ## Documentation Index
> Fetch the complete documentation index at: https://docs.comfy.org/llms.txt
> Use this file to discover all available pages before exploring further.

# 클라우드 API 참조

> Comfy Cloud의 코드 예제와 함께 제공되는 완벽한 API 참조

<Warning>
  **실험적 API:** 이 API는 실험적이며 변경될 수 있습니다. 엔드포인트, 요청/응답 형식 및 동작은 사전 통지 없이 수정될 수 있습니다. 일부 엔드포인트는 로컬 ComfyUI와의 호환성을 위해 유지되지만 다른 의미를 가질 수 있습니다(예: 무시된 필드).
</Warning>

이 페이지에서는 일반적인 Comfy Cloud API 작업에 대한 완벽한 예제를 제공합니다.

<Note>
  **구독 필요:** API를 통해 워크플로우를 실행하려면 활성 Comfy Cloud 구독이 필요합니다. 자세한 내용은 [가격 계획](https://www.comfy.org/cloud/pricing?utm_source=docs\&utm_campaign=cloud-api)을 참조하세요.
</Note>

## 설정

모든 예제는 다음과 같은 공통 임포트와 구성을 사용합니다:

<CodeGroup>
  ```bash curl theme={null}
  export COMFY_CLOUD_API_KEY="your-api-key"
  export BASE_URL="https://cloud.comfy.org"
  ```

  ```typescript TypeScript theme={null}
  import { readFile, writeFile } from "fs/promises";

  const BASE_URL = "https://cloud.comfy.org";
  const API_KEY = process.env.COMFY_CLOUD_API_KEY!;

  function getHeaders(): HeadersInit {
    return {
      "X-API-Key": API_KEY,
      "Content-Type": "application/json",
    };
  }
  ```

  ```python Python theme={null}
  import os
  import requests
  import json
  import time
  import asyncio
  import aiohttp

  BASE_URL = "https://cloud.comfy.org"
  API_KEY = os.environ["COMFY_CLOUD_API_KEY"]

  def get_headers():
      return {
          "X-API-Key": API_KEY,
          "Content-Type": "application/json"
      }
  ```
</CodeGroup>

***

## 객체 정보

사용 가능한 노드 정의를 가져옵니다. 이는 어떤 노드들이 있는지, 그리고 그 입력과 출력 사양을 이해하는 데 유용합니다.

<CodeGroup>
  ```bash curl theme={null}
  curl -X GET "$BASE_URL/api/object_info" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY"
  ```

  ```typescript TypeScript theme={null}
  async function getObjectInfo(): Promise<Record<string, any>> {
    const response = await fetch(`${BASE_URL}/api/object_info`, {
      headers: getHeaders(),
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
    return response.json();
  }

  const objectInfo = await getObjectInfo();
  console.log(`사용 가능한 노드: ${Object.keys(objectInfo).length}`);

  const ksampler = objectInfo["KSampler"] ?? {};
  console.log(`KSampler 입력: ${Object.keys(ksampler.input?.required ?? {})}`);
  ```

  ```python Python theme={null}
  def get_object_info():
      """클라우드에서 사용 가능한 모든 노드 정의를 가져옵니다."""
      response = requests.get(
          f"{BASE_URL}/api/object_info",
          headers=get_headers()
      )
      response.raise_for_status()
      return response.json()

  # 모든 노드 가져오기
  object_info = get_object_info()
  print(f"사용 가능한 노드: {len(object_info)}")

  # 특정 노드의 정의 가져오기
  ksampler = object_info.get("KSampler", {})
  inputs = list(ksampler.get('input', {}).get('required', {}).keys())
  print(f"KSampler 입력: {inputs}")
  ```
</CodeGroup>

***

## 입력 업로드

워크플로우에서 사용할 이미지, 마스크 또는 기타 파일을 업로드합니다.

### 직접 업로드 (멀티파트)

<CodeGroup>
  ```bash curl theme={null}
  curl -X POST "$BASE_URL/api/upload/image" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY" \
    -F "image=@my_image.png" \
    -F "type=input" \
    -F "overwrite=true"
  ```

  ```typescript TypeScript theme={null}
  async function uploadInput(
    filePath: string,
    inputType: string = "input"
  ): Promise<{ name: string; subfolder: string }> {
    const file = await readFile(filePath);
    const formData = new FormData();
    formData.append("image", new Blob([file]), filePath.split("/").pop());
    formData.append("type", inputType);
    formData.append("overwrite", "true");

    const response = await fetch(`${BASE_URL}/api/upload/image`, {
      method: "POST",
      headers: { "X-API-Key": API_KEY },
      body: formData,
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
    return response.json();
  }

  const result = await uploadInput("my_image.png");
  console.log(`업로드됨: ${result.name} to ${result.subfolder}`);
  ```

  ```python Python theme={null}
  def upload_input(file_path: str, input_type: str = "input") -> dict:
      """파일을 클라우드에 직접 업로드합니다.
      
      Args:
          file_path: 업로드할 파일 경로
          input_type: "input" - 이미지용, "temp" - 임시 파일용
          
      Returns:
          업로드 응답, 파일명과 하위 폴더 포함
      """
      with open(file_path, "rb") as f:
          files = {"image": f}
          data = {"type": input_type, "overwrite": "true"}
          
          response = requests.post(
              f"{BASE_URL}/api/upload/image",
              headers={"X-API-Key": API_KEY},  # 멀티파트이므로 Content-Type 없음
              files=files,
              data=data
          )
      response.raise_for_status()
      return response.json()

  # 이미지 업로드
  result = upload_input("my_image.png")
  print(f"업로드됨: {result['name']} to {result['subfolder']}")
  ```
</CodeGroup>

### 마스크 업로드

<Note>
  `subfolder` 매개변수는 API 호환성을 위해 허용되지만 클라우드 스토리지에서는 무시됩니다. 모든 파일은 평탄하고 콘텐츠 주소 기반 네임스페이스에 저장됩니다.
</Note>

<CodeGroup>
  ```bash curl theme={null}
  curl -X POST "$BASE_URL/api/upload/mask" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY" \
    -F "image=@mask.png" \
    -F "type=input" \
    -F "subfolder=clipspace" \
    -F 'original_ref={"filename":"my_image.png","subfolder":"","type":"input"}'
  ```

  ```typescript TypeScript theme={null}
  async function uploadMask(
    filePath: string,
    originalRef: { filename: string; subfolder: string; type: string }
  ): Promise<{ name: string; subfolder: string }> {
    const file = await readFile(filePath);
    const formData = new FormData();
    formData.append("image", new Blob([file]), filePath.split("/").pop());
    formData.append("type", "input");
    formData.append("subfolder", "clipspace");
    formData.append("original_ref", JSON.stringify(originalRef));

    const response = await fetch(`${BASE_URL}/api/upload/mask`, {
      method: "POST",
      headers: { "X-API-Key": API_KEY },
      body: formData,
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
    return response.json();
  }

  const maskResult = await uploadMask("mask.png", {
    filename: "my_image.png",
    subfolder: "",
    type: "input",
  });
  console.log(`업로드된 마스크: ${maskResult.name}`);
  ```

  ```python Python theme={null}
  def upload_mask(file_path: str, original_ref: dict) -> dict:
      """원본 이미지와 연관된 마스크를 업로드합니다.
      
      Args:
          file_path: 마스크 파일 경로
          original_ref: 원본 이미지 참조 {"filename": "...", "subfolder": "...", "type": "..."}
      """
      with open(file_path, "rb") as f:
          files = {"image": f}
          data = {
              "type": "input",
              "subfolder": "clipspace",
              "original_ref": json.dumps(original_ref)
          }
          
          response = requests.post(
              f"{BASE_URL}/api/upload/mask",
              headers={"X-API-Key": API_KEY},
              files=files,
              data=data
          )
      response.raise_for_status()
      return response.json()
  ```
</CodeGroup>

***

## 워크플로우 실행

워크플로우를 제출하여 실행합니다.

<Info>
  **동시 제출 지원:** 구독 등급에 따라 이전 작업이 완료될 때까지 기다리지 않고 여러 워크플로우를 제출할 수 있습니다. 등급 한도 내에서 작업이 병렬로 실행되며, 초과 작업은 자동으로 대기열에 추가됩니다. 자세한 내용과 동시성 한도는 [병렬 실행](/ko/development/cloud/overview#parallel-execution-concurrent-jobs)을 참조하세요.
</Info>

### 워크플로우 제출

<CodeGroup>
  ```bash curl theme={null}
  curl -X POST "$BASE_URL/api/prompt" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY" \
    -H "Content-Type: application/json" \
    -d '{"prompt": '"$(cat workflow_api.json)"'}'
  ```

  ```typescript TypeScript theme={null}
  async function submitWorkflow(workflow: Record<string, any>): Promise<string> {
    const response = await fetch(`${BASE_URL}/api/prompt`, {
      method: "POST",
      headers: getHeaders(),
      body: JSON.stringify({ prompt: workflow }),
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
    const result = await response.json();

    if (result.error) {
      throw new Error(`워크플로우 오류: ${result.error}`);
    }
    return result.prompt_id;
  }

  const workflow = JSON.parse(await readFile("workflow_api.json", "utf-8"));
  const promptId = await submitWorkflow(workflow);
  console.log(`제출된 작업: ${promptId}`);
  ```

  ```python Python theme={null}
  def submit_workflow(workflow: dict) -> str:
      """워크플로우를 제출하고 prompt_id(작업 ID)를 반환합니다.
      
      Args:
          workflow: API 형식의 ComfyUI 워크플로우
          
      Returns:
          prompt_id, 작업 추적용
      """
      response = requests.post(
          f"{BASE_URL}/api/prompt",
          headers=get_headers(),
          json={"prompt": workflow}
      )
      response.raise_for_status()
      result = response.json()
      
      if "error" in result:
          raise ValueError(f"워크플로우 오류: {result['error']}")
      
      return result["prompt_id"]

  # 워크플로우 로드 및 제출
  with open("workflow_api.json") as f:
      workflow = json.load(f)

  prompt_id = submit_workflow(workflow)
  print(f"제출된 작업: {prompt_id}")
  ```
</CodeGroup>

### 파트너 노드 사용

워크플로우에 [파트너 노드](/ko/tutorials/partner-nodes/overview) (Flux Pro, Ideogram 등 외부 AI 서비스를 호출하는 노드)가 포함된 경우, 요청 페이로드의 `extra_data` 필드에 [Comfy API 키](/ko/development/api-development/getting-an-api-key)를 포함해야 합니다.

<Note>
  ComfyUI 프론트엔드는 브라우저에서 워크플로우를 실행할 때 자동으로 API 키를 `extra_data`에 패키징합니다. 이 섹션은 직접 API를 호출할 때만 관련이 있습니다.
</Note>

<CodeGroup>
  ```bash curl theme={null}
  curl -X POST "$BASE_URL/api/prompt" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY" \
    -H "Content-Type: application/json" \
    -d '{
      "prompt": '"$(cat workflow_api.json)"',
      "extra_data": {
        "api_key_comfy_org": "your-comfy-api-key"
      }
    }'
  ```

  ```typescript TypeScript theme={null}
  async function submitWorkflowWithPartnerNodes(
    workflow: Record<string, any>,
    apiKey: string
  ): Promise<string> {
    const response = await fetch(`${BASE_URL}/api/prompt`, {
      method: "POST",
      headers: getHeaders(),
      body: JSON.stringify({
        prompt: workflow,
        extra_data: {
          api_key_comfy_org: apiKey,
        },
      }),
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
    const result = await response.json();
    return result.prompt_id;
  }

  // Use when workflow contains Partner Nodes (e.g., Flux Pro, Ideogram, etc.)
  const promptId = await submitWorkflowWithPartnerNodes(workflow, API_KEY);
  ```

  ```python Python theme={null}
  def submit_workflow_with_partner_nodes(workflow: dict, api_key: str) -> str:
      """Submit a workflow that uses Partner Nodes.
      
      Args:
          workflow: ComfyUI workflow in API format
          api_key: Your API key from platform.comfy.org
          
      Returns:
          prompt_id for tracking the job
      """
      response = requests.post(
          f"{BASE_URL}/api/prompt",
          headers=get_headers(),
          json={
              "prompt": workflow,
              "extra_data": {
                  "api_key_comfy_org": api_key
              }
          }
      )
      response.raise_for_status()
      return response.json()["prompt_id"]

  # Use when workflow contains Partner Nodes
  prompt_id = submit_workflow_with_partner_nodes(workflow, API_KEY)
  ```
</CodeGroup>

<Info>
  [platform.comfy.org](https://platform.comfy.org/login)에서 API 키를 생성하세요. 이는 클라우드 API 인증에 사용되는 키(`X-API-Key` 헤더)와 동일합니다. 자세한 가이드는 [API 키 받기](/ko/development/api-development/getting-an-api-key)를 참조하세요.
</Info>

### 워크플로우 입력 수정

<CodeGroup>
  ```typescript TypeScript theme={null}
  function setWorkflowInput(
    workflow: Record<string, any>,
    nodeId: string,
    inputName: string,
    value: any
  ): Record<string, any> {
    if (workflow[nodeId]) {
      workflow[nodeId].inputs[inputName] = value;
    }
    return workflow;
  }

  // 예제: 시드와 프롬프트 설정
  let workflow = JSON.parse(await readFile("workflow_api.json", "utf-8"));
  workflow = setWorkflowInput(workflow, "3", "seed", 12345);
  workflow = setWorkflowInput(workflow, "6", "text", "아름다운 풍경");
  ```

  ```python Python theme={null}
  def set_workflow_input(workflow: dict, node_id: str, input_name: str, value) -> dict:
      """워크플로우 입력 값을 수정합니다.
      
      Args:
          workflow: 워크플로우 딕셔너리
          node_id: 수정할 노드 ID
          input_name: 입력 필드 이름
          value: 새로운 값
          
      Returns:
          수정된 워크플로우
      """
      if node_id in workflow:
          workflow[node_id]["inputs"][input_name] = value
      return workflow

  # 예제: 시드와 프롬프트 설정
  workflow = set_workflow_input(workflow, "3", "seed", 12345)
  workflow = set_workflow_input(workflow, "6", "text", "아름다운 풍경")
  ```
</CodeGroup>

***

## 작업 상태 확인

작업 완료 여부를 확인합니다.

**작업 상태 값:**

API는 다음 상태 값을 반환합니다:

| 상태            | 설명                       |
| ------------- | ------------------------ |
| `pending`     | 작업이 대기 중이며 시작을 기다리고 있습니다 |
| `in_progress` | 작업이 현재 실행 중입니다           |
| `completed`   | 작업이 성공적으로 완료되었습니다        |
| `failed`      | 작업 중 오류가 발생했습니다          |
| `cancelled`   | 사용자가 작업을 취소했습니다          |

<CodeGroup>
  ```bash curl theme={null}
  # 작업 완료 여부를 풀링합니다
  curl -X GET "$BASE_URL/api/job/{prompt_id}/status" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY"

  # 응답 예시:
  # {"status": "pending"}      - 작업이 대기 중입니다
  # {"status": "in_progress"}  - 작업이 현재 실행 중입니다
  # {"status": "completed"}    - 작업이 성공적으로 완료되었습니다
  # {"status": "failed"}       - 작업 중 오류가 발생했습니다
  # {"status": "cancelled"}    - 작업이 취소되었습니다
  ```

  ```typescript TypeScript theme={null}
  interface JobStatus {
    status: string;
  }

  async function getJobStatus(promptId: string): Promise<JobStatus> {
    const response = await fetch(`${BASE_URL}/api/job/${promptId}/status`, {
      headers: getHeaders(),
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
    return response.json();
  }

  async function pollForCompletion(
    promptId: string,
    timeout: number = 300,
    pollInterval: number = 2000
  ): Promise<void> {
    const startTime = Date.now();

    while (Date.now() - startTime < timeout * 1000) {
      const { status } = await getJobStatus(promptId);

      if (status === "completed") {
        return;
      } else if (["failed", "cancelled"].includes(status)) {
        throw new Error(`작업이 실패했습니다: ${status}`);
      }

      await new Promise((resolve) => setTimeout(resolve, pollInterval));
    }

    throw new Error(`작업 ${promptId}이 ${timeout}s 내에 완료되지 않았습니다`);
  }

  await pollForCompletion(promptId);
  console.log("작업 완료!");
  ```

  ```python Python theme={null}
  def get_job_status(prompt_id: str) -> str:
      """작업의 현재 상태를 가져옵니다."""
      response = requests.get(
          f"{BASE_URL}/api/job/{prompt_id}/status",
          headers=get_headers()
      )
      response.raise_for_status()
      return response.json()["status"]

  def poll_for_completion(prompt_id: str, timeout: int = 300, poll_interval: float = 2.0) -> None:
      """작업이 완료되거나 시간 초과될 때까지 풀링합니다."""
      start_time = time.time()

      while time.time() - start_time < timeout:
          status = get_job_status(prompt_id)

          if status == "completed":
              return
          elif status in ("failed", "cancelled"):
              raise RuntimeError(f"작업이 실패했습니다: {status}")

          time.sleep(poll_interval)

      raise TimeoutError(f"작업 {prompt_id}이 {timeout}s 내에 완료되지 않았습니다")

  poll_for_completion(prompt_id)
  print("작업 완료!")
  ```
</CodeGroup>

***

## 실시간 진행 상황을 위한 WebSocket

WebSocket 연결을 통해 실시간 실행 업데이트를 받습니다.

<Note>
  `clientId` 매개변수는 현재 무시됩니다—사용자당 모든 연결이 동일한 메시지를 받습니다. 앞으로의 호환성을 위해 고유한 `clientId`를 전달하세요.
</Note>

<CodeGroup>
  ```typescript TypeScript theme={null}
  async function listenForCompletion(
    promptId: string,
    timeout: number = 300000
  ): Promise<Record<string, any>> {
    const wsUrl = `wss://cloud.comfy.org/ws?clientId=${crypto.randomUUID()}&token=${API_KEY}`;
    const outputs: Record<string, any> = {};

    return new Promise((resolve, reject) => {
      const ws = new WebSocket(wsUrl);
      const timer = setTimeout(() => {
        ws.close();
        reject(new Error(`작업이 ${timeout / 1000}s 내에 완료되지 않았습니다`));
      }, timeout);

      ws.onmessage = (event) => {
        const data = JSON.parse(event.data);
        const msgType = data.type;
        const msgData = data.data ?? {};

        // 우리의 작업으로 필터링
        if (msgData.prompt_id !== promptId) return;

        if (msgType === "executing") {
          const node = msgData.node;
          if (node) {
            console.log(`실행 중인 노드: ${node}`);
          } else {
            console.log("실행 완료");
          }
        } else if (msgType === "progress") {
          console.log(`진행률: ${msgData.value}/${msgData.max}`);
        } else if (msgType === "executed" && msgData.output) {
          outputs[msgData.node] = msgData.output;
        } else if (msgType === "execution_success") {
          console.log("작업이 성공적으로 완료되었습니다!");
          clearTimeout(timer);
          ws.close();
          resolve(outputs);
        } else if (msgType === "execution_error") {
          const errorMsg = msgData.exception_message ?? "알 수 없는 오류";
          clearTimeout(timer);
          ws.close();
          reject(new Error(`실행 오류: ${errorMsg}`));
        }
      };

      ws.onerror = (err) => {
        clearTimeout(timer);
        reject(err);
      };
    });
  }

  // 완료를 기다리고 출력값 수집
  const outputs = await listenForCompletion(promptId);
  ```

  ```python Python theme={null}
  import asyncio
  import aiohttp
  import json
  import uuid

  async def listen_for_completion(prompt_id: str, timeout: float = 300.0) -> dict:
      """WebSocket에 연결하여 작업 완료를 모니터링합니다.

      Returns:
          작업의 최종 출력값
      """
      ws_url = BASE_URL.replace("https://", "wss://")
      client_id = str(uuid.uuid4())
      ws_url = f"{ws_url}/ws?clientId={client_id}&token={API_KEY}"

      outputs = {}

      async with aiohttp.ClientSession() as session:
          async with session.ws_connect(ws_url) as ws:
              async def receive_messages():
                  async for msg in ws:
                      if msg.type == aiohttp.WSMsgType.TEXT:
                          data = json.loads(msg.data)
                          msg_type = data.get("type")
                          msg_data = data.get("data", {})

                          # 우리의 작업으로 필터링
                          if msg_data.get("prompt_id") != prompt_id:
                              continue

                          if msg_type == "executing":
                              node = msg_data.get("node")
                              if node:
                                  print(f"실행 중인 노드: {node}")

                          elif msg_type == "progress":
                              value = msg_data.get("value", 0)
                              max_val = msg_data.get("max", 100)
                              print(f"진행률: {value}/{max_val}")

                          elif msg_type == "executed":
                              node_id = msg_data.get("node")
                              output = msg_data.get("output", {})
                              if output:
                                  outputs[node_id] = output

                          elif msg_type == "execution_success":
                              print("작업이 성공적으로 완료되었습니다!")
                              return outputs

                          elif msg_type == "execution_error":
                              error_msg = msg_data.get("exception_message", "알 수 없는 오류")
                              raise RuntimeError(f"실행 오류: {error_msg}")

                      elif msg.type == aiohttp.WSMsgType.ERROR:
                          raise RuntimeError(f"WebSocket 오류: {ws.exception()}")

              try:
                  return await asyncio.wait_for(receive_messages(), timeout=timeout)
              except asyncio.TimeoutError:
                  raise TimeoutError(f"작업이 {timeout}s 내에 완료되지 않았습니다")

  # 완료를 기다리고 출력값 수집
  outputs = await listen_for_completion(prompt_id)
  ```
</CodeGroup>

### WebSocket 메시지 유형

메시지는 특별히 언급되지 않은 한 JSON 텍스트 프레임으로 전송됩니다.

| 유형                      | 설명                                                               |
| ----------------------- | ---------------------------------------------------------------- |
| `status`                | 대기열 상태 업데이트, `queue_remaining` 카운트 포함                            |
| `notification`          | 사용자 친화적 상태 메시지 (`value` 필드에는 "워크플로우 실행 중..."과 같은 텍스트 포함)         |
| `execution_start`       | 워크플로우 실행 시작                                                      |
| `executing`             | 특정 노드가 현재 실행 중 (노드 ID는 `node` 필드에 있음)                            |
| `progress`              | 노드 내 단계 진행 상황 (`value`/`max`는 샘플링 단계)                            |
| `progress_state`        | 노드 메타데이터를 포함한 확장된 진행 상태 (중첩된 `nodes` 객체)                         |
| `executed`              | 노드가 완료되고 출력물(이미지, 비디오 등 `output` 필드에 있음) 생성                      |
| `execution_cached`      | 출력물이 캐시되어 건너뛴 노드 (`nodes` 배열)                                    |
| `execution_success`     | 전체 워크플로우 성공적으로 완료                                                |
| `execution_error`       | 워크플로우 실패 (`exception_type`, `exception_message`, `traceback` 포함) |
| `execution_interrupted` | 사용자가 워크플로우를 취소함                                                  |

#### 바이너리 메시지 (미리보기 이미지)

이미지 생성 중에 ComfyUI는 미리보기 이미지를 포함한 **바이너리 WebSocket 프레임**을 전송합니다. 이는 원시 바이너리 데이터(JSON 아님)입니다:

| 바이너리 유형                       | 값   | 설명                          |
| ----------------------------- | --- | --------------------------- |
| `PREVIEW_IMAGE`               | `1` | 디퓨전 샘플링 중 진행 중인 미리보기        |
| `TEXT`                        | `3` | 노드에서 출력된 텍스트 (진행률 텍스트)      |
| `PREVIEW_IMAGE_WITH_METADATA` | `4` | 노드 컨텍스트 메타데이터를 포함한 미리보기 이미지 |

**바이너리 프레임 형식** (모든 정수는 빅엔디안):

<Tabs>
  <Tab title="PREVIEW_IMAGE (1)">
    | 오프셋 | 크기   | 필드           | 설명                    |
    | --- | ---- | ------------ | --------------------- |
    | 0   | 4바이트 | `type`       | `0x00000001`          |
    | 4   | 4바이트 | `image_type` | 포맷 코드 (1=JPEG, 2=PNG) |
    | 8   | 변수   | `image_data` | 이미지 바이트 원시 데이터        |
  </Tab>

  <Tab title="TEXT (3)">
    | 오프셋 | 크기   | 필드            | 설명                 |
    | --- | ---- | ------------- | ------------------ |
    | 0   | 4바이트 | `type`        | `0x00000003`       |
    | 4   | 4바이트 | `node_id_len` | 노드 ID 문자열 길이       |
    | 8   | N바이트 | `node_id`     | UTF-8 인코딩된 노드 ID   |
    | 8+N | 변수   | `text`        | UTF-8 인코딩된 진행률 텍스트 |
  </Tab>

  <Tab title="PREVIEW_WITH_METADATA (4)">
    | 오프셋 | 크기   | 필드             | 설명                 |
    | --- | ---- | -------------- | ------------------ |
    | 0   | 4바이트 | `type`         | `0x00000004`       |
    | 4   | 4바이트 | `metadata_len` | 메타데이터 JSON 길이      |
    | 8   | N바이트 | `metadata`     | UTF-8 JSON (아래 참조) |
    | 8+N | 변수   | `image_data`   | JPEG/PNG 원시 바이트    |

    **메타데이터 JSON 구조:**

    ```json theme={null}
    {
      "node_id": "3",
      "display_node_id": "3",
      "real_node_id": "3",
      "prompt_id": "abc-123",
      "parent_node_id": null
    }
    ```
  </Tab>
</Tabs>

<Note>
  각 JSON 메시지 유형의 완전한 스키마 정의는 [OpenAPI 사양](/ko/development/cloud/openapi)을 참조하세요.
</Note>

***

## 출력 다운로드

작업 완료 후 생성된 파일을 가져옵니다.

<CodeGroup>
  ```bash curl theme={null}
  # 단일 출력 파일 다운로드하기 (302 리디렉션은 -L 옵션으로 따라가기)
  curl -L "$BASE_URL/api/view?filename=output.png&subfolder=&type=output" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY" \
    -o output.png
  ```

  ```typescript TypeScript theme={null}
  async function downloadOutput(
    filename: string,
    subfolder: string = "",
    outputType: string = "output"
  ): Promise<ArrayBuffer> {
    const params = new URLSearchParams({ filename, subfolder, type: outputType });
    // 리디렉션 URL 가져오기
    const response = await fetch(`${BASE_URL}/api/view?${params}`, {
      headers: { "X-API-Key": API_KEY },
      redirect: "manual",
    });
    if (response.status !== 302) throw new Error(`HTTP ${response.status}`);
    const signedUrl = response.headers.get("location")!;

    // 서명된 URL에서 가져오기
    const fileResponse = await fetch(signedUrl);
    if (!fileResponse.ok) throw new Error(`HTTP ${fileResponse.status}`);
    return fileResponse.arrayBuffer();
  }

  async function saveOutputs(
    outputs: Record<string, any>,
    outputDir: string = "."
  ): Promise<void> {
    for (const nodeOutputs of Object.values(outputs)) {
      for (const key of ["images", "video", "audio"]) {
        for (const fileInfo of (nodeOutputs as any)[key] ?? []) {
          const data = await downloadOutput(
            fileInfo.filename,
            fileInfo.subfolder ?? "",
            fileInfo.type ?? "output"
          );
          const path = `${outputDir}/${fileInfo.filename}`;
          await writeFile(path, Buffer.from(data));
          console.log(`저장됨: ${path}`);
        }
      }
    }
  }

  // 모든 출력 다운로드
  await saveOutputs(outputs, "./my_outputs");
  ```

  ```python Python theme={null}
  def download_output(filename: str, subfolder: str = "", output_type: str = "output") -> bytes:
      """출력 파일을 다운로드합니다.

      Args:
          filename: 파일 이름
          subfolder: 하위 폴더 경로 (보통 비어 있음)
          output_type: 최종 출력일 경우 "output", 미리보기일 경우 "temp"

      Returns:
          파일 바이트
      """
      params = {
          "filename": filename,
          "subfolder": subfolder,
          "type": output_type
      }

      response = requests.get(
          f"{BASE_URL}/api/view",
          headers=get_headers(),
          params=params
      )
      response.raise_for_status()
      return response.content

  def save_outputs(outputs: dict, output_dir: str = "."):
      """작업의 모든 출력을 디스크에 저장합니다.

      Args:
          outputs: 작업에서 가져온 출력 사전 (노드 ID -> 출력 데이터)
          output_dir: 파일을 저장할 디렉터리
      """
      import os
      os.makedirs(output_dir, exist_ok=True)

      for node_id, node_outputs in outputs.items():
          for key in ("images", "video", "audio"):
              for file_info in node_outputs.get(key, []):
                  filename = file_info["filename"]
                  subfolder = file_info.get("subfolder", "")
                  output_type = file_info.get("type", "output")

                  data = download_output(filename, subfolder, output_type)

                  output_path = os.path.join(output_dir, filename)
                  with open(output_path, "wb") as f:
                      f.write(data)
                  print(f"저장됨: {output_path}")

  # 모든 출력 다운로드
  save_outputs(outputs, "./my_outputs")
  ```
</CodeGroup>

***

## 완전한 엔드투엔드 예제

모든 것을 하나로 묶은 전체 예제는 다음과 같습니다:

<CodeGroup>
  ```typescript TypeScript theme={null}
  const BASE_URL = "https://cloud.comfy.org";
  const API_KEY = process.env.COMFY_CLOUD_API_KEY!;

  function getHeaders(): HeadersInit {
    return { "X-API-Key": API_KEY, "Content-Type": "application/json" };
  }

  async function submitWorkflow(workflow: Record<string, any>): Promise<string> {
    const response = await fetch(`${BASE_URL}/api/prompt`, {
      method: "POST",
      headers: getHeaders(),
      body: JSON.stringify({ prompt: workflow }),
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
    return (await response.json()).prompt_id;
  }

  async function waitForCompletion(
    promptId: string,
    timeout: number = 300000
  ): Promise<Record<string, any>> {
    const wsUrl = `wss://cloud.comfy.org/ws?clientId=${crypto.randomUUID()}&token=${API_KEY}`;
    const outputs: Record<string, any> = {};

    return new Promise((resolve, reject) => {
      const ws = new WebSocket(wsUrl);
      const timer = setTimeout(() => {
        ws.close();
        reject(new Error("작업 시간 초과"));
      }, timeout);

      ws.onmessage = (event) => {
        const data = JSON.parse(event.data);
        if (data.data?.prompt_id !== promptId) return;

        const msgType = data.type;
        const msgData = data.data ?? {};

        if (msgType === "progress") {
          console.log(`진행률: ${msgData.value}/${msgData.max}`);
        } else if (msgType === "executed" && msgData.output) {
          outputs[msgData.node] = msgData.output;
        } else if (msgType === "execution_success") {
          clearTimeout(timer);
          ws.close();
          resolve(outputs);
        } else if (msgType === "execution_error") {
          clearTimeout(timer);
          ws.close();
          reject(new Error(msgData.exception_message ?? "알 수 없는 오류"));
        }
      };

      ws.onerror = (err) => {
        clearTimeout(timer);
        reject(err);
      };
    });
  }

  async function downloadOutputs(
    outputs: Record<string, any>,
    outputDir: string
  ): Promise<void> {
    for (const nodeOutputs of Object.values(outputs)) {
      for (const key of ["images", "video", "audio"]) {
        for (const fileInfo of (nodeOutputs as any)[key] ?? []) {
          const params = new URLSearchParams({
            filename: fileInfo.filename,
            subfolder: fileInfo.subfolder ?? "",
            type: fileInfo.type ?? "output",
          });
          // 리디렉션 URL 가져오기 (인증 정보를 저장소로 보내지 않도록)
          const response = await fetch(`${BASE_URL}/api/view?${params}`, {
            headers: { "X-API-Key": API_KEY },
            redirect: "manual",
          });
          if (response.status !== 302) throw new Error(`HTTP ${response.status}`);
          const signedUrl = response.headers.get("location")!;
          // 서명된 URL에서 인증 헤더 없이 가져오기
          const fileResponse = await fetch(signedUrl);
          if (!fileResponse.ok) throw new Error(`HTTP ${fileResponse.status}`);

          const path = `${outputDir}/${fileInfo.filename}`;
          await writeFile(path, Buffer.from(await fileResponse.arrayBuffer()));
          console.log(`다운로드됨: ${path}`);
        }
      }
    }
  }

  async function main() {
    // 1. 워크플로우 로드
    const workflow = JSON.parse(await readFile("workflow_api.json", "utf-8"));

    // 2. 워크플로우 파라미터 수정
    workflow["3"].inputs.seed = 42;
    workflow["6"].inputs.text = "산 위의 아름다운 일몰";

    // 3. 워크플로우 제출
    const promptId = await submitWorkflow(workflow);
    console.log(`작업 제출됨: ${promptId}`);

    // 4. 진행 상황과 함께 완료 대기
    const outputs = await waitForCompletion(promptId);
    console.log(`작업 완료! ${Object.keys(outputs).length}개의 출력 노드 발견`);

    // 5. 출력 다운로드
    await downloadOutputs(outputs, "./outputs");
    console.log("완료!");
  }

  main();
  ```

  ```python Python theme={null}
  import os
  import requests
  import json
  import asyncio
  import aiohttp
  import uuid

  BASE_URL = "https://cloud.comfy.org"
  API_KEY = os.environ["COMFY_CLOUD_API_KEY"]

  def get_headers():
      return {"X-API-Key": API_KEY, "Content-Type": "application/json"}

  def upload_image(file_path: str) -> dict:
      """이미지를 업로드하고 워크플로우에서 사용할 참조값을 반환합니다."""
      with open(file_path, "rb") as f:
          response = requests.post(
              f"{BASE_URL}/api/upload/image",
              headers={"X-API-Key": API_KEY},
              files={"image": f},
              data={"type": "input", "overwrite": "true"}
          )
      response.raise_for_status()
      return response.json()

  def submit_workflow(workflow: dict) -> str:
      """워크플로우를 제출하고 prompt_id를 반환합니다."""
      response = requests.post(
          f"{BASE_URL}/api/prompt",
          headers=get_headers(),
          json={"prompt": workflow}
      )
      response.raise_for_status()
      return response.json()["prompt_id"]

  async def wait_for_completion(prompt_id: str, timeout: float = 300.0) -> dict:
      """WebSocket을 통해 작업 완료를 기다립니다."""
      ws_url = BASE_URL.replace("https://", "wss://") + f"/ws?clientId={uuid.uuid4()}&token={API_KEY}"
      outputs = {}
      
      async with aiohttp.ClientSession() as session:
          async with session.ws_connect(ws_url) as ws:
              start = asyncio.get_event_loop().time()
              async for msg in ws:
                  if asyncio.get_event_loop().time() - start > timeout:
                      raise TimeoutError("작업 시간 초과")
                  
                  if msg.type != aiohttp.WSMsgType.TEXT:
                      continue
                      
                  data = json.loads(msg.data)
                  if data.get("data", {}).get("prompt_id") != prompt_id:
                      continue
                  
                  msg_type = data.get("type")
                  msg_data = data.get("data", {})
                  
                  if msg_type == "progress":
                      print(f"진행률: {msg_data.get('value')}/{msg_data.get('max')}")
                  elif msg_type == "executed":
                      if output := msg_data.get("output"):
                          outputs[msg_data["node"]] = output
                  elif msg_type == "execution_success":
                      return outputs
                  elif msg_type == "execution_error":
                      raise RuntimeError(msg_data.get("exception_message", "알 수 없는 오류"))
      
      return outputs

  def download_outputs(outputs: dict, output_dir: str):
      """모든 출력 파일을 다운로드합니다."""
      os.makedirs(output_dir, exist_ok=True)
      
      for node_outputs in outputs.values():
          for key in ["images", "video", "audio"]:
              for file_info in node_outputs.get(key, []):
                  params = {
                      "filename": file_info["filename"],
                      "subfolder": file_info.get("subfolder", ""),
                      "type": file_info.get("type", "output")
                  }
                  response = requests.get(f"{BASE_URL}/api/view", headers=get_headers(), params=params)
                  response.raise_for_status()
                  
                  path = os.path.join(output_dir, file_info["filename"])
                  with open(path, "wb") as f:
                      f.write(response.content)
                  print(f"다운로드됨: {path}")

  async def main():
      # 1. 워크플로우 로드
      with open("workflow_api.json") as f:
          workflow = json.load(f)
      
      # 2. 선택적으로 입력 이미지 업로드
      # image_ref = upload_image("input.png")
      # workflow["1"]["inputs"]["image"] = image_ref["name"]
      
      # 3. 워크플로우 파라미터 수정
      workflow["3"]["inputs"]["seed"] = 42
      workflow["6"]["inputs"]["text"] = "산 위의 아름다운 일몰"
      
      # 4. 워크플로우 제출
      prompt_id = submit_workflow(workflow)
      print(f"작업 제출됨: {prompt_id}")
      
      # 5. 진행 상황과 함께 완료 대기
      outputs = await wait_for_completion(prompt_id)
      print(f"작업 완료! {len(outputs)}개의 출력 노드 발견")
      
      # 6. 출력 다운로드
      download_outputs(outputs, "./outputs")
      print("완료!")

  if __name__ == "__main__":
      asyncio.run(main())
  ```
</CodeGroup>

***

## 대기열 관리

### 대기열 상태 가져오기

<CodeGroup>
  ```bash curl theme={null}
  curl -X GET "$BASE_URL/api/queue" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY"
  ```

  ```typescript TypeScript theme={null}
  async function getQueue(): Promise<{
    queue_running: any[];
    queue_pending: any[];
  }> {
    const response = await fetch(`${BASE_URL}/api/queue`, {
      headers: getHeaders(),
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
    return response.json();
  }

  const queue = await getQueue();
  console.log(`실행 중: ${queue.queue_running.length}`);
  console.log(`대기 중: ${queue.queue_pending.length}`);
  ```

  ```python Python theme={null}
  def get_queue():
      """현재 대기열 상태를 가져옵니다."""
      response = requests.get(
          f"{BASE_URL}/api/queue",
          headers=get_headers()
      )
      response.raise_for_status()
      return response.json()

  queue = get_queue()
  print(f"실행 중: {len(queue.get('queue_running', []))}")
  print(f"대기 중: {len(queue.get('queue_pending', []))}")
  ```
</CodeGroup>

### 작업 취소

<CodeGroup>
  ```bash curl theme={null}
  curl -X POST "$BASE_URL/api/queue" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY" \
    -H "Content-Type: application/json" \
    -d '{"delete": ["PROMPT_ID_HERE"]}'
  ```

  ```typescript TypeScript theme={null}
  async function cancelJob(promptId: string): Promise<void> {
    const response = await fetch(`${BASE_URL}/api/queue`, {
      method: "POST",
      headers: getHeaders(),
      body: JSON.stringify({ delete: [promptId] }),
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
  }
  ```

  ```python Python theme={null}
  def cancel_job(prompt_id: str):
      """대기 중이거나 실행 중인 작업을 취소합니다."""
      response = requests.post(
          f"{BASE_URL}/api/queue",
          headers=get_headers(),
          json={"delete": [prompt_id]}
      )
      response.raise_for_status()
      return response.json()
  ```
</CodeGroup>

### 현재 실행 중단

<CodeGroup>
  ```bash curl theme={null}
  curl -X POST "$BASE_URL/api/interrupt" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY"
  ```

  ```typescript TypeScript theme={null}
  async function interrupt(): Promise<void> {
    const response = await fetch(`${BASE_URL}/api/interrupt`, {
      method: "POST",
      headers: getHeaders(),
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
  }
  ```

  ```python Python theme={null}
  def interrupt():
      """현재 실행 중인 작업을 중단합니다."""
      response = requests.post(
          f"{BASE_URL}/api/interrupt",
          headers=get_headers()
      )
      response.raise_for_status()
  ```
</CodeGroup>

***

## 오류 처리

### HTTP 오류

REST API 엔드포인트는 표준 HTTP 상태 코드를 반환합니다:

| 상태    | 설명                         |
| ----- | -------------------------- |
| `400` | 잘못된 요청 (잘못된 워크플로우, 필드 누락)  |
| `401` | 인증되지 않음 (잘못된 또는 누락된 API 키) |
| `402` | 크레딧 부족                     |
| `429` | 구독 비활성                     |
| `500` | 내부 서버 오류                   |

### 실행 오류

워크플로우 실행 중 오류는 `execution_error` WebSocket 메시지를 통해 전달됩니다. `exception_type` 필드는 오류 범주를 식별합니다:

| 예외 유형                       | 설명                   |
| --------------------------- | -------------------- |
| `ValidationError`           | 유효하지 않은 워크플로우 또는 입력  |
| `ModelDownloadError`        | 필요한 모델이 없거나 다운로드 실패  |
| `ImageDownloadError`        | URL에서 입력 이미지 다운로드 실패 |
| `OOMError`                  | GPU 메모리 부족           |
| `InsufficientFundsError`    | 계정 잔액 부족 (파트너 노드용)   |
| `InactiveSubscriptionError` | 구독 비활성               |
