> ## Documentation Index
> Fetch the complete documentation index at: https://docs.comfy.org/llms.txt
> Use this file to discover all available pages before exploring further.

# Cloud API Reference

> Complete API reference with code examples for Comfy Cloud

<Warning>
  **Experimental API:** This API is experimental and subject to change. Endpoints, request/response formats, and behavior may be modified without notice. Some endpoints are maintained for compatibility with local ComfyUI but may have different semantics (e.g., ignored fields).
</Warning>

This page provides complete examples for common Comfy Cloud API operations.

<Note>
  **Subscription Required:** Running workflows via the API requires an active Comfy Cloud subscription. See [pricing plans](https://www.comfy.org/cloud/pricing?utm_source=docs\&utm_campaign=cloud-api) for details.
</Note>

## Setup

All examples use these common imports and configuration:

<CodeGroup>
  ```bash curl theme={null}
  export COMFY_CLOUD_API_KEY="your-api-key"
  export BASE_URL="https://cloud.comfy.org"
  ```

  ```typescript TypeScript theme={null}
  import { readFile, writeFile } from "fs/promises";

  const BASE_URL = "https://cloud.comfy.org";
  const API_KEY = process.env.COMFY_CLOUD_API_KEY!;

  function getHeaders(): HeadersInit {
    return {
      "X-API-Key": API_KEY,
      "Content-Type": "application/json",
    };
  }
  ```

  ```python Python theme={null}
  import os
  import requests
  import json
  import time
  import asyncio
  import aiohttp

  BASE_URL = "https://cloud.comfy.org"
  API_KEY = os.environ["COMFY_CLOUD_API_KEY"]

  def get_headers():
      return {
          "X-API-Key": API_KEY,
          "Content-Type": "application/json"
      }
  ```
</CodeGroup>

***

## Object Info

Retrieve available node definitions. This is useful for understanding what nodes are available and their input/output specifications.

<CodeGroup>
  ```bash curl theme={null}
  curl -X GET "$BASE_URL/api/object_info" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY"
  ```

  ```typescript TypeScript theme={null}
  async function getObjectInfo(): Promise<Record<string, any>> {
    const response = await fetch(`${BASE_URL}/api/object_info`, {
      headers: getHeaders(),
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
    return response.json();
  }

  const objectInfo = await getObjectInfo();
  console.log(`Available nodes: ${Object.keys(objectInfo).length}`);

  const ksampler = objectInfo["KSampler"] ?? {};
  console.log(`KSampler inputs: ${Object.keys(ksampler.input?.required ?? {})}`);
  ```

  ```python Python theme={null}
  def get_object_info():
      """Fetch all available node definitions from cloud."""
      response = requests.get(
          f"{BASE_URL}/api/object_info",
          headers=get_headers()
      )
      response.raise_for_status()
      return response.json()

  # Get all nodes
  object_info = get_object_info()
  print(f"Available nodes: {len(object_info)}")

  # Get a specific node's definition
  ksampler = object_info.get("KSampler", {})
  inputs = list(ksampler.get('input', {}).get('required', {}).keys())
  print(f"KSampler inputs: {inputs}")
  ```
</CodeGroup>

***

## Uploading Inputs

Upload images, masks, or other files for use in workflows.

### Direct Upload (Multipart)

<CodeGroup>
  ```bash curl theme={null}
  curl -X POST "$BASE_URL/api/upload/image" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY" \
    -F "image=@my_image.png" \
    -F "type=input" \
    -F "overwrite=true"
  ```

  ```typescript TypeScript theme={null}
  async function uploadInput(
    filePath: string,
    inputType: string = "input"
  ): Promise<{ name: string; subfolder: string }> {
    const file = await readFile(filePath);
    const formData = new FormData();
    formData.append("image", new Blob([file]), filePath.split("/").pop());
    formData.append("type", inputType);
    formData.append("overwrite", "true");

    const response = await fetch(`${BASE_URL}/api/upload/image`, {
      method: "POST",
      headers: { "X-API-Key": API_KEY },
      body: formData,
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
    return response.json();
  }

  const result = await uploadInput("my_image.png");
  console.log(`Uploaded: ${result.name} to ${result.subfolder}`);
  ```

  ```python Python theme={null}
  def upload_input(file_path: str, input_type: str = "input") -> dict:
      """Upload a file directly to cloud.
      
      Args:
          file_path: Path to the file to upload
          input_type: "input" for images, "temp" for temporary files
          
      Returns:
          Upload response with filename and subfolder
      """
      with open(file_path, "rb") as f:
          files = {"image": f}
          data = {"type": input_type, "overwrite": "true"}
          
          response = requests.post(
              f"{BASE_URL}/api/upload/image",
              headers={"X-API-Key": API_KEY},  # No Content-Type for multipart
              files=files,
              data=data
          )
      response.raise_for_status()
      return response.json()

  # Upload an image
  result = upload_input("my_image.png")
  print(f"Uploaded: {result['name']} to {result['subfolder']}")
  ```
</CodeGroup>

### Upload Mask

<Note>
  The `subfolder` parameter is accepted for API compatibility but ignored in cloud storage. All files are stored in a flat, content-addressed namespace.
</Note>

<CodeGroup>
  ```bash curl theme={null}
  curl -X POST "$BASE_URL/api/upload/mask" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY" \
    -F "image=@mask.png" \
    -F "type=input" \
    -F "subfolder=clipspace" \
    -F 'original_ref={"filename":"my_image.png","subfolder":"","type":"input"}'
  ```

  ```typescript TypeScript theme={null}
  async function uploadMask(
    filePath: string,
    originalRef: { filename: string; subfolder: string; type: string }
  ): Promise<{ name: string; subfolder: string }> {
    const file = await readFile(filePath);
    const formData = new FormData();
    formData.append("image", new Blob([file]), filePath.split("/").pop());
    formData.append("type", "input");
    formData.append("subfolder", "clipspace");
    formData.append("original_ref", JSON.stringify(originalRef));

    const response = await fetch(`${BASE_URL}/api/upload/mask`, {
      method: "POST",
      headers: { "X-API-Key": API_KEY },
      body: formData,
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
    return response.json();
  }

  const maskResult = await uploadMask("mask.png", {
    filename: "my_image.png",
    subfolder: "",
    type: "input",
  });
  console.log(`Uploaded mask: ${maskResult.name}`);
  ```

  ```python Python theme={null}
  def upload_mask(file_path: str, original_ref: dict) -> dict:
      """Upload a mask associated with an original image.
      
      Args:
          file_path: Path to the mask file
          original_ref: Reference to the original image {"filename": "...", "subfolder": "...", "type": "..."}
      """
      with open(file_path, "rb") as f:
          files = {"image": f}
          data = {
              "type": "input",
              "subfolder": "clipspace",
              "original_ref": json.dumps(original_ref)
          }
          
          response = requests.post(
              f"{BASE_URL}/api/upload/mask",
              headers={"X-API-Key": API_KEY},
              files=files,
              data=data
          )
      response.raise_for_status()
      return response.json()
  ```
</CodeGroup>

***

## Running Workflows

Submit a workflow for execution.

<Info>
  **Concurrent submissions supported:** Depending on your subscription tier, you can submit multiple workflows without waiting for previous jobs to complete. Jobs run in parallel up to your tier's limit — additional jobs queue automatically. See [Parallel Execution](/development/cloud/overview#parallel-execution-concurrent-jobs) for details and concurrency limits.
</Info>

### Submit Workflow

<CodeGroup>
  ```bash curl theme={null}
  curl -X POST "$BASE_URL/api/prompt" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY" \
    -H "Content-Type: application/json" \
    -d '{"prompt": '"$(cat workflow_api.json)"'}'
  ```

  ```typescript TypeScript theme={null}
  async function submitWorkflow(workflow: Record<string, any>): Promise<string> {
    const response = await fetch(`${BASE_URL}/api/prompt`, {
      method: "POST",
      headers: getHeaders(),
      body: JSON.stringify({ prompt: workflow }),
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
    const result = await response.json();

    if (result.error) {
      throw new Error(`Workflow error: ${result.error}`);
    }
    return result.prompt_id;
  }

  const workflow = JSON.parse(await readFile("workflow_api.json", "utf-8"));
  const promptId = await submitWorkflow(workflow);
  console.log(`Submitted job: ${promptId}`);
  ```

  ```python Python theme={null}
  def submit_workflow(workflow: dict) -> str:
      """Submit a workflow and return the prompt_id (job ID).
      
      Args:
          workflow: ComfyUI workflow in API format
          
      Returns:
          prompt_id for tracking the job
      """
      response = requests.post(
          f"{BASE_URL}/api/prompt",
          headers=get_headers(),
          json={"prompt": workflow}
      )
      response.raise_for_status()
      result = response.json()
      
      if "error" in result:
          raise ValueError(f"Workflow error: {result['error']}")
      
      return result["prompt_id"]

  # Load and submit a workflow
  with open("workflow_api.json") as f:
      workflow = json.load(f)

  prompt_id = submit_workflow(workflow)
  print(f"Submitted job: {prompt_id}")
  ```
</CodeGroup>

### Using Partner Nodes

If your workflow contains [Partner Nodes](/tutorials/api-nodes/overview) (nodes that call external AI services like Flux Pro, Ideogram, etc.), you must include your Comfy API key in the `extra_data` field of the request payload.

<Note>
  The ComfyUI frontend automatically packages your API key into `extra_data` when running workflows in the browser. This section is only relevant when calling the API directly.
</Note>

<CodeGroup>
  ```bash curl theme={null}
  curl -X POST "$BASE_URL/api/prompt" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY" \
    -H "Content-Type: application/json" \
    -d '{
      "prompt": '"$(cat workflow_api.json)"',
      "extra_data": {
        "api_key_comfy_org": "your-comfy-api-key"
      }
    }'
  ```

  ```typescript TypeScript theme={null}
  async function submitWorkflowWithPartnerNodes(
    workflow: Record<string, any>,
    apiKey: string
  ): Promise<string> {
    const response = await fetch(`${BASE_URL}/api/prompt`, {
      method: "POST",
      headers: getHeaders(),
      body: JSON.stringify({
        prompt: workflow,
        extra_data: {
          api_key_comfy_org: apiKey,
        },
      }),
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
    const result = await response.json();
    return result.prompt_id;
  }

  // Use when workflow contains Partner Nodes (e.g., Flux Pro, Ideogram, etc.)
  const promptId = await submitWorkflowWithPartnerNodes(workflow, API_KEY);
  ```

  ```python Python theme={null}
  def submit_workflow_with_partner_nodes(workflow: dict, api_key: str) -> str:
      """Submit a workflow that uses Partner Nodes.
      
      Args:
          workflow: ComfyUI workflow in API format
          api_key: Your API key from platform.comfy.org
          
      Returns:
          prompt_id for tracking the job
      """
      response = requests.post(
          f"{BASE_URL}/api/prompt",
          headers=get_headers(),
          json={
              "prompt": workflow,
              "extra_data": {
                  "api_key_comfy_org": api_key
              }
          }
      )
      response.raise_for_status()
      return response.json()["prompt_id"]

  # Use when workflow contains Partner Nodes
  prompt_id = submit_workflow_with_partner_nodes(workflow, API_KEY)
  ```
</CodeGroup>

<Info>
  Generate your API key at [platform.comfy.org](https://platform.comfy.org/login). This is the same key used for Cloud API authentication (`X-API-Key` header).
</Info>

### Modify Workflow Inputs

<CodeGroup>
  ```typescript TypeScript theme={null}
  function setWorkflowInput(
    workflow: Record<string, any>,
    nodeId: string,
    inputName: string,
    value: any
  ): Record<string, any> {
    if (workflow[nodeId]) {
      workflow[nodeId].inputs[inputName] = value;
    }
    return workflow;
  }

  // Example: Set seed and prompt
  let workflow = JSON.parse(await readFile("workflow_api.json", "utf-8"));
  workflow = setWorkflowInput(workflow, "3", "seed", 12345);
  workflow = setWorkflowInput(workflow, "6", "text", "a beautiful landscape");
  ```

  ```python Python theme={null}
  def set_workflow_input(workflow: dict, node_id: str, input_name: str, value) -> dict:
      """Modify a workflow input value.
      
      Args:
          workflow: The workflow dict
          node_id: ID of the node to modify
          input_name: Name of the input field
          value: New value
          
      Returns:
          Modified workflow
      """
      if node_id in workflow:
          workflow[node_id]["inputs"][input_name] = value
      return workflow

  # Example: Set seed and prompt
  workflow = set_workflow_input(workflow, "3", "seed", 12345)
  workflow = set_workflow_input(workflow, "6", "text", "a beautiful landscape")
  ```
</CodeGroup>

***

## Checking Job Status

Poll for job completion.

**Job Status Values:**

The API returns one of the following status values:

| Status        | Description                        |
| ------------- | ---------------------------------- |
| `pending`     | Job is queued and waiting to start |
| `in_progress` | Job is currently executing         |
| `completed`   | Job finished successfully          |
| `failed`      | Job encountered an error           |
| `cancelled`   | Job was cancelled by user          |

<CodeGroup>
  ```bash curl theme={null}
  # Poll for job completion
  curl -X GET "$BASE_URL/api/job/{prompt_id}/status" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY"

  # Response examples:
  # {"status": "pending"}      - Job is queued
  # {"status": "in_progress"}  - Job is currently running
  # {"status": "completed"}    - Job finished successfully
  # {"status": "failed"}       - Job encountered an error
  # {"status": "cancelled"}    - Job was cancelled
  ```

  ```typescript TypeScript theme={null}
  interface JobStatus {
    status: string;
  }

  async function getJobStatus(promptId: string): Promise<JobStatus> {
    const response = await fetch(`${BASE_URL}/api/job/${promptId}/status`, {
      headers: getHeaders(),
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
    return response.json();
  }

  async function pollForCompletion(
    promptId: string,
    timeout: number = 300,
    pollInterval: number = 2000
  ): Promise<void> {
    const startTime = Date.now();

    while (Date.now() - startTime < timeout * 1000) {
      const { status } = await getJobStatus(promptId);

      if (status === "completed") {
        return;
      } else if (["failed", "cancelled"].includes(status)) {
        throw new Error(`Job failed with status: ${status}`);
      }

      await new Promise((resolve) => setTimeout(resolve, pollInterval));
    }

    throw new Error(`Job ${promptId} did not complete within ${timeout}s`);
  }

  await pollForCompletion(promptId);
  console.log("Job completed!");
  ```

  ```python Python theme={null}
  def get_job_status(prompt_id: str) -> str:
      """Get the current status of a job."""
      response = requests.get(
          f"{BASE_URL}/api/job/{prompt_id}/status",
          headers=get_headers()
      )
      response.raise_for_status()
      return response.json()["status"]

  def poll_for_completion(prompt_id: str, timeout: int = 300, poll_interval: float = 2.0) -> None:
      """Poll until job completes or times out."""
      start_time = time.time()

      while time.time() - start_time < timeout:
          status = get_job_status(prompt_id)

          if status == "completed":
              return
          elif status in ("failed", "cancelled"):
              raise RuntimeError(f"Job failed with status: {status}")

          time.sleep(poll_interval)

      raise TimeoutError(f"Job {prompt_id} did not complete within {timeout}s")

  poll_for_completion(prompt_id)
  print("Job completed!")
  ```
</CodeGroup>

***

## WebSocket for Real-Time Progress

Connect to the WebSocket for real-time execution updates.

<Note>
  The `clientId` parameter is currently ignored—all connections for a user receive the same messages. Pass a unique `clientId` for forward compatibility.
</Note>

<CodeGroup>
  ```typescript TypeScript theme={null}
  async function listenForCompletion(
    promptId: string,
    timeout: number = 300000
  ): Promise<Record<string, any>> {
    const wsUrl = `wss://cloud.comfy.org/ws?clientId=${crypto.randomUUID()}&token=${API_KEY}`;
    const outputs: Record<string, any> = {};

    return new Promise((resolve, reject) => {
      const ws = new WebSocket(wsUrl);
      const timer = setTimeout(() => {
        ws.close();
        reject(new Error(`Job did not complete within ${timeout / 1000}s`));
      }, timeout);

      ws.onmessage = (event) => {
        const data = JSON.parse(event.data);
        const msgType = data.type;
        const msgData = data.data ?? {};

        // Filter to our job
        if (msgData.prompt_id !== promptId) return;

        if (msgType === "executing") {
          const node = msgData.node;
          if (node) {
            console.log(`Executing node: ${node}`);
          } else {
            console.log("Execution complete");
          }
        } else if (msgType === "progress") {
          console.log(`Progress: ${msgData.value}/${msgData.max}`);
        } else if (msgType === "executed" && msgData.output) {
          outputs[msgData.node] = msgData.output;
        } else if (msgType === "execution_success") {
          console.log("Job completed successfully!");
          clearTimeout(timer);
          ws.close();
          resolve(outputs);
        } else if (msgType === "execution_error") {
          const errorMsg = msgData.exception_message ?? "Unknown error";
          clearTimeout(timer);
          ws.close();
          reject(new Error(`Execution error: ${errorMsg}`));
        }
      };

      ws.onerror = (err) => {
        clearTimeout(timer);
        reject(err);
      };
    });
  }

  // Wait for completion and collect outputs
  const outputs = await listenForCompletion(promptId);
  ```

  ```python Python theme={null}
  import asyncio
  import aiohttp
  import json
  import uuid

  async def listen_for_completion(prompt_id: str, timeout: float = 300.0) -> dict:
      """Connect to WebSocket and listen for job completion.

      Returns:
          Final outputs from the job
      """
      ws_url = BASE_URL.replace("https://", "wss://")
      client_id = str(uuid.uuid4())
      ws_url = f"{ws_url}/ws?clientId={client_id}&token={API_KEY}"

      outputs = {}

      async with aiohttp.ClientSession() as session:
          async with session.ws_connect(ws_url) as ws:
              async def receive_messages():
                  async for msg in ws:
                      if msg.type == aiohttp.WSMsgType.TEXT:
                          data = json.loads(msg.data)
                          msg_type = data.get("type")
                          msg_data = data.get("data", {})

                          # Filter to our job
                          if msg_data.get("prompt_id") != prompt_id:
                              continue

                          if msg_type == "executing":
                              node = msg_data.get("node")
                              if node:
                                  print(f"Executing node: {node}")

                          elif msg_type == "progress":
                              value = msg_data.get("value", 0)
                              max_val = msg_data.get("max", 100)
                              print(f"Progress: {value}/{max_val}")

                          elif msg_type == "executed":
                              node_id = msg_data.get("node")
                              output = msg_data.get("output", {})
                              if output:
                                  outputs[node_id] = output

                          elif msg_type == "execution_success":
                              print("Job completed successfully!")
                              return outputs

                          elif msg_type == "execution_error":
                              error_msg = msg_data.get("exception_message", "Unknown error")
                              raise RuntimeError(f"Execution error: {error_msg}")

                      elif msg.type == aiohttp.WSMsgType.ERROR:
                          raise RuntimeError(f"WebSocket error: {ws.exception()}")

              try:
                  return await asyncio.wait_for(receive_messages(), timeout=timeout)
              except asyncio.TimeoutError:
                  raise TimeoutError(f"Job did not complete within {timeout}s")

  # Wait for completion and collect outputs
  outputs = await listen_for_completion(prompt_id)
  ```
</CodeGroup>

### WebSocket Message Types

Messages are sent as JSON text frames unless otherwise noted.

| Type                    | Description                                                                              |
| ----------------------- | ---------------------------------------------------------------------------------------- |
| `status`                | Queue status update with `queue_remaining` count                                         |
| `notification`          | User-friendly status message (`value` field contains text like "Executing workflow\...") |
| `execution_start`       | Workflow execution has started                                                           |
| `executing`             | A specific node is now executing (node ID in `node` field)                               |
| `progress`              | Step progress within a node (`value`/`max` for sampling steps)                           |
| `progress_state`        | Extended progress state with node metadata (nested `nodes` object)                       |
| `executed`              | Node completed with outputs (images, video, etc. in `output` field)                      |
| `execution_cached`      | Nodes skipped because outputs are cached (`nodes` array)                                 |
| `execution_success`     | Entire workflow completed successfully                                                   |
| `execution_error`       | Workflow failed (includes `exception_type`, `exception_message`, `traceback`)            |
| `execution_interrupted` | Workflow was cancelled by user                                                           |

#### Binary Messages (Preview Images)

During image generation, ComfyUI sends **binary WebSocket frames** containing preview images. These are raw binary data (not JSON):

| Binary Type                   | Value | Description                                   |
| ----------------------------- | ----- | --------------------------------------------- |
| `PREVIEW_IMAGE`               | `1`   | In-progress preview during diffusion sampling |
| `TEXT`                        | `3`   | Text output from nodes (progress text)        |
| `PREVIEW_IMAGE_WITH_METADATA` | `4`   | Preview image with node context metadata      |

**Binary frame formats** (all integers are big-endian):

<Tabs>
  <Tab title="PREVIEW_IMAGE (1)">
    | Offset | Size     | Field        | Description                 |
    | ------ | -------- | ------------ | --------------------------- |
    | 0      | 4 bytes  | `type`       | `0x00000001`                |
    | 4      | 4 bytes  | `image_type` | Format code (1=JPEG, 2=PNG) |
    | 8      | variable | `image_data` | Raw image bytes             |
  </Tab>

  <Tab title="TEXT (3)">
    | Offset | Size     | Field         | Description                 |
    | ------ | -------- | ------------- | --------------------------- |
    | 0      | 4 bytes  | `type`        | `0x00000003`                |
    | 4      | 4 bytes  | `node_id_len` | Length of node\_id string   |
    | 8      | N bytes  | `node_id`     | UTF-8 encoded node ID       |
    | 8+N    | variable | `text`        | UTF-8 encoded progress text |
  </Tab>

  <Tab title="PREVIEW_WITH_METADATA (4)">
    | Offset | Size     | Field          | Description             |
    | ------ | -------- | -------------- | ----------------------- |
    | 0      | 4 bytes  | `type`         | `0x00000004`            |
    | 4      | 4 bytes  | `metadata_len` | Length of metadata JSON |
    | 8      | N bytes  | `metadata`     | UTF-8 JSON (see below)  |
    | 8+N    | variable | `image_data`   | Raw JPEG/PNG bytes      |

    **Metadata JSON structure:**

    ```json theme={null}
    {
      "node_id": "3",
      "display_node_id": "3",
      "real_node_id": "3",
      "prompt_id": "abc-123",
      "parent_node_id": null
    }
    ```
  </Tab>
</Tabs>

<Note>
  See the [OpenAPI Specification](/development/cloud/openapi) for complete schema definitions of each JSON message type.
</Note>

***

## Downloading Outputs

Retrieve generated files after job completion.

<CodeGroup>
  ```bash curl theme={null}
  # Download a single output file (follow 302 redirect with -L)
  curl -L "$BASE_URL/api/view?filename=output.png&subfolder=&type=output" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY" \
    -o output.png
  ```

  ```typescript TypeScript theme={null}
  async function downloadOutput(
    filename: string,
    subfolder: string = "",
    outputType: string = "output"
  ): Promise<ArrayBuffer> {
    const params = new URLSearchParams({ filename, subfolder, type: outputType });
    // Get the redirect URL
    const response = await fetch(`${BASE_URL}/api/view?${params}`, {
      headers: { "X-API-Key": API_KEY },
      redirect: "manual",
    });
    if (response.status !== 302) throw new Error(`HTTP ${response.status}`);
    const signedUrl = response.headers.get("location")!;

    // Fetch from signed URL
    const fileResponse = await fetch(signedUrl);
    if (!fileResponse.ok) throw new Error(`HTTP ${fileResponse.status}`);
    return fileResponse.arrayBuffer();
  }

  async function saveOutputs(
    outputs: Record<string, any>,
    outputDir: string = "."
  ): Promise<void> {
    for (const nodeOutputs of Object.values(outputs)) {
      for (const key of ["images", "video", "audio"]) {
        for (const fileInfo of (nodeOutputs as any)[key] ?? []) {
          const data = await downloadOutput(
            fileInfo.filename,
            fileInfo.subfolder ?? "",
            fileInfo.type ?? "output"
          );
          const path = `${outputDir}/${fileInfo.filename}`;
          await writeFile(path, Buffer.from(data));
          console.log(`Saved: ${path}`);
        }
      }
    }
  }

  // Download all outputs
  await saveOutputs(outputs, "./my_outputs");
  ```

  ```python Python theme={null}
  def download_output(filename: str, subfolder: str = "", output_type: str = "output") -> bytes:
      """Download an output file.

      Args:
          filename: Name of the file
          subfolder: Subfolder path (usually empty)
          output_type: "output" for final outputs, "temp" for previews

      Returns:
          File bytes
      """
      params = {
          "filename": filename,
          "subfolder": subfolder,
          "type": output_type
      }

      response = requests.get(
          f"{BASE_URL}/api/view",
          headers=get_headers(),
          params=params
      )
      response.raise_for_status()
      return response.content

  def save_outputs(outputs: dict, output_dir: str = "."):
      """Save all outputs from a job to disk.

      Args:
          outputs: Outputs dict from job (node_id -> output_data)
          output_dir: Directory to save files to
      """
      import os
      os.makedirs(output_dir, exist_ok=True)

      for node_id, node_outputs in outputs.items():
          for key in ("images", "video", "audio"):
              for file_info in node_outputs.get(key, []):
                  filename = file_info["filename"]
                  subfolder = file_info.get("subfolder", "")
                  output_type = file_info.get("type", "output")

                  data = download_output(filename, subfolder, output_type)

                  output_path = os.path.join(output_dir, filename)
                  with open(output_path, "wb") as f:
                      f.write(data)
                  print(f"Saved: {output_path}")

  # Download all outputs
  save_outputs(outputs, "./my_outputs")
  ```
</CodeGroup>

***

## Complete End-to-End Example

Here's a full example that ties everything together:

<CodeGroup>
  ```typescript TypeScript theme={null}
  const BASE_URL = "https://cloud.comfy.org";
  const API_KEY = process.env.COMFY_CLOUD_API_KEY!;

  function getHeaders(): HeadersInit {
    return { "X-API-Key": API_KEY, "Content-Type": "application/json" };
  }

  async function submitWorkflow(workflow: Record<string, any>): Promise<string> {
    const response = await fetch(`${BASE_URL}/api/prompt`, {
      method: "POST",
      headers: getHeaders(),
      body: JSON.stringify({ prompt: workflow }),
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
    return (await response.json()).prompt_id;
  }

  async function waitForCompletion(
    promptId: string,
    timeout: number = 300000
  ): Promise<Record<string, any>> {
    const wsUrl = `wss://cloud.comfy.org/ws?clientId=${crypto.randomUUID()}&token=${API_KEY}`;
    const outputs: Record<string, any> = {};

    return new Promise((resolve, reject) => {
      const ws = new WebSocket(wsUrl);
      const timer = setTimeout(() => {
        ws.close();
        reject(new Error("Job timed out"));
      }, timeout);

      ws.onmessage = (event) => {
        const data = JSON.parse(event.data);
        if (data.data?.prompt_id !== promptId) return;

        const msgType = data.type;
        const msgData = data.data ?? {};

        if (msgType === "progress") {
          console.log(`Progress: ${msgData.value}/${msgData.max}`);
        } else if (msgType === "executed" && msgData.output) {
          outputs[msgData.node] = msgData.output;
        } else if (msgType === "execution_success") {
          clearTimeout(timer);
          ws.close();
          resolve(outputs);
        } else if (msgType === "execution_error") {
          clearTimeout(timer);
          ws.close();
          reject(new Error(msgData.exception_message ?? "Unknown error"));
        }
      };

      ws.onerror = (err) => {
        clearTimeout(timer);
        reject(err);
      };
    });
  }

  async function downloadOutputs(
    outputs: Record<string, any>,
    outputDir: string
  ): Promise<void> {
    for (const nodeOutputs of Object.values(outputs)) {
      for (const key of ["images", "video", "audio"]) {
        for (const fileInfo of (nodeOutputs as any)[key] ?? []) {
          const params = new URLSearchParams({
            filename: fileInfo.filename,
            subfolder: fileInfo.subfolder ?? "",
            type: fileInfo.type ?? "output",
          });
          // Get redirect URL (don't follow to avoid sending auth to storage)
          const response = await fetch(`${BASE_URL}/api/view?${params}`, {
            headers: { "X-API-Key": API_KEY },
            redirect: "manual",
          });
          if (response.status !== 302) throw new Error(`HTTP ${response.status}`);
          const signedUrl = response.headers.get("location")!;
          // Fetch from signed URL without auth headers
          const fileResponse = await fetch(signedUrl);
          if (!fileResponse.ok) throw new Error(`HTTP ${fileResponse.status}`);

          const path = `${outputDir}/${fileInfo.filename}`;
          await writeFile(path, Buffer.from(await fileResponse.arrayBuffer()));
          console.log(`Downloaded: ${path}`);
        }
      }
    }
  }

  async function main() {
    // 1. Load workflow
    const workflow = JSON.parse(await readFile("workflow_api.json", "utf-8"));

    // 2. Modify workflow parameters
    workflow["3"].inputs.seed = 42;
    workflow["6"].inputs.text = "a beautiful sunset over mountains";

    // 3. Submit workflow
    const promptId = await submitWorkflow(workflow);
    console.log(`Job submitted: ${promptId}`);

    // 4. Wait for completion with progress
    const outputs = await waitForCompletion(promptId);
    console.log(`Job completed! Found ${Object.keys(outputs).length} output nodes`);

    // 5. Download outputs
    await downloadOutputs(outputs, "./outputs");
    console.log("Done!");
  }

  main();
  ```

  ```python Python theme={null}
  import os
  import requests
  import json
  import asyncio
  import aiohttp
  import uuid

  BASE_URL = "https://cloud.comfy.org"
  API_KEY = os.environ["COMFY_CLOUD_API_KEY"]

  def get_headers():
      return {"X-API-Key": API_KEY, "Content-Type": "application/json"}

  def upload_image(file_path: str) -> dict:
      """Upload an image and return the reference for use in workflows."""
      with open(file_path, "rb") as f:
          response = requests.post(
              f"{BASE_URL}/api/upload/image",
              headers={"X-API-Key": API_KEY},
              files={"image": f},
              data={"type": "input", "overwrite": "true"}
          )
      response.raise_for_status()
      return response.json()

  def submit_workflow(workflow: dict) -> str:
      """Submit workflow and return prompt_id."""
      response = requests.post(
          f"{BASE_URL}/api/prompt",
          headers=get_headers(),
          json={"prompt": workflow}
      )
      response.raise_for_status()
      return response.json()["prompt_id"]

  async def wait_for_completion(prompt_id: str, timeout: float = 300.0) -> dict:
      """Wait for job completion via WebSocket."""
      ws_url = BASE_URL.replace("https://", "wss://") + f"/ws?clientId={uuid.uuid4()}&token={API_KEY}"
      outputs = {}
      
      async with aiohttp.ClientSession() as session:
          async with session.ws_connect(ws_url) as ws:
              start = asyncio.get_event_loop().time()
              async for msg in ws:
                  if asyncio.get_event_loop().time() - start > timeout:
                      raise TimeoutError("Job timed out")
                  
                  if msg.type != aiohttp.WSMsgType.TEXT:
                      continue
                      
                  data = json.loads(msg.data)
                  if data.get("data", {}).get("prompt_id") != prompt_id:
                      continue
                  
                  msg_type = data.get("type")
                  msg_data = data.get("data", {})
                  
                  if msg_type == "progress":
                      print(f"Progress: {msg_data.get('value')}/{msg_data.get('max')}")
                  elif msg_type == "executed":
                      if output := msg_data.get("output"):
                          outputs[msg_data["node"]] = output
                  elif msg_type == "execution_success":
                      return outputs
                  elif msg_type == "execution_error":
                      raise RuntimeError(msg_data.get("exception_message", "Unknown error"))
      
      return outputs

  def download_outputs(outputs: dict, output_dir: str):
      """Download all output files."""
      os.makedirs(output_dir, exist_ok=True)
      
      for node_outputs in outputs.values():
          for key in ["images", "video", "audio"]:
              for file_info in node_outputs.get(key, []):
                  params = {
                      "filename": file_info["filename"],
                      "subfolder": file_info.get("subfolder", ""),
                      "type": file_info.get("type", "output")
                  }
                  response = requests.get(f"{BASE_URL}/api/view", headers=get_headers(), params=params)
                  response.raise_for_status()
                  
                  path = os.path.join(output_dir, file_info["filename"])
                  with open(path, "wb") as f:
                      f.write(response.content)
                  print(f"Downloaded: {path}")

  async def main():
      # 1. Load workflow
      with open("workflow_api.json") as f:
          workflow = json.load(f)
      
      # 2. Optionally upload input images
      # image_ref = upload_image("input.png")
      # workflow["1"]["inputs"]["image"] = image_ref["name"]
      
      # 3. Modify workflow parameters
      workflow["3"]["inputs"]["seed"] = 42
      workflow["6"]["inputs"]["text"] = "a beautiful sunset over mountains"
      
      # 4. Submit workflow
      prompt_id = submit_workflow(workflow)
      print(f"Job submitted: {prompt_id}")
      
      # 5. Wait for completion with progress
      outputs = await wait_for_completion(prompt_id)
      print(f"Job completed! Found {len(outputs)} output nodes")
      
      # 6. Download outputs
      download_outputs(outputs, "./outputs")
      print("Done!")

  if __name__ == "__main__":
      asyncio.run(main())
  ```
</CodeGroup>

***

## Queue Management

### Get Queue Status

<CodeGroup>
  ```bash curl theme={null}
  curl -X GET "$BASE_URL/api/queue" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY"
  ```

  ```typescript TypeScript theme={null}
  async function getQueue(): Promise<{
    queue_running: any[];
    queue_pending: any[];
  }> {
    const response = await fetch(`${BASE_URL}/api/queue`, {
      headers: getHeaders(),
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
    return response.json();
  }

  const queue = await getQueue();
  console.log(`Running: ${queue.queue_running.length}`);
  console.log(`Pending: ${queue.queue_pending.length}`);
  ```

  ```python Python theme={null}
  def get_queue():
      """Get current queue status."""
      response = requests.get(
          f"{BASE_URL}/api/queue",
          headers=get_headers()
      )
      response.raise_for_status()
      return response.json()

  queue = get_queue()
  print(f"Running: {len(queue.get('queue_running', []))}")
  print(f"Pending: {len(queue.get('queue_pending', []))}")
  ```
</CodeGroup>

### Cancel a Job

<CodeGroup>
  ```bash curl theme={null}
  curl -X POST "$BASE_URL/api/queue" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY" \
    -H "Content-Type: application/json" \
    -d '{"delete": ["PROMPT_ID_HERE"]}'
  ```

  ```typescript TypeScript theme={null}
  async function cancelJob(promptId: string): Promise<void> {
    const response = await fetch(`${BASE_URL}/api/queue`, {
      method: "POST",
      headers: getHeaders(),
      body: JSON.stringify({ delete: [promptId] }),
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
  }
  ```

  ```python Python theme={null}
  def cancel_job(prompt_id: str):
      """Cancel a pending or running job."""
      response = requests.post(
          f"{BASE_URL}/api/queue",
          headers=get_headers(),
          json={"delete": [prompt_id]}
      )
      response.raise_for_status()
      return response.json()
  ```
</CodeGroup>

### Interrupt Current Execution

<CodeGroup>
  ```bash curl theme={null}
  curl -X POST "$BASE_URL/api/interrupt" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY"
  ```

  ```typescript TypeScript theme={null}
  async function interrupt(): Promise<void> {
    const response = await fetch(`${BASE_URL}/api/interrupt`, {
      method: "POST",
      headers: getHeaders(),
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
  }
  ```

  ```python Python theme={null}
  def interrupt():
      """Interrupt the currently running job."""
      response = requests.post(
          f"{BASE_URL}/api/interrupt",
          headers=get_headers()
      )
      response.raise_for_status()
  ```
</CodeGroup>

***

## Error Handling

### HTTP Errors

REST API endpoints return standard HTTP status codes:

| Status | Description                                    |
| ------ | ---------------------------------------------- |
| `400`  | Invalid request (bad workflow, missing fields) |
| `401`  | Unauthorized (invalid or missing API key)      |
| `402`  | Insufficient credits                           |
| `429`  | Subscription inactive                          |
| `500`  | Internal server error                          |

### Execution Errors

During workflow execution, errors are delivered via the `execution_error` WebSocket message. The `exception_type` field identifies the error category:

| Exception Type              | Description                                        |
| --------------------------- | -------------------------------------------------- |
| `ValidationError`           | Invalid workflow or inputs                         |
| `ModelDownloadError`        | Required model not available or failed to download |
| `ImageDownloadError`        | Failed to download input image from URL            |
| `OOMError`                  | Out of GPU memory                                  |
| `InsufficientFundsError`    | Account balance too low (for Partner Nodes)        |
| `InactiveSubscriptionError` | Subscription not active                            |
