276 lines
12 KiB
Python
276 lines
12 KiB
Python
import os
|
||
import subprocess
|
||
import sys
|
||
import json
|
||
import logging
|
||
|
||
# 获取当前脚本所在目录的父目录(项目根目录)
|
||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||
project_root = os.path.dirname(script_dir)
|
||
|
||
# 配置日志 - 使用相对路径
|
||
log_file = os.path.join(project_root, 'environment_setup.log')
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||
handlers=[
|
||
logging.StreamHandler(),
|
||
logging.FileHandler(log_file)
|
||
]
|
||
)
|
||
|
||
def run_command(command, description=None, show_output=False):
|
||
"""运行命令并实时打印输出"""
|
||
if description:
|
||
logging.info(f"=== {description} ===")
|
||
logging.info(f"执行: {command}")
|
||
|
||
try:
|
||
# 创建进程并实时获取输出
|
||
process = subprocess.Popen(
|
||
['/bin/bash', '-c', command],
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.STDOUT, # 将stderr重定向到stdout
|
||
universal_newlines=True,
|
||
bufsize=1
|
||
)
|
||
|
||
# 收集所有输出
|
||
all_output = []
|
||
|
||
# 实时读取并显示输出
|
||
for line in process.stdout:
|
||
line = line.rstrip()
|
||
all_output.append(line)
|
||
print(line) # 直接打印到控制台
|
||
|
||
# 等待进程结束
|
||
process.wait()
|
||
|
||
# 检查返回码
|
||
if process.returncode == 0:
|
||
logging.info(f"命令执行成功: {command}")
|
||
return True, '\n'.join(all_output)
|
||
else:
|
||
logging.error(f"命令执行失败: {command}")
|
||
return False, '\n'.join(all_output)
|
||
|
||
except Exception as e:
|
||
logging.error(f"命令执行异常: {command}")
|
||
logging.error(f"异常信息: {str(e)}")
|
||
return False, str(e)
|
||
|
||
def setup_environment():
|
||
"""执行环境配置步骤"""
|
||
# 确保当前工作目录是项目根目录
|
||
os.chdir(project_root)
|
||
|
||
# 检查环境检查结果是否存在
|
||
results_file = os.path.join(project_root, 'environment_check_results.json')
|
||
try:
|
||
with open(results_file, 'r') as f:
|
||
check_data = json.load(f)
|
||
dependencies_met = check_data.get("dependencies_met", False)
|
||
check_results = check_data.get("check_results", [])
|
||
except (FileNotFoundError, json.JSONDecodeError):
|
||
logging.warning("未找到环境检查结果,请先运行 check_environment.py")
|
||
dependencies_met = False
|
||
check_results = []
|
||
|
||
results = []
|
||
|
||
# 记录检查结果
|
||
results.append({
|
||
"step": "环境依赖检查",
|
||
"success": dependencies_met,
|
||
"output": "环境依赖检查已完成",
|
||
"details": check_results
|
||
})
|
||
|
||
if not dependencies_met:
|
||
logging.warning("环境检查未通过,但将继续配置环境,可能会遇到兼容性问题。")
|
||
|
||
# 获取conda路径(提前获取,避免变量引用错误)
|
||
success, conda_path_output = run_command("conda info --base", "获取conda基础路径")
|
||
if success:
|
||
conda_path = conda_path_output.strip()
|
||
logging.info(f"Conda 基础路径: {conda_path}")
|
||
else:
|
||
logging.error("无法获取conda路径,环境配置失败")
|
||
results.append({"step": "获取conda路径", "success": False, "output": "无法获取conda基础路径"})
|
||
return {
|
||
"dependencies_check": {
|
||
"passed": dependencies_met,
|
||
"details": check_results
|
||
},
|
||
"installation": {
|
||
"total_steps": 1,
|
||
"success_steps": 0,
|
||
"results": results[1:]
|
||
}
|
||
}
|
||
|
||
# 步骤0: 安装Vulkan
|
||
success, output = run_command(
|
||
"sudo apt install -y libvulkan1 mesa-vulkan-drivers vulkan-tools",
|
||
"安装Vulkan"
|
||
)
|
||
results.append({"step": "安装Vulkan", "success": success, "output": output})
|
||
|
||
# 步骤1: 检查是否已存在RoboTwin环境,如果有则直接激活,否则创建新环境
|
||
check_env_cmd = "conda env list | grep -w RoboTwin || echo ''"
|
||
success, env_check_output = run_command(
|
||
check_env_cmd,
|
||
"检查是否存在RoboTwin环境"
|
||
)
|
||
|
||
# 更严格地验证环境是否存在
|
||
if success and env_check_output.strip() and "RoboTwin" in env_check_output:
|
||
# 进一步验证 - 尝试列出环境中的包
|
||
validate_cmd = "conda activate RoboTwin && conda list || echo 'fail'"
|
||
success_validate, validate_output = run_command(
|
||
f"source {conda_path}/etc/profile.d/conda.sh && {validate_cmd}",
|
||
"验证RoboTwin环境"
|
||
)
|
||
|
||
if success_validate and "fail" not in validate_output:
|
||
logging.info("RoboTwin环境验证成功,将直接激活")
|
||
results.append({"step": "检查conda环境", "success": True, "output": "RoboTwin环境已存在,验证成功"})
|
||
else:
|
||
# 环境存在但无法激活,可能已损坏,重新创建
|
||
logging.warning("RoboTwin环境存在但无法激活,将重新创建")
|
||
remove_cmd = "conda env remove -n RoboTwin -y"
|
||
run_command(remove_cmd, "移除损坏的RoboTwin环境")
|
||
create_cmd = "conda create -n RoboTwin python=3.8 -y"
|
||
success, output = run_command(create_cmd, "创建conda环境")
|
||
results.append({"step": "创建conda环境", "success": success, "output": output})
|
||
else:
|
||
# 创建新的conda环境
|
||
logging.info("RoboTwin环境不存在,将创建新环境")
|
||
success, output = run_command(
|
||
"conda create -n RoboTwin python=3.8 -y",
|
||
"创建conda环境"
|
||
)
|
||
results.append({"step": "创建conda环境", "success": success, "output": output})
|
||
|
||
# 定义激活命令(conda_path已经在前面获取了)
|
||
activate_command = f"source {conda_path}/etc/profile.d/conda.sh && conda activate RoboTwin && "
|
||
|
||
# 步骤2: 安装基本依赖
|
||
logging.info("开始安装基本依赖包...")
|
||
pip_install_cmd = activate_command + "pip install torch==2.4.1 torchvision sapien==3.0.0b1 scipy==1.10.1 mplib==0.1.1 gymnasium==0.29.1 trimesh==4.4.3 open3d==0.18.0 imageio==2.34.2 pydantic zarr openai huggingface_hub==0.25.0"
|
||
success, output = run_command(
|
||
pip_install_cmd,
|
||
"安装基本依赖",
|
||
show_output=True
|
||
)
|
||
results.append({"step": "安装基本依赖", "success": success, "output": output})
|
||
|
||
# 步骤3: 安装pytorch3d
|
||
logging.info("开始安装pytorch3d...")
|
||
pytorch3d_cmd = activate_command + "cd third_party/pytorch3d_simplified && pip install -e . && cd ../.."
|
||
success, output = run_command(
|
||
pytorch3d_cmd,
|
||
"安装pytorch3d",
|
||
show_output=True
|
||
)
|
||
results.append({"step": "安装pytorch3d", "success": success, "output": output})
|
||
|
||
# 步骤4: 检查资源文件是否已存在,如果已存在则跳过下载
|
||
check_assets_cmd = "ls -la aloha_urdf/ main_models/ 2>/dev/null || echo '资源文件不存在'"
|
||
success, assets_check = run_command(
|
||
check_assets_cmd,
|
||
"检查资源文件是否已存在"
|
||
)
|
||
|
||
if success and "资源文件不存在" not in assets_check and "aloha_urdf" in assets_check and "main_models" in assets_check:
|
||
logging.info("资源文件已存在,跳过下载步骤")
|
||
results.append({"step": "下载资源", "success": True, "output": "资源文件已存在,跳过下载步骤"})
|
||
else:
|
||
# 需要下载资源
|
||
logging.info("开始下载资源文件...")
|
||
# 根据当前 git 分支决定解压哪个 zip 文件
|
||
branch_cmd = "$(git rev-parse --abbrev-ref HEAD)"
|
||
unzip_cmd = f'if [ "{branch_cmd}" = "main" ]; then unzip -o aloha_urdf.zip && unzip -o main_models.zip; else unzip -o aloha_urdf.zip && unzip -o gpt_models.zip; fi'
|
||
download_cmd = activate_command + f"python ./script/download_asset.py && {unzip_cmd}"
|
||
|
||
success, output = run_command(
|
||
download_cmd,
|
||
"下载资源"
|
||
)
|
||
results.append({"step": "下载资源", "success": success, "output": output})
|
||
|
||
# 步骤5: 修改 mplib 库代码
|
||
# 查找 mplib 位置
|
||
logging.info("开始修改 mplib 库代码...")
|
||
mplib_locate_cmd = activate_command + "pip show mplib | grep Location | awk '{print $2}'"
|
||
success, mplib_path = run_command(mplib_locate_cmd, "查找mplib位置")
|
||
|
||
if success and mplib_path:
|
||
mplib_path = mplib_path.strip()
|
||
planner_path = os.path.join(mplib_path, "mplib", "planner.py")
|
||
logging.info(f"mplib planner.py 路径: {planner_path}")
|
||
|
||
if os.path.exists(planner_path):
|
||
# 首先检查文件内容,看是否已经修改过
|
||
check_content_cmd = f"cat {planner_path}"
|
||
success, file_content = run_command(check_content_cmd, "检查mplib代码内容", show_output=False)
|
||
|
||
# 步骤5.1: 检查并移除 convex=True
|
||
convex_exists = "convex=True" in file_content
|
||
if convex_exists:
|
||
logging.info("正在移除 convex=True 参数...")
|
||
sed_convex_cmd = f"sed -i 's/convex=True,/# convex=True,/' {planner_path}"
|
||
success, output = run_command(
|
||
sed_convex_cmd,
|
||
"修改mplib库代码 - 移除convex=True"
|
||
)
|
||
results.append({"step": "修改mplib - 移除convex=True", "success": success, "output": output})
|
||
else:
|
||
logging.info("mplib中的convex=True已经被移除,跳过此步骤")
|
||
results.append({"step": "修改mplib - 移除convex=True", "success": True, "output": "已经被移除,无需修改"})
|
||
|
||
# 步骤5.2: 检查并移除 or collide
|
||
collide_exists = "or collide or not within_joint_limit" in file_content
|
||
if collide_exists:
|
||
logging.info("正在移除 or collide 条件...")
|
||
sed_collide_cmd = f"sed -i 's/or collide or not within_joint_limit/or not within_joint_limit/' {planner_path}"
|
||
success, output = run_command(
|
||
sed_collide_cmd,
|
||
"修改mplib库代码 - 移除 or collide"
|
||
)
|
||
results.append({"step": "修改mplib - 移除 or collide", "success": success, "output": output})
|
||
else:
|
||
logging.info("mplib中的'or collide'已经被移除,跳过此步骤")
|
||
results.append({"step": "修改mplib - 移除 or collide", "success": True, "output": "已经被移除,无需修改"})
|
||
else:
|
||
logging.error(f"mplib planner.py 文件不存在: {planner_path}")
|
||
results.append({"step": "查找mplib文件", "success": False, "output": f"文件不存在: {planner_path}"})
|
||
else:
|
||
logging.error("无法找到mplib安装路径")
|
||
results.append({"step": "查找mplib路径", "success": False, "output": "无法找到mplib安装路径"})
|
||
|
||
# 总结安装结果
|
||
logging.info("\n=== 环境配置总结 ===")
|
||
success_count = sum(1 for result in results[1:] if result["success"]) # 不包括依赖检查
|
||
total_steps = len(results) - 1 # 不包括依赖检查
|
||
logging.info(f"总步骤: {total_steps}, 成功: {success_count}, 失败: {total_steps - success_count}")
|
||
|
||
for result in results[1:]: # 不包括依赖检查
|
||
status = "✓" if result["success"] else "✗"
|
||
logging.info(f"{status} {result['step']}")
|
||
|
||
return {
|
||
"dependencies_check": {
|
||
"passed": dependencies_met,
|
||
"details": check_results
|
||
},
|
||
"installation": {
|
||
"total_steps": total_steps,
|
||
"success_steps": success_count,
|
||
"results": results[1:] # 不包括依赖检查
|
||
}
|
||
}
|
||
|
||
if __name__ == "__main__":
|
||
setup_environment() |