diff --git a/connection.go b/connection.go index a3c193d..6172ac5 100644 --- a/connection.go +++ b/connection.go @@ -375,18 +375,33 @@ func (c *Connection) readMessage() (*buffer.InMessage, error) { } // Write the supplied message to the kernel. -func (c *Connection) writeMessage(msg []byte) error { - // Avoid the retry loop in os.File.Write. - n, err := syscall.Write(int(c.dev.Fd()), msg) - if err != nil { - return err +func (c *Connection) writeMessage(outMsg *buffer.OutMessage) error { + var err error + var n int + expectedLen := outMsg.Len() + if outMsg.Sglist != nil { + if fusekernel.IsPlatformFuseT { + // writev is not atomic on macos, restrict to fuse-t platform + writeLock.Lock() + defer writeLock.Unlock() + } + n, err = writev(int(c.dev.Fd()), outMsg.Sglist) + } else { + // Avoid the retry loop in os.File.Write. + n, err = syscall.Write(int(c.dev.Fd()), outMsg.OutHeaderBytes()) } - - if n != len(msg) { - return fmt.Errorf("Wrote %d bytes; expected %d", n, len(msg)) + if err == nil && n != expectedLen { + err = fmt.Errorf("Wrote %d bytes; expected %d", n, expectedLen) } - - return nil + if err != nil { + writeErrMsg := fmt.Sprintf("writeMessage: %v %v", err, outMsg.OutHeaderBytes()) + if c.errorLogger != nil { + c.errorLogger.Print(writeErrMsg) + } + return fmt.Errorf(writeErrMsg) + } + outMsg.Sglist = nil + return err } // ReadOp consumes the next op from the kernel process, returning the op and a @@ -527,25 +542,7 @@ func (c *Connection) Reply(ctx context.Context, opErr error) error { noResponse := c.kernelResponse(outMsg, inMsg.Header().Unique, op, opErr) if !noResponse { - var err error - if outMsg.Sglist != nil { - if fusekernel.IsPlatformFuseT { - // writev is not atomic on macos, restrict to fuse-t platform - writeLock.Lock() - defer writeLock.Unlock() - } - _, err = writev(int(c.dev.Fd()), outMsg.Sglist) - } else { - err = c.writeMessage(outMsg.OutHeaderBytes()) - } - if err != nil { - writeErrMsg := fmt.Sprintf("writeMessage: %v %v", err, outMsg.OutHeaderBytes()) - if c.errorLogger != nil { - c.errorLogger.Print(writeErrMsg) - } - return fmt.Errorf(writeErrMsg) - } - outMsg.Sglist = nil + c.writeMessage(outMsg) } return nil @@ -561,6 +558,17 @@ func (c *Connection) callbackForOp(op interface{}) func() { return nil } +// Send a notification to the kernel +// notification must be a pointer to one of fuseops.NotifyXXX structures +// To avoid a deadlock notifications must not be called in the execution path of a related filesytem operation or within any code that could hold a lock that could be needed to execute such an operation. As of kernel 4.18, a "related operation" is a lookup(), symlink(), mknod(), mkdir(), unlink(), rename(), link() or create() request for the parent, and a setattr(), unlink(), rmdir(), rename(), setxattr(), removexattr(), readdir() or readdirplus() request for the inode itself. +func (c *Connection) Notify(notification interface{}) error { + outMsg := c.getOutMessage() + defer c.putOutMessage(outMsg) + c.kernelNotification(outMsg, notification) + outMsg.OutHeader().Len = uint32(outMsg.Len()) + return c.writeMessage(outMsg) +} + // Close the connection. Must not be called until operations that were read // from the connection have been responded to. func (c *Connection) close() error { diff --git a/conversions.go b/conversions.go index 6c2ea89..a77ced0 100644 --- a/conversions.go +++ b/conversions.go @@ -731,6 +731,42 @@ func convertInMessage( }, } + case fusekernel.OpPoll: + type input fusekernel.PollIn + in := (*input)(inMsg.Consume(unsafe.Sizeof(input{}))) + if in == nil { + return nil, errors.New("Corrupt OpPoll") + } + + o = &fuseops.PollOp{ + Inode: fuseops.InodeID(inMsg.Header().Nodeid), + Handle: fuseops.HandleID(in.Fh), + Kh: in.Kh, + Flags: fusekernel.PollFlags(in.Flags), + Events: fusekernel.PollEvents(in.Events), + OpContext: fuseops.OpContext{Pid: inMsg.Header().Pid}, + } + + case fusekernel.OpNotifyReply: + type input fusekernel.NotifyRetrieveIn + in := (*input)(inMsg.Consume(unsafe.Sizeof(input{}))) + if in == nil { + return nil, errors.New("Corrupt OpNotifyReply") + } + + buf := inMsg.ConsumeBytes(inMsg.Len()) + if len(buf) < int(in.Size) { + return nil, errors.New("Corrupt OpNotifyReply") + } + + o = &fuseops.NotifyRetrieveReplyOp{ + Inode: fuseops.InodeID(inMsg.Header().Nodeid), + Unique: inMsg.Header().Unique, + Offset: in.Offset, + Length: in.Size, + OpContext: fuseops.OpContext{Pid: inMsg.Header().Pid}, + } + default: o = &unknownOp{ OpCode: inMsg.Header().Opcode, @@ -764,6 +800,9 @@ func (c *Connection) kernelResponse( case *fuseops.BatchForgetOp: return true + case *fuseops.NotifyRetrieveReplyOp: + return true + case *interruptOp: return true } @@ -998,6 +1037,13 @@ func (c *Connection) kernelResponseForOp( out.TimeGran = 1 out.MaxPages = o.MaxPages + case *fuseops.PollOp: + out := (*fusekernel.PollOut)(m.Grow(int(unsafe.Sizeof(fusekernel.PollOut{})))) + out.Revents = uint32(o.Revents) + + case *fuseops.NotifyRetrieveReplyOp: + // Empty response + default: panic(fmt.Sprintf("Unexpected op: %#v", op)) } @@ -1005,6 +1051,69 @@ func (c *Connection) kernelResponseForOp( return } +// Like kernelResponse, but assumes the user replied with a nil error to the op. +func (c *Connection) kernelNotification( + m *buffer.OutMessage, + op interface{}) { + + h := m.OutHeader() + h.Unique = 0 + + // Create the appropriate output message + switch o := op.(type) { + case *fuseops.NotifyPollWakeup: + h.Error = fusekernel.NotifyCodePoll + out := (*fusekernel.NotifyPollWakeupOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyPollWakeupOut{})))) + out.Kh = o.Kh + + case *fuseops.NotifyInvalInode: + h.Error = fusekernel.NotifyCodeInvalInode + out := (*fusekernel.NotifyInvalInodeOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyInvalInodeOut{})))) + out.Ino = uint64(o.Inode) + out.Off = o.Offset + out.Len = o.Length + + case *fuseops.NotifyInvalEntry: + h.Error = fusekernel.NotifyCodeInvalEntry + out := (*fusekernel.NotifyInvalEntryOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyInvalEntryOut{})))) + out.Parent = uint64(o.Parent) + out.Namelen = uint32(len(o.Name)) + m.AppendString(o.Name) + m.AppendString("\x00") + + case *fuseops.NotifyDelete: + h.Error = fusekernel.NotifyCodeDelete + out := (*fusekernel.NotifyDeleteOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyDeleteOut{})))) + out.Parent = uint64(o.Parent) + out.Child = uint64(o.Child) + out.Namelen = uint32(len(o.Name)) + m.AppendString(o.Name) + m.AppendString("\x00") + + case *fuseops.NotifyStore: + h.Error = fusekernel.NotifyCodeStore + out := (*fusekernel.NotifyStoreOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyStoreOut{})))) + out.Nodeid = uint64(o.Inode) + out.Offset = o.Offset + out.Size = o.Length + m.Append(o.Data...) + m.ShrinkTo(buffer.OutMessageHeaderSize + int(unsafe.Sizeof(fusekernel.NotifyStoreOut{})) + int(o.Length)) + + case *fuseops.NotifyRetrieve: + h.Error = fusekernel.NotifyCodeRetrieve + out := (*fusekernel.NotifyRetrieveOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyRetrieveOut{})))) + out.Unique = o.Unique + out.Nodeid = uint64(o.Inode) + out.Offset = o.Offset + out.Size = o.Length + + default: + panic(fmt.Sprintf("Unexpected notification: %#v", op)) + } + + return +} + //////////////////////////////////////////////////////////////////////// // General conversions //////////////////////////////////////////////////////////////////////// diff --git a/fuseops/ops.go b/fuseops/ops.go index 9f75541..00fdd6f 100644 --- a/fuseops/ops.go +++ b/fuseops/ops.go @@ -1009,3 +1009,127 @@ type SyncFSOp struct { Inode InodeID OpContext OpContext } + +// Request notifications when the file system user calls poll/select or +// similar operations on a file. +type PollOp struct { + // The inode and handle the user wants to poll + Inode InodeID + Handle HandleID + + // Kh is the "kernel handle". The reason behind it is that it's allocated + // by the kernel on file allocation and guaranteed to be unique as opposed + // to regular file handles (HandleID) generated by the userland server + // (by us). Kh has to be used in NotifyPollWakeupOut replies. + Kh uint64 + + // Poll flags + Flags fusekernel.PollFlags + + // Requested events + Events fusekernel.PollEvents + + // Set by the file system: the actual events that have happened + // since the last poll + Revents fusekernel.PollEvents + OpContext OpContext +} + +// Notify consumers waiting for poll/epoll that events are incoming +// for the specified kernel handle. The kernel will send a PollOp request +// to get the event mask after receiving this notification +type NotifyPollWakeup struct { + Kh uint64 +} + +// Notify to invalidate cache for an inode. +// +// If the filesystem has writeback caching enabled, invalidating an inode +// will first trigger a writeback of all dirty pages. The call will block +// until all writeback requests have completed and the inode has been +// invalidated. It will, however, not wait for completion of pending writeback +// requests that have been issued before. +type NotifyInvalInode struct { + Inode InodeID + Offset int64 + Length int64 +} + +// Notify to invalidate parent attributes and the dentry matching parent/name +// +// To avoid a deadlock this request must not be sent in the execution path +// of a related filesytem operation or within any code that could hold a lock +// that could be needed to execute such an operation. As of kernel 4.18, a +// "related operation" is a lookup(), symlink(), mknod(), mkdir(), unlink(), +// rename(), link() or create() request for the parent, and a setattr(), +// unlink(), rmdir(), rename(), setxattr(), removexattr(), readdir() or +// readdirplus() request for the inode itself. +// +// When called correctly, it will never block. +type NotifyInvalEntry struct { + Parent InodeID + Name string +} + +// This request behaves like NotifyInvalEntry with the following additional +// effect (at least as of Linux kernel 4.8): +// +// If the provided child inode matches the inode that is currently associated +// with the cached dentry, and if there are any inotify watches registered for +// the dentry, then the watchers are informed that the dentry has been deleted. +// +// To avoid a deadlock this request must not be sent while executing a +// related filesytem operation or while holding a lock that could be needed to +// execute such an operation. +type NotifyDelete struct { + Parent InodeID + Child InodeID + Name string +} + +// Store data to the kernel buffers +// +// Synchronously store data in the kernel buffers belonging to the given inode. +// The stored data is marked up-to-date (no read will be performed against it, +// unless it's invalidated or evicted from the cache). +// +// If the stored data overflows the current file size, then the size is extended, +// similarly to a write(2) on the filesystem. +// +// If this request returns an error, then the store wasn't fully completed, but +// it may have been partially completed. +type NotifyStore struct { + Inode InodeID + Offset uint64 + Length uint32 + Data [][]byte +} + +// Retrieve data from the kernel buffers belonging to the given inode +// +// If successful then the kernel will send a NotifyRetrieveReplyOp as a reply. +// Only present pages are returned in the retrieve reply. Retrieving stops when it +// finds a non-present page and only data prior to that is returned. +// +// If this request returns an error, then the retrieve will not be completed and +// no reply will be sent. +// +// This request doesn't change the dirty state of pages in the kernel buffer. For +// dirty pages the write() method will be called regardless of having been retrieved +// previously. +type NotifyRetrieve struct { + Inode InodeID + Unique uint64 + Offset uint64 + Length uint32 +} + +// Matches the size of WriteIn +type NotifyRetrieveReplyOp struct { + Inode InodeID + Unique uint64 + Offset uint64 + Length uint32 + + OpContext OpContext +} diff --git a/fuseutil/file_system.go b/fuseutil/file_system.go index 107e427..9aee3ac 100644 --- a/fuseutil/file_system.go +++ b/fuseutil/file_system.go @@ -64,6 +64,9 @@ type FileSystem interface { SetXattr(context.Context, *fuseops.SetXattrOp) error Fallocate(context.Context, *fuseops.FallocateOp) error SyncFS(context.Context, *fuseops.SyncFSOp) error + Poll(context.Context, *fuseops.PollOp) error + + SetConnection(*fuse.Connection) // Regard all inodes (including the root inode) as having their lookup counts // decremented to zero, and clean up any resources associated with the file @@ -96,6 +99,8 @@ type fileSystemServer struct { } func (s *fileSystemServer) ServeOps(c *fuse.Connection) { + s.fs.SetConnection(c) + // When we are done, we clean up by waiting for all in-flight ops then // destroying the file system. defer func() { @@ -240,6 +245,9 @@ func (s *fileSystemServer) handleOp( case *fuseops.SyncFSOp: err = s.fs.SyncFS(ctx, typed) + + case *fuseops.PollOp: + err = s.fs.Poll(ctx, typed) } c.Reply(ctx, err) diff --git a/fuseutil/not_implemented_file_system.go b/fuseutil/not_implemented_file_system.go index e47e02b..d55ab72 100644 --- a/fuseutil/not_implemented_file_system.go +++ b/fuseutil/not_implemented_file_system.go @@ -210,5 +210,14 @@ func (fs *NotImplementedFileSystem) SyncFS( return fuse.ENOSYS } +func (fs *NotImplementedFileSystem) Poll( + ctx context.Context, + op *fuseops.PollOp) error { + return fuse.ENOSYS +} + +func (fs *NotImplementedFileSystem) SetConnection(*fuse.Connection) { +} + func (fs *NotImplementedFileSystem) Destroy() { } diff --git a/internal/fusekernel/fuse_kernel.go b/internal/fusekernel/fuse_kernel.go index 8e90815..f468e75 100644 --- a/internal/fusekernel/fuse_kernel.go +++ b/internal/fusekernel/fuse_kernel.go @@ -349,6 +349,34 @@ var releaseFlagNames = []flagName{ {uint32(ReleaseFlush), "ReleaseFlush"}, } +// Poll flags and events are used in the Poll exchange. +type PollFlags uint32 + +const ( + // From the kernel source: + // Ask for notification if there's someone waiting for it. + // The client may ignore the flag and always notify. + PollScheduleNotify PollFlags = 1 << 0 +) + +type PollEvents uint32 + +const ( + PollInEvent PollEvents = 0x0001 + PollPriEvent PollEvents = 0x0002 + PollOutEvent PollEvents = 0x0004 + PollErrEvent PollEvents = 0x0008 + PollHupEvent PollEvents = 0x0010 + PollNvalEvent PollEvents = 0x0020 + PollRdNormEvent PollEvents = 0x0040 + PollRdBandEvent PollEvents = 0x0080 + PollWrNormEvent PollEvents = 0x0100 + PollWrBandEvent PollEvents = 0x0200 + PollMsgEvent PollEvents = 0x0400 + PollRemoveEvent PollEvents = 0x1000 + PollRdHupEvent PollEvents = 0x2000 +) + // Opcodes const ( OpLookup = 1 @@ -389,6 +417,7 @@ const ( OpDestroy = 38 OpIoctl = 39 // Linux? OpPoll = 40 // Linux? + OpNotifyReply = 41 OpBatchForget = 42 OpFallocate = 43 OpReaddirplus = 44 @@ -563,6 +592,18 @@ func CreateInSize(p Protocol) uintptr { } } +type PollIn struct { + Fh uint64 + Kh uint64 + Flags uint32 + Events uint32 +} + +type PollOut struct { + Revents uint32 + padding uint32 +} + type ReleaseIn struct { Fh uint64 Flags uint32 @@ -798,8 +839,15 @@ const ( NotifyCodePoll int32 = 1 NotifyCodeInvalInode int32 = 2 NotifyCodeInvalEntry int32 = 3 + NotifyCodeStore int32 = 4 + NotifyCodeRetrieve int32 = 5 + NotifyCodeDelete int32 = 6 ) +type NotifyPollWakeupOut struct { + Kh uint64 +} + type NotifyInvalInodeOut struct { Ino uint64 Off int64 @@ -815,3 +863,35 @@ type NotifyInvalEntryOut struct { type SyncFSIn struct { Padding uint64 } + +type NotifyDeleteOut struct { + Parent uint64 + Child uint64 + Namelen uint32 + padding uint32 +} + +type NotifyStoreOut struct { + Nodeid uint64 + Offset uint64 + Size uint32 + padding uint32 +} + +type NotifyRetrieveOut struct { + Unique uint64 + Nodeid uint64 + Offset uint64 + Size uint32 + padding uint32 +} + +// Matches the size of WriteIn +type NotifyRetrieveIn struct { + dummy1 uint64 + Offset uint64 + Size uint32 + dummy2 uint32 + dummy3 uint64 + dummy4 uint64 +}