掘金 人工智能 6小时前
阿里云PAI平台实现基于Isaac仿真的机器人操作数据扩增与模仿学习
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文介绍了阿里云PAI人工智能平台如何整合NVIDIA Physical AI套件,实现机器人操作数据的扩增与模仿学习。该过程包括人工演示、数据扩增(利用Isaac Lab Mimic)、数据增强(利用Cosmos-Transfer1模型提升真实感)、模仿学习(训练BC-RNN模型)以及模型测评。通过该流程,尤其是在数据增强后,训练出的模型在面对复杂视觉环境变化时展现出更强的泛化能力,为具身智能机器人视觉操作提供了端到端的解决方案。

🤖 **端到端实现机器人操作数据处理流程**:文章详细阐述了从人工少量演示开始,经过数据扩增、数据增强,到模仿学习和模型测评的完整流程。在阿里云PAI平台上,通过整合Isaac Sim、Isaac Lab、Cosmos等工具栈,开发者可以高效地完成Physical AI模型的训练,尤其是在数据处理和模型训练方面提供了最佳实践。

✨ **数据扩增与增强提升模型泛化能力**:通过Isaac Lab Mimic功能,可以从少量人工演示数据生成大规模仿真数据集;利用Cosmos-Transfer1模型对仿真视频进行增强,显著提升了数据的真实感和多样性。实验结果表明,经过数据扩增与增强后的模型,在光照强度、颜色、纹理等多种环境变化下,成功率大幅提升,有效增强了模型的sim2real泛化能力。

🎓 **模仿学习与模型测评验证效果**:文章介绍了使用BC-RNN模型进行模仿学习,并提供了在不同环境扰动下的模型测评方法。测评结果清晰地展示了数据增强对提升模型鲁棒性和泛化能力的关键作用,为具身智能机器人在复杂真实世界任务中的应用提供了坚实的技术基础。

Physical AI是AI技术演进的一个热门话题,目的是基于Transformer、Diffusion等主流大模型结构,训练得到可以在实际物理空间中指导机器人本体完成各种任务的AI模型。

在Physical AI模型的开发过程中,需要用到遥操采集、数据合成、数据增强、模仿学习、模型测评等多个过程,也会用到Isaac Sim、Isaac Lab、Cosmos等多种工具栈和模型。

基于和NVIDIA Physical AI套件的深度整合,阿里云PAI人工智能平台整合了上述全套工具栈,并在Notebook Gallery中上线了多套开箱即用的最佳实践,供开发者熟悉技术、快速启动Physical AI项目使用。

从本期开始,我们会陆续介绍5款最佳实践:

本期我们介绍基于Isaac仿真的操作动作数据扩增与模仿学习

Physical AI模型模仿学习的一般过程

和LLM一样,Physical AI模型的训练也是神经网络模型,也需要大量数据训练,才能学会特定技能。但是与LLM使用文本数据训练不同,Physical AI模型训练使用的数据,需要是针对特定场景、特定对象、特定任务的多模态数据,一般包含视频、触觉传感信息、关节位置数据。

一般来说,遥控真实机器人完成特定任务,即可获取上述数据。但是,由于真实机器人尚未大规模普及,为了特定任务专门搭建演示场景的成本又较高,一般使用人工演示 + 数据扩增 + 数据增强的方式来获取足够的训练数据,其中:

在PAI的Notebook Gallery中,我们已经预置了一个最佳实践,就是这个过程的一个具体示例:gallery.pai-ml.com/#/preview/d…

下面我们来详细解读这个示例。

人工少量演示

人工少量演示的目的是给后续的数据扩增打下样例。人工演示可以在真实环境下通过遥控真机完成,也可以在仿真环境下通过遥控仿真环境下的本体完成。

在本最佳实践中,我们基于MimicGen进行数据扩增,而MimicGen需要仿真环境提供的真值数据,因此我们在Isaac Lab仿真环境下采集人工演示。以下是在DSW中启动Isaac Lab仿真环境的一个样例:

# 使用键盘遥操仿真环境进行数据集标注cmd = f"PUBLIC_IP=$(curl -s ifconfig.me) /workspace/isaaclab/isaaclab.sh -p /workspace/isaaclab/scripts/environments/teleoperation/teleop_se3_agent.py --task Isaac-Stack-Cube-Franka-IK-Rel-v0 --num_envs 1 --teleop_device keyboard --livestream 1"    print(f"执行命令: {cmd}")!{cmd}

在DSW中执行此命令,稍等片刻,即可通过Isaac Sim WebRTC Streaming Client连接DSW公网IP从而启动遥操界面

通过以下键盘操作可以控制机械臂的动作:

Toggle gripper (open/close): KMove arm along x-axis: W/SMove arm along y-axis: A/DMove arm along z-axis: Q/ERotate arm along x-axis: Z/XRotate arm along y-axis: T/GRotate arm along z-axis: C/V

在完成遥控操作后,还需要使用Mimic Gen管线中的annotate_demos.py对人工演示数据进行子任务标注,这是确保Isaac Lab Mimic功能正常运行的关键步骤。

cmd = f"/workspace/isaaclab/isaaclab.sh -p /workspace/isaaclab/scripts/imitation_learning/isaaclab_mimic/annotate_demos.py \--enable_cameras --task Isaac-Stack-Cube-Franka-IK-Rel-Visuomotor-Mimic-v0 --auto \--input_file {dataset_path} --output_file {annotated_dataset_path} --headless"    print(f"执行命令: {cmd}")!{cmd}

完成子任务标注后,可以通过以下命令,查看人工演示的数据:

# 使用LiveStream预览已标注数据集os.environ["ACCEPT_EULA"] = "Y"cmd = f"PUBLIC_IP=$(curl -s ifconfig.me) /isaac-sim/python.sh /workspace/isaaclab/scripts/tools/replay_demos.py     --dataset_file /mnt/data/isaac_tmp/dataset/annotated_dataset.hdf5     --task Isaac-Stack-Cube-Franka-IK-Rel-v0     --num_envs 1  --livestream 1"print(f"执行命令: {cmd}")!{cmd}

视频演示 >>

数据扩增

使用Isaac Lab Mimic功能,从少量已标注的专家演示中自动扩增额外的演示数据。用户可以调节命令中的num_envs参数,以增加或减少并行环境数量,充分利用显存。还可以调节generation_num_trials,以调节希望扩增的演示数据数量。例如,可以设置num_envs为8,generation_num_trials为1000,这样可以以8份数据为一组,不断重复,直至得到1000份数据。

# 使用Isaac Lab Mimic生成数据集os.environ["ACCEPT_EULA"] = "Y"cmd = f"PUBLIC_IP=$(curl -s ifconfig.me) /workspace/isaaclab/isaaclab.sh -p /workspace/isaaclab/scripts/imitation_learning/isaaclab_mimic/generate_dataset.py \--enable_cameras --headless --num_envs 8 --generation_num_trials 1000 \--input_file {annotated_dataset_path} --output_file {mimic_dataset_path} \--task Isaac-Stack-Cube-Franka-IK-Rel-Visuomotor-Cosmos-Mimic-v0  --livestream 1"    print(f"执行命令: {cmd}")!{cmd}

视频演示 >>

对于规模较大的生成或者mimic任务,也可以利用DLC拉起离线仿真任务;此外,DLC也提供了对Ray的支持,我们也提供了示例,使用DLC-Ray拉起多任务,实现计算资源的编排和充分使用。

# 创建DLC作业。    create_job_resp = dlc_client.create_job(CreateJobRequest().from_map({        'WorkspaceId': workspace_id,        'DisplayName': display_name,        'JobType': 'RayJob',        'ResourceId': resource_quota_id,        'JobSpecs': [            {                #假设我们可用资源是8GPU * 128CPU * 1024G内存,可以按照如下分配head和worker                "Type": "Head",                "Image": image_uri,                "PodCount": 1,                "ResourceConfig": {                    "CPU": "8",       # 指定CPU核心数                    "Memory": "32Gi",   # 指定内存大小(单位:GB)                    "GPU": "0"        # Head节点无需GPU,因此设置为0                }            },            {                "Type": "Worker",                "Image": image_uri,                "PodCount": 2, #模拟两个worker node                "ResourceConfig": {                    "CPU": "56",       # 指定CPU核心数                    "Memory": "448Gi",   # 指定内存大小(单位:GB)                    "GPU": "4"        # 指定GPU数量,与UserCommand中的--num_per_worker一致,以保证一个任务使用一个gpu                }            },        ],        'DataSources': [            {                "DataSourceId": dataset_id,                "MountPath": "/mnt/data",  # 挂载路径            },            {                "DataSourceId": pub_dataset_id,                "MountPath": "/mnt/isaac_assets",  # 挂载路径            }        ],       'UserVpc': {            "VpcId": vpc_id,  # 替换为实际 VPC ID            "SwitchId": switch_id,  # 替换为实际交换机 ID            "SecurityGroupId": security_groupid  # 替换为实际安全组 ID        },                # 根据可用资源以及任务,示例中为每个任务分配1gpu * 14cpu * 96G内存,由于每个worker我们设置了4张卡,因此可以启动4个任务        "UserCommand": f"""        /workspace/isaaclab/isaaclab.sh -p /mnt/data/isaac_tmp/ray_isaac_new.py \        --command "/workspace/isaaclab/isaaclab.sh -p /mnt/data/isaac_tmp/generate_dataset_ray.py \        --enable_cameras --headless --num_envs 8 --generation_num_trials 125 \        --input_file {annotated_dataset_path} --output_file {mimic_dataset_path} \        --task Isaac-Stack-Cube-Franka-IK-Rel-Visuomotor-Cosmos-Mimic-v0 \        --asset_path {asset_root_dir}" \        --gpu 1 \        --cpu 14 \        --memory 96 \        --num_per_worker 4        """    }))        job_id = create_job_resp.body.job_id    wait_for_job_to_terminate(dlc_client, job_id)

数据增强

通过Isaac Lab提供的格式转换功能,可以提取hdf5文件中记录的视频帧,将其转换为mp4文件:

    cmd = f"/workspace/isaaclab/isaaclab.sh -p /workspace/isaaclab/scripts/tools/hdf5_to_mp4.py \    --input_file {input_file} \    --output_dir {output_dir} \    --input_keys {' '.join(input_keys)}"        print(f"执行命令: {cmd}")    result = subprocess.run(cmd, shell=True, capture_output=True, text=True)    print(result.stdout)

不过,仿真得到的mp4文件通常不会十分逼真,例如:视频演示 >>

我们可以通过cosmos-transer1-7b模型将其增强。

首先在Model Gallery中找到cosmos-transfer1-7b模型并将其部署:

在完成部署后,通过以下代码可以调用cosmos服务,从而增强扩增的数据:

import jsonfrom pathlib import Pathimport osimport shutilimport requestsimport gradio_client.client as gradio_clientimport gradio_client.utils as gradio_utilsdef load_prompts_from_file(prompts_file_path):    """从文件中加载提示词。"""    if not prompts_file_path.exists():        return []    with open(prompts_file_path, 'r', encoding='utf-8') as f:        return [line.strip() for line in f if line.strip()]def create_cosmos_request(input_video_path, prompt=None, depth_control_path=None, seg_control_path=None):    """动态创建Cosmos请求参数。"""    request = {        "vis": {            "control_weight": 0.1,        },        "edge": {            "control_weight": 0.3,        },        "depth": {            "control_weight": 0.6,            "input_control": depth_control_path,        },        "seg": {            "control_weight": 0.7,            "input_control": seg_control_path,        },        "input_video_path": input_video_path,        "negative_prompt": "The video captures a game playing, with bad crappy graphics and cartoonish frames. It represents a recording of old outdated games. The lighting looks very fake. The textures are very raw and basic. The geometries are very primitive. The images are very pixelated and of poor CG quality. There are many subtitles in the footage. Overall, the video is unrealistic at all.",        "prompt": prompt,        "sigma_max": 50.0,    }    return requestdef cosmos_sync_with_upload(client, input_video_path, output_dir, prompt, depth_video_path=None, seg_video_path=None):    """上传文件,调用API进行增强,并下载结果。"""        def upload_file(filepath):        if not filepath or not filepath.exists():            return None        print(f"  Uploading: {filepath.name}")        file_descriptor = gradio_utils.handle_file(str(filepath))        upload_result_str = client.predict(file_descriptor, api_name="/upload_file")        return json.loads(upload_result_str).get("path")    remote_main_video_path = upload_file(input_video_path)    if not remote_main_video_path:        print(f"  主视频文件上传失败: {input_video_path.name}")        return False, "主视频上传失败"    remote_depth_path = upload_file(depth_video_path)    remote_seg_path = upload_file(seg_video_path)        request_dict = create_cosmos_request(remote_main_video_path, prompt, remote_depth_path, remote_seg_path)    print("  Sending generation request...")    result = client.predict(json.dumps(request_dict), api_name="/generate_video")        if result and isinstance(result, tuple) and len(result) >= 2:        video_info, message = result        print(video_info, message)        if isinstance(video_info, dict) and "video" in video_info:            video_path = video_info["video"]            if os.path.exists(video_path):                output_file = Path(output_dir) / f"{Path(input_video_path).name}"                import shutil                shutil.copy2(video_path, output_file)                return True, str(output_file)            if video_path.startswith(("http://", "https://")):                import requests                resp = requests.get(video_path, stream=True)                output_file = Path(output_dir) / f"cosmos_{Path(input_video_path).name}"                with open(output_file, "wb") as f:                    for chunk in resp.iter_content(chunk_size=8192):                        if chunk:                            f.write(chunk)                return True, str(output_file)def run_cosmos_augmentation_direct(mp4_input_dir, cosmos_output_dir, prompts_file_path):    """    主执行函数:遍历输入目录,为每个主视频调用增强服务。    """    input_dir = Path(mp4_input_dir)    output_dir = Path(cosmos_output_dir)    output_dir.mkdir(parents=True, exist_ok=True)    main_video_files = sorted(list(input_dir.glob("*cam.mp4")))    if not main_video_files:        print(f"在目录 {input_dir} 中未找到任何 '*cam.mp4' 文件。")        return 0, []            print(f"找到 {len(main_video_files)} 个主视频文件待处理。")    prompts = load_prompts_from_file(prompts_file_path)        client = gradio_client.Client(COSMOS_SERVICE_URL, hf_token=EAS_TOKEN)        success_count = 0    failed_files = []    for idx, main_video_path in enumerate(main_video_files, 1):        print("-" * 50)        print(f"[{idx}/{len(main_video_files)}] Processing: {main_video_path.name}")                base_name = main_video_path.name.replace("_cam.mp4", "")        depth_path = main_video_path.with_name(f"{base_name}_cam_depth.mp4")        seg_path = main_video_path.with_name(f"{base_name}_cam_shaded_segmentation.mp4")        depth_path = depth_path if depth_path.exists() else None        seg_path = seg_path if seg_path.exists() else None                prompt = prompts[(idx-1) % len(prompts)]        ok, result_info = cosmos_sync_with_upload(            client, main_video_path, output_dir, prompt,            depth_video_path=depth_path, seg_video_path=seg_path        )        if ok:            success_count += 1            print(result_info)        else:            failed_files.append(main_video_path.name)    print("\n" + "="*20 + " 处理完成统计 " + "="*20)    print(f"成功: {success_count}/{len(main_video_files)}")    print(f"失败: {len(failed_files)}")    if failed_files:        print("失败文件列表:", failed_files)        return success_count, failed_files# --- 程序入口 ---if __name__ == "__main__":    if not mp4_output_dir.exists():        print(f"错误:输入目录 {mp4_output_dir} 不存在!")    else:        print("开始进行Cosmos视觉增强...")        success_count, _ = run_cosmos_augmentation_direct(            # mp4_output_dir,            "/mnt/data/isaac_tmp/dataset/mimic_dataset_1k_mp4_backup",            cosmos_output_dir,            prompts_file        )        print(f"Cosmos增强流程结束!共成功处理 {success_count} 个文件。")

完成增强后的视频文件可以得到更逼真的视觉效果:视频演示 >>

模仿学习

在本最佳实践中,使用简单的BC-RNN模型作为模仿学习的示例:

def train_model(task_name, dataset_path, model_name):    """    训练视觉运动BC代理    """    cmd = f"cd {workspace_dir} && /workspace/isaaclab/isaaclab.sh -p /workspace/isaaclab/scripts/imitation_learning/robomimic/train.py \    --task {task_name} --algo bc \    --dataset {dataset_path} \    --name {model_name}"        print(f"训练模型: {cmd}")    result = subprocess.run(cmd, shell=True, capture_output=True, text=True)    print(result.stdout)        return f"logs/robomimic/{task_name}/{model_name}"# 选择要使用的数据集if merged_dataset_path.exists():    training_dataset = merged_dataset_path    print(f"使用合并数据集进行训练: {training_dataset}")elif cosmos_dataset_path.exists():    training_dataset = cosmos_dataset_path    print(f"使用Cosmos数据集进行训练: {training_dataset}")elif mimic_dataset_path.exists():    training_dataset = mimic_dataset_path    print(f"使用Mimic数据集进行训练: {training_dataset}")else:    print("没有可用的训练数据集")    training_dataset = Noneif training_dataset:    model_dir = train_model(task_name, training_dataset, model_name)    print(f"模型训练完成,保存在: {model_dir}")

同样,也可以使用DLC启动分布式任务来提高训练效率:

# 创建DLC作业。    create_job_resp = dlc_client.create_job(CreateJobRequest().from_map({        'WorkspaceId': workspace_id,        'DisplayName': display_name,        'JobType': 'PyTorchJob',        'ResourceId': resource_quota_id,        'JobSpecs': [            {                "Type": "Master",                "Image": image_uri,                "PodCount": 1,                # "EcsSpec": ecs_spec,                "ResourceConfig": {                    "CPU": "48",       # 指定CPU核心数                    "Memory": "256Gi",   # 指定内存大小(单位:GB)                    "GPU": "2"        # 指定GPU数量,与UserCommand中的--num_per_worker一致,以保证一个任务使用一个gpu                }            },        ],        'DataSources': [            {                "DataSourceId": dataset_id,                "MountPath": "/mnt/data",  # 挂载路径            },            {                "DataSourceId": pub_dataset_id,                "MountPath": "/mnt/isaac_assets",  # 挂载路径            }        ],       'UserVpc': {            "VpcId": vpc_id,  # 替换为实际 VPC ID            "SwitchId": switch_id,  # 替换为实际交换机 ID            "SecurityGroupId": security_groupid  # 替换为实际安全组 ID        },        "UserCommand": f"cd {workspace_dir} && \        /workspace/isaaclab/isaaclab.sh -p /workspace/isaaclab/scripts/imitation_learning/robomimic/train.py \        --task {task_name} --algo bc \        --dataset {dataset_path} \        --name {model_name} && \        sleep 30",    }))    job_id = create_job_resp.body.job_id    wait_for_job_to_terminate(dlc_client, job_id)

完成训练后,BC-RNN模型即可获得模仿人工演示任务的能力:

模型测评

可以使用以下代码,在不同的环境条件变化下,测试不同模型模仿人工演示任务的成功率

import globdef evaluate_model(task_name, model_dir, log_dir, num_rollouts=15):    """    评估最新的训练权重    """        timestamp_dirs = glob.glob(f"{model_dir}/[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]")    timestamp_dirs.sort(key=lambda x: os.path.getmtime(x), reverse=True)    latest_dir = timestamp_dirs[0]    model_dir_path = f"{latest_dir}/models"        cmd = f"cd {workspace_dir} && /workspace/isaaclab/isaaclab.sh -p /workspace/isaaclab/scripts/imitation_learning/robomimic/robust_eval.py \    --task {task_name} \    --input_dir {model_dir_path} \    --log_dir {log_dir} \    --log_file result \    --enable_cameras \    --livestream 1 \    --seeds 0 \    --num_rollouts {num_rollouts} \    --headless"        print(f"评估模型: {cmd}")    print("\n 注意: 评估过程可能需要很长时间(超过一天),这是正常现象。")    result = subprocess.run(cmd, shell=True, capture_output=True, text=True)    print(result.stdout)# 评估设置说明evaluation_settings = {    "Vanilla": "与Mimic数据生成时完全相同的设置",    "Light Intensity": "光照强度/亮度变化,其他方面保持不变",    "Light Color": "光照颜色变化,其他方面保持不变",    "Light Texture (Background)": "光照纹理/背景变化,其他方面保持不变",    "Table Texture": "桌面视觉纹理变化,其他方面保持不变",    "Robot Arm Texture": "机器人手臂视觉纹理变化,其他方面保持不变"}print("评估设置说明:")for setting, description in evaluation_settings.items():    print(f"- {setting}: {description}")# 执行评估(如果模型已训练)# 假如您希望使用预训练的模型,请取消下列路径其中之一的注释# model_name = "franka_stack_mimic_1k_table_only"# model_name = "franka_stack_mimic_2k_table_only"# model_name = "franka_stack_mimic_cosmos_2k_table_only"model_dir = f"{workspace_dir}/logs/robomimic/{task_name}/{model_name}"if Path(model_dir).exists():    log_dir = f"robust_results/{model_name}"    evaluate_model(task_name, model_dir, log_dir)else:    print("模型目录不存在,跳过评估步骤")

如果一切正常,可以得到以下测试结果:

测试结果显示,在大部分环境条件变化下,使用数据扩增+数据增强训练得到的模型,总能得到最好的结果。尤其在光照强度、光照颜色和光照纹理变化的条件下,使用数据扩增+数据增强得到的模型,成功率得到了大幅提升。

总结

在本最佳实践中,基于阿里云 PAI 平台的特性,我们实现了基于Isaac仿真的操作动作数据扩增与模仿学习,包含从人工少量演示、数据扩增、数据增强、模仿学习再到模型测评的端到端实现:

Cosmos 数据增强后的训练模型在各个场景下的成功率均有较高提升,这一工作流程为具身智能的机器人视觉操作提供了一套完整的技术解决方案,在 sim2real 的训练过程中提高了模型在复杂视觉环境下的泛化能力。

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Physical AI 阿里云PAI Isaac Sim 模仿学习 数据增强 机器人 具身智能 仿真 NVIDIA Physical AI Alibaba Cloud PAI Isaac Sim Imitation Learning Data Augmentation Robotics Embodied AI Simulation NVIDIA
相关文章