setup mq worker
This commit is contained in:
parent
7d6b3458e7
commit
99a67fcfa8
3 changed files with 2735 additions and 0 deletions
2568
Cargo.lock
generated
Normal file
2568
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load diff
16
Cargo.toml
Normal file
16
Cargo.toml
Normal file
|
|
@ -0,0 +1,16 @@
|
|||
[package]
|
||||
name = "whisper-worker"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "1.42.0", features = ["rt-multi-thread", "rt", "macros"] }
|
||||
chrono = {version = "0.4.39", features = ["serde"] }
|
||||
serde = { version = "1.0.216", features = ["derive"] }
|
||||
serde_json = "1.0.133"
|
||||
serde_with = "3.11.0"
|
||||
lapin = "2.5.0"
|
||||
confy = "0.6.1"
|
||||
tokio-executor-trait = "2.1.3"
|
||||
tokio-reactor-trait = "1.1.0"
|
||||
log = "0.4.22"
|
||||
151
src/main.rs
Normal file
151
src/main.rs
Normal file
|
|
@ -0,0 +1,151 @@
|
|||
use serde_with::BoolFromInt;
|
||||
use chrono::serde::ts_milliseconds;
|
||||
use std::env;
|
||||
use std::fs::File;
|
||||
use std::path::Path;
|
||||
use std::process::Command;
|
||||
use chrono::{DateTime, Utc};
|
||||
use lapin::{Connection, ConnectionProperties, Consumer};
|
||||
use lapin::message::{Delivery, DeliveryResult};
|
||||
use lapin::options::{BasicAckOptions, BasicConsumeOptions};
|
||||
use lapin::types::FieldTable;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use log::warn;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let args: Vec<String> = env::args().collect();
|
||||
// let cfg: AppConfig = confy::load_path(Path::new(&args[1])).expect("Couldn't read config");
|
||||
let cfg: AppConfig = confy::load_path("./config.toml").expect("Couldn't read config");
|
||||
|
||||
let options = ConnectionProperties::default()
|
||||
.with_executor(tokio_executor_trait::Tokio::current())
|
||||
.with_reactor(tokio_reactor_trait::Tokio);
|
||||
let connection = Connection::connect(&cfg.rabbit_mq_config.connection_string, options).await.unwrap();
|
||||
let channel = connection.create_channel().await.unwrap();
|
||||
let consumer = channel.basic_consume("transcribe", "whisper-worker", BasicConsumeOptions::default(), FieldTable::default()).await.unwrap();
|
||||
|
||||
consumer.set_delegate(move |delivery: DeliveryResult| async move {
|
||||
let delivery = match delivery {
|
||||
Ok(Some(delivery)) => delivery,
|
||||
Ok(None) => return,
|
||||
Err(error) => {
|
||||
dbg!("Failed to consume queue message {}", error);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let transcription_request: TranscriptionRequest = serde_json::from_slice(&delivery.data).unwrap();
|
||||
let path = Path::new(&transcription_request.audio_file_path);
|
||||
if !path.exists() {
|
||||
warn!("File not found: {}", &transcription_request.audio_file_path);
|
||||
return delivery.ack(BasicAckOptions::default()).await.unwrap();
|
||||
}
|
||||
transcribe_call(path);
|
||||
|
||||
delivery.ack(BasicAckOptions::default()).await.unwrap()
|
||||
});
|
||||
|
||||
std::future::pending::<()>().await;
|
||||
}
|
||||
|
||||
fn transcribe_call(file_path: &Path) {
|
||||
let output_directory = file_path.parent().unwrap();
|
||||
Command::new("whisperx")
|
||||
.args(["--language", "en"])
|
||||
.args(["--model", "largev3"])
|
||||
.args(["--batch_size", "4"])
|
||||
.args(["--compute_type", "int8"])
|
||||
.args(["--output_format", "txt"])
|
||||
.args(["--output directory", output_directory.parent().unwrap().to_str().unwrap()])
|
||||
.arg("file")
|
||||
.output().expect("TODO: panic message");
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Default, Debug)]
|
||||
struct TranscriptionRequest {
|
||||
audio_file_path: String,
|
||||
call_metadata: Call
|
||||
}
|
||||
|
||||
#[serde_with::serde_as]
|
||||
#[derive(Serialize, Deserialize, Default, Debug)]
|
||||
struct Call {
|
||||
freq: u32,
|
||||
freq_error: i32,
|
||||
signal: i32,
|
||||
noise: i32,
|
||||
source_num: u32,
|
||||
recorder_num: u32,
|
||||
tdma_slot: u32,
|
||||
#[serde_as(as = "BoolFromInt")]
|
||||
phase2_tdma: bool,
|
||||
#[serde(with = "ts_milliseconds")]
|
||||
start_time: DateTime<Utc>,
|
||||
#[serde(with = "ts_milliseconds")]
|
||||
stop_time: DateTime<Utc>,
|
||||
#[serde_as(as = "BoolFromInt")]
|
||||
emergency: bool,
|
||||
priority: i32,
|
||||
#[serde_as(as = "BoolFromInt")]
|
||||
mode: bool,
|
||||
#[serde_as(as = "BoolFromInt")]
|
||||
duplex: bool,
|
||||
#[serde_as(as = "BoolFromInt")]
|
||||
encrypted: bool,
|
||||
call_length: i32,
|
||||
talkgroup: u64,
|
||||
talkgroup_tag: String,
|
||||
talkgroup_description: String,
|
||||
talkgroup_group_tag: String,
|
||||
talkgroup_group: String,
|
||||
audio_type: String,
|
||||
short_name: String,
|
||||
#[serde(rename = "freqList")]
|
||||
freq_list: Vec<CallFrequency>,
|
||||
#[serde(rename = "srcList")]
|
||||
src_list: Vec<CallSource>,
|
||||
patched_talkgroups: Option<Vec<u32>>
|
||||
}
|
||||
|
||||
#[serde_with::serde_as]
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct CallSource {
|
||||
src: i64,
|
||||
#[serde(with = "ts_milliseconds")]
|
||||
time: DateTime<Utc>,
|
||||
pos: f64,
|
||||
#[serde_as(as = "BoolFromInt")]
|
||||
emergency: bool,
|
||||
signal_system: String,
|
||||
tag: String
|
||||
}
|
||||
|
||||
#[serde_with::serde_as]
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct CallFrequency {
|
||||
freq: f64,
|
||||
#[serde(with = "ts_milliseconds")]
|
||||
time: DateTime<Utc>,
|
||||
pos: f64,
|
||||
len: f64,
|
||||
error_count: i32,
|
||||
spike_count: i32
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
struct AppConfig {
|
||||
rabbit_mq_config: RabbitMqConfig,
|
||||
file_storage_path: String
|
||||
}
|
||||
|
||||
impl Default for AppConfig {
|
||||
fn default() -> Self {
|
||||
panic!("Could not find config file")
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
struct RabbitMqConfig {
|
||||
connection_string: String,
|
||||
}
|
||||
Loading…
Reference in a new issue