diff --git a/screenpipe-server/Cargo.toml b/screenpipe-server/Cargo.toml index 1b62caf97c..b82aac2dbe 100644 --- a/screenpipe-server/Cargo.toml +++ b/screenpipe-server/Cargo.toml @@ -170,4 +170,5 @@ nix = { version = "0.29", features = ["signal"] } windows = { version = "0.58", features = [ "Win32_System_Threading", "Win32_Foundation", + "Win32_Networking_WinSock", ] } diff --git a/screenpipe-server/src/auto_destruct.rs b/screenpipe-server/src/auto_destruct.rs index 553f9c82b0..c479396961 100644 --- a/screenpipe-server/src/auto_destruct.rs +++ b/screenpipe-server/src/auto_destruct.rs @@ -2,89 +2,276 @@ use std::process::Command; use std::time::Duration; use tokio::time::sleep; #[cfg(target_os = "windows")] -use tracing::debug; -use tracing::info; -#[cfg(target_os = "windows")] use windows::Win32::Foundation::{CloseHandle, HANDLE, STILL_ACTIVE}; #[cfg(target_os = "windows")] use windows::Win32::System::Threading::{ - GetExitCodeProcess, OpenProcess, PROCESS_QUERY_INFORMATION, + CreateJobObjectW, AssignProcessToJobObject, GetExitCodeProcess, OpenProcess, TerminateProcess, + PROCESS_QUERY_INFORMATION, PROCESS_TERMINATE, PROCESS_ALL_ACCESS, }; +use tracing::{debug, error, info, warn}; +use std::io::Error as IoError; +use thiserror::Error; + +#[cfg(target_os = "windows")] +#[derive(Error, Debug)] +pub enum ProcessError { + #[error("Failed to open process: {0}")] + OpenProcess(IoError), + #[error("Failed to terminate process: {0}")] + TerminateProcess(IoError), + #[error("Process became unresponsive")] + Unresponsive, + #[error("Port is still in use")] + PortInUse, + #[error("Failed to kill port: {0}")] + KillPort(String), +} + +#[cfg(target_os = "windows")] +#[derive(Debug)] +pub struct ProcessStatus { + pub terminated: bool, + pub exit_code: Option, + pub port_released: bool, +} + +#[cfg(target_os = "windows")] +pub struct ProcessManager { + pid: u32, + port: u16, + job_handle: HANDLE, +} #[cfg(target_os = "windows")] -fn is_process_alive(pid: u32) -> bool { - unsafe { - let process: HANDLE = match OpenProcess(PROCESS_QUERY_INFORMATION, false, pid) { - Ok(handle) => handle, - Err(e) => { - debug!("Failed to open process with PID ({}): {:?}", pid, e); - return false; +impl ProcessManager { + pub fn new(pid: u32, port: u16) -> windows::core::Result { + unsafe { + let job_handle = CreateJobObjectW(None, None)?; + let process_handle = OpenProcess(PROCESS_ALL_ACCESS, false, pid)?; + AssignProcessToJobObject(job_handle, process_handle)?; + CloseHandle(process_handle).expect("Failed to close process handle"); + Ok(Self { + pid, + port, + job_handle, + }) + } + } + + fn is_process_alive(&self) -> Result { + unsafe { + let process = OpenProcess(PROCESS_QUERY_INFORMATION | PROCESS_TERMINATE, false, self.pid) + .map_err(|e| ProcessError::OpenProcess(IoError::from_raw_os_error(e.code().0)))?; + + if process.is_invalid() { + return Ok(false); } - }; - if process.is_invalid() { - return false; - } - let mut exit_code: u32 = 0; - let result = GetExitCodeProcess(process, &mut exit_code); - CloseHandle(process).expect("Failed to close process handle"); - if result.is_err() { - debug!("Failed to get exit code for process with PID ({})", pid); - return false; - } - exit_code == STILL_ACTIVE.0 as u32 + + let mut exit_code: u32 = 0; + GetExitCodeProcess(process, &mut exit_code).map_err(|_| ProcessError::Unresponsive)?; + CloseHandle(process).expect("Failed to close process handle"); + + Ok(exit_code == STILL_ACTIVE.0 as u32) + } } -} -pub async fn watch_pid(pid: u32) -> bool { - info!("starting to watch for app termination (pid: {})", pid); + fn get_port_pids(&self) -> Vec { + let netstat_output = Command::new("netstat") + .args(&["-ano"]) + .output() + .expect("failed to execute netstat"); - loop { - #[cfg(target_os = "windows")] - { - // Try Windows API first - if !is_process_alive(pid) { - debug!("Process ({}) not found via windows api", pid); - return true; + let output = String::from_utf8_lossy(&netstat_output.stdout); + let mut pids = Vec::new(); + + for line in output.lines() { + if line.contains(&format!(":{}", self.port)) { + if let Some(pid_str) = line.split_whitespace().last() { + if let Ok(pid) = pid_str.parse::() { + pids.push(pid); + } + } + } + } + pids + } + + fn kill_port_forcefully(&self) -> Result<(), ProcessError> { + // First try using killport crate + if let Err(e) = killport::kill(self.port as u16) { + warn!("killport failed: {:?}, trying alternative methods", e); + } + + // Then try killing all processes using the port + for pid in self.get_port_pids() { + unsafe { + if let Ok(process) = OpenProcess(PROCESS_TERMINATE, false, pid) { + let _ = TerminateProcess(process, 1); + CloseHandle(process).expect("Failed to close process handle"); + } + } + } + + // Verify port is released + if self.is_port_in_use() { + Err(ProcessError::KillPort(format!("Failed to release port {}", self.port))) + } else { + Ok(()) + } + } + + fn is_port_in_use(&self) -> bool { + let netstat_output = Command::new("netstat") + .args(&["-ano"]) + .output() + .expect("failed to execute netstat"); + + let output = String::from_utf8_lossy(&netstat_output.stdout); + output.contains(&format!(":{}", self.port)) + } + + async fn try_graceful_shutdown(&self) -> Result<(), ProcessError> { + for _ in 0..3 { + if !self.is_process_alive()? { + // Even if process is dead, ensure port is released + if !self.is_port_in_use() { + return Ok(()); + } + } + sleep(Duration::from_secs(1)).await; + } + Err(ProcessError::Unresponsive) + } + + pub async fn force_terminate(&self) -> Result<(), ProcessError> { + // First try normal process termination + unsafe { + let process = OpenProcess(PROCESS_TERMINATE, false, self.pid) + .map_err(|e| ProcessError::OpenProcess(IoError::from_raw_os_error(e.code().0)))?; + + if !process.is_invalid() { + TerminateProcess(process, 1) + .map_err(|e| ProcessError::TerminateProcess(IoError::from_raw_os_error(e.code().0)))?; + CloseHandle(process).expect("Failed to close process handle"); } + } + + // Then ensure port is released + if self.is_port_in_use() { + self.kill_port_forcefully()?; + } + + Ok(()) + } - // Fallback to Command approach - let pid_output = Command::new("tasklist") - .args(&["/FI", &format!("PID eq {}", pid), "/NH", "/FO", "CSV"]) - .output() - .expect("failed to check pid"); - - let app_output = Command::new("tasklist") - .args(&[ - "/FI", - "IMAGENAME eq screenpipe-app.exe", - "/NH", - "/FO", - "CSV", - ]) - .output() - .expect("failed to check app name"); - - let pid_alive = String::from_utf8_lossy(&pid_output.stdout).contains(&pid.to_string()); - let app_alive = !String::from_utf8_lossy(&app_output.stdout).is_empty(); - - info!("pid alive: {}, app alive: {}", pid_alive, app_alive); - - if !pid_alive || !app_alive { - return true; + pub async fn watch_and_cleanup(&self) -> Result { + info!("starting enhanced process monitoring (pid: {}, port: {})", self.pid, self.port); + + let mut consecutive_fails = 0; + let max_fails = 3; + + loop { + match self.is_process_alive() { + Ok(false) => { + debug!("Process ({}) not found via Windows API", self.pid); + break; + } + Ok(true) => { + consecutive_fails = 0; + } + Err(e) => { + consecutive_fails += 1; + error!("Error checking process status: {:?}", e); + if consecutive_fails >= max_fails { + return Err(ProcessError::Unresponsive); + } + } } + + sleep(Duration::from_secs(1)).await; + } + + // Try graceful shutdown first + if let Err(e) = self.try_graceful_shutdown().await { + warn!("Graceful shutdown failed: {:?}, attempting force terminate", e); + self.force_terminate().await?; + } + + // If port is still in use after process termination, force kill it + if self.is_port_in_use() { + warn!("Port {} still in use after process termination, force killing", self.port); + self.kill_port_forcefully()?; + } + + let mut port_released = false; + for _ in 0..5 { + if !self.is_port_in_use() { + port_released = true; + break; + } + sleep(Duration::from_secs(1)).await; + } + + if !port_released { + return Err(ProcessError::PortInUse); } - #[cfg(not(target_os = "windows"))] - { - let output = Command::new("ps") - .args(&["-p", &pid.to_string()]) - .output() - .expect("failed to execute process check command"); + Ok(ProcessStatus { + terminated: true, + exit_code: None, + port_released, + }) + } +} + +#[cfg(target_os = "windows")] +impl Drop for ProcessManager { + fn drop(&mut self) { + unsafe { + CloseHandle(self.job_handle).expect("Failed to close job handle"); + } + } +} - if !output.status.success() || output.stdout.is_empty() { - return true; +#[cfg(target_os = "windows")] +pub async fn watch_pid(pid: u32) -> bool { + let port = 3030; + match ProcessManager::new(pid, port) { + Ok(manager) => { + match manager.watch_and_cleanup().await { + Ok(status) => { + info!( + "Process terminated successfully. Port released: {}", + status.port_released + ); + true + } + Err(e) => { + error!("Failed to manage process: {:?}", e); + false + } } } + Err(e) => { + error!("Failed to create process manager: {:?}", e); + false + } + } +} + +#[cfg(not(target_os = "windows"))] +pub async fn watch_pid(pid: u32) -> bool { + info!("starting to watch for app termination (pid: {})", pid); + + loop { + let output = Command::new("ps") + .args(&["-p", &pid.to_string()]) + .output() + .expect("failed to execute process check command"); + + if !output.status.success() || output.stdout.is_empty() { + return true; + } sleep(Duration::from_secs(1)).await; }