RoboTwin_image/script/env_health_checker.py
2025-07-02 03:13:07 +00:00

392 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import subprocess
import json
import sys
import platform
import re
import logging
import importlib
import warnings
import time
# 获取当前脚本所在目录的父目录(项目根目录)
script_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(script_dir)
# 配置日志 - 使用相对路径
log_file = os.path.join(project_root, 'environment_verify.log')
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler(log_file)
]
)
def run_command(command, description=None, show_output=False):
"""运行命令并打印状态"""
if description:
logging.info(f"=== {description} ===")
logging.info(f"执行: {command}")
try:
# 明确指定使用bash执行命令
result = subprocess.run(['/bin/bash', '-c', command],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
output = result.stdout.decode('utf-8')
logging.info(f"命令执行成功: {command}")
if show_output:
logging.info(f"输出:\n{output}")
return True, output
except subprocess.CalledProcessError as e:
error = e.stderr.decode('utf-8')
logging.error(f"命令执行失败: {command}")
logging.error(f"错误信息: {error}")
return False, error
class SapienRenderTester:
def __init__(self):
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=UserWarning)
# 添加tools路径
sys.path.append(os.path.join(project_root, 'tools'))
def test_render_pipeline(self):
"""测试Sapien渲染管线是否正常工作"""
try:
# 导入必要的包
import numpy as np
import torch
import sapien.core as sapien
from sapien.utils.viewer import Viewer
import gymnasium as gym
import transforms3d as t3d
from collections import OrderedDict
# 设置日志级别
import toppra as ta
ta.setup_logging("CRITICAL") # 隐藏日志
# 初始化引擎
engine = sapien.Engine()
# 声明sapien渲染器
from sapien.render import set_global_config
set_global_config(max_num_materials=50000, max_num_textures=50000)
renderer = sapien.SapienRenderer()
# 将渲染器提供给sapien模拟器
engine.set_renderer(renderer)
# 设置光线追踪
sapien.render.set_camera_shader_dir("rt")
sapien.render.set_ray_tracing_samples_per_pixel(32)
sapien.render.set_ray_tracing_path_depth(8)
sapien.render.set_ray_tracing_denoiser("oidn")
# 声明sapien场景
scene_config = sapien.SceneConfig()
scene = engine.create_scene(scene_config)
# 如果以上步骤都没有异常,则渲染管线工作正常
return True, "Sapien渲染管线初始化成功"
except Exception as e:
return False, f"Sapien渲染管线初始化失败: {str(e)}"
def verify_robotwin_environment():
"""验证RoboTwin环境是否正常安装和配置"""
verification_results = []
all_passed = True
# 获取conda路径
success, conda_path_output = run_command("conda info --base", "获取conda基础路径")
if success:
conda_path = conda_path_output.strip()
logging.info(f"Conda 基础路径: {conda_path}")
else:
logging.error("无法获取conda路径环境验证失败")
verification_results.append({
"component": "Conda",
"status": "失败",
"message": "无法获取conda路径"
})
all_passed = False
return all_passed, verification_results
# 检查RoboTwin环境是否存在
check_env_cmd = "conda env list | grep -w RoboTwin || echo ''"
success, env_check_output = run_command(check_env_cmd, "检查RoboTwin环境是否存在")
if not success or not env_check_output.strip() or "RoboTwin" not in env_check_output:
logging.error("RoboTwin环境不存在")
verification_results.append({
"component": "RoboTwin环境",
"status": "失败",
"message": "RoboTwin conda环境不存在"
})
all_passed = False
return all_passed, verification_results
# 验证RoboTwin环境
activate_command = f"source {conda_path}/etc/profile.d/conda.sh && conda activate RoboTwin && "
# 1. 检查环境是否可激活
check_activate_cmd = activate_command + "conda info --envs"
success, activate_output = run_command(check_activate_cmd, "激活RoboTwin环境")
if not success:
logging.error("无法激活RoboTwin环境")
verification_results.append({
"component": "RoboTwin环境激活",
"status": "失败",
"message": "无法激活RoboTwin环境"
})
all_passed = False
else:
verification_results.append({
"component": "RoboTwin环境激活",
"status": "成功",
"message": "可以成功激活RoboTwin环境"
})
# 2. 检查Python版本
check_py_cmd = activate_command + "python --version"
success, py_version_output = run_command(check_py_cmd, "检查Python版本")
if success:
# 提取版本号
match = re.search(r"Python (\d+\.\d+\.\d+)", py_version_output)
if match:
py_version = match.group(1)
if py_version.startswith("3.8.") or py_version.startswith("3.10."):
verification_results.append({
"component": "Python版本",
"status": "成功",
"message": f"Python版本 {py_version} 符合要求"
})
else:
verification_results.append({
"component": "Python版本",
"status": "失败",
"message": f"Python版本 {py_version} 不符合要求 (需要3.8或3.10)"
})
all_passed = False
else:
verification_results.append({
"component": "Python版本",
"status": "失败",
"message": "无法确定Python版本"
})
all_passed = False
else:
verification_results.append({
"component": "Python版本",
"status": "失败",
"message": "无法获取Python版本"
})
all_passed = False
# 3. 检查核心依赖包
core_packages = [
"torch", "torchvision", "sapien", "scipy",
"mplib", "gymnasium", "trimesh", "open3d",
"imageio", "pydantic", "zarr", "huggingface_hub",
"pytorch3d" # 将pytorch3d添加到核心依赖包列表中
]
# 使用importlib.util.find_spec快速检查包是否安装而不实际导入它们
import importlib.util
logging.info("=== 检查核心依赖包 ===")
start_total = time.time()
for package in core_packages:
try:
# 仅检查包是否存在,不导入
spec = importlib.util.find_spec(package)
if spec is not None:
verification_results.append({
"component": f"依赖包 {package}",
"status": "成功",
"message": f"已安装 {package}"
})
else:
verification_results.append({
"component": f"依赖包 {package}",
"status": "失败",
"message": f"未安装 {package}"
})
all_passed = False
except Exception as e:
verification_results.append({
"component": f"依赖包 {package}",
"status": "失败",
"message": f"检查 {package} 时发生错误: {str(e)}"
})
all_passed = False
elapsed_total = time.time() - start_total
logging.info(f"=== 核心依赖包检查完成 (总耗时: {elapsed_total:.2f}秒) ===")
# 5. 检查Vulkan
check_vulkan_cmd = "DISPLAY='' vulkaninfo --summary 2>/dev/null || echo 'Vulkan not installed'"
success, vulkan_output = run_command(check_vulkan_cmd, "检查Vulkan")
if success and "Vulkan not installed" not in vulkan_output:
if "Vulkan Instance Version" in vulkan_output:
# 提取Vulkan版本
version_match = re.search(r"Vulkan Instance Version: (\d+\.\d+\.\d+)", vulkan_output)
version = version_match.group(1) if version_match else "未知"
verification_results.append({
"component": "Vulkan",
"status": "成功",
"message": f"Vulkan 已安装 (版本 {version})"
})
else:
verification_results.append({
"component": "Vulkan",
"status": "成功",
"message": "Vulkan 已安装"
})
else:
verification_results.append({
"component": "Vulkan",
"status": "失败",
"message": "Vulkan 未安装或安装不正确"
})
all_passed = False
# 6. 检查资源文件
resource_directories = ["aloha_maniskill_sim", "models"]
for directory in resource_directories:
check_dir_cmd = f"ls -la {os.path.join(project_root, directory)} 2>/dev/null || echo 'not found'"
success, dir_output = run_command(check_dir_cmd, f"检查{directory}资源目录")
if success and "not found" not in dir_output:
verification_results.append({
"component": f"资源目录 {directory}",
"status": "成功",
"message": f"{directory} 目录存在"
})
else:
verification_results.append({
"component": f"资源目录 {directory}",
"status": "失败",
"message": f"{directory} 目录不存在"
})
all_passed = False
# 7. 检查mplib修改
logging.info("=== 检查mplib修改 ===")
try:
# 直接使用Python获取mplib包路径
import importlib.util
mplib_spec = importlib.util.find_spec("mplib")
if mplib_spec and mplib_spec.origin:
# 从模块文件路径获取包路径
mplib_path = os.path.dirname(os.path.dirname(mplib_spec.origin))
planner_path = os.path.join(mplib_path, "mplib", "planner.py")
if os.path.exists(planner_path):
# 直接读取文件内容
with open(planner_path, 'r') as f:
file_content = f.read()
# 检查修改1: convex=True 是否已注释
convex_exists = "convex=True," in file_content
convex_commented = "# convex=True," in file_content
if not convex_exists or convex_commented:
verification_results.append({
"component": "mplib修改 - convex=True",
"status": "成功",
"message": "convex=True 参数已经被注释或移除"
})
else:
verification_results.append({
"component": "mplib修改 - convex=True",
"status": "失败",
"message": "convex=True 参数未被注释或移除"
})
all_passed = False
# 检查修改2: 'or collide' 是否已移除
collide_exists = "or collide or not within_joint_limit" in file_content
if not collide_exists:
verification_results.append({
"component": "mplib修改 - or collide",
"status": "成功",
"message": "'or collide' 条件已被移除"
})
else:
verification_results.append({
"component": "mplib修改 - or collide",
"status": "失败",
"message": "'or collide' 条件未被移除"
})
all_passed = False
else:
verification_results.append({
"component": "mplib修改",
"status": "失败",
"message": f"mplib planner.py 文件不存在: {planner_path}"
})
all_passed = False
else:
verification_results.append({
"component": "mplib修改",
"status": "失败",
"message": "无法找到mplib安装位置"
})
all_passed = False
except Exception as e:
verification_results.append({
"component": "mplib修改",
"status": "失败",
"message": f"检查mplib修改时发生错误: {str(e)}"
})
all_passed = False
# 生成最终报告
logging.info("\n=== RoboTwin环境验证总结 ===")
success_count = sum(1 for result in verification_results if result["status"] == "成功")
total_items = len(verification_results)
logging.info(f"总检查项: {total_items}, 成功: {success_count}, 失败: {total_items - success_count}")
for result in verification_results:
status_symbol = "" if result["status"] == "成功" else ""
logging.info(f"{status_symbol} {result['component']}: {result['message']}")
return all_passed, verification_results
if __name__ == "__main__":
# 确保当前工作目录是项目根目录
os.chdir(project_root)
# 运行环境验证
is_healthy, check_results = verify_robotwin_environment()
# 准备输出结果
output_result = {
"env_check_result": "healthy" if is_healthy else "unhealthy",
"check_result_message": check_results
}
# 输出到终端
print(json.dumps(output_result, indent=2, ensure_ascii=False))
# 保存结果到文件
with open(os.path.join(project_root, 'environment_verify_results.json'), 'w', encoding='utf-8') as f:
json.dump(output_result, f, indent=2, ensure_ascii=False)
# 退出状态码
sys.exit(0 if is_healthy else 1)