Skip to content

Commit

Permalink
Hook up IO scheduler callbacks correctly
Browse files Browse the repository at this point in the history
Working on failures in mri/fiber/test_io_buffer.rb
  • Loading branch information
headius committed Oct 20, 2023
1 parent 082842d commit 24615ae
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 23 deletions.
38 changes: 32 additions & 6 deletions core/src/main/java/org/jruby/FiberScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ public static IRubyObject ioSelectv(ThreadContext context, IRubyObject scheduler
}

// MRI: rb_fiber_scheduler_io_read
public static IRubyObject ioRead(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, int length) {
return Helpers.invokeChecked(context, scheduler, "io_read", io, buffer, context.runtime.newFixnum(length));
public static IRubyObject ioRead(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, int length, int offset) {
Ruby runtime = context.runtime;
return Helpers.invokeChecked(context, scheduler, "io_read", io, buffer, runtime.newFixnum(length), runtime.newFixnum(offset));
}

public static IRubyObject ioRead(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, RubyInteger length, RubyInteger offset) {
Expand All @@ -78,8 +79,9 @@ public static IRubyObject ioPRead(ThreadContext context, IRubyObject scheduler,
}

// MRI: rb_fiber_scheduler_io_write
public static IRubyObject ioWrite(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, int length) {
return Helpers.invokeChecked(context, scheduler, "io_read", io, buffer, context.runtime.newFixnum(length));
public static IRubyObject ioWrite(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, int length, int offset) {
Ruby runtime = context.runtime;
return Helpers.invokeChecked(context, scheduler, "io_read", io, buffer, runtime.newFixnum(length), runtime.newFixnum(offset));
}

public static IRubyObject ioWrite(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, RubyInteger length, RubyInteger offset) {
Expand All @@ -99,7 +101,19 @@ public static IRubyObject ioPWrite(ThreadContext context, IRubyObject scheduler,
public static IRubyObject ioReadMemory(ThreadContext context, IRubyObject scheduler, IRubyObject io, ByteBuffer base, int size, int length) {
RubyIOBuffer buffer = RubyIOBuffer.newBuffer(context.runtime, base, size, RubyIOBuffer.LOCKED);

IRubyObject result = ioRead(context, scheduler, io, buffer, length);
IRubyObject result = ioRead(context, scheduler, io, buffer, length, 0);

buffer.unlock(context);
buffer.free(context);

return result;
}

// MRI: rb_fiber_scheduler_io_pread_memory
public static IRubyObject ioPReadMemory(ThreadContext context, IRubyObject scheduler, IRubyObject io, ByteBuffer base, int from, int size, int length) {
RubyIOBuffer buffer = RubyIOBuffer.newBuffer(context.runtime, base, size, RubyIOBuffer.LOCKED);

IRubyObject result = ioPRead(context, scheduler, io, buffer, from, length, 0);

buffer.unlock(context);
buffer.free(context);
Expand All @@ -111,7 +125,19 @@ public static IRubyObject ioReadMemory(ThreadContext context, IRubyObject schedu
public static IRubyObject ioWriteMemory(ThreadContext context, IRubyObject scheduler, IRubyObject io, ByteBuffer base, int size, int length) {
RubyIOBuffer buffer = RubyIOBuffer.newBuffer(context.runtime, base, size, RubyIOBuffer.LOCKED | RubyIOBuffer.READONLY);

IRubyObject result = ioWrite(context, scheduler, io, buffer, length);
IRubyObject result = ioWrite(context, scheduler, io, buffer, length, 0);

buffer.unlock(context);
buffer.free(context);

return result;
}

// MRI: p
public static IRubyObject ioPWriteMemory(ThreadContext context, IRubyObject scheduler, IRubyObject io, ByteBuffer base, int from, int size, int length) {
RubyIOBuffer buffer = RubyIOBuffer.newBuffer(context.runtime, base, size, RubyIOBuffer.LOCKED | RubyIOBuffer.READONLY);

IRubyObject result = ioPWrite(context, scheduler, io, buffer, from, length, 0);

buffer.unlock(context);
buffer.free(context);
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/java/org/jruby/RubyIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -4816,7 +4816,7 @@ public IRubyObject pread(ThreadContext context, IRubyObject _length, IRubyObject
int read;

ByteBuffer wrap = ByteBuffer.wrap(strByteList.unsafeBytes(), strByteList.begin(), length);
read = OpenFile.preadInternal(context, fd, wrap, from, length);
read = OpenFile.preadInternal(context, fptr, fd, wrap, from, length);

string.setReadLength(read);

Expand Down Expand Up @@ -4851,7 +4851,7 @@ public IRubyObject pwrite(ThreadContext context, IRubyObject str, IRubyObject of

int written;

written = OpenFile.pwriteInternal(context, fd, wrap, off, length);
written = OpenFile.pwriteInternal(context, fptr, fd, wrap, off, length);

return context.runtime.newFixnum(written);
}
Expand Down Expand Up @@ -5202,7 +5202,7 @@ public final OpenFile MakeOpenFile() {
rb_io_fptr_finalize(runtime, openFile);
openFile = null;
}
openFile = new OpenFile(runtime.getNil());
openFile = new OpenFile(this, runtime.getNil());
runtime.addInternalFinalizer(openFile);
return openFile;
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/jruby/RubyIOBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -1916,7 +1916,7 @@ private static IRubyObject preadInternal(ThreadContext context, RubyIO io, ByteB
try {
base.position(offset);
base.limit(offset + size);
int result = OpenFile.preadInternal(context, fptr.fd(), base, from, size);
int result = OpenFile.preadInternal(context, fptr, fptr.fd(), base, from, size);
return FiberScheduler.result(context.runtime, result, fptr.errno());
} finally {
base.clear();
Expand Down Expand Up @@ -2114,7 +2114,7 @@ private static IRubyObject pwriteInternal(ThreadContext context, RubyIO io, Byte
try {
base.position(offset);
base.limit(offset + size);
int result = OpenFile.pwriteInternal(context, fptr.fd(), base, from, size);
int result = OpenFile.pwriteInternal(context, fptr, fptr.fd(), base, from, size);
return FiberScheduler.result(context.runtime, result, fptr.errno());
} finally {
base.clear();
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/jruby/ext/fiber/FiberQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public FiberQueue(Ruby runtime) {
this.queue = new ArrayBlockingQueue<>(1, false);
}

final RubyThread.Task<FiberQueue, FiberRequest> takeTask = new RubyThread.Task<FiberQueue, FiberRequest>() {
private static final RubyThread.Task<FiberQueue, FiberRequest> TAKE_TASK = new RubyThread.Task<FiberQueue, FiberRequest>() {
@Override
public FiberRequest run(ThreadContext context, FiberQueue queue) throws InterruptedException {
return queue.getQueueSafe().take();
Expand Down Expand Up @@ -89,7 +89,7 @@ public synchronized void checkShutdown() {

public FiberRequest pop(ThreadContext context) {
try {
return context.getThread().executeTaskBlocking(context, this, takeTask);
return context.getThread().executeTaskBlocking(context, this, TAKE_TASK);
} catch (InterruptedException ie) {
throw context.runtime.newThreadError("interrupted in FiberQueue.pop");
}
Expand Down
48 changes: 38 additions & 10 deletions core/src/main/java/org/jruby/util/io/OpenFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@
public class OpenFile implements Finalizable {

// RB_IO_FPTR_NEW, minus fields that Java already initializes the same way
public OpenFile(IRubyObject nil) {
public OpenFile(RubyIO io, IRubyObject nil) {
this.io = io;
runtime = nil.getRuntime();
writeconvAsciicompat = null;
writeconvPreEcopts = nil;
Expand Down Expand Up @@ -147,6 +148,7 @@ public static class Buffer {

public final Buffer wbuf = new Buffer(), rbuf = new Buffer(), cbuf = new Buffer();

public final RubyIO io;
public RubyIO tiedIOForWriting;

private boolean nonblock = false;
Expand Down Expand Up @@ -1474,15 +1476,19 @@ public static int readInternal(ThreadContext context, OpenFile fptr, ChannelFD f
}

// rb_io_buffer_read_internal
public static int readInternal(ThreadContext context, OpenFile fptr, ChannelFD fd, ByteBuffer bufBytes, int buf, int count) {
public static int readInternal(ThreadContext context, OpenFile fptr, ChannelFD fd, ByteBuffer buffer, int buf, int count) {
// try scheduler first
IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent();
if (!scheduler.isNil()) {
IRubyObject result = FiberScheduler.ioReadMemory(context, scheduler, fptr.tiedIOForWriting, bufBytes, buf, count);
IRubyObject result = FiberScheduler.ioReadMemory(context, scheduler, fptr.io, buffer, buf, count);

if (result != null) {
FiberScheduler.resultApply(context, result);
return FiberScheduler.resultApply(context, result);
}
}

// proceed to builtin read logic

// if we can do selection and this is not a non-blocking call, do selection

/*
Expand All @@ -1499,24 +1505,46 @@ simple read(2) because EINTR does not damage the descriptor.
preRead(context, fptr, fd);

try {
return context.getThread().executeReadWrite(context, fptr, bufBytes, buf, count, READ_TASK);
return context.getThread().executeReadWrite(context, fptr, buffer, buf, count, READ_TASK);
} catch (InterruptedException ie) {
throw context.runtime.newConcurrencyError("IO operation interrupted");
}
}

public static int preadInternal(ThreadContext context, ChannelFD fd, ByteBuffer buffer, int from, int length) {
public static int preadInternal(ThreadContext context, OpenFile fptr, ChannelFD fd, ByteBuffer buffer, int from, int length) {
// try scheduler first
IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent();
if (!scheduler.isNil()) {
IRubyObject result = FiberScheduler.ioPReadMemory(context, scheduler, fptr.io, buffer, from, length, 0);

if (result != null) {
return FiberScheduler.resultApply(context, result);
}
}

// proceed to builtin pread logic
try {
return context.getThread().executeReadWrite(context, fd, buffer, from, length, PREAD_TASK);
} catch (InterruptedException ie) {
throw context.runtime.newConcurrencyError("IO operation interrupted");
}
}

public static int pwriteInternal(ThreadContext context, ChannelFD fd, ByteBuffer bytes, int from, int length) {
public static int pwriteInternal(ThreadContext context, OpenFile fptr, ChannelFD fd, ByteBuffer buffer, int from, int length) {
// try scheduler first
IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent();
if (!scheduler.isNil()) {
IRubyObject result = FiberScheduler.ioPWriteMemory(context, scheduler, fptr.io, buffer, from, length, 0);

if (result != null) {
return FiberScheduler.resultApply(context, result);
}
}

// proceed to builtin pread logic
int written;
try {
written = context.getThread().executeReadWrite(context, fd, bytes, from, length, PWRITE_TASK);
written = context.getThread().executeReadWrite(context, fd, buffer, from, length, PWRITE_TASK);
} catch (InterruptedException ie) {
throw context.runtime.newConcurrencyError("IO operation interrupted");
}
Expand Down Expand Up @@ -2453,10 +2481,10 @@ public static int writeInternal(ThreadContext context, OpenFile fptr, byte[] buf
public static int writeInternal(ThreadContext context, OpenFile fptr, ByteBuffer bufBytes, int buf, int count) {
IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent();
if (!scheduler.isNil()) {
IRubyObject result = FiberScheduler.ioWriteMemory(context, scheduler, fptr.tiedIOForWriting, bufBytes, buf, count);
IRubyObject result = FiberScheduler.ioWriteMemory(context, scheduler, fptr.io, bufBytes, buf, count);

if (result != null) {
FiberScheduler.resultApply(context, result);
return FiberScheduler.resultApply(context, result);
}
}

Expand Down

0 comments on commit 24615ae

Please sign in to comment.