Skip to content

Commit

Permalink
Add batch processor (#25)
Browse files Browse the repository at this point in the history
* Added a simple batch processor to skip cross-platform processor for now

* Fixed the way batch processor separate expired items from the internal vector

---------

Co-authored-by: Roman Glushko <[email protected]>
  • Loading branch information
roma-glushko and Roman Glushko authored Dec 27, 2023
1 parent aaa27c6 commit 794159b
Show file tree
Hide file tree
Showing 7 changed files with 879 additions and 766 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "notifykit_lib"
version = "0.0.4"
version = "0.0.5"
edition = "2021"
license = "A toolkit for building applications watching filesystem changes"
homepage = "https://github.com/roma-glushko/notifykit"
Expand Down
1,289 changes: 671 additions & 618 deletions pdm.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "notifykit"
version = "0.0.4"
version = "0.0.5"
description = "A modern efficient Python toolkit for building applications that need to watch filesystem changes"
authors = [
{name = "Roman Glushko", email = "[email protected]"},
Expand Down
1 change: 1 addition & 0 deletions src/file_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub struct FileCache {
roots: Vec<(PathBuf, RecursiveMode)>,
}

#[allow(unused)]
impl FileCache {
/// Construct an empty cache.
pub fn new() -> Self {
Expand Down
323 changes: 191 additions & 132 deletions src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,15 @@ impl FileEventQueue {
}
}

pub(crate) trait EventProcessor {
fn get_events(&mut self) -> Vec<RawEvent>;
fn get_errors(&mut self) -> Vec<NotifyError>;
fn add_event(&mut self, event: NotifyEvent);
fn add_error(&mut self, error: NotifyError);
}

#[derive(Debug)]
pub struct EventProcessor<T> {
pub struct CrossPlatformEventProcessor<T> {
events_by_file: HashMap<PathBuf, FileEventQueue>,
file_cache: T,
rename_event: Option<(RawEvent, Option<FileId>)>,
Expand All @@ -98,7 +105,9 @@ pub struct EventProcessor<T> {
buffering_time: Duration,
}

impl<T: FileIdCache> EventProcessor<T> {
// TODO: allow to specify cross-platform event processor when it's bugs are fixed
#[allow(unused)]
impl<T: FileIdCache> CrossPlatformEventProcessor<T> {
pub fn new(file_cache: T, buffering_time: Duration) -> Self {
Self {
events_by_file: HashMap::new(),
Expand All @@ -110,136 +119,6 @@ impl<T: FileIdCache> EventProcessor<T> {
}
}

pub fn get_events(&mut self) -> Vec<RawEvent> {
let now = Instant::now();
let mut events_to_return = Vec::with_capacity(self.events_by_file.len());
let mut remaining_events = HashMap::with_capacity(self.events_by_file.len());

if let Some(rescan_event) = self.rescan_event.take() {
if now.saturating_duration_since(rescan_event.time) >= self.buffering_time {
// log::trace!("debounced event: {rescan_event:?}");

events_to_return.push(rescan_event);
} else {
self.rescan_event = Some(rescan_event);
}
}

// TODO: perfect fit for drain_filter https://github.com/rust-lang/rust/issues/59618
for (path, mut events) in self.events_by_file.drain() {
let mut kind_index = HashMap::new();

while let Some(event) = events.events.pop_front() {
if now.saturating_duration_since(event.time) >= self.buffering_time {
// remove previous event of the same kind
if let Some(idx) = kind_index.get(&event.kind).copied() {
events_to_return.remove(idx);

kind_index.values_mut().for_each(|i| {
if *i > idx {
*i -= 1
}
})
}

kind_index.insert(event.kind, events_to_return.len());

events_to_return.push(event);
} else {
events.events.push_front(event);
break;
}
}

if !events.events.is_empty() {
remaining_events.insert(path, events);
}
}

self.events_by_file = remaining_events;

// order events for different files chronologically, but keep the order of events for the same file
events_to_return.sort_by(|event_a, event_b| {
// use the last path because rename events are emitted for the target path
if event_a.paths.last() == event_b.paths.last() {
std::cmp::Ordering::Equal
} else {
event_a.time.cmp(&event_b.time)
}
});

events_to_return
}

/// Returns all currently stored errors
pub fn get_errors(&mut self) -> Vec<NotifyError> {
let mut v = Vec::new();
std::mem::swap(&mut v, &mut self.errors);
v
}

/// Add an error entry to re-send later on
pub fn add_error(&mut self, error: NotifyError) {
self.errors.push(error);
}

/// Add new event to debouncer cache
pub fn add_event(&mut self, event: NotifyEvent) {
// log::trace!("raw event: {event:?}");

if event.need_rescan() {
self.file_cache.rescan();
self.rescan_event = Some(event.into());
return;
}

let path = &event.paths[0];

match &event.kind {
EventKind::Create(_) => {
self.file_cache.add_path(path);

self.push_event(event, Instant::now());
}
EventKind::Modify(ModifyKind::Name(rename_mode)) => {
match rename_mode {
RenameMode::Any => {
if event.paths[0].exists() {
self.handle_rename_to(event);
} else {
self.handle_rename_from(event);
}
}
RenameMode::To => {
self.handle_rename_to(event);
}
RenameMode::From => {
self.handle_rename_from(event);
}
RenameMode::Both => {
// ignore and handle `To` and `From` events instead
}
RenameMode::Other => {
// unused
}
}
}
EventKind::Remove(_) => {
self.push_remove_event(event, Instant::now());
}
EventKind::Other => {
// ignore meta events
}
_ => {
if self.file_cache.get_file_id(path).is_none() {
self.file_cache.add_path(path);
}

self.push_event(event, Instant::now());
}
}
}

fn handle_rename_from(&mut self, event: NotifyEvent) {
let time = Instant::now();
let path = &event.paths[0];
Expand Down Expand Up @@ -405,3 +284,183 @@ impl<T: FileIdCache> EventProcessor<T> {
}
}
}

impl<T: FileIdCache> EventProcessor for CrossPlatformEventProcessor<T> {
fn get_events(&mut self) -> Vec<RawEvent> {
let now = Instant::now();
let mut events_to_return = Vec::with_capacity(self.events_by_file.len());
let mut remaining_events = HashMap::with_capacity(self.events_by_file.len());

if let Some(rescan_event) = self.rescan_event.take() {
if now.saturating_duration_since(rescan_event.time) >= self.buffering_time {
// log::trace!("debounced event: {rescan_event:?}");

events_to_return.push(rescan_event);
} else {
self.rescan_event = Some(rescan_event);
}
}

// TODO: perfect fit for drain_filter https://github.com/rust-lang/rust/issues/59618
for (path, mut events) in self.events_by_file.drain() {
let mut kind_index = HashMap::new();

while let Some(event) = events.events.pop_front() {
if now.saturating_duration_since(event.time) >= self.buffering_time {
// remove previous event of the same kind
if let Some(idx) = kind_index.get(&event.kind).copied() {
events_to_return.remove(idx);

kind_index.values_mut().for_each(|i| {
if *i > idx {
*i -= 1
}
})
}

kind_index.insert(event.kind, events_to_return.len());

events_to_return.push(event);
} else {
events.events.push_front(event);
break;
}
}

if !events.events.is_empty() {
remaining_events.insert(path, events);
}
}

self.events_by_file = remaining_events;

// order events for different files chronologically, but keep the order of events for the same file
events_to_return.sort_by(|event_a, event_b| {
// use the last path because rename events are emitted for the target path
if event_a.paths.last() == event_b.paths.last() {
std::cmp::Ordering::Equal
} else {
event_a.time.cmp(&event_b.time)
}
});

events_to_return
}

/// Returns all currently stored errors
fn get_errors(&mut self) -> Vec<NotifyError> {
let mut v = Vec::new();
std::mem::swap(&mut v, &mut self.errors);
v
}

/// Add new event to debouncer cache
fn add_event(&mut self, event: NotifyEvent) {
// log::trace!("raw event: {event:?}");

if event.need_rescan() {
self.file_cache.rescan();
self.rescan_event = Some(event.into());
return;
}

let path = &event.paths[0];

match &event.kind {
EventKind::Create(_) => {
self.file_cache.add_path(path);

self.push_event(event, Instant::now());
}
EventKind::Modify(ModifyKind::Name(rename_mode)) => {
match rename_mode {
RenameMode::Any => {
if event.paths[0].exists() {
self.handle_rename_to(event);
} else {
self.handle_rename_from(event);
}
}
RenameMode::To => {
self.handle_rename_to(event);
}
RenameMode::From => {
self.handle_rename_from(event);
}
RenameMode::Both => {
// ignore and handle `To` and `From` events instead
}
RenameMode::Other => {
// unused
}
}
}
EventKind::Remove(_) => {
self.push_remove_event(event, Instant::now());
}
EventKind::Other => {
// ignore meta events
}
_ => {
if self.file_cache.get_file_id(path).is_none() {
self.file_cache.add_path(path);
}

self.push_event(event, Instant::now());
}
}
}

/// Add an error entry to re-send later on
fn add_error(&mut self, error: NotifyError) {
self.errors.push(error);
}
}

#[derive(Debug)]
pub struct BatchProcessor {
buffering_time: Duration,
events: Vec<RawEvent>,
errors: Vec<NotifyError>,
}

/// A simple event processor that buffers incoming events for a given amount
/// of time without any modification to the underlying events
impl BatchProcessor {
pub fn new(buffering_time: Duration) -> Self {
Self {
events: Vec::new(),
errors: Vec::new(),
buffering_time,
}
}
}

impl EventProcessor for BatchProcessor {
fn get_events(&mut self) -> Vec<RawEvent> {
let now = Instant::now();

// Assuming expired items are contiguous and at the beginning of the vector
let first_expired_index = self
.events
.iter()
.position(|event| now.saturating_duration_since(event.time) >= self.buffering_time)
.unwrap_or(self.events.len());

self.events.drain(0..first_expired_index).collect()
}

fn get_errors(&mut self) -> Vec<NotifyError> {
let mut v = Vec::new();
std::mem::swap(&mut v, &mut self.errors);
v
}

fn add_event(&mut self, event: NotifyEvent) {
self.events.push(RawEvent::new(event, Instant::now()));
}

fn add_error(&mut self, error: NotifyError) {
self.errors.push(error);
}
}
Loading

0 comments on commit 794159b

Please sign in to comment.