RoboTwin_image/script/env_installer.py
2025-07-02 03:13:07 +00:00

276 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import subprocess
import sys
import json
import logging
# 获取当前脚本所在目录的父目录(项目根目录)
script_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(script_dir)
# 配置日志 - 使用相对路径
log_file = os.path.join(project_root, 'environment_setup.log')
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler(log_file)
]
)
def run_command(command, description=None, show_output=False):
"""运行命令并实时打印输出"""
if description:
logging.info(f"=== {description} ===")
logging.info(f"执行: {command}")
try:
# 创建进程并实时获取输出
process = subprocess.Popen(
['/bin/bash', '-c', command],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # 将stderr重定向到stdout
universal_newlines=True,
bufsize=1
)
# 收集所有输出
all_output = []
# 实时读取并显示输出
for line in process.stdout:
line = line.rstrip()
all_output.append(line)
print(line) # 直接打印到控制台
# 等待进程结束
process.wait()
# 检查返回码
if process.returncode == 0:
logging.info(f"命令执行成功: {command}")
return True, '\n'.join(all_output)
else:
logging.error(f"命令执行失败: {command}")
return False, '\n'.join(all_output)
except Exception as e:
logging.error(f"命令执行异常: {command}")
logging.error(f"异常信息: {str(e)}")
return False, str(e)
def setup_environment():
"""执行环境配置步骤"""
# 确保当前工作目录是项目根目录
os.chdir(project_root)
# 检查环境检查结果是否存在
results_file = os.path.join(project_root, 'environment_check_results.json')
try:
with open(results_file, 'r') as f:
check_data = json.load(f)
dependencies_met = check_data.get("dependencies_met", False)
check_results = check_data.get("check_results", [])
except (FileNotFoundError, json.JSONDecodeError):
logging.warning("未找到环境检查结果,请先运行 check_environment.py")
dependencies_met = False
check_results = []
results = []
# 记录检查结果
results.append({
"step": "环境依赖检查",
"success": dependencies_met,
"output": "环境依赖检查已完成",
"details": check_results
})
if not dependencies_met:
logging.warning("环境检查未通过,但将继续配置环境,可能会遇到兼容性问题。")
# 获取conda路径提前获取避免变量引用错误
success, conda_path_output = run_command("conda info --base", "获取conda基础路径")
if success:
conda_path = conda_path_output.strip()
logging.info(f"Conda 基础路径: {conda_path}")
else:
logging.error("无法获取conda路径环境配置失败")
results.append({"step": "获取conda路径", "success": False, "output": "无法获取conda基础路径"})
return {
"dependencies_check": {
"passed": dependencies_met,
"details": check_results
},
"installation": {
"total_steps": 1,
"success_steps": 0,
"results": results[1:]
}
}
# 步骤0: 安装Vulkan
success, output = run_command(
"sudo apt install -y libvulkan1 mesa-vulkan-drivers vulkan-tools",
"安装Vulkan"
)
results.append({"step": "安装Vulkan", "success": success, "output": output})
# 步骤1: 检查是否已存在RoboTwin环境如果有则直接激活否则创建新环境
check_env_cmd = "conda env list | grep -w RoboTwin || echo ''"
success, env_check_output = run_command(
check_env_cmd,
"检查是否存在RoboTwin环境"
)
# 更严格地验证环境是否存在
if success and env_check_output.strip() and "RoboTwin" in env_check_output:
# 进一步验证 - 尝试列出环境中的包
validate_cmd = "conda activate RoboTwin && conda list || echo 'fail'"
success_validate, validate_output = run_command(
f"source {conda_path}/etc/profile.d/conda.sh && {validate_cmd}",
"验证RoboTwin环境"
)
if success_validate and "fail" not in validate_output:
logging.info("RoboTwin环境验证成功将直接激活")
results.append({"step": "检查conda环境", "success": True, "output": "RoboTwin环境已存在验证成功"})
else:
# 环境存在但无法激活,可能已损坏,重新创建
logging.warning("RoboTwin环境存在但无法激活将重新创建")
remove_cmd = "conda env remove -n RoboTwin -y"
run_command(remove_cmd, "移除损坏的RoboTwin环境")
create_cmd = "conda create -n RoboTwin python=3.8 -y"
success, output = run_command(create_cmd, "创建conda环境")
results.append({"step": "创建conda环境", "success": success, "output": output})
else:
# 创建新的conda环境
logging.info("RoboTwin环境不存在将创建新环境")
success, output = run_command(
"conda create -n RoboTwin python=3.8 -y",
"创建conda环境"
)
results.append({"step": "创建conda环境", "success": success, "output": output})
# 定义激活命令conda_path已经在前面获取了
activate_command = f"source {conda_path}/etc/profile.d/conda.sh && conda activate RoboTwin && "
# 步骤2: 安装基本依赖
logging.info("开始安装基本依赖包...")
pip_install_cmd = activate_command + "pip install torch==2.4.1 torchvision sapien==3.0.0b1 scipy==1.10.1 mplib==0.1.1 gymnasium==0.29.1 trimesh==4.4.3 open3d==0.18.0 imageio==2.34.2 pydantic zarr openai huggingface_hub==0.25.0"
success, output = run_command(
pip_install_cmd,
"安装基本依赖",
show_output=True
)
results.append({"step": "安装基本依赖", "success": success, "output": output})
# 步骤3: 安装pytorch3d
logging.info("开始安装pytorch3d...")
pytorch3d_cmd = activate_command + "cd third_party/pytorch3d_simplified && pip install -e . && cd ../.."
success, output = run_command(
pytorch3d_cmd,
"安装pytorch3d",
show_output=True
)
results.append({"step": "安装pytorch3d", "success": success, "output": output})
# 步骤4: 检查资源文件是否已存在,如果已存在则跳过下载
check_assets_cmd = "ls -la aloha_urdf/ main_models/ 2>/dev/null || echo '资源文件不存在'"
success, assets_check = run_command(
check_assets_cmd,
"检查资源文件是否已存在"
)
if success and "资源文件不存在" not in assets_check and "aloha_urdf" in assets_check and "main_models" in assets_check:
logging.info("资源文件已存在,跳过下载步骤")
results.append({"step": "下载资源", "success": True, "output": "资源文件已存在,跳过下载步骤"})
else:
# 需要下载资源
logging.info("开始下载资源文件...")
# 根据当前 git 分支决定解压哪个 zip 文件
branch_cmd = "$(git rev-parse --abbrev-ref HEAD)"
unzip_cmd = f'if [ "{branch_cmd}" = "main" ]; then unzip -o aloha_urdf.zip && unzip -o main_models.zip; else unzip -o aloha_urdf.zip && unzip -o gpt_models.zip; fi'
download_cmd = activate_command + f"python ./script/download_asset.py && {unzip_cmd}"
success, output = run_command(
download_cmd,
"下载资源"
)
results.append({"step": "下载资源", "success": success, "output": output})
# 步骤5: 修改 mplib 库代码
# 查找 mplib 位置
logging.info("开始修改 mplib 库代码...")
mplib_locate_cmd = activate_command + "pip show mplib | grep Location | awk '{print $2}'"
success, mplib_path = run_command(mplib_locate_cmd, "查找mplib位置")
if success and mplib_path:
mplib_path = mplib_path.strip()
planner_path = os.path.join(mplib_path, "mplib", "planner.py")
logging.info(f"mplib planner.py 路径: {planner_path}")
if os.path.exists(planner_path):
# 首先检查文件内容,看是否已经修改过
check_content_cmd = f"cat {planner_path}"
success, file_content = run_command(check_content_cmd, "检查mplib代码内容", show_output=False)
# 步骤5.1: 检查并移除 convex=True
convex_exists = "convex=True" in file_content
if convex_exists:
logging.info("正在移除 convex=True 参数...")
sed_convex_cmd = f"sed -i 's/convex=True,/# convex=True,/' {planner_path}"
success, output = run_command(
sed_convex_cmd,
"修改mplib库代码 - 移除convex=True"
)
results.append({"step": "修改mplib - 移除convex=True", "success": success, "output": output})
else:
logging.info("mplib中的convex=True已经被移除跳过此步骤")
results.append({"step": "修改mplib - 移除convex=True", "success": True, "output": "已经被移除,无需修改"})
# 步骤5.2: 检查并移除 or collide
collide_exists = "or collide or not within_joint_limit" in file_content
if collide_exists:
logging.info("正在移除 or collide 条件...")
sed_collide_cmd = f"sed -i 's/or collide or not within_joint_limit/or not within_joint_limit/' {planner_path}"
success, output = run_command(
sed_collide_cmd,
"修改mplib库代码 - 移除 or collide"
)
results.append({"step": "修改mplib - 移除 or collide", "success": success, "output": output})
else:
logging.info("mplib中的'or collide'已经被移除,跳过此步骤")
results.append({"step": "修改mplib - 移除 or collide", "success": True, "output": "已经被移除,无需修改"})
else:
logging.error(f"mplib planner.py 文件不存在: {planner_path}")
results.append({"step": "查找mplib文件", "success": False, "output": f"文件不存在: {planner_path}"})
else:
logging.error("无法找到mplib安装路径")
results.append({"step": "查找mplib路径", "success": False, "output": "无法找到mplib安装路径"})
# 总结安装结果
logging.info("\n=== 环境配置总结 ===")
success_count = sum(1 for result in results[1:] if result["success"]) # 不包括依赖检查
total_steps = len(results) - 1 # 不包括依赖检查
logging.info(f"总步骤: {total_steps}, 成功: {success_count}, 失败: {total_steps - success_count}")
for result in results[1:]: # 不包括依赖检查
status = "" if result["success"] else ""
logging.info(f"{status} {result['step']}")
return {
"dependencies_check": {
"passed": dependencies_met,
"details": check_results
},
"installation": {
"total_steps": total_steps,
"success_steps": success_count,
"results": results[1:] # 不包括依赖检查
}
}
if __name__ == "__main__":
setup_environment()