Skip to main content

End-to-end script with an example dataset with prediction and clustering

# /// script
# requires-python = ">=3.11"
# dependencies = [
#   "ucimlrepo",
#   "pandas",
#   "requests",
# ]
# ///
import argparse
import os
import sys
import time
import json
import pandas as pd
import requests
from ucimlrepo import fetch_ucirepo
# Defaults
DEFAULT_BASE_URL = "https://beta.woodwide.ai"
UCI_DATASET_ID = 2  # Adult dataset
def setup_args():
    parser = argparse.ArgumentParser(
        description="Test Woodwide API with UCI Adult dataset"
    )
    parser.add_argument("-k", "--api-key", required=True, help="Woodwide API Key")
    parser.add_argument(
        "-m", "--model-name", default="uci_adult_model", help="Name for the model"
    )
    parser.add_argument(
        "-d", "--dataset-name", default="uci_adult", help="Name for the dataset"
    )
    parser.add_argument(
        "-o", "--output-file", help="File path to save inference results"
    )
    parser.add_argument(
        "--base-url", default=DEFAULT_BASE_URL, help="Base URL for API"
    )
    parser.add_argument(
        "-c", "--clustering", action="store_true", help="Run clustering instead of prediction"
    )
    return parser.parse_args()
def fetch_and_prepare_data():
    print(f"Fetching UCI dataset ID={UCI_DATASET_ID}...")
    dataset = fetch_ucirepo(id=UCI_DATASET_ID)
    X = dataset.data.features
    y = dataset.data.targets
    # Combine features and targets
    df = pd.concat([X, y], axis=1)
    # Simple train/test split (80/20)
    train_df = df.sample(frac=0.8, random_state=42)
    test_df = df.drop(train_df.index)
    # Save to temporary CSV files
    train_path = "uci_train.csv"
    test_path = "uci_test.csv"
    train_df.to_csv(train_path, index=False)
    test_df.to_csv(test_path, index=False)
    label_column = y.columns[0]
    print(f"Data prepared. Label column: '{label_column}'")
    print(f"Train shape: {train_df.shape}, Test shape: {test_df.shape}")
    return train_path, test_path, label_column
def upload_dataset(base_url, headers, file_path, name):
    print(f"Uploading {file_path} as '{name}'...")
    start_time = time.time()
    with open(file_path, "rb") as f:
        files = {"file": (os.path.basename(file_path), f, "text/csv")}
        data = {"dataset_name": name, "overrides": "true"}
        response = requests.post(f"{base_url}/api/datasets", headers=headers, files=files, data=data)
    if response.status_code not in (200, 201):
        print(f"Error uploading dataset: {response.status_code}")
        print(response.text)
        sys.exit(1)
    elapsed = time.time() - start_time
    dataset_id = response.json().get("dataset_id")
    print(f"Upload took {elapsed:.2f}s. ID: {dataset_id}\n")
    return dataset_id
def train_model(base_url, headers, dataset_id, model_name, label_column, is_clustering=False):
    model_type = "clustering" if is_clustering else "prediction"
    print(f"Training {model_type.capitalize()} Model '{model_name}' using dataset ID '{dataset_id}'...")
    
    payload = {
        "model_name": model_name,
        "model_type": model_type,
        "dataset_id": dataset_id
    }
    
    if not is_clustering:
        payload["label_column"] = label_column
    start_time = time.time()
    response = requests.post(
        f"{base_url}/api/models/train",
        json=payload,
        headers=headers,
    )
    if response.status_code not in (200, 202):
        print(f"Error starting training: {response.status_code}")
        print(response.text)
        sys.exit(1)
    elapsed = time.time() - start_time
    model_id = response.json().get("model_id")
    print(f"Request took {elapsed:.2f}s. Model ID: {model_id}\n")
    return model_id
def wait_for_training(base_url, headers, model_id):
    print(f"Waiting for Model Training to Complete (ID: {model_id})...")
    start_time = time.time()
    timeout = 3000
    while True:
        response = requests.get(f"{base_url}/api/models/{model_id}", headers=headers)
        if response.status_code != 200:
            print(f"Error checking status: {response.status_code}")
            sys.exit(1)
            
        model_data = response.json()
        status = model_data.get("status")
        if status == "ready":
            print("Training Complete.")
            break
        elif status == "failed":
            print("Error: Model Training Failed.")
            print(json.dumps(model_data, indent=2))
            sys.exit(1)
        elapsed = time.time() - start_time
        if elapsed >= timeout:
            print(f"Error: Training Timed Out after {timeout} seconds.")
            sys.exit(1)
        print(f"Status: {status}. Waiting...")
        time.sleep(5)
    print(f"Success: Took {time.time() - start_time:.2f} seconds to train model.\n")
def run_inference(base_url, headers, model_id, test_file_path, output_file):
    print(f"Running Synchronous Inference on Model {model_id} using file {test_file_path}...")
    start_time = time.time()
    with open(test_file_path, "rb") as f:
        files = {"file": (os.path.basename(test_file_path), f, "text/csv")}
        # Requesting JSON output for easy handling in the script
        data = {"output_type": "json"}
        response = requests.post(
            f"{base_url}/api/models/{model_id}/infer",
            headers=headers,
            files=files,
            data=data
        )
    if response.status_code != 200:
        print(f"Error running inference: {response.status_code}")
        print(response.text)
        sys.exit(1)
    elapsed = time.time() - start_time
    print(f"Inference took {elapsed:.2f}s")
    result = response.json()
    formatted_result = json.dumps(result, indent=2)
    if output_file:
        with open(output_file, "w") as f:
            f.write(formatted_result)
        print(f"Inference results saved to {output_file}")
    else:
        print("Inference Response (Sample):")
        # Print a sample of the output to avoid flooding the terminal
        output_data = result.get("output", {})
        sample = {k: v[:5] if isinstance(v, list) else v for k, v in list(output_data.items())[:3]}
        print(json.dumps({"job_id": result.get("job_id"), "output_sample": sample}, indent=2))
    print("")
def main():
    args = setup_args()
    base_url = args.base_url.rstrip("/")
    headers = {"Authorization": f"Bearer {args.api_key}"}
    # 1. Fetch Data
    train_path, test_path, label_column = fetch_and_prepare_data()
    try:
        # 2. Upload Training Data
        train_dataset_id = upload_dataset(
            base_url, headers, train_path, args.dataset_name
        )
        # 3. Train Model
        model_id = train_model(
            base_url,
            headers,
            train_dataset_id,
            args.model_name,
            label_column,
            is_clustering=args.clustering,
        )
        # 4. Wait for Training
        wait_for_training(base_url, headers, model_id)
        # 5. Run Inference using the test file
        run_inference(
            base_url, headers, model_id, test_path, args.output_file
        )
    finally:
        # Cleanup temp files
        for path in [train_path, test_path]:
            if os.path.exists(path):
                os.remove(path)
if __name__ == "__main__":
    main()