Skip to content

Commit

Permalink
fix: #901, fix: switch back from jpeg encoding to png as it seem mpbm…
Browse files Browse the repository at this point in the history
…4max CPU not fast enough for 10 sized encoding queue, fix all tests, fix ci, fix linux add.rs, logging
  • Loading branch information
louis030195 committed Feb 1, 2025
1 parent 9311d63 commit b0eaf36
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 178 deletions.
2 changes: 1 addition & 1 deletion screenpipe-events/src/events_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl EventManager {
pub fn send<T: Serialize + 'static>(&self, event: impl Into<String>, data: T) -> Result<()> {
let event_name = event.into();
let value = serde_json::to_value(data)?;
tracing::debug!("Sending event {} with data {:?}", event_name, value);
tracing::debug!("sending event {} ", event_name);
match self.sender.send(Event {
name: event_name.clone(),
data: value,
Expand Down
2 changes: 1 addition & 1 deletion screenpipe-server/src/add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ pub async fn handle_index_command(
OcrEngine::WindowsNative => perform_ocr_windows(&frame).await.unwrap(),
_ => {
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
perform_ocr_tesseract(&frame, vec![]);
return perform_ocr_tesseract(&frame, vec![]);

Check failure on line 191 in screenpipe-server/src/add.rs

View workflow job for this annotation

GitHub Actions / test-linux

mismatched types

Check failure on line 191 in screenpipe-server/src/add.rs

View workflow job for this annotation

GitHub Actions / test-ubuntu

mismatched types

panic!("unsupported ocr engine");

Check warning on line 193 in screenpipe-server/src/add.rs

View workflow job for this annotation

GitHub Actions / test-linux

unreachable statement

Check warning on line 193 in screenpipe-server/src/add.rs

View workflow job for this annotation

GitHub Actions / test-ubuntu

unreachable statement
}
Expand Down
250 changes: 85 additions & 165 deletions screenpipe-server/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -948,31 +948,49 @@ impl DatabaseManager {
speaker_ids: Option<Vec<i64>>,
frame_name: Option<&str>,
) -> Result<usize, sqlx::Error> {
let mut json_array: String = "[]".to_string();
if let Some(ids) = speaker_ids {
// binding order:
// ?1 = query
// ?2 = start_time
// ?3 = end_time
// ?4 = app_name
// ?5 = window_name
// ?6 = frame_name (for OCR only)
// ?7 = min_length (for text/transcription length)
// ?8 = max_length (for text/transcription length)
// ?9 = json array for speaker_ids (for audio only)
let json_array = if let Some(ids) = speaker_ids {
if !ids.is_empty() {
json_array = serde_json::to_string(&ids).unwrap_or_default();
serde_json::to_string(&ids).unwrap_or_default()
} else {
"[]".to_string()
}
}
} else {
"[]".to_string()
};

let sql = match content_type {
ContentType::OCR => {
format!(
r#"
SELECT COUNT(DISTINCT frames.id)
FROM ocr_text_fts
JOIN ocr_text ON ocr_text_fts.frame_id = ocr_text.frame_id
FROM {table}
JOIN frames ON ocr_text.frame_id = frames.id
WHERE {}
WHERE {match_condition}
AND (?2 IS NULL OR frames.timestamp >= ?2)
AND (?3 IS NULL OR frames.timestamp <= ?3)
AND (?4 IS NULL OR ocr_text.app_name LIKE '%' || ?4 || '%')
AND (?5 IS NULL OR ocr_text.window_name LIKE '%' || ?5 || '%')
AND (?6 IS NULL OR LENGTH(ocr_text.text) >= ?6)
AND (?7 IS NULL OR LENGTH(ocr_text.text) <= ?7)
AND (?8 IS NULL OR frames.name LIKE '%' || ?8 || '%' COLLATE NOCASE)
AND (?6 IS NULL OR frames.name LIKE '%' || ?6 || '%' COLLATE NOCASE)
AND (?7 IS NULL OR LENGTH(ocr_text.text) >= ?7)
AND (?8 IS NULL OR LENGTH(ocr_text.text) <= ?8)
AND ocr_text.text != 'No text found'
"#,
if query.is_empty() {
table = if query.is_empty() {
"ocr_text"
} else {
"ocr_text_fts JOIN ocr_text ON ocr_text_fts.frame_id = ocr_text.frame_id"
},
match_condition = if query.is_empty() {
"1=1"
} else {
"ocr_text_fts MATCH ?1"
Expand All @@ -982,17 +1000,22 @@ impl DatabaseManager {
ContentType::Audio => {
format!(
r#"
SELECT COUNT(DISTINCT audio_transcriptions.audio_chunk_id || '_' || COALESCE(audio_transcriptions.start_time, '') || '_' || COALESCE(audio_transcriptions.end_time, ''))
FROM audio_transcriptions_fts
JOIN audio_transcriptions ON audio_transcriptions_fts.audio_chunk_id = audio_transcriptions.audio_chunk_id
WHERE {}
SELECT COUNT(DISTINCT audio_transcriptions.id)
FROM {table}
WHERE {match_condition}
AND (?2 IS NULL OR audio_transcriptions.timestamp >= ?2)
AND (?3 IS NULL OR audio_transcriptions.timestamp <= ?3)
AND (?6 IS NULL OR LENGTH(audio_transcriptions.transcription) >= ?6)
AND (?7 IS NULL OR LENGTH(audio_transcriptions.transcription) <= ?7)
AND (json_array_length(?8) = 0 OR audio_transcriptions.speaker_id IN (SELECT value FROM json_each(?8)))
AND audio_transcriptions.transcription != ''
AND (json_array_length(?9) = 0 OR audio_transcriptions.speaker_id IN (SELECT value FROM json_each(?9)))
"#,
if query.is_empty() {
table = if query.is_empty() {
"audio_transcriptions"
} else {
"audio_transcriptions_fts JOIN audio_transcriptions ON audio_transcriptions_fts.audio_chunk_id = audio_transcriptions.audio_chunk_id"
},
match_condition = if query.is_empty() {
"1=1"
} else {
"audio_transcriptions_fts MATCH ?1"
Expand All @@ -1003,17 +1026,22 @@ impl DatabaseManager {
format!(
r#"
SELECT COUNT(DISTINCT ui_monitoring.id)
FROM ui_monitoring_fts
JOIN ui_monitoring ON ui_monitoring_fts.ui_id = ui_monitoring.id
WHERE {}
FROM {table}
WHERE {match_condition}
AND (?2 IS NULL OR ui_monitoring.timestamp >= ?2)
AND (?3 IS NULL OR ui_monitoring.timestamp <= ?3)
AND (?4 IS NULL OR ui_monitoring.app LIKE '%' || ?4 || '%')
AND (?5 IS NULL OR ui_monitoring.window LIKE '%' || ?5 || '%')
AND (?6 IS NULL OR LENGTH(ui_monitoring.text_output) >= ?6)
AND (?7 IS NULL OR LENGTH(ui_monitoring.text_output) <= ?7)
AND (?7 IS NULL OR LENGTH(ui_monitoring.text_output) >= ?7)
AND (?8 IS NULL OR LENGTH(ui_monitoring.text_output) <= ?8)
AND 1=1 -- placeholder for ?9 (json speaker_ids, not used for ui)
"#,
if query.is_empty() {
table = if query.is_empty() {
"ui_monitoring"
} else {
"ui_monitoring_fts JOIN ui_monitoring ON ui_monitoring_fts.ui_id = ui_monitoring.id"
},
match_condition = if query.is_empty() {
"1=1"
} else {
"ui_monitoring_fts MATCH ?1"
Expand All @@ -1024,70 +1052,70 @@ impl DatabaseManager {
format!(
r#"
SELECT COUNT(*) FROM (
-- OCR part
SELECT DISTINCT frames.id
FROM {}
FROM {ocr_table}
JOIN frames ON ocr_text.frame_id = frames.id
WHERE {}
WHERE {ocr_match}
AND (?2 IS NULL OR frames.timestamp >= ?2)
AND (?3 IS NULL OR frames.timestamp <= ?3)
AND (?4 IS NULL OR ocr_text.app_name LIKE '%' || ?4 || '%')
AND (?5 IS NULL OR ocr_text.window_name LIKE '%' || ?5 || '%')
AND (?6 IS NULL OR LENGTH(ocr_text.text) >= ?6)
AND (?7 IS NULL OR LENGTH(ocr_text.text) <= ?7)
AND (?8 IS NULL OR frames.name LIKE '%' || ?8 || '%' COLLATE NOCASE)
AND (?6 IS NULL OR frames.name LIKE '%' || ?6 || '%' COLLATE NOCASE)
AND (?7 IS NULL OR LENGTH(ocr_text.text) >= ?7)
AND (?8 IS NULL OR LENGTH(ocr_text.text) <= ?8)
AND ocr_text.text != 'No text found'
UNION ALL
-- Audio part
SELECT DISTINCT audio_transcriptions.id
FROM {}
WHERE {}
FROM {audio_table}
WHERE {audio_match}
AND (?2 IS NULL OR audio_transcriptions.timestamp >= ?2)
AND (?3 IS NULL OR audio_transcriptions.timestamp <= ?3)
AND (?6 IS NULL OR LENGTH(audio_transcriptions.transcription) >= ?6)
AND (?7 IS NULL OR LENGTH(audio_transcriptions.transcription) <= ?7)
AND audio_transcriptions.transcription != ''
AND (json_array_length(?8) = 0 OR audio_transcriptions.speaker_id IN (SELECT value FROM json_each(?8)))
AND (json_array_length(?9) = 0 OR audio_transcriptions.speaker_id IN (SELECT value FROM json_each(?9)))
UNION ALL
-- UI part
SELECT DISTINCT ui_monitoring.id
FROM {}
WHERE {}
FROM {ui_table}
WHERE {ui_match}
AND (?2 IS NULL OR ui_monitoring.timestamp >= ?2)
AND (?3 IS NULL OR ui_monitoring.timestamp <= ?3)
AND (?4 IS NULL OR ui_monitoring.app LIKE '%' || ?4 || '%')
AND (?5 IS NULL OR ui_monitoring.window LIKE '%' || ?5 || '%')
AND (?6 IS NULL OR LENGTH(ui_monitoring.text_output) >= ?6)
AND (?7 IS NULL OR LENGTH(ui_monitoring.text_output) <= ?7)
AND ui_monitoring.text_output != ''
)"#,
if query.is_empty() {
AND (?7 IS NULL OR LENGTH(ui_monitoring.text_output) >= ?7)
AND (?8 IS NULL OR LENGTH(ui_monitoring.text_output) <= ?8)
AND 1=1 -- placeholder for ?9
)
"#,
ocr_table = if query.is_empty() {
"ocr_text"
} else {
"ocr_text_fts JOIN ocr_text ON ocr_text_fts.frame_id = ocr_text.frame_id"
},
if query.is_empty() {
ocr_match = if query.is_empty() {
"1=1"
} else {
"ocr_text_fts MATCH ?1"
},
if query.is_empty() {
audio_table = if query.is_empty() {
"audio_transcriptions"
} else {
"audio_transcriptions_fts JOIN audio_transcriptions ON audio_transcriptions_fts.rowid = audio_transcriptions.id"
"audio_transcriptions_fts JOIN audio_transcriptions ON audio_transcriptions_fts.audio_chunk_id = audio_transcriptions.audio_chunk_id"
},
if query.is_empty() {
audio_match = if query.is_empty() {
"1=1"
} else {
"audio_transcriptions_fts MATCH ?1"
},
if query.is_empty() {
ui_table = if query.is_empty() {
"ui_monitoring"
} else {
"ui_monitoring_fts JOIN ui_monitoring ON ui_monitoring_fts.ui_id = ui_monitoring.id"
},
if query.is_empty() {
ui_match = if query.is_empty() {
"1=1"
} else {
"ui_monitoring_fts MATCH ?1"
Expand All @@ -1098,18 +1126,17 @@ impl DatabaseManager {
};

let count: (i64,) = sqlx::query_as(&sql)
.bind(query)
.bind(start_time)
.bind(end_time)
.bind(app_name)
.bind(window_name)
.bind(frame_name)
.bind(min_length.map(|len| len as i64))
.bind(max_length.map(|len| len as i64))
.bind(json_array)
.bind(query) // ?1
.bind(start_time) // ?2
.bind(end_time) // ?3
.bind(app_name) // ?4
.bind(window_name) // ?5
.bind(frame_name) // ?6
.bind(min_length.map(|l| l as i64)) // ?7
.bind(max_length.map(|l| l as i64)) // ?8
.bind(json_array) // ?9
.fetch_one(&self.pool)
.await?;

Ok(count.0 as usize)
}

Expand Down Expand Up @@ -1362,113 +1389,6 @@ impl DatabaseManager {
))
}

// ! TODO: atm not sure what will happen if we have multiple transcriptions, OCR, etc for same timestamp (multi monitor, multi audio device...)
// ! just merging
// ! the offset is not quite right but we try to index around frames which is the central human experience and most important sense
// ! there should be a way to properly sync audio and video indexes
//pub async fn find_video_chunks(
// &self,
// start: DateTime<Utc>,
// end: DateTime<Utc>,
//) -> Result<TimeSeriesChunk, SqlxError> {
// // First get all frames in time range with their OCR data
// let frames_query = r#"
// SELECT
// f.id,
// f.timestamp,
// f.offset_index,
// ot.text,
// ot.app_name,
// ot.window_name,
// vc.device_name as screen_device,
// vc.file_path as video_path
// FROM frames f
// JOIN video_chunks vc ON f.video_chunk_id = vc.id
// LEFT JOIN ocr_text ot ON f.id = ot.frame_id
// WHERE f.timestamp >= ?1 AND f.timestamp <= ?2
// ORDER BY f.timestamp DESC, f.offset_index DESC
// "#;

// // Then get audio data that overlaps with these frames
// let audio_query = r#"
// SELECT
// at.timestamp,
// at.transcription,
// at.device as audio_device,
// at.is_input_device,
// ac.file_path as audio_path
// FROM audio_transcriptions at
// JOIN audio_chunks ac ON at.audio_chunk_id = ac.id
// WHERE at.timestamp >= ?1 AND at.timestamp <= ?2
// ORDER BY at.timestamp DESC
// "#;

// // Execute both queries
// let (frame_rows, audio_rows) = tokio::try_join!(
// sqlx::query(frames_query)
// .bind(start)
// .bind(end)
// .fetch_all(&self.pool),
// sqlx::query(audio_query)
// .bind(start)
// .bind(end)
// .fetch_all(&self.pool)
// )?;

// // Process into structured data
// let mut frames_map: BTreeMap<(DateTime<Utc>, i64), FrameData> = BTreeMap::new();

// // Process frame/OCR data
// for row in frame_rows {
// let timestamp: DateTime<Utc> = row.get("timestamp");
// let offset_index: i64 = row.get("offset_index");
// let key = (timestamp, offset_index);

// let frame_data = frames_map.entry(key).or_insert_with(|| FrameData {
// frame_id: row.get("id"),
// timestamp,
// offset_index,
// ocr_entries: Vec::new(),
// audio_entries: Vec::new(),
// });

// if let Ok(text) = row.try_get::<String, _>("text") {
// frame_data.ocr_entries.push(OCREntry {
// text,
// app_name: row.get("app_name"),
// window_name: row.get("window_name"),
// device_name: row.get("screen_device"),
// video_file_path: row.get("video_path"),
// });
// }
// }

// // Process audio data
// for row in audio_rows {
// let timestamp: DateTime<Utc> = row.get("timestamp");

// // Find the closest frame
// if let Some((&key, _)) = frames_map.range(..(timestamp, i64::MAX)).next_back() {
// if let Some(frame_data) = frames_map.get_mut(&key) {
// frame_data.audio_entries.push(AudioEntry {
// transcription: row.get("transcription"),
// device_name: row.get("audio_device"),
// is_input: row.get("is_input_device"),
// audio_file_path: row.get("audio_path"),
// // duration_secs: row.get("duration_secs"),
// duration_secs: 0.0, // TODO
// });
// }
// }
// }

// Ok(TimeSeriesChunk {
// frames: frames_map.into_values().rev().collect(),
// start_time: start,
// end_time: end,
// })
//}

pub async fn find_video_chunks(
&self,
start: DateTime<Utc>,
Expand Down
5 changes: 2 additions & 3 deletions screenpipe-server/src/video.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use chrono::Utc;
use crossbeam::queue::ArrayQueue;
use image::ImageFormat::{self};
use log::{debug, error};
use log::{info, warn};
use screenpipe_core::{find_ffmpeg_path, Language};
Expand Down Expand Up @@ -193,7 +192,7 @@ pub async fn start_ffmpeg_process(output_file: &str, fps: f64) -> Result<Child,
"-f",
"image2pipe",
"-vcodec",
"mjpeg",
"png", // consider using mjpeg and change to encode to jpeg below too
"-r",
&fps_str,
"-i",
Expand Down Expand Up @@ -345,7 +344,7 @@ fn encode_frame(frame: &CaptureResult) -> Vec<u8> {
let mut buffer = Vec::new();
frame
.image
.write_to(&mut std::io::Cursor::new(&mut buffer), ImageFormat::Png)
.write_to(&mut std::io::Cursor::new(&mut buffer), image::ImageFormat::Png)
.expect("Failed to encode frame");
buffer
}
Expand Down
Loading

0 comments on commit b0eaf36

Please sign in to comment.