# /// script
# requires-python = ">=3.11"
# dependencies = [
# "ucimlrepo",
# "pandas",
# "requests",
# ]
# ///
import argparse
import os
import sys
import time
import json
import pandas as pd
import requests
from ucimlrepo import fetch_ucirepo
# Defaults
DEFAULT_BASE_URL = "https://beta.woodwide.ai"
UCI_DATASET_ID = 2 # Adult dataset
def setup_args():
parser = argparse.ArgumentParser(
description="Test Woodwide API with UCI Adult dataset"
)
parser.add_argument("-k", "--api-key", required=True, help="Woodwide API Key")
parser.add_argument(
"-m", "--model-name", default="uci_adult_model", help="Name for the model"
)
parser.add_argument(
"-d", "--dataset-name", default="uci_adult", help="Name for the dataset"
)
parser.add_argument(
"-o", "--output-file", help="File path to save inference results"
)
parser.add_argument(
"--base-url", default=DEFAULT_BASE_URL, help="Base URL for API"
)
parser.add_argument(
"-c", "--clustering", action="store_true", help="Run clustering instead of prediction"
)
return parser.parse_args()
def fetch_and_prepare_data():
print(f"Fetching UCI dataset ID={UCI_DATASET_ID}...")
dataset = fetch_ucirepo(id=UCI_DATASET_ID)
X = dataset.data.features
y = dataset.data.targets
# Combine features and targets
df = pd.concat([X, y], axis=1)
# Simple train/test split (80/20)
train_df = df.sample(frac=0.8, random_state=42)
test_df = df.drop(train_df.index)
# Save to temporary CSV files
train_path = "uci_train.csv"
test_path = "uci_test.csv"
train_df.to_csv(train_path, index=False)
test_df.to_csv(test_path, index=False)
label_column = y.columns[0]
print(f"Data prepared. Label column: '{label_column}'")
print(f"Train shape: {train_df.shape}, Test shape: {test_df.shape}")
return train_path, test_path, label_column
def upload_dataset(base_url, headers, file_path, name):
print(f"Uploading {file_path} as '{name}'...")
start_time = time.time()
with open(file_path, "rb") as f:
files = {"file": (os.path.basename(file_path), f, "text/csv")}
data = {"dataset_name": name, "overrides": "true"}
response = requests.post(f"{base_url}/api/datasets", headers=headers, files=files, data=data)
if response.status_code not in (200, 201):
print(f"Error uploading dataset: {response.status_code}")
print(response.text)
sys.exit(1)
elapsed = time.time() - start_time
dataset_id = response.json().get("dataset_id")
print(f"Upload took {elapsed:.2f}s. ID: {dataset_id}\n")
return dataset_id
def train_model(base_url, headers, dataset_id, model_name, label_column, is_clustering=False):
model_type = "clustering" if is_clustering else "prediction"
print(f"Training {model_type.capitalize()} Model '{model_name}' using dataset ID '{dataset_id}'...")
payload = {
"model_name": model_name,
"model_type": model_type,
"dataset_id": dataset_id
}
if not is_clustering:
payload["label_column"] = label_column
start_time = time.time()
response = requests.post(
f"{base_url}/api/models/train",
json=payload,
headers=headers,
)
if response.status_code not in (200, 202):
print(f"Error starting training: {response.status_code}")
print(response.text)
sys.exit(1)
elapsed = time.time() - start_time
model_id = response.json().get("model_id")
print(f"Request took {elapsed:.2f}s. Model ID: {model_id}\n")
return model_id
def wait_for_training(base_url, headers, model_id):
print(f"Waiting for Model Training to Complete (ID: {model_id})...")
start_time = time.time()
timeout = 3000
while True:
response = requests.get(f"{base_url}/api/models/{model_id}", headers=headers)
if response.status_code != 200:
print(f"Error checking status: {response.status_code}")
sys.exit(1)
model_data = response.json()
status = model_data.get("status")
if status == "ready":
print("Training Complete.")
break
elif status == "failed":
print("Error: Model Training Failed.")
print(json.dumps(model_data, indent=2))
sys.exit(1)
elapsed = time.time() - start_time
if elapsed >= timeout:
print(f"Error: Training Timed Out after {timeout} seconds.")
sys.exit(1)
print(f"Status: {status}. Waiting...")
time.sleep(5)
print(f"Success: Took {time.time() - start_time:.2f} seconds to train model.\n")
def run_inference(base_url, headers, model_id, test_file_path, output_file):
print(f"Running Synchronous Inference on Model {model_id} using file {test_file_path}...")
start_time = time.time()
with open(test_file_path, "rb") as f:
files = {"file": (os.path.basename(test_file_path), f, "text/csv")}
# Requesting JSON output for easy handling in the script
data = {"output_type": "json"}
response = requests.post(
f"{base_url}/api/models/{model_id}/infer",
headers=headers,
files=files,
data=data
)
if response.status_code != 200:
print(f"Error running inference: {response.status_code}")
print(response.text)
sys.exit(1)
elapsed = time.time() - start_time
print(f"Inference took {elapsed:.2f}s")
result = response.json()
formatted_result = json.dumps(result, indent=2)
if output_file:
with open(output_file, "w") as f:
f.write(formatted_result)
print(f"Inference results saved to {output_file}")
else:
print("Inference Response (Sample):")
# Print a sample of the output to avoid flooding the terminal
output_data = result.get("output", {})
sample = {k: v[:5] if isinstance(v, list) else v for k, v in list(output_data.items())[:3]}
print(json.dumps({"job_id": result.get("job_id"), "output_sample": sample}, indent=2))
print("")
def main():
args = setup_args()
base_url = args.base_url.rstrip("/")
headers = {"Authorization": f"Bearer {args.api_key}"}
# 1. Fetch Data
train_path, test_path, label_column = fetch_and_prepare_data()
try:
# 2. Upload Training Data
train_dataset_id = upload_dataset(
base_url, headers, train_path, args.dataset_name
)
# 3. Train Model
model_id = train_model(
base_url,
headers,
train_dataset_id,
args.model_name,
label_column,
is_clustering=args.clustering,
)
# 4. Wait for Training
wait_for_training(base_url, headers, model_id)
# 5. Run Inference using the test file
run_inference(
base_url, headers, model_id, test_path, args.output_file
)
finally:
# Cleanup temp files
for path in [train_path, test_path]:
if os.path.exists(path):
os.remove(path)
if __name__ == "__main__":
main()