Paraformer to ascend npu
V1: Static shape
The following code is from ChatGPT:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import acl
import numpy as np
import librosa
import kaldi_native_fbank as knf
# -----------------------------
# 特征提取
# -----------------------------
def compute_feat(filename):
sample_rate = 16000
samples, _ = librosa.load(filename, sr=sample_rate)
opts = knf.FbankOptions()
opts.frame_opts.dither = 0
opts.frame_opts.snip_edges = False
opts.frame_opts.samp_freq = sample_rate
opts.mel_opts.num_bins = 80
online_fbank = knf.OnlineFbank(opts)
online_fbank.accept_waveform(sample_rate, (samples * 32768).tolist())
online_fbank.input_finished()
features = np.stack([online_fbank.get_frame(i) for i in range(online_fbank.num_frames_ready)])
assert features.data.contiguous is True
assert features.dtype == np.float32
window_size = 7
window_shift = 6
T = (features.shape[0] - window_size) // window_shift + 1
features = np.lib.stride_tricks.as_strided(
features,
shape=(T, features.shape[1] * window_size),
strides=((window_shift * features.shape[1]) * 4, 4),
)
return np.copy(features)
# -----------------------------
# Acoustic embedding
# -----------------------------
def get_acoustic_embedding(alpha: np.array, hidden: np.array):
alpha = alpha.tolist()
acc = 0
embeddings = []
cur_embedding = np.zeros((hidden.shape[1],), dtype=np.float32)
for i, w in enumerate(alpha):
acc += w
if acc >= 1:
overflow = acc - 1
remain = w - overflow
cur_embedding += remain * hidden[i]
embeddings.append(cur_embedding)
cur_embedding = overflow * hidden[i]
acc = overflow
else:
cur_embedding += w * hidden[i]
if len(embeddings) == 0:
raise ValueError("No speech in the audio file")
return np.array(embeddings)
# -----------------------------
# Tokens
# -----------------------------
def load_tokens():
ans = dict()
i = 0
with open("tokens.txt", encoding="utf-8") as f:
for line in f:
ans[i] = line.strip().split()[0]
i += 1
return ans
# -----------------------------
# ACL 模型加载
# -----------------------------
ACL_MEM_MALLOC_NORMAL_ONLY = 0
ACL_MEMCPY_HOST_TO_DEVICE = 0
ACL_MEMCPY_DEVICE_TO_HOST = 1
def load_om_model(path):
model_id, ret = acl.mdl.load_from_file(path)
if ret != 0:
raise RuntimeError(f"Load model failed: {path}, ret={ret}")
model_desc = acl.mdl.create_desc()
acl.mdl.get_desc(model_desc, model_id)
return model_id, model_desc
# -----------------------------
# 单输入模型推理
# -----------------------------
def run_model_single_input(model_id, model_desc, input_array):
input_size = input_array.nbytes
input_buf, _ = acl.rt.malloc(input_size, ACL_MEM_MALLOC_NORMAL_ONLY)
acl.rt.memcpy(input_buf, input_size, input_array.ctypes.data, input_size, ACL_MEMCPY_HOST_TO_DEVICE)
input_dataset = acl.mdl.create_dataset()
input_db = acl.create_data_buffer(input_buf, input_size)
acl.mdl.add_dataset_buffer(input_dataset, input_db)
output_size = acl.mdl.get_output_size_by_index(model_desc, 0)
output_buf, _ = acl.rt.malloc(output_size, ACL_MEM_MALLOC_NORMAL_ONLY)
output_dataset = acl.mdl.create_dataset()
output_db = acl.create_data_buffer(output_buf, output_size)
acl.mdl.add_dataset_buffer(output_dataset, output_db)
ret = acl.mdl.execute(model_id, input_dataset, output_dataset)
if ret != 0:
raise RuntimeError(f"Execute failed, ret={ret}")
output_host, _ = acl.rt.malloc_host(output_size)
acl.rt.memcpy(output_host, output_size, output_buf, output_size, ACL_MEMCPY_DEVICE_TO_HOST)
output_data = np.frombuffer(acl.util.ptr_to_bytes(output_host, output_size), dtype=np.float32)
acl.rt.free(input_buf)
acl.rt.free(output_buf)
acl.rt.free_host(output_host)
acl.mdl.destroy_dataset(input_dataset)
acl.mdl.destroy_dataset(output_dataset)
return output_data
# -----------------------------
# 多输入 decoder 推理
# -----------------------------
def run_model_decoder(model_id, model_desc, encoder_out, acoustic_embedding, mask):
# 准备输入 Dataset
input_dataset = acl.mdl.create_dataset()
# encoder_out
buf1, _ = acl.rt.malloc(encoder_out.nbytes, ACL_MEM_MALLOC_NORMAL_ONLY)
acl.rt.memcpy(buf1, encoder_out.nbytes, encoder_out.ctypes.data, encoder_out.nbytes, ACL_MEMCPY_HOST_TO_DEVICE)
db1 = acl.create_data_buffer(buf1, encoder_out.nbytes)
acl.mdl.add_dataset_buffer(input_dataset, db1)
# acoustic_embedding
buf2, _ = acl.rt.malloc(acoustic_embedding.nbytes, ACL_MEM_MALLOC_NORMAL_ONLY)
acl.rt.memcpy(buf2, acoustic_embedding.nbytes, acoustic_embedding.ctypes.data, acoustic_embedding.nbytes, ACL_MEMCPY_HOST_TO_DEVICE)
db2 = acl.create_data_buffer(buf2, acoustic_embedding.nbytes)
acl.mdl.add_dataset_buffer(input_dataset, db2)
# mask
buf3, _ = acl.rt.malloc(mask.nbytes, ACL_MEM_MALLOC_NORMAL_ONLY)
acl.rt.memcpy(buf3, mask.nbytes, mask.ctypes.data, mask.nbytes, ACL_MEMCPY_HOST_TO_DEVICE)
db3 = acl.create_data_buffer(buf3, mask.nbytes)
acl.mdl.add_dataset_buffer(input_dataset, db3)
# 输出
output_size = acl.mdl.get_output_size_by_index(model_desc, 0)
output_buf, _ = acl.rt.malloc(output_size, ACL_MEM_MALLOC_NORMAL_ONLY)
output_dataset = acl.mdl.create_dataset()
output_db = acl.create_data_buffer(output_buf, output_size)
acl.mdl.add_dataset_buffer(output_dataset, output_db)
# 执行
ret = acl.mdl.execute(model_id, input_dataset, output_dataset)
if ret != 0:
raise RuntimeError(f"Decoder execute failed, ret={ret}")
# 拷贝回 host
output_host, _ = acl.rt.malloc_host(output_size)
acl.rt.memcpy(output_host, output_size, output_buf, output_size, ACL_MEMCPY_DEVICE_TO_HOST)
output_data = np.frombuffer(acl.util.ptr_to_bytes(output_host, output_size), dtype=np.float32)
# 释放
acl.rt.free(buf1)
acl.rt.free(buf2)
acl.rt.free(buf3)
acl.rt.free(output_buf)
acl.rt.free_host(output_host)
acl.mdl.destroy_dataset(input_dataset)
acl.mdl.destroy_dataset(output_dataset)
return output_data
# -----------------------------
# 主函数
# -----------------------------
def main():
acl.init()
device_id = 0
acl.rt.set_device(device_id)
context, _ = acl.rt.create_context(device_id)
# 特征
features = compute_feat("./1.wav")
if features.shape[0] >= 83:
features = features[:83]
else:
padding = features[-(83 - features.shape[0]):]
features = np.concatenate([features, padding])
features = features[None, ...] # batch=1
# 加载 OM 模型
enc_id, enc_desc = load_om_model("encoder.om")
pred_id, pred_desc = load_om_model("predictor.om")
dec_id, dec_desc = load_om_model("decoder.om")
# encoder
encoder_out = run_model_single_input(enc_id, enc_desc, features)
encoder_out = encoder_out.reshape(-1, 512)
print("encoder_out", encoder_out.shape, encoder_out.sum(), encoder_out.mean())
# predictor
alpha = run_model_single_input(pred_id, pred_desc, encoder_out)
print("alpha", alpha.shape, alpha.sum(), alpha.mean())
# acoustic embedding
acoustic_embedding = get_acoustic_embedding(alpha, encoder_out)
print('acoustic_embedding', acoustic_embedding.shape)
num_tokens = acoustic_embedding.shape[0]
print('num_tokens', num_tokens)
padding = np.zeros((83 - num_tokens, 512), dtype=np.float32)
acoustic_embedding = np.concatenate([acoustic_embedding, padding], axis=0)
mask = np.zeros((83,), dtype=np.float32)
mask[:num_tokens] = 1
print('mask', mask.sum(), mask.shape)
decoder_input = acoustic_embedding[None, ...]
# decoder
decoder_out = run_model_decoder(dec_id, dec_desc, encoder_out, decoder_input, mask)
decoder_out = decoder_out.reshape(83, -1)
print("decoder_out", decoder_out.shape, decoder_out.sum(), decoder_out.mean())
# 输出文字
yseq = decoder_out[:num_tokens].argmax(axis=-1).tolist()
print('yseq', yseq)
tokens = load_tokens()
words = [tokens[i] for i in yseq if i not in (1,2)]
text = "".join(words)
print("Recognized text:", text)
# 卸载模型 & 清理
for mid, mdesc in [(enc_id, enc_desc), (pred_id, pred_desc), (dec_id, dec_desc)]:
acl.mdl.unload(mid)
acl.mdl.destroy_desc(mdesc)
acl.rt.destroy_context(context)
acl.rt.reset_device(device_id)
acl.finalize()
if __name__ == "__main__":
main()
V2: Static shape + simplified API
#!/usr/bin/env python3
# Copyright (c) 2025 Xiaomi Corporation
import kaldi_native_fbank as knf
import librosa
import torch
import numpy as np
from ais_bench.infer.interface import InferSession
class SinusoidalPositionEncoder(torch.nn.Module):
def encode(
self,
positions: torch.Tensor = None,
depth: int = None,
dtype: torch.dtype = torch.float32,
):
"""
Args:
positions: (batch_size, )
"""
batch_size = positions.size(0)
positions = positions.type(dtype)
device = positions.device
log_timescale_increment = torch.log(
torch.tensor([10000], dtype=dtype, device=device)
) / (depth / 2 - 1)
inv_timescales = torch.exp(
torch.arange(depth / 2, device=device).type(dtype)
* (-log_timescale_increment)
)
inv_timescales = torch.reshape(inv_timescales, [batch_size, -1])
scaled_time = torch.reshape(positions, [1, -1, 1]) * torch.reshape(
inv_timescales, [1, 1, -1]
)
encoding = torch.cat([torch.sin(scaled_time), torch.cos(scaled_time)], dim=2)
return encoding.type(dtype)
def forward(self, batch_size, timesteps, input_dim):
positions = torch.arange(1, timesteps + 1)[None, :]
position_encoding = self.encode(positions, input_dim, torch.float32)
return position_encoding
def compute_feat(filename):
sample_rate = 16000
samples, _ = librosa.load(filename, sr=sample_rate)
opts = knf.FbankOptions()
opts.frame_opts.dither = 0
opts.frame_opts.snip_edges = False
opts.frame_opts.samp_freq = sample_rate
opts.mel_opts.num_bins = 80
online_fbank = knf.OnlineFbank(opts)
online_fbank.accept_waveform(sample_rate, (samples * 32768).tolist())
online_fbank.input_finished()
features = np.stack(
[online_fbank.get_frame(i) for i in range(online_fbank.num_frames_ready)]
)
assert features.data.contiguous is True
assert features.dtype == np.float32, features.dtype
print("features sum", features.sum(), features.shape)
window_size = 7 # lfr_m
window_shift = 6 # lfr_n
T = (features.shape[0] - window_size) // window_shift + 1
features = np.lib.stride_tricks.as_strided(
features,
shape=(T, features.shape[1] * window_size),
strides=((window_shift * features.shape[1]) * 4, 4),
)
return np.copy(features)
def load_tokens():
ans = dict()
i = 0
with open("tokens.txt", encoding="utf-8") as f:
for line in f:
ans[i] = line.strip().split()[0]
i += 1
return ans
class OmModel:
def __init__(self):
print("init encoder")
self.encoder = InferSession(device_id=0, model_path='./encoder.om', debug=False)
self.decoder = InferSession(device_id=0, model_path='./decoder.om', debug=False)
self.predictor = InferSession(device_id=0, model_path='./predictor.om', debug=False)
print("---encoder---")
for i in self.encoder.get_inputs():
print(i.name, i.datatype, i.shape)
print("-----")
for i in self.encoder.get_outputs():
print(i.name, i.datatype, i.shape)
print("---decoder---")
for i in self.decoder.get_inputs():
print(i.name, i.datatype, i.shape)
print("-----")
for i in self.decoder.get_outputs():
print(i.name, i.datatype, i.shape)
print("---predictor---")
for i in self.predictor.get_inputs():
print(i.name, i.datatype, i.shape)
print("-----")
for i in self.predictor.get_outputs():
print(i.name, i.datatype, i.shape)
# def run_encoder(self, features, pos_emb):
def run_encoder(self, features):
encoder_out = self.encoder.infer([features], mode='static')[0]
return encoder_out
def run_predictor(self, encoder_out):
alphas = self.predictor.infer([encoder_out], mode='static')[0]
return alphas
# def run_decoder(self, encoder_out, acoustic_embedding, mask):
def run_decoder(self, encoder_out, acoustic_embedding, mask):
decoder_out = self.decoder.infer([encoder_out, acoustic_embedding, mask], mode='static')[0]
return decoder_out
def get_acoustic_embedding(alpha: np.array, hidden: np.array):
"""
Args:
alpha: (T,)
hidden: (T, C)
Returns:
acoustic_embeds: (num_tokens, C)
"""
alpha = alpha.tolist()
acc = 0
num_tokens = 0
embeddings = []
cur_embedding = np.zeros((hidden.shape[1],), dtype=np.float32)
for i, w in enumerate(alpha):
acc += w
if acc >= 1:
overflow = acc - 1
remain = w - overflow
cur_embedding += remain * hidden[i]
embeddings.append(cur_embedding)
cur_embedding = overflow * hidden[i]
acc = overflow
else:
cur_embedding += w * hidden[i]
if len(embeddings) == 0:
raise ValueError("No speech in the audio file")
embeddings = np.array(embeddings)
return embeddings
def main():
features = compute_feat("./1.wav")
print("here", features.shape, features.shape[0] > 83)
if features.shape[0] >= 83:
features = features[:83]
else:
padding = features[-(83 - features.shape[0]) :]
print("padding", features.shape, padding.shape)
features = np.concatenate([features, padding])
pos_emb = (
SinusoidalPositionEncoder()(1, features.shape[0], features.shape[1])
.squeeze(0)
.numpy()
)
print("features.shape", features.shape, pos_emb.shape)
print("sum", features.sum(), features.mean(), pos_emb.sum(), pos_emb.mean())
model = OmModel()
# encoder_out = model.run_encoder(features[None], pos_emb[None])
encoder_out = model.run_encoder(features[None])
print("encoder_out.shape", encoder_out.shape)
print("encoder_out.sum", encoder_out.sum(), encoder_out.mean())
alpha = model.run_predictor(encoder_out)
print("alpha.shape", alpha.shape)
print("alpha.sum()", alpha.sum(), alpha.mean())
acoustic_embedding = get_acoustic_embedding(alpha[0], encoder_out[0])
print("acoustic_embedding.shape", acoustic_embedding.shape)
num_tokens = acoustic_embedding.shape[0]
padding = np.zeros((83 - acoustic_embedding.shape[0], 512), dtype=np.float32)
print("padding.shape", padding.shape, acoustic_embedding.shape)
acoustic_embedding = np.concatenate([acoustic_embedding, padding], axis=0)
print("acoustic_embedding.shape", acoustic_embedding.shape)
print("acoustic_embedding.sum", acoustic_embedding.sum(), acoustic_embedding.mean())
mask = np.zeros((83,), dtype=np.float32)
mask[:num_tokens] = 1
print(mask)
decoder_out = model.run_decoder(encoder_out, acoustic_embedding[None], mask)
# decoder_out = model.run_decoder(encoder_out, acoustic_embedding[None])
print("decoder_out", decoder_out.shape)
print("decoder_out.sum", decoder_out.sum(), decoder_out.mean())
yseq = decoder_out[0, :num_tokens].argmax(axis=-1).tolist()
print(yseq, "-->", len(yseq))
tokens = load_tokens()
words = [tokens[i] for i in yseq if i not in (1, 2)]
print(words)
text = "".join(words)
print(text)
if __name__ == "__main__":
main()