Skip to content

Commit

Permalink
First pass adding scheduler support
Browse files Browse the repository at this point in the history
* Only IO endpoints calling back into scheduler so far
* IO::Buffer totally stubbed out
* Organization of utility methods likely to change
  • Loading branch information
headius committed Sep 15, 2023
1 parent ce45348 commit 9a23a82
Show file tree
Hide file tree
Showing 7 changed files with 781 additions and 5 deletions.
157 changes: 157 additions & 0 deletions core/src/main/java/org/jruby/FiberScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package org.jruby;

import org.jruby.runtime.Helpers;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.util.io.OpenFile;

public class FiberScheduler {
// MRI: rb_fiber_scheduler_kernel_sleep
public static IRubyObject kernelSleep(ThreadContext context, IRubyObject scheduler, IRubyObject timeout) {
return scheduler.callMethod(context, "kernel_sleep", timeout);
}

// MRI: rb_fiber_scheduler_kernel_sleepv
public static IRubyObject kernelSleep(ThreadContext context, IRubyObject scheduler, IRubyObject[] args) {
return scheduler.callMethod(context, "kernel_sleep", args);
}

// MRI: rb_fiber_scheduler_process_wait
public static IRubyObject processWait(ThreadContext context, IRubyObject scheduler, long pid, int flags) {
return Helpers.invokeChecked(context, scheduler, "process_wait", context.runtime.newFixnum(pid), context.runtime.newFixnum(flags));
}

// MRI: rb_fiber_scheduler_block
public static IRubyObject block(ThreadContext context, IRubyObject scheduler, IRubyObject blocker, IRubyObject timeout) {
return Helpers.invoke(context, scheduler, "block", blocker, timeout);
}

// MRI: rb_fiber_scheduler_unblock
public static IRubyObject unblock(ThreadContext context, IRubyObject scheduler, IRubyObject blocker, IRubyObject fiber) {
return Helpers.invoke(context, scheduler, "unblock", blocker, fiber);
}

// MRI: rb_fiber_scheduler_io_wait
public static IRubyObject ioWait(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject events, IRubyObject timeout) {
return Helpers.invoke(context, scheduler, "io_wait", io, events, timeout);
}

// MRI: rb_fiber_scheduler_io_wait_readable
public static IRubyObject ioWaitReadable(ThreadContext context, IRubyObject scheduler, IRubyObject io) {
return ioWait(context, scheduler, io, context.runtime.newFixnum(OpenFile.READABLE), context.nil);
}

// MRI: rb_fiber_scheduler_io_wait_writable
public static IRubyObject ioWaitWritable(ThreadContext context, IRubyObject scheduler, IRubyObject io) {
return ioWait(context, scheduler, io, context.runtime.newFixnum(OpenFile.WRITABLE), context.nil);
}

// MRI: rb_fiber_scheduler_io_select
public static IRubyObject ioSelect(ThreadContext context, IRubyObject scheduler, IRubyObject readables, IRubyObject writables, IRubyObject exceptables, IRubyObject timeout) {
return ioSelectv(context, scheduler, readables, writables, exceptables, timeout);
}

// MRI: rb_fiber_scheduler_io_selectv
public static IRubyObject ioSelectv(ThreadContext context, IRubyObject scheduler, IRubyObject... args) {
return Helpers.invokeChecked(context, scheduler, "io_select", args);
}

// MRI: rb_fiber_scheduler_io_read
public static IRubyObject ioRead(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, int length) {
return Helpers.invokeChecked(context, scheduler, "io_read", io, buffer, context.runtime.newFixnum(length));
}

// MRI: rb_fiber_scheduler_io_pread
public static IRubyObject ioPRead(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, int length, int offset) {
return Helpers.invokeChecked(context, scheduler, "io_pread", io, buffer, context.runtime.newFixnum(length), context.runtime.newFixnum(offset));
}

// MRI: rb_fiber_scheduler_io_write
public static IRubyObject ioWrite(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, int length) {
return Helpers.invokeChecked(context, scheduler, "io_read", io, buffer, context.runtime.newFixnum(length));
}

// MRI: rb_fiber_scheduler_io_pwrite
public static IRubyObject ioPWrite(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, int length, int offset) {
return Helpers.invokeChecked(context, scheduler, "io_pwrite", io, buffer, context.runtime.newFixnum(length), context.runtime.newFixnum(offset));
}

// MRI: rb_fiber_scheduler_io_read_memory
public static IRubyObject ioReadMemory(ThreadContext context, IRubyObject scheduler, IRubyObject io, byte[] base, int size, int length) {
RubyIOBuffer buffer = RubyIOBuffer.newBuffer(context.runtime, base, size, RubyIOBuffer.LOCKED);

IRubyObject result = ioRead(context, scheduler, io, buffer, length);

buffer.unlock(context);
buffer.free(context);

return result;
}

// MRI: rb_fiber_scheduler_io_write_memory
public static IRubyObject ioWriteMemory(ThreadContext context, IRubyObject scheduler, IRubyObject io, byte[] base, int size, int length) {
RubyIOBuffer buffer = RubyIOBuffer.newBuffer(context.runtime, base, size, RubyIOBuffer.LOCKED | RubyIOBuffer.READONLY);

IRubyObject result = ioWrite(context, scheduler, io, buffer, length);

buffer.unlock(context);
buffer.free(context);

return result;
}

// MRI: rb_fiber_scheduler_io_close
public static IRubyObject ioClose(ThreadContext context, IRubyObject scheduler, IRubyObject io) {
return Helpers.invokeChecked(context, scheduler, "io_close", io);
}

// MRI: rb_fiber_scheduler_address_resolve
public static IRubyObject addressResolve(ThreadContext context, IRubyObject scheduler, IRubyObject hostname) {
return Helpers.invokeChecked(context, scheduler, "address_resolve", hostname);
}

// MRI: verify_scheduler
static void verifyInterface(IRubyObject scheduler) {
if (!scheduler.respondsTo("block")) {
throw scheduler.getRuntime().newArgumentError("Scheduler must implement #block");
}

if (!scheduler.respondsTo("unblock")) {
throw scheduler.getRuntime().newArgumentError("Scheduler must implement #unblock");
}

if (!scheduler.respondsTo("kernel_sleep")) {
throw scheduler.getRuntime().newArgumentError("Scheduler must implement #kernel_sleep");
}

if (!scheduler.respondsTo("io_wait")) {
throw scheduler.getRuntime().newArgumentError("Scheduler must implement #io_wait");
}
}

// MRI: rb_fiber_scheduler_close
public static IRubyObject close(ThreadContext context, IRubyObject scheduler) {
// VM_ASSERT(ruby_thread_has_gvl_p());

IRubyObject result;

result = Helpers.invokeChecked(context, scheduler, "scheduler_close");
if (result != RubyBasicObject.UNDEF) return result;

result = Helpers.invokeChecked(context, scheduler, "close");
if (result != RubyBasicObject.UNDEF) return result;

return context.nil;
}

// MRI: rb_fiber_scheduler_io_result_apply
public static int resultApply(ThreadContext context, IRubyObject result) {
int resultInt;
if (result instanceof RubyFixnum && (resultInt = RubyNumeric.num2int(result)) < 0) {
context.runtime.getPosix().errno(-resultInt);
return -1;
} else {
return RubyNumeric.num2int(result);
}
}
}
27 changes: 27 additions & 0 deletions core/src/main/java/org/jruby/Ruby.java
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ private Ruby(RubyInstanceConfig config) {
randomClass = null;
}
ioClass = RubyIO.createIOClass(this);
ioBufferClass = RubyIOBuffer.createIOBufferClass(this);

structClass = profile.allowClass("Struct") ? RubyStruct.createStructClass(this) : null;
bindingClass = profile.allowClass("Binding") ? RubyBinding.createBindingClass(this) : null;
Expand Down Expand Up @@ -1714,6 +1715,14 @@ private void initExceptions() {
ifAllowed("KeyError", (ruby) -> keyError = RubyKeyError.define(ruby, indexError));
ifAllowed("DomainError", (ruby) -> mathDomainError = RubyDomainError.define(ruby, argumentError, mathModule));

RubyClass runtimeError = this.runtimeError;
ObjectAllocator runtimeErrorAllocator = runtimeError.getAllocator();
bufferLockedError = ioBufferClass.defineClassUnder("LockedError", runtimeError, runtimeErrorAllocator);
ioBufferClass.defineClassUnder("AllocationError", runtimeError, runtimeErrorAllocator);
ioBufferClass.defineClassUnder("AccessError", runtimeError, runtimeErrorAllocator);
ioBufferClass.defineClassUnder("InvalidatedError", runtimeError, runtimeErrorAllocator);
ioBufferClass.defineClassUnder("MaskError", runtimeError, runtimeErrorAllocator);

initErrno();

initNativeException();
Expand Down Expand Up @@ -2174,6 +2183,10 @@ public RubyClass getIO() {
return ioClass;
}

public RubyClass getIOBuffer() {
return ioBufferClass;
}

public RubyClass getThread() {
return threadClass;
}
Expand Down Expand Up @@ -2485,6 +2498,10 @@ public RubyClass getInvalidByteSequenceError() {
return invalidByteSequenceError;
}

public RubyClass getBufferLockedError() {
return bufferLockedError;
}

@Deprecated
RubyRandom.RandomType defaultRand;

Expand Down Expand Up @@ -4246,6 +4263,10 @@ public RaiseException newInvalidByteSequenceError(String message) {
return newRaiseException(getInvalidByteSequenceError(), message);
}

public RaiseException newBufferLockedError(String message) {
return newRaiseException(getBufferLockedError(), message);
}

/**
* Construct a new RaiseException wrapping a new Ruby exception object appropriate to the given exception class.
*
Expand Down Expand Up @@ -5361,6 +5382,7 @@ public RubyClass getData() {
private final RubyClass fileClass;
private final RubyClass fileStatClass;
private final RubyClass ioClass;
private final RubyClass ioBufferClass;
private final RubyClass threadClass;
private final RubyClass threadGroupClass;
private final RubyClass continuationClass;
Expand Down Expand Up @@ -5422,6 +5444,11 @@ public RubyClass getData() {
private RubyClass keyError;
private RubyClass locationClass;
private RubyClass interruptedRegexpError;
private RubyClass bufferLockedError;
private RubyClass bufferAllocationError;
private RubyClass bufferAccessError;
private RubyClass bufferInvalidatedError;
private RubyClass bufferMaskError;

/**
* All the core modules we keep direct references to, for quick access and
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/org/jruby/RubyIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -3794,6 +3794,12 @@ public static RubyIO convertToIO(ThreadContext context, IRubyObject obj) {

@JRubyMethod(name = "select", required = 1, optional = 3, checkArity = false, meta = true)
public static IRubyObject select(ThreadContext context, IRubyObject recv, IRubyObject[] argv) {
IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent();
if (!scheduler.isNil()) {
IRubyObject result = FiberScheduler.ioSelectv(context, scheduler, argv);
if (result != UNDEF) return result;
}

int argc = Arity.checkArgumentCount(context, argv, 1, 4);

IRubyObject read, write, except, _timeout;
Expand Down
Loading

0 comments on commit 9a23a82

Please sign in to comment.