From af6376e2a0b4163f6f5af53bfc80a64d7e83478f Mon Sep 17 00:00:00 2001 From: Christopher Moyer Date: Tue, 17 Dec 2024 00:32:36 -0500 Subject: [PATCH] add mutex around call transcription --- src/main.rs | 58 +++++++++++++++++++++++++++++++++++------------------ 1 file changed, 38 insertions(+), 20 deletions(-) diff --git a/src/main.rs b/src/main.rs index fea7475..1e6b222 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,7 @@ use std::env; use std::fs::File; use std::path::Path; use std::process::Command; +use std::sync::{Arc, Mutex}; use chrono::{DateTime, Utc}; use lapin::{Connection, ConnectionProperties, Consumer}; use lapin::message::{Delivery, DeliveryResult}; @@ -40,27 +41,42 @@ _|"""""|_|"""""|_|"""""|_|"""""|_|"""""|_|"""""|_|"""""| {======|_|"""""|_|""""" let channel = connection.create_channel().await.unwrap(); let consumer = channel.basic_consume("transcribe", "whisper-worker", BasicConsumeOptions::default(), FieldTable::default()).await.unwrap(); - consumer.set_delegate(move |delivery: DeliveryResult| async move { - info!("Consuming mq message"); - let delivery = match delivery { - Ok(Some(delivery)) => delivery, - Ok(None) => return, - Err(error) => { - warn!("Failed to consume queue message {}", error); - return; + let processing_lock = Arc::new(Mutex::new(())); + consumer.set_delegate({ + let processing_lock = Arc::clone(&processing_lock); + move |delivery: DeliveryResult| { + let processing_lock = Arc::clone(&processing_lock); + async move { + info!("Consuming mq message"); + let delivery = match delivery { + Ok(Some(delivery)) => delivery, + Ok(None) => return, + Err(error) => { + warn!("Failed to consume queue message {}", error); + return; + } + }; + + let transcription_request: TranscriptionRequest = serde_json::from_slice(&delivery.data).unwrap(); + let path = Path::new(&transcription_request.audio_file_path); + if !path.exists() { + warn!("File not found: {}", &transcription_request.audio_file_path); + return delivery.ack(BasicAckOptions::default()).await.unwrap(); + } + + + let result = { + info!("Waiting for lock"); + let _lock = processing_lock.lock(); + info!("Acquired lock!"); + transcribe_call(path); + }; + info!("Lock released"); + + info!("Call transcription done"); + delivery.ack(BasicAckOptions::default()).await.unwrap() } - }; - - let transcription_request: TranscriptionRequest = serde_json::from_slice(&delivery.data).unwrap(); - let path = Path::new(&transcription_request.audio_file_path); - if !path.exists() { - warn!("File not found: {}", &transcription_request.audio_file_path); - return delivery.ack(BasicAckOptions::default()).await.unwrap(); } - transcribe_call(path); - - info!("Call transcription done"); - delivery.ack(BasicAckOptions::default()).await.unwrap() }); info!("Waiting for messages"); @@ -68,8 +84,9 @@ _|"""""|_|"""""|_|"""""|_|"""""|_|"""""|_|"""""|_|"""""| {======|_|"""""|_|""""" } fn transcribe_call(file_path: &Path) { + info!("Transcribing file {}", file_path.display()); let output_directory = file_path.parent().unwrap(); - Command::new("whisperx") + let output = Command::new("whisperx") .args(["--language", "en"]) .args(["--model", "largev3"]) .args(["--batch_size", "4"]) @@ -78,6 +95,7 @@ fn transcribe_call(file_path: &Path) { .args(["--output directory", output_directory.parent().unwrap().to_str().unwrap()]) .arg("file") .output().expect("TODO: panic message"); + info!("Transcription done, {}", &output.status); } #[derive(Serialize, Deserialize, Default, Debug)]