Skip to content

Commit

Permalink
Add executeWriteTask to avoid allocating struct
Browse files Browse the repository at this point in the history
This eliminates allocation of the struct-like object for every
write operation, which should improve the performance of all IO
writes.
  • Loading branch information
headius committed Mar 2, 2021
1 parent 67fd7fc commit 67918b7
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 29 deletions.
2 changes: 1 addition & 1 deletion core/src/main/java/org/jruby/RubyIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -1346,7 +1346,7 @@ public IRubyObject syswrite(ThreadContext context, IRubyObject str) {
}

ByteList strByteList = ((RubyString) str).getByteList();
n = OpenFile.writeInternal(context, fptr, fptr.fd(), strByteList.unsafeBytes(), strByteList.begin(), strByteList.getRealSize());
n = OpenFile.writeInternal(context, fptr, strByteList.unsafeBytes(), strByteList.begin(), strByteList.getRealSize());

if (n == -1) throw runtime.newErrnoFromErrno(fptr.errno(), fptr.getPath());
} finally {
Expand Down
41 changes: 41 additions & 0 deletions core/src/main/java/org/jruby/RubyThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -1627,6 +1627,47 @@ public <Data, Return> Return executeTask(ThreadContext context, Data data, Statu
}
}

/**
* Execute an interruptible write operation with the given byte range and data object.
*
* @param context the current context
* @param data a data object
* @param bytes the bytes to write
* @param start start range of bytes to write
* @param length length of bytes to write
* @param task the write task
* @param <Data> the type of the data object
* @return the number of bytes written
* @throws InterruptedException
*/
public <Data> int executeWriteTask(
ThreadContext context,
Data data, byte[] bytes, int start, int length,
WriteTask<Data> task) throws InterruptedException {
Status oldStatus = STATUS.get(this);
try {
this.unblockArg = data;
this.unblockFunc = task;

// check for interrupt before going into blocking call
pollThreadEvents(context);

STATUS.set(this, Status.SLEEP);

return task.run(context, data, bytes, start, length);
} finally {
STATUS.set(this, oldStatus);
this.unblockFunc = null;
this.unblockArg = null;
pollThreadEvents(context);
}
}

public interface WriteTask<Data> extends Unblocker<Data> {
public int run(ThreadContext context, Data data, byte[] bytes, int start, int length) throws InterruptedException;
public void wakeup(RubyThread thread, Data data);
}

public void enterSleep() {
STATUS.set(this, Status.SLEEP);
}
Expand Down
36 changes: 8 additions & 28 deletions core/src/main/java/org/jruby/util/io/OpenFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -1354,23 +1354,21 @@ public void wakeup(RubyThread thread, InternalReadStruct data) {
}
};

final static RubyThread.Task<InternalWriteStruct, Integer> writeTask = new RubyThread.Task<InternalWriteStruct, Integer>() {
final static RubyThread.WriteTask<OpenFile> WRITE_TASK = new RubyThread.WriteTask<OpenFile>() {
@Override
public Integer run(ThreadContext context, InternalWriteStruct iis) throws InterruptedException {
OpenFile fptr = iis.fptr;

public int run(ThreadContext context, OpenFile fptr, byte[] bytes, int start, int length) throws InterruptedException {
assert fptr.lockedByMe();

fptr.unlock();
try {
return iis.fptr.posix.write(iis.fd, iis.bufBytes, iis.buf, iis.capa, iis.fptr.nonblock);
return fptr.posix.write(fptr.fd, bytes, start, length, fptr.nonblock);
} finally {
fptr.lock();
}
}

@Override
public void wakeup(RubyThread thread, InternalWriteStruct data) {
public void wakeup(RubyThread thread, OpenFile data) {
// FIXME: NO! This will kill many native channels. Must be nonblocking to interrupt.
thread.getNativeThread().interrupt();
}
Expand Down Expand Up @@ -2290,7 +2288,7 @@ public int binwriteInt(ThreadContext context, byte[] ptrBytes, int ptr, int len,
}
} else {
int l = writableLength(n);
r = writeInternal(context, this, fd, ptrBytes, ptr + offset, l);
r = writeInternal(context, this, ptrBytes, ptr + offset, l);
}
/* xxx: other threads may modify given string. */
if (r == n) return len;
Expand Down Expand Up @@ -2327,28 +2325,10 @@ static int binwriteString(OpenFile fptr, byte[] bytes, int start, int length) {
return fptr.writeInternal2(fptr.fd, bytes, start, l);
}

public static class InternalWriteStruct {
InternalWriteStruct(OpenFile fptr, ChannelFD fd, byte[] bufBytes, int buf, int count) {
this.fptr = fptr;
this.fd = fd;
this.bufBytes = bufBytes;
this.buf = buf;
this.capa = count;
}

public final OpenFile fptr;
public final ChannelFD fd;
public final byte[] bufBytes;
public final int buf;
public final int capa;
}

// rb_write_internal
public static int writeInternal(ThreadContext context, OpenFile fptr, ChannelFD fd, byte[] bufBytes, int buf, int count) {
InternalWriteStruct iis = new InternalWriteStruct(fptr, fd, bufBytes, buf, count);

public static int writeInternal(ThreadContext context, OpenFile fptr, byte[] bufBytes, int buf, int count) {
try {
return context.getThread().executeTask(context, iis, writeTask);
return context.getThread().executeWriteTask(context, fptr, bufBytes, buf, count, WRITE_TASK);
} catch (InterruptedException ie) {
throw context.runtime.newConcurrencyError("IO operation interrupted");
}
Expand Down Expand Up @@ -2423,7 +2403,7 @@ IRubyObject finishWriteconv(ThreadContext context, boolean noalloc) {
if (write_lock != null && write_lock.isWriteLockedByCurrentThread())
r = writeInternal2(fd, dsBytes, ds, dpPtr.p - ds);
else
r = writeInternal(context, this, fd, dsBytes, ds, dpPtr.p - ds);
r = writeInternal(context, this, dsBytes, ds, dpPtr.p - ds);
if (r == dpPtr.p - ds)
break outer;
if (0 <= r) {
Expand Down

0 comments on commit 67918b7

Please sign in to comment.