import os import subprocess import json import sys import platform import re import logging import importlib import warnings import time # 获取当前脚本所在目录的父目录(项目根目录) script_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(script_dir) # 配置日志 - 使用相对路径 log_file = os.path.join(project_root, 'environment_verify.log') logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler(log_file) ] ) def run_command(command, description=None, show_output=False): """运行命令并打印状态""" if description: logging.info(f"=== {description} ===") logging.info(f"执行: {command}") try: # 明确指定使用bash执行命令 result = subprocess.run(['/bin/bash', '-c', command], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output = result.stdout.decode('utf-8') logging.info(f"命令执行成功: {command}") if show_output: logging.info(f"输出:\n{output}") return True, output except subprocess.CalledProcessError as e: error = e.stderr.decode('utf-8') logging.error(f"命令执行失败: {command}") logging.error(f"错误信息: {error}") return False, error class SapienRenderTester: def __init__(self): warnings.simplefilter(action='ignore', category=FutureWarning) warnings.simplefilter(action='ignore', category=UserWarning) # 添加tools路径 sys.path.append(os.path.join(project_root, 'tools')) def test_render_pipeline(self): """测试Sapien渲染管线是否正常工作""" try: # 导入必要的包 import numpy as np import torch import sapien.core as sapien from sapien.utils.viewer import Viewer import gymnasium as gym import transforms3d as t3d from collections import OrderedDict # 设置日志级别 import toppra as ta ta.setup_logging("CRITICAL") # 隐藏日志 # 初始化引擎 engine = sapien.Engine() # 声明sapien渲染器 from sapien.render import set_global_config set_global_config(max_num_materials=50000, max_num_textures=50000) renderer = sapien.SapienRenderer() # 将渲染器提供给sapien模拟器 engine.set_renderer(renderer) # 设置光线追踪 sapien.render.set_camera_shader_dir("rt") sapien.render.set_ray_tracing_samples_per_pixel(32) sapien.render.set_ray_tracing_path_depth(8) sapien.render.set_ray_tracing_denoiser("oidn") # 声明sapien场景 scene_config = sapien.SceneConfig() scene = engine.create_scene(scene_config) # 如果以上步骤都没有异常,则渲染管线工作正常 return True, "Sapien渲染管线初始化成功" except Exception as e: return False, f"Sapien渲染管线初始化失败: {str(e)}" def verify_robotwin_environment(): """验证RoboTwin环境是否正常安装和配置""" verification_results = [] all_passed = True # 获取conda路径 success, conda_path_output = run_command("conda info --base", "获取conda基础路径") if success: conda_path = conda_path_output.strip() logging.info(f"Conda 基础路径: {conda_path}") else: logging.error("无法获取conda路径,环境验证失败") verification_results.append({ "component": "Conda", "status": "失败", "message": "无法获取conda路径" }) all_passed = False return all_passed, verification_results # 检查RoboTwin环境是否存在 check_env_cmd = "conda env list | grep -w RoboTwin || echo ''" success, env_check_output = run_command(check_env_cmd, "检查RoboTwin环境是否存在") if not success or not env_check_output.strip() or "RoboTwin" not in env_check_output: logging.error("RoboTwin环境不存在") verification_results.append({ "component": "RoboTwin环境", "status": "失败", "message": "RoboTwin conda环境不存在" }) all_passed = False return all_passed, verification_results # 验证RoboTwin环境 activate_command = f"source {conda_path}/etc/profile.d/conda.sh && conda activate RoboTwin && " # 1. 检查环境是否可激活 check_activate_cmd = activate_command + "conda info --envs" success, activate_output = run_command(check_activate_cmd, "激活RoboTwin环境") if not success: logging.error("无法激活RoboTwin环境") verification_results.append({ "component": "RoboTwin环境激活", "status": "失败", "message": "无法激活RoboTwin环境" }) all_passed = False else: verification_results.append({ "component": "RoboTwin环境激活", "status": "成功", "message": "可以成功激活RoboTwin环境" }) # 2. 检查Python版本 check_py_cmd = activate_command + "python --version" success, py_version_output = run_command(check_py_cmd, "检查Python版本") if success: # 提取版本号 match = re.search(r"Python (\d+\.\d+\.\d+)", py_version_output) if match: py_version = match.group(1) if py_version.startswith("3.8.") or py_version.startswith("3.10."): verification_results.append({ "component": "Python版本", "status": "成功", "message": f"Python版本 {py_version} 符合要求" }) else: verification_results.append({ "component": "Python版本", "status": "失败", "message": f"Python版本 {py_version} 不符合要求 (需要3.8或3.10)" }) all_passed = False else: verification_results.append({ "component": "Python版本", "status": "失败", "message": "无法确定Python版本" }) all_passed = False else: verification_results.append({ "component": "Python版本", "status": "失败", "message": "无法获取Python版本" }) all_passed = False # 3. 检查核心依赖包 core_packages = [ "torch", "torchvision", "sapien", "scipy", "mplib", "gymnasium", "trimesh", "open3d", "imageio", "pydantic", "zarr", "huggingface_hub", "pytorch3d" # 将pytorch3d添加到核心依赖包列表中 ] # 使用importlib.util.find_spec快速检查包是否安装,而不实际导入它们 import importlib.util logging.info("=== 检查核心依赖包 ===") start_total = time.time() for package in core_packages: try: # 仅检查包是否存在,不导入 spec = importlib.util.find_spec(package) if spec is not None: verification_results.append({ "component": f"依赖包 {package}", "status": "成功", "message": f"已安装 {package}" }) else: verification_results.append({ "component": f"依赖包 {package}", "status": "失败", "message": f"未安装 {package}" }) all_passed = False except Exception as e: verification_results.append({ "component": f"依赖包 {package}", "status": "失败", "message": f"检查 {package} 时发生错误: {str(e)}" }) all_passed = False elapsed_total = time.time() - start_total logging.info(f"=== 核心依赖包检查完成 (总耗时: {elapsed_total:.2f}秒) ===") # 5. 检查Vulkan check_vulkan_cmd = "DISPLAY='' vulkaninfo --summary 2>/dev/null || echo 'Vulkan not installed'" success, vulkan_output = run_command(check_vulkan_cmd, "检查Vulkan") if success and "Vulkan not installed" not in vulkan_output: if "Vulkan Instance Version" in vulkan_output: # 提取Vulkan版本 version_match = re.search(r"Vulkan Instance Version: (\d+\.\d+\.\d+)", vulkan_output) version = version_match.group(1) if version_match else "未知" verification_results.append({ "component": "Vulkan", "status": "成功", "message": f"Vulkan 已安装 (版本 {version})" }) else: verification_results.append({ "component": "Vulkan", "status": "成功", "message": "Vulkan 已安装" }) else: verification_results.append({ "component": "Vulkan", "status": "失败", "message": "Vulkan 未安装或安装不正确" }) all_passed = False # 6. 检查资源文件 resource_directories = ["aloha_maniskill_sim", "models"] for directory in resource_directories: check_dir_cmd = f"ls -la {os.path.join(project_root, directory)} 2>/dev/null || echo 'not found'" success, dir_output = run_command(check_dir_cmd, f"检查{directory}资源目录") if success and "not found" not in dir_output: verification_results.append({ "component": f"资源目录 {directory}", "status": "成功", "message": f"{directory} 目录存在" }) else: verification_results.append({ "component": f"资源目录 {directory}", "status": "失败", "message": f"{directory} 目录不存在" }) all_passed = False # 7. 检查mplib修改 logging.info("=== 检查mplib修改 ===") try: # 直接使用Python获取mplib包路径 import importlib.util mplib_spec = importlib.util.find_spec("mplib") if mplib_spec and mplib_spec.origin: # 从模块文件路径获取包路径 mplib_path = os.path.dirname(os.path.dirname(mplib_spec.origin)) planner_path = os.path.join(mplib_path, "mplib", "planner.py") if os.path.exists(planner_path): # 直接读取文件内容 with open(planner_path, 'r') as f: file_content = f.read() # 检查修改1: convex=True 是否已注释 convex_exists = "convex=True," in file_content convex_commented = "# convex=True," in file_content if not convex_exists or convex_commented: verification_results.append({ "component": "mplib修改 - convex=True", "status": "成功", "message": "convex=True 参数已经被注释或移除" }) else: verification_results.append({ "component": "mplib修改 - convex=True", "status": "失败", "message": "convex=True 参数未被注释或移除" }) all_passed = False # 检查修改2: 'or collide' 是否已移除 collide_exists = "or collide or not within_joint_limit" in file_content if not collide_exists: verification_results.append({ "component": "mplib修改 - or collide", "status": "成功", "message": "'or collide' 条件已被移除" }) else: verification_results.append({ "component": "mplib修改 - or collide", "status": "失败", "message": "'or collide' 条件未被移除" }) all_passed = False else: verification_results.append({ "component": "mplib修改", "status": "失败", "message": f"mplib planner.py 文件不存在: {planner_path}" }) all_passed = False else: verification_results.append({ "component": "mplib修改", "status": "失败", "message": "无法找到mplib安装位置" }) all_passed = False except Exception as e: verification_results.append({ "component": "mplib修改", "status": "失败", "message": f"检查mplib修改时发生错误: {str(e)}" }) all_passed = False # 生成最终报告 logging.info("\n=== RoboTwin环境验证总结 ===") success_count = sum(1 for result in verification_results if result["status"] == "成功") total_items = len(verification_results) logging.info(f"总检查项: {total_items}, 成功: {success_count}, 失败: {total_items - success_count}") for result in verification_results: status_symbol = "✓" if result["status"] == "成功" else "✗" logging.info(f"{status_symbol} {result['component']}: {result['message']}") return all_passed, verification_results if __name__ == "__main__": # 确保当前工作目录是项目根目录 os.chdir(project_root) # 运行环境验证 is_healthy, check_results = verify_robotwin_environment() # 准备输出结果 output_result = { "env_check_result": "healthy" if is_healthy else "unhealthy", "check_result_message": check_results } # 输出到终端 print(json.dumps(output_result, indent=2, ensure_ascii=False)) # 保存结果到文件 with open(os.path.join(project_root, 'environment_verify_results.json'), 'w', encoding='utf-8') as f: json.dump(output_result, f, indent=2, ensure_ascii=False) # 退出状态码 sys.exit(0 if is_healthy else 1)