作者:昇腾实战派,Ming-L

项目背景

随着文生图模型的发展,出现了更多“小而美”的文生图后训练模型,通过使用flux模型以及DanceGRPO框架做生成模型强化学习,能够在模型尺寸较小的情况下,提升生成图片的质量与效果。

版本环境与依赖清单

安装依赖套件如NPU固件、NPU驱动、CANN、transformers、pytorch、torch-npu
依赖版本如下:

依赖软件 版本
昇腾NPU驱动 商发版本
昇腾NPU固件 商发版本
CANN CANN8.2.RC1.B080
Python 3.10
PyTorch 2.6.0
torch_npu 2.6.0
torchvision 0.21.0
transformers 4.53.0

模型部署流程

  1. 拉取代码仓GitHub - XueZeyue/DanceGRPO
git clone https://github.com/XueZeyue/DanceGRPO.git
  1. 下载权重(具体看READEME.md):FLUX、HPS、open_clip
  2. 其他依赖安装:
    • 仓库未实现懒加载,所以会导入许多用不到的三方库,可以直接注释如:
# DanceGRPO/fastvideo/models/mochi_hf/modeling_mochi.py
# 注释掉以下
from liger_kernel.ops.swiglu import LigerSiLUMulFunction;flash_attn_no_pad.py

# flash_attn_no_pad.py
# 注释掉flash_attn的导包,flash_attn_no_pad注释掉中间逻辑,直接return;
cd DanceGRPO
./env_setup.sh fastvideo
  1. 修改:preprocess_flux_embedding.py:
# # 引入torch_npu
import torch_npu
from torch_npu.contrib import transfer_to_npu

# "./data/flux"写死的路径改成参数
# 原 : pipe = FluxPipeline.from_pretrained("./data/flux", torch_dtype=torch.bfloat16).to(device)
pipe = FluxPipeline.from_pretrained(args.model_path, torch_dtype=torch.bfloat16).to(device)
  1. 修改:train_grpo_flux.py
# # 引入torch_npu
import torch_npu
from torch_npu.contrib import transfer_to_npu
  1. 执行Flux GRPO脚本:
bash ./scripts/finetune/finetune_flux_grpo.sh

模型验证测试

模型的验证测试主要完成其精度对齐,此前大模型精度对齐主要涉及预训练、微调、推理等独立场景,常规精度对齐流程关注单个模型的前向、反向和长稳loss情况。

而强化学习作为多模型多流程融合的训练方式,与以往的单个模型精度对齐场景存在差异。对于GRPO场景,分为推理、reward、训练三个阶段,三个阶段使用的模型,框架可能存在差异,所以需要关注流程中每个模型的数据流是否对齐。

目前采用的方法是先将各个阶段单独抽取出来,对齐各个阶段的精度,再全流程去对齐,在过程中记录必要的对齐数据。

随机性固定

load版本,更准确,但操作也更麻烦

此处固定方法是把程序中涉及随机性的变量,通过torch.save,torch.load的方式,让NPU和GPU上保持一致,具体操作如下

  1. 数据顺序固定:关闭shuffle,固定训练的数据顺序
# fastvideo/train_grpo_flux.py中,shuffle设为false

sampler = DistributedSampler(train_dataset, rank=rank, num_replicas=world_size, shuffle=False, seed=args.sampler_seed)
  1. prev_sample 固定,GPU代码修改如下,在GPU上运行后保存下来。
#  1. 添加全局变量COFF_STEP,控制coff生成的step数  
COFF_STEP = 0  

def flux_step():  
    global COFF_STEP  
    ......  
    if grpo and prev_sample is None:  
    coff = torch.randn_like(prev_sample_mean)  
    torch.save(coff, f"saves/coff_{COFF_STEP}_{torch.distributed.get_rank()}.pt")  
    prev_sample = prev_sample_mean + coff * std_dev_t  
    COFF_STEP += 1

NPU上加载

coff = torch.load(f"saves/coff_{COFF_STEP}_{torch.distributed.get_rank()}.pt", map_location=f"cuda:{torch.cuda.current_device()}")
```3. input_latents 固定,GPU代码修改如下,在GPU上运行后保存下来。

```python
def sample_reference_model(  
    args,  
    device,  
    transformer,  
    vae,  
    encoder_hidden_states,  
    pooled_prompt_embeds,  
    text_ids,  
    reward_model,  
    tokenizer,  
    caption,  
    preprocess_val,  
    step,  # # # 增加参数输入,用于序列文件记录,找到相关调用处,加上该入参  
)  

def train_one_step(  
    args,  
    device,  
    transformer,  
    vae,  
    reward_model,  
    tokenizer,  
    optimizer,  
    lr_scheduler,  
    loader,  
    noise_scheduler,  
    max_grad_norm,  
    preprocess_val,  
    step,  # # # 增加参数输入,用于序列文件记录,找到相关调用处,加上该入参  
) 

def sample_reference_model();  
     ......  
    if args.init_same_noise:  
        input_latents = torch.randn(  
                (1, IN_CHANNELS, latent_h, latent_w),  # (c,t,h,w)  
                device=device,  
                dtype=torch.bfloat16,  
            )  
        torch.save(input_latents, f"saves/input_latents_{step}_{torch.distributed.get_rank()}.pt")

NPU上加载

input_latents = torch.load(f"saves/input_latents_{step}_{torch.distributed.get_rank()}.pt", map_location=f'cuda:{device}')
  1. perms 固定;
    GPU代码修改如下,在GPU上运行后保存下来。
def train_one_step():  
    ......  
    perms = torch.stack(  
            [  
                torch.randperm(len(samples["timesteps"][0]))  
                for _ in range(batch_size)  
            ]  
        ).to(device)  
    torch.save(perms, f"saves/perms_{step}_{torch.distributed.get_rank()}.pt")

NPU上加载

perms = torch.load(f"saves/perms_{step}_{torch.distributed.get_rank()}.pt", map_location=f'{device}')

使用CPU进行随机性固定

固定seed可用于模型训练复现,但是不同的设备如GPU和NPU在同样的seed下生成的值也是不一样的,但是不同设备上都有CPU,因此可以固定seed后使用CPU生成张量,以此让GPU和NPU上生成的张量输入保持相同。

  1. fastvideo/train_grpo_flux.py:91修改为:
if grpo and prev_sample is None:  
	prev_sample = prev_sample_mean + torch.randn_like(prev_sample_mean.cpu()).to(  
		prev_sample_mean.device) * std_dev_t
  1. fastvideo/train_grpo_flux.py:270修改为:
if args.init_same_noise:  
	input_latents = torch.randn(  
		(1, IN_CHANNELS, latent_h, latent_w),  #  (c,t,h,w)  
		dtype=torch.bfloat16,  
	).to(device)
  1. fastvideo/train_grpo_flux.py:657修改为:
sampler = DistributedSampler(  
		train_dataset, rank=rank, num_replicas=world_size, shuffle=False, seed=args.sampler_seed  
	)
  1. fastvideo/train_grpo_flux.py:1061增加:
import random  
def seed_all_own(seed=1234, mode=True, is_gpu=True):  
	random.seed(seed)  
	os.environ['PYTHONHASHSEED'] = str(seed)  
	os.environ['GLOBAL_SEED'] = str(seed)  
	np.random.seed(seed)  
	torch.manual_seed(seed)  
	torch.use_deterministic_algorithms(mode)  
	if is_gpu:  
		os.environ['CUBLAS_WORKSPACE_CONFIG'] = ':4096:8'  
		os.environ['CUDA_LAUNCH_BLOCKING'] = '1'  
		torch.cuda.manual_seed_all(seed)  
		torch.cuda.manual_seed(seed)  
		torch.backends.cudnn.deterministic = True  
		torch.backends.cudnn.enable = False  
		torch.backends.cudnn.benchmark = False  
	else:  
		import torch_npu  
		os.environ['HCCL_DETERMINISTIC'] = 'true'  
		os.environ['CLOSE_MATMUL_K_SHIFT'] = '1'  
		torch_npu.npu.manual_seed_all(seed)  
		torch_npu.npu.manual_seed(seed)  
	print("====== seed all ========")  
seed_all_own(is_gpu=False)  
from msprobe.pytorch import seed_all  
seed_all(mode=True)

推理流程对齐

推理流程对齐的内容主要是GRPO去噪后生成的latents,latents解码成图片后对比:固定随机性,将GPU、NPU上使用相同noise的latents使用vae解码,再保存,此时只需要对比生成图片的差异。 关键代码:decoded_image[0].save(img_path),这里会保存训练过程中,模型在每个step,每次generation中生成的图片,可以直观的看到训练过程中的的变化。

过程会生成多个图片,每个step,每个rank会处理一个prompt,然后生成12(参数: num_generations)个相似的图片进行比较,在GPU和NPU上分别保存这些图片,然后进行对比,对比同序号的,具体保存图片代码逻辑如下,对比效果参考附录A. NPU/GPU 推理流程对齐效果

# # # # sample_reference_model函数  
def sample_reference_model():  
    with torch.inference_mode():  
        with torch.autocast("cuda", dtype=torch.bfloat16):  
            latents = unpack_latents(latents, h, w, 8)  
                latents = (latents / 0.3611) + 0.1159  
                image = vae.decode(latents, return_dict=False)[0]  
                decoded_image = image_processor.postprocess(  
                image)  
        decoded_image[0].save(f"./images/flux_{step}_{rank}_{index}.png")

Reward Model对齐

DanceGRPO模型涉及多个model,强化学习中需要对齐的主要是loss和reward值,这里讲的是如何对齐reward。

此处采取的方法是把reward model单独拿出来,for循环多步,对比GPU和NPU的值reward值,代码修改如下:

for step in range(1, 1001):
             #  text = tokenizer([batch_caption[0]]).to(device=device, non_blocking=True)
         image = torch.load(f"/home/grpo/DanceGRPO/save/images-1/image_{step}_{rank}.pt")
         text = torch.load(f"/home/grpo/DanceGRPO/save/texts-1/text_{step}_{rank}.pt")
         
         #  torch.save(image, f"/home/GRPO/DanceGRPO/save/images-1/image_{step}_{rank}.pt")
         #  torch.save(text, f"/home/GRPO/DanceGRPO/save/texts-1/text_{step}_{rank}.pt")
         if rank == 0:
             print(f"image_{rank}_{step}: ", image, "\n\n")
             print(f"text_{rank}_{step}: ", text, "\n\n")
         with torch.no_grad():
             with torch.amp.autocast("cuda"):
                 outputs = reward_model(image, text)
         if rank == 0:
             print(f"output_{rank}_{step}: ", outputs, "\n\n")
         image_features, text_features = outputs["image_features"], outputs["text_features"]
         logits_per_image = image_features @ text_features.T
         hps_score = torch.diagonal(logits_per_image)
         all_rewards = []
         all_rewards.append(hps_score)
         all_rewards = torch.cat(all_rewards, dim=0)
         samples = {
             "rewards": all_rewards.to(torch.float32)
         }
         if rank == 0:
             print(f"samples_{rank}_{step}: ", samples, "\n\n")
         gathered_reward = gather_tensor(samples["rewards"])
         if rank == 0:
             print(f"gather_reward_{rank}_{step}: ", gathered_reward, "\n\n")
         if dist.get_rank() == 0:
             print("gathered_hps_reward", gathered_reward)
             with open('./hps_reward.txt', 'a') as f:
                 f.write(f"{gathered_reward.mean().item()}\n")
         samples_batched = {
             k: v.unsqueeze(1)
             for k, v in samples.items()
         }
         samples_batched_list = [
             dict(zip(samples_batched, x)) for x in zip(*samples_batched.values())
         ]
         for i, sample in list(enumerate(samples_batched_list)):
             if rank == 0:
                 print(f"sample_{rank}_{step}: ", sample["rewards"], "\n\n")
             if dist.get_rank() % 8 == 0:
                 print("hps reward", sample["rewards"].item(), "\n\n\n\n\n")
             #  print("ratio", ratio)
             #  print("advantage", sample["advantages"].item())
             #  print("final loss", loss.item())

生成1000个reward值,其精度对比效果如下(绝对误差≈0.015%):
在这里插入图片描述

端到端对齐

对齐标准

固定随机性后,需要按照如下标准关注对齐结果:

  1. 关注推理阶段生成的图片,主观对齐
  2. 关注训练过程中的loss(生成模型loss较小,参考价值有限)
  3. 关注reward scores,200步误差5%以内

对齐步骤

端到端对齐流程主要关注两方面,一方面是是综合度量模型训练的指标:loss+reward scores,另一方面是下游任务,也就是强化学习后的模型保存权重,在GPU、NPU上分别观察推理效果

对于模型训练的综合度量指标对齐,根据DanceGRPO端到端流程(如下图),对于其中的子阶段(推理阶段、reward阶段)。整体流程中,需要对齐实际训练中的loss指标与reward scores指标
在这里插入图片描述

全流程对齐具体步骤:

  1. 两边加载相同的预训练权重
  2. 固定随机性:整体随机性与确定性计算固定(seed_all,mode=True),noise在cpu侧生成
  3. 保存关键信息:推理阶段的图片、reward阶段的rewardvalues、训练阶段模型loss,同时保存权重,用于对齐推理效果

此处注意需要持续关注推理阶段生成图片的效果,具体例子为在替换rope融合算子时,loss结果与reward差异不大,但推理阶段出现了花图

对齐结果

在这里插入图片描述

常见问题与解决方案

如遇到ROPE部分不支持complex128计算问题,NPU场景需要适配修改lib/python3.10/site-packages/diffusers/models/embeddings.py1250行,修改如下

is_mps = ids.device.type == "mps"
is_npu = ids.device.type == "npu"  #增加改行

##下面增加is_npu判断
freqs_dtype = torch.float32 if is_mps or is_npu else torch.float64
Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐