use serde_with::BoolFromInt; use chrono::serde::ts_milliseconds; use std::{env, fs}; use std::fs::File; use std::io::Error; use std::path::{Path, PathBuf}; use std::process::{Command, Output}; use std::sync::{Arc, Mutex}; use chrono::{DateTime, Utc}; use lapin::{Connection, ConnectionProperties, Consumer}; use lapin::message::{Delivery, DeliveryResult}; use lapin::options::{BasicAckOptions, BasicConsumeOptions}; use lapin::types::FieldTable; use serde::{Deserialize, Serialize}; use log::{error, info, trace, warn}; use meilisearch_sdk::client::Client; use meilisearch_sdk::task_info::TaskInfo; use meilisearch_sdk::tasks::Task; use uuid::Uuid; #[tokio::main(flavor = "multi_thread")] async fn main() { let ascii = r#" __ ___ _ _ __ __ __ _ \ \ / / |_ (_) ___ | '_ \ ___ _ _ o O O\ \ / / ___ _ _ | |__ ___ _ _ \ \/\/ /| ' \ | | (_-< | .__/ / -_) | '_| o \ \/\/ / / _ \ | '_| | / / / -_) | '_| \_/\_/ |_||_| _|_|_ /__/_ |_|__ \___| _|_|_ TS__[O] \_/\_/ \___/ _|_|_ |_\_\ \___| _|_|_ _|"""""|_|"""""|_|"""""|_|"""""|_|"""""|_|"""""|_|"""""| {======|_|"""""|_|"""""|_|"""""|_|"""""|_|"""""|_|"""""| "`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'./o--000'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-' "#; println!("{ascii}"); let args: Vec = env::args().collect(); // let cfg: AppConfig = confy::load_path(Path::new(&args[1])).expect("Couldn't read config"); let cfg: AppConfig = confy::load_path("./config.toml").expect("Couldn't read config"); match Path::new("log4rs.yaml").exists() { true => log4rs::init_file("log4rs.yaml", Default::default()).unwrap(), false => println!("No log4rs.yaml file found. Logging will not be enabled") } info!("Setting up mq consumer"); let options = ConnectionProperties::default() .with_executor(tokio_executor_trait::Tokio::current()) .with_reactor(tokio_reactor_trait::Tokio); let connection = Connection::connect(&cfg.rabbit_mq_config.connection_string, options).await.unwrap(); let channel = connection.create_channel().await.unwrap(); let consumer = channel.basic_consume("transcribe", "whisper-worker", BasicConsumeOptions::default(), FieldTable::default()).await.unwrap(); let processing_lock = Arc::new(Mutex::new(())); consumer.set_delegate({ let processing_lock = Arc::clone(&processing_lock); move |delivery: DeliveryResult| { let meilisearch_client = Client::new(&cfg.meilisearch_config.connection_string, Some(&cfg.meilisearch_config.api_key)).expect("Couldn't create meilisearch client"); let processing_lock = Arc::clone(&processing_lock); async move { info!("Consuming mq message"); let delivery = match delivery { Ok(Some(delivery)) => delivery, Ok(None) => return, Err(error) => { warn!("Failed to consume queue message {}", error); return; } }; let transcription_request: TranscriptionRequest = serde_json::from_slice(&delivery.data).unwrap(); let path = Path::new(&transcription_request.audio_file_path); if !path.exists() { warn!("File not found: {}", &transcription_request.audio_file_path); return delivery.ack(BasicAckOptions::default()).await.unwrap(); } let transcription_result = { let _lock = processing_lock.lock(); transcribe_call(path) }; match transcription_result { Ok(result) if result.status.success() => { info!("Successfully transcribed {}", &transcription_request.audio_file_path); if !result.stdout.is_empty() { trace!("Stdout: {}", String::from_utf8_lossy(&result.stdout)); } if !result.stderr.is_empty() { trace!("Stderr: {}", String::from_utf8_lossy(&result.stderr)); } } Ok(result) => { error!("Failed to transcribe {}, Exit code {}", &transcription_request.audio_file_path, result.status.code().unwrap()); error!("Stdout: {}", String::from_utf8_lossy(&result.stdout)); error!("Stderr: {}", String::from_utf8_lossy(&result.stderr)); return delivery.ack(BasicAckOptions::default()).await.unwrap(); } Err(error) => { error!("Failed to transcribe {}, {}", &transcription_request.audio_file_path, error); return delivery.ack(BasicAckOptions::default()).await.unwrap(); } } let transcript_path = path.with_extension("txt"); let transcript = match fs::read_to_string(&transcript_path) { Ok(transcript) => transcript, Err(_) => { error!("Failed to read transcript file {}", &transcript_path.display()); return delivery.ack(BasicAckOptions::default()).await.unwrap(); } }; let meilisearch_task = match write_to_meilisearch(&meilisearch_client, transcription_request.call_metadata, transcript).await { Ok(task) => task, Err(error) => { error!("Failed to send message to meilisearch, {}", error); return delivery.ack(BasicAckOptions::default()).await.unwrap(); } }; info!("Sent to meilisearch, waiting for task to complete"); match wait_for_task_to_complete(&meilisearch_client, &meilisearch_task).await { Ok(task) if task.is_failure() => { error!("Failed to send message to meilisearch, {}", task.unwrap_failure()); } Err(error) => { error!("Failed to send message to meilisearch, {}", error); } _ => {} } info!("Meilisearch task completed"); delivery.ack(BasicAckOptions::default()).await.unwrap() } } }); info!("Startup Complete!"); std::future::pending::<()>().await; } fn transcribe_call(file_path: &Path) -> Result { info!("Transcribing file {}", file_path.display()); let output_directory = file_path.parent().unwrap(); Command::new("whisperx") .args(["--language", "en"]) .args(["--model", "large-v3"]) .args(["--batch_size", "4"]) .args(["--compute_type", "int8"]) .args(["--output_format", "txt"]) .args(["--output_dir", output_directory.to_str().unwrap()]) .arg(file_path) .output() } async fn write_to_meilisearch(client: &Client, call: Call, transcript: String) -> Result { let doc = MeilisearchCall { id: String::from(Uuid::new_v4()), transcript, metadata: call }; info!("Sending message to meilisearch"); client .index("calls") .add_documents(&[doc], None) .await } async fn wait_for_task_to_complete(client: &Client, task: &TaskInfo) -> Result { loop { let result = client.get_task(task).await; match &result { Ok(task) if task.is_pending() => { tokio::time::sleep(std::time::Duration::from_secs(2)).await; } _ => { return result; } } } } #[derive(Serialize, Deserialize, Default, Debug)] struct TranscriptionRequest { audio_file_path: String, call_metadata: Call } #[derive(Serialize, Deserialize, Debug)] struct MeilisearchCall { id: String, transcript: String, metadata: Call } #[serde_with::serde_as] #[derive(Serialize, Deserialize, Default, Debug)] struct Call { freq: u32, freq_error: i32, signal: i32, noise: i32, source_num: u32, recorder_num: u32, tdma_slot: u32, #[serde_as(as = "BoolFromInt")] phase2_tdma: bool, #[serde(with = "ts_milliseconds")] start_time: DateTime, #[serde(with = "ts_milliseconds")] stop_time: DateTime, #[serde_as(as = "BoolFromInt")] emergency: bool, priority: i32, #[serde_as(as = "BoolFromInt")] mode: bool, #[serde_as(as = "BoolFromInt")] duplex: bool, #[serde_as(as = "BoolFromInt")] encrypted: bool, call_length: i32, talkgroup: u64, talkgroup_tag: String, talkgroup_description: String, talkgroup_group_tag: String, talkgroup_group: String, audio_type: String, short_name: String, #[serde(rename = "freqList")] freq_list: Vec, #[serde(rename = "srcList")] src_list: Vec, patched_talkgroups: Option> } #[serde_with::serde_as] #[derive(Serialize, Deserialize, Debug)] struct CallSource { src: i64, #[serde(with = "ts_milliseconds")] time: DateTime, pos: f64, #[serde_as(as = "BoolFromInt")] emergency: bool, signal_system: String, tag: String } #[serde_with::serde_as] #[derive(Serialize, Deserialize, Debug)] struct CallFrequency { freq: f64, #[serde(with = "ts_milliseconds")] time: DateTime, pos: f64, len: f64, error_count: i32, spike_count: i32 } #[derive(Debug, Serialize, Deserialize, Clone)] struct AppConfig { rabbit_mq_config: RabbitMqConfig, meilisearch_config: MeilisearchConfig } impl Default for AppConfig { fn default() -> Self { panic!("Could not find config file") } } #[derive(Debug, Serialize, Deserialize, Clone)] struct RabbitMqConfig { connection_string: String, } #[derive(Debug, Serialize, Deserialize, Clone)] struct MeilisearchConfig { connection_string: String, api_key: String, }