RoboTwin_image/daemon/robotwin_daemon.py
2025-07-02 03:13:07 +00:00

72 lines
2.0 KiB
Python

def check_and_install_dependencies():
import importlib
import subprocess
import sys
dependencies = ["fastapi", "pydantic", "uvicorn"]
for package in dependencies:
try:
importlib.import_module(package)
print(f"{package} installed")
except ImportError:
print(f"Installing {package}...")
subprocess.check_call([sys.executable, "-m", "pip", "install", package])
print(f"{package} installed")
check_and_install_dependencies()
from fastapi import FastAPI
import uvicorn
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
app = FastAPI()
@app.get("/v1/get_all_task_name")
def get_all_tasks():
import os
import inspect
import importlib.util
# Get the absolute path to task_info.py
task_info_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "gpt_api", "task_info.py")
# Import the task_info module
spec = importlib.util.spec_from_file_location("task_info", task_info_path)
task_info = importlib.util.module_from_spec(spec)
spec.loader.exec_module(task_info)
# Get all uppercase variables from task_info.py which contain task definitions
task_vars = inspect.getmembers(task_info, lambda x: isinstance(x, dict) and 'task_name' in x)
# Extract task_name from each task definition
task_names = [var[1]['task_name'] for var in task_vars]
return task_names
@app.get("/v1/check_daemon_health")
def check_daemon_health():
return {"status": "200"}
@app.get("/v1/check_env_health")
def check_env_health():
from script.env_health_checker import verify_robotwin_environment
is_healthy, check_results = verify_robotwin_environment()
output_result = {
"env_check_result": "healthy" if is_healthy else "unhealthy",
"check_result_message": check_results
}
return output_result
if __name__ == "__main__":
uvicorn.run(app, uds="/tmp/robotwin.sock")