add mutex around call transcription

This commit is contained in:
Christopher Moyer 2024-12-17 00:32:36 -05:00
parent 4f5c1f8ac4
commit af6376e2a0

View file

@ -4,6 +4,7 @@ use std::env;
use std::fs::File;
use std::path::Path;
use std::process::Command;
use std::sync::{Arc, Mutex};
use chrono::{DateTime, Utc};
use lapin::{Connection, ConnectionProperties, Consumer};
use lapin::message::{Delivery, DeliveryResult};
@ -40,27 +41,42 @@ _|"""""|_|"""""|_|"""""|_|"""""|_|"""""|_|"""""|_|"""""| {======|_|"""""|_|"""""
let channel = connection.create_channel().await.unwrap();
let consumer = channel.basic_consume("transcribe", "whisper-worker", BasicConsumeOptions::default(), FieldTable::default()).await.unwrap();
consumer.set_delegate(move |delivery: DeliveryResult| async move {
info!("Consuming mq message");
let delivery = match delivery {
Ok(Some(delivery)) => delivery,
Ok(None) => return,
Err(error) => {
warn!("Failed to consume queue message {}", error);
return;
let processing_lock = Arc::new(Mutex::new(()));
consumer.set_delegate({
let processing_lock = Arc::clone(&processing_lock);
move |delivery: DeliveryResult| {
let processing_lock = Arc::clone(&processing_lock);
async move {
info!("Consuming mq message");
let delivery = match delivery {
Ok(Some(delivery)) => delivery,
Ok(None) => return,
Err(error) => {
warn!("Failed to consume queue message {}", error);
return;
}
};
let transcription_request: TranscriptionRequest = serde_json::from_slice(&delivery.data).unwrap();
let path = Path::new(&transcription_request.audio_file_path);
if !path.exists() {
warn!("File not found: {}", &transcription_request.audio_file_path);
return delivery.ack(BasicAckOptions::default()).await.unwrap();
}
let result = {
info!("Waiting for lock");
let _lock = processing_lock.lock();
info!("Acquired lock!");
transcribe_call(path);
};
info!("Lock released");
info!("Call transcription done");
delivery.ack(BasicAckOptions::default()).await.unwrap()
}
};
let transcription_request: TranscriptionRequest = serde_json::from_slice(&delivery.data).unwrap();
let path = Path::new(&transcription_request.audio_file_path);
if !path.exists() {
warn!("File not found: {}", &transcription_request.audio_file_path);
return delivery.ack(BasicAckOptions::default()).await.unwrap();
}
transcribe_call(path);
info!("Call transcription done");
delivery.ack(BasicAckOptions::default()).await.unwrap()
});
info!("Waiting for messages");
@ -68,8 +84,9 @@ _|"""""|_|"""""|_|"""""|_|"""""|_|"""""|_|"""""|_|"""""| {======|_|"""""|_|"""""
}
fn transcribe_call(file_path: &Path) {
info!("Transcribing file {}", file_path.display());
let output_directory = file_path.parent().unwrap();
Command::new("whisperx")
let output = Command::new("whisperx")
.args(["--language", "en"])
.args(["--model", "largev3"])
.args(["--batch_size", "4"])
@ -78,6 +95,7 @@ fn transcribe_call(file_path: &Path) {
.args(["--output directory", output_directory.parent().unwrap().to_str().unwrap()])
.arg("file")
.output().expect("TODO: panic message");
info!("Transcription done, {}", &output.status);
}
#[derive(Serialize, Deserialize, Default, Debug)]