Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to detect if the queue has/has not changed? #18

Open
MoonMachine1 opened this issue Oct 12, 2024 · 4 comments
Open

How to detect if the queue has/has not changed? #18

MoonMachine1 opened this issue Oct 12, 2024 · 4 comments

Comments

@MoonMachine1
Copy link

MoonMachine1 commented Oct 12, 2024

Great library here!

I have a use case where I need to monitor the queue/ringbuffer to see if values have changed.

Basically, I have a stream that publishes every 1s or so and processing is done within 5ms so I want to poll the queue/ringbuffer to see the values have changed, to alert downstream systems.

I see that the sequences is stored on the producer, checking if that value has changed also would work, but is not pub.

Curious to see what the best strategy for polling for changes in an outside process would be or if there is a supported strategy that I overlooked.

@nicholassm
Copy link
Owner

Hi @MoonMachine1,

You can get the sequence for the event being published as one of the arguments to the closure for event handling:

let processor = |e: &Event, sequence: Sequence, end_of_batch: bool| {
   // Your processing logic here. I.e. business logic.
};

let monitor = |e: &Event, sequence: Sequence, end_of_batch: bool| {
   // Your monitoring logic here.
};

let size = 64;
let mut producer = disruptor::build_single_producer(size, factory, BusySpin)
   .handle_events_with(processor)
   .and_then()
       .handle_events_with(monitor)
   .build();

The only way to "read" events is to register an event processor - like shown above. There's no way for another thread (outside the Disruptor) to see what happens on the ringbuffer.
Note, that the monitor closure will only read the event after the processor closure is done.
So this way you can measure the time it takes to process the event (assuming the producer publishes a timestamp of the publication time).
That should achieve what you're after - as all event processor read all published events.

Did that answer your questions? Or did I misunderstand?

Kind regards,
Nicholas

@MoonMachine1
Copy link
Author

Thank you @nicholassm!

That is a reasonable solution. I moved my monitoring logic out of the main handle into the secondary handle and it did improve throughput.

Maybe a feature request, is it possible to assign different WaitStrategy's to different handlers? I put several monitor threads on the same core which I am guessing has a similar side effect due to contention.

@MoonMachine1
Copy link
Author

Also side note for benchmarking, I just ported a tokio::spawn based version of my pipeline to disruptor and improved performance ~300-600ms on end-to-end completion time (which is great).

One thing I have noticed is that when it publish to a separate processor (from inside of the processor closure) there is 1 μs delay on picking up the first event, and then an additional ~10 μs of latency on subsequent events. ie event pick up by second processor (timed on arrival, with only tracing::info log): 1st ~1 μs, 2nd ~10 μs, 3rd ~20 μs.

I tested with batch_publish and publish and there is about the same amount of latency. Each thread is pinned to a different core.

I am new to rust, so there may be something in my code vs the library.

Example

struct EventData {
 some_Strings,
 some_u64s
}
...
    pub fn publish_events(&mut self, events: Vec<EventData>, id: u64, ts: QuantaInstant) {
        self.queue.batch_publish(events.len(), |iter| {
            let mut i = 0;

            for event in iter {
                event.data = events[i].clone();
                event.ts = ts;
                event.id = id;
                i += 1;
            }
        });
    }

    pub fn publish_event(&mut self, data: EventData, id: u64, ts: QuantaInstant) {
        self.queue.publish(|obj: &mut PumpFunSwapEventPayload| {
            obj.data = data;
            obj.ts = ts;
            obj.id = id;
        });
    }
    ```

@nicholassm
Copy link
Owner

Hi @MoonMachine1

First of all - apologies for late reply. Busy week.

Great, that the performance improved compared to using a pipeline using tokio::spawn!

Regarding assigning different `WaitStrategy´s to different event handlers: I never really thought of this as I've focused a lot on the lowest latency possible.
I do see how that can be beneficial if you want to do e.g. in-process monitoring with multiple "low-priority" threads on the same core.
However, any monitoring can affect your system and in particular the critical path. Would it be possible to monitor from outside the process instead - and ideally on the network? The latter has the added benefit that no latency can "hide" because you're seeing the true end-to-end latency including jitter from you network card, OS, etc.

That leads me to your last question - the latency you see when you publish from one Disruptor handler and onto another Disruptor (I read the scenario correctly, right?) There are so many potential sources of latency:

  1. Is your ringbuffer large enough so that your monitoring threads are never blocking your producers?
  2. What latency does the logging induce? Could it be that code that adds latency?
  3. I see that your EventData struct includes Strings. That implies allocation in the critical path when you clone them and that can cause jitter because your allocator is also being used by other threads in your application and therefore there's locking involved. Generally, try to avoid allocating (and freeing) on the critical path.

I hope that gives you a few ideas to pursue.

Just out of curiosity - may I ask what problem your application is trying to tackle? (And it's perfectly fine if you don't want to share).

Have a nice weekend,
Nicholas

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants