Skip to content

Commit

Permalink
feat: 1.0.17
Browse files Browse the repository at this point in the history
- Support for multiplexing a single WebSocket connection across multiple Channels.
- Enhanced the logical correctness of Pty-related interfaces.
- Enabled login shell mode for Pty during login.
- Optimized the timeout logic for Session and Channel.
  • Loading branch information
motalin committed May 16, 2024
1 parent a1f3011 commit 7d392fd
Show file tree
Hide file tree
Showing 21 changed files with 2,298 additions and 1,971 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

All notable changes to this project will be documented in this file.

## [1.0.17] - 2024-05-15

## Changed

- Support for multiplexing a single WebSocket connection across multiple Channels.
- Enhanced the logical correctness of Pty-related interfaces.
- Enabled login shell mode for Pty during login.
- Optimized the timeout logic for Session and Channel.

## [1.0.16] - 2024-05-09

### Changed
Expand Down
53 changes: 3 additions & 50 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tat_agent"
version = "1.0.16"
version = "1.0.17"
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand All @@ -12,7 +12,6 @@ reqwest = { version = "0.10", features = ["blocking", "json", "stream"] }
websocket = "0.26.2"
bytes = "0.5.6"
futures = "0.3.28"
tokio-test = "0.4.2"
serde_json = "1.0.57"
serde_bytes = "0.11.7"
serde = { version = "1.0.115", features = ["derive"] }
Expand Down
1 change: 1 addition & 0 deletions install/tat_agent.service
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ ExecStartPost=/bin/sleep 0.2
KillMode=process
Restart=always
RestartSec=1s
OOMPolicy=continue

[Install]
WantedBy=multi-user.target
90 changes: 89 additions & 1 deletion src/common/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,101 @@
use std::ffi::{OsStr, OsString};
#[cfg(windows)]
use std::os::windows::ffi::{OsStrExt, OsStringExt};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;

use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use rsa::pkcs1::{EncodeRsaPrivateKey, EncodeRsaPublicKey};
use rsa::pkcs8::LineEnding;
use rsa::{RsaPrivateKey, RsaPublicKey};
use std::time::UNIX_EPOCH;
use std::time::{Duration, UNIX_EPOCH};
use tokio::sync::oneshot::{self, Receiver, Sender};
use tokio::time::{delay_until, Instant};

pub struct Stopper(Mutex<Option<Sender<()>>>, Mutex<Option<Receiver<()>>>);

impl Stopper {
pub fn new() -> Self {
let (tx, rx) = oneshot::channel();
Self(Mutex::new(Some(tx)), Mutex::new(Some(rx)))
}

pub fn stop(&self) {
self.0.lock().expect("lock failed").take();
}

pub fn get_receiver(&self) -> Option<Receiver<()>> {
self.1.lock().expect("lock failed").take()
}
}

pub struct Timer {
last_time: Mutex<Instant>,
interval: u64,
freeze_count: AtomicUsize,
}

impl Timer {
pub fn new(interval: u64) -> Self {
Self {
last_time: Mutex::new(Instant::now()),
interval,
freeze_count: AtomicUsize::new(0),
}
}

pub fn freeze(&self) {
self.freeze_count.fetch_add(1, Ordering::SeqCst);
}

pub fn unfreeze(&self) {
self.refresh();
self.freeze_count.fetch_sub(1, Ordering::SeqCst);
}

pub fn refresh(&self) {
self.refresh_with(Instant::now());
}

pub async fn timeout(&self) {
while !self.is_timeout() {
delay_until(self.may_timeout_at()).await
}
}

pub fn is_timeout_refresh(&self, inst: Instant) -> bool {
if self.is_timeout_with(inst) {
return true;
}
self.refresh_with(inst);
false
}

fn refresh_with(&self, inst: Instant) {
*self.last_time.lock().expect("lock failed") = inst;
}

fn is_timeout_with(&self, inst: Instant) -> bool {
if self.freeze_count.load(Ordering::SeqCst) != 0 {
self.refresh();
return false;
}
inst.elapsed().as_secs() >= self.interval
}

fn is_timeout(&self) -> bool {
self.is_timeout_with(self.last_time())
}

fn may_timeout_at(&self) -> Instant {
self.last_time() + Duration::from_secs(self.interval)
}

fn last_time(&self) -> Instant {
*self.last_time.lock().expect("lock failed")
}
}

pub fn generate_rsa_key() -> Option<(String, String)> {
let private_key = RsaPrivateKey::new(&mut rand::thread_rng(), 2048).ok()?;
Expand Down
Loading

0 comments on commit 7d392fd

Please sign in to comment.