From 24615aeae79d5f630f96a2eecaf7933aa65c8daa Mon Sep 17 00:00:00 2001 From: Charles Oliver Nutter Date: Fri, 20 Oct 2023 17:23:16 -0500 Subject: [PATCH] Hook up IO scheduler callbacks correctly Working on failures in mri/fiber/test_io_buffer.rb --- .../main/java/org/jruby/FiberScheduler.java | 38 ++++++++++++--- core/src/main/java/org/jruby/RubyIO.java | 6 +-- .../src/main/java/org/jruby/RubyIOBuffer.java | 4 +- .../java/org/jruby/ext/fiber/FiberQueue.java | 4 +- .../main/java/org/jruby/util/io/OpenFile.java | 48 +++++++++++++++---- 5 files changed, 77 insertions(+), 23 deletions(-) diff --git a/core/src/main/java/org/jruby/FiberScheduler.java b/core/src/main/java/org/jruby/FiberScheduler.java index b1f2b20e5fa..2509cf088fc 100644 --- a/core/src/main/java/org/jruby/FiberScheduler.java +++ b/core/src/main/java/org/jruby/FiberScheduler.java @@ -60,8 +60,9 @@ public static IRubyObject ioSelectv(ThreadContext context, IRubyObject scheduler } // MRI: rb_fiber_scheduler_io_read - public static IRubyObject ioRead(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, int length) { - return Helpers.invokeChecked(context, scheduler, "io_read", io, buffer, context.runtime.newFixnum(length)); + public static IRubyObject ioRead(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, int length, int offset) { + Ruby runtime = context.runtime; + return Helpers.invokeChecked(context, scheduler, "io_read", io, buffer, runtime.newFixnum(length), runtime.newFixnum(offset)); } public static IRubyObject ioRead(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, RubyInteger length, RubyInteger offset) { @@ -78,8 +79,9 @@ public static IRubyObject ioPRead(ThreadContext context, IRubyObject scheduler, } // MRI: rb_fiber_scheduler_io_write - public static IRubyObject ioWrite(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, int length) { - return Helpers.invokeChecked(context, scheduler, "io_read", io, buffer, context.runtime.newFixnum(length)); + public static IRubyObject ioWrite(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, int length, int offset) { + Ruby runtime = context.runtime; + return Helpers.invokeChecked(context, scheduler, "io_read", io, buffer, runtime.newFixnum(length), runtime.newFixnum(offset)); } public static IRubyObject ioWrite(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, RubyInteger length, RubyInteger offset) { @@ -99,7 +101,19 @@ public static IRubyObject ioPWrite(ThreadContext context, IRubyObject scheduler, public static IRubyObject ioReadMemory(ThreadContext context, IRubyObject scheduler, IRubyObject io, ByteBuffer base, int size, int length) { RubyIOBuffer buffer = RubyIOBuffer.newBuffer(context.runtime, base, size, RubyIOBuffer.LOCKED); - IRubyObject result = ioRead(context, scheduler, io, buffer, length); + IRubyObject result = ioRead(context, scheduler, io, buffer, length, 0); + + buffer.unlock(context); + buffer.free(context); + + return result; + } + + // MRI: rb_fiber_scheduler_io_pread_memory + public static IRubyObject ioPReadMemory(ThreadContext context, IRubyObject scheduler, IRubyObject io, ByteBuffer base, int from, int size, int length) { + RubyIOBuffer buffer = RubyIOBuffer.newBuffer(context.runtime, base, size, RubyIOBuffer.LOCKED); + + IRubyObject result = ioPRead(context, scheduler, io, buffer, from, length, 0); buffer.unlock(context); buffer.free(context); @@ -111,7 +125,19 @@ public static IRubyObject ioReadMemory(ThreadContext context, IRubyObject schedu public static IRubyObject ioWriteMemory(ThreadContext context, IRubyObject scheduler, IRubyObject io, ByteBuffer base, int size, int length) { RubyIOBuffer buffer = RubyIOBuffer.newBuffer(context.runtime, base, size, RubyIOBuffer.LOCKED | RubyIOBuffer.READONLY); - IRubyObject result = ioWrite(context, scheduler, io, buffer, length); + IRubyObject result = ioWrite(context, scheduler, io, buffer, length, 0); + + buffer.unlock(context); + buffer.free(context); + + return result; + } + + // MRI: p + public static IRubyObject ioPWriteMemory(ThreadContext context, IRubyObject scheduler, IRubyObject io, ByteBuffer base, int from, int size, int length) { + RubyIOBuffer buffer = RubyIOBuffer.newBuffer(context.runtime, base, size, RubyIOBuffer.LOCKED | RubyIOBuffer.READONLY); + + IRubyObject result = ioPWrite(context, scheduler, io, buffer, from, length, 0); buffer.unlock(context); buffer.free(context); diff --git a/core/src/main/java/org/jruby/RubyIO.java b/core/src/main/java/org/jruby/RubyIO.java index 7c03fd1679c..1fd0e2c8155 100644 --- a/core/src/main/java/org/jruby/RubyIO.java +++ b/core/src/main/java/org/jruby/RubyIO.java @@ -4816,7 +4816,7 @@ public IRubyObject pread(ThreadContext context, IRubyObject _length, IRubyObject int read; ByteBuffer wrap = ByteBuffer.wrap(strByteList.unsafeBytes(), strByteList.begin(), length); - read = OpenFile.preadInternal(context, fd, wrap, from, length); + read = OpenFile.preadInternal(context, fptr, fd, wrap, from, length); string.setReadLength(read); @@ -4851,7 +4851,7 @@ public IRubyObject pwrite(ThreadContext context, IRubyObject str, IRubyObject of int written; - written = OpenFile.pwriteInternal(context, fd, wrap, off, length); + written = OpenFile.pwriteInternal(context, fptr, fd, wrap, off, length); return context.runtime.newFixnum(written); } @@ -5202,7 +5202,7 @@ public final OpenFile MakeOpenFile() { rb_io_fptr_finalize(runtime, openFile); openFile = null; } - openFile = new OpenFile(runtime.getNil()); + openFile = new OpenFile(this, runtime.getNil()); runtime.addInternalFinalizer(openFile); return openFile; } diff --git a/core/src/main/java/org/jruby/RubyIOBuffer.java b/core/src/main/java/org/jruby/RubyIOBuffer.java index 2f47166971a..93cddc756f4 100644 --- a/core/src/main/java/org/jruby/RubyIOBuffer.java +++ b/core/src/main/java/org/jruby/RubyIOBuffer.java @@ -1916,7 +1916,7 @@ private static IRubyObject preadInternal(ThreadContext context, RubyIO io, ByteB try { base.position(offset); base.limit(offset + size); - int result = OpenFile.preadInternal(context, fptr.fd(), base, from, size); + int result = OpenFile.preadInternal(context, fptr, fptr.fd(), base, from, size); return FiberScheduler.result(context.runtime, result, fptr.errno()); } finally { base.clear(); @@ -2114,7 +2114,7 @@ private static IRubyObject pwriteInternal(ThreadContext context, RubyIO io, Byte try { base.position(offset); base.limit(offset + size); - int result = OpenFile.pwriteInternal(context, fptr.fd(), base, from, size); + int result = OpenFile.pwriteInternal(context, fptr, fptr.fd(), base, from, size); return FiberScheduler.result(context.runtime, result, fptr.errno()); } finally { base.clear(); diff --git a/core/src/main/java/org/jruby/ext/fiber/FiberQueue.java b/core/src/main/java/org/jruby/ext/fiber/FiberQueue.java index 50b6484d0bc..0a9aa0b2814 100644 --- a/core/src/main/java/org/jruby/ext/fiber/FiberQueue.java +++ b/core/src/main/java/org/jruby/ext/fiber/FiberQueue.java @@ -50,7 +50,7 @@ public FiberQueue(Ruby runtime) { this.queue = new ArrayBlockingQueue<>(1, false); } - final RubyThread.Task takeTask = new RubyThread.Task() { + private static final RubyThread.Task TAKE_TASK = new RubyThread.Task() { @Override public FiberRequest run(ThreadContext context, FiberQueue queue) throws InterruptedException { return queue.getQueueSafe().take(); @@ -89,7 +89,7 @@ public synchronized void checkShutdown() { public FiberRequest pop(ThreadContext context) { try { - return context.getThread().executeTaskBlocking(context, this, takeTask); + return context.getThread().executeTaskBlocking(context, this, TAKE_TASK); } catch (InterruptedException ie) { throw context.runtime.newThreadError("interrupted in FiberQueue.pop"); } diff --git a/core/src/main/java/org/jruby/util/io/OpenFile.java b/core/src/main/java/org/jruby/util/io/OpenFile.java index ea9bb91e77d..6a3ec3a7cd7 100644 --- a/core/src/main/java/org/jruby/util/io/OpenFile.java +++ b/core/src/main/java/org/jruby/util/io/OpenFile.java @@ -54,7 +54,8 @@ public class OpenFile implements Finalizable { // RB_IO_FPTR_NEW, minus fields that Java already initializes the same way - public OpenFile(IRubyObject nil) { + public OpenFile(RubyIO io, IRubyObject nil) { + this.io = io; runtime = nil.getRuntime(); writeconvAsciicompat = null; writeconvPreEcopts = nil; @@ -147,6 +148,7 @@ public static class Buffer { public final Buffer wbuf = new Buffer(), rbuf = new Buffer(), cbuf = new Buffer(); + public final RubyIO io; public RubyIO tiedIOForWriting; private boolean nonblock = false; @@ -1474,15 +1476,19 @@ public static int readInternal(ThreadContext context, OpenFile fptr, ChannelFD f } // rb_io_buffer_read_internal - public static int readInternal(ThreadContext context, OpenFile fptr, ChannelFD fd, ByteBuffer bufBytes, int buf, int count) { + public static int readInternal(ThreadContext context, OpenFile fptr, ChannelFD fd, ByteBuffer buffer, int buf, int count) { + // try scheduler first IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent(); if (!scheduler.isNil()) { - IRubyObject result = FiberScheduler.ioReadMemory(context, scheduler, fptr.tiedIOForWriting, bufBytes, buf, count); + IRubyObject result = FiberScheduler.ioReadMemory(context, scheduler, fptr.io, buffer, buf, count); if (result != null) { - FiberScheduler.resultApply(context, result); + return FiberScheduler.resultApply(context, result); } } + + // proceed to builtin read logic + // if we can do selection and this is not a non-blocking call, do selection /* @@ -1499,13 +1505,24 @@ simple read(2) because EINTR does not damage the descriptor. preRead(context, fptr, fd); try { - return context.getThread().executeReadWrite(context, fptr, bufBytes, buf, count, READ_TASK); + return context.getThread().executeReadWrite(context, fptr, buffer, buf, count, READ_TASK); } catch (InterruptedException ie) { throw context.runtime.newConcurrencyError("IO operation interrupted"); } } - public static int preadInternal(ThreadContext context, ChannelFD fd, ByteBuffer buffer, int from, int length) { + public static int preadInternal(ThreadContext context, OpenFile fptr, ChannelFD fd, ByteBuffer buffer, int from, int length) { + // try scheduler first + IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent(); + if (!scheduler.isNil()) { + IRubyObject result = FiberScheduler.ioPReadMemory(context, scheduler, fptr.io, buffer, from, length, 0); + + if (result != null) { + return FiberScheduler.resultApply(context, result); + } + } + + // proceed to builtin pread logic try { return context.getThread().executeReadWrite(context, fd, buffer, from, length, PREAD_TASK); } catch (InterruptedException ie) { @@ -1513,10 +1530,21 @@ public static int preadInternal(ThreadContext context, ChannelFD fd, ByteBuffer } } - public static int pwriteInternal(ThreadContext context, ChannelFD fd, ByteBuffer bytes, int from, int length) { + public static int pwriteInternal(ThreadContext context, OpenFile fptr, ChannelFD fd, ByteBuffer buffer, int from, int length) { + // try scheduler first + IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent(); + if (!scheduler.isNil()) { + IRubyObject result = FiberScheduler.ioPWriteMemory(context, scheduler, fptr.io, buffer, from, length, 0); + + if (result != null) { + return FiberScheduler.resultApply(context, result); + } + } + + // proceed to builtin pread logic int written; try { - written = context.getThread().executeReadWrite(context, fd, bytes, from, length, PWRITE_TASK); + written = context.getThread().executeReadWrite(context, fd, buffer, from, length, PWRITE_TASK); } catch (InterruptedException ie) { throw context.runtime.newConcurrencyError("IO operation interrupted"); } @@ -2453,10 +2481,10 @@ public static int writeInternal(ThreadContext context, OpenFile fptr, byte[] buf public static int writeInternal(ThreadContext context, OpenFile fptr, ByteBuffer bufBytes, int buf, int count) { IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent(); if (!scheduler.isNil()) { - IRubyObject result = FiberScheduler.ioWriteMemory(context, scheduler, fptr.tiedIOForWriting, bufBytes, buf, count); + IRubyObject result = FiberScheduler.ioWriteMemory(context, scheduler, fptr.io, bufBytes, buf, count); if (result != null) { - FiberScheduler.resultApply(context, result); + return FiberScheduler.resultApply(context, result); } }