Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kernel notification support #140

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 37 additions & 29 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,18 +375,33 @@ func (c *Connection) readMessage() (*buffer.InMessage, error) {
}

// Write the supplied message to the kernel.
func (c *Connection) writeMessage(msg []byte) error {
// Avoid the retry loop in os.File.Write.
n, err := syscall.Write(int(c.dev.Fd()), msg)
if err != nil {
return err
func (c *Connection) writeMessage(outMsg *buffer.OutMessage) error {
var err error
var n int
expectedLen := outMsg.Len()
if outMsg.Sglist != nil {
if fusekernel.IsPlatformFuseT {
// writev is not atomic on macos, restrict to fuse-t platform
writeLock.Lock()
defer writeLock.Unlock()
}
n, err = writev(int(c.dev.Fd()), outMsg.Sglist)
} else {
// Avoid the retry loop in os.File.Write.
n, err = syscall.Write(int(c.dev.Fd()), outMsg.OutHeaderBytes())
}

if n != len(msg) {
return fmt.Errorf("Wrote %d bytes; expected %d", n, len(msg))
if err == nil && n != expectedLen {
err = fmt.Errorf("Wrote %d bytes; expected %d", n, expectedLen)
}

return nil
if err != nil {
writeErrMsg := fmt.Sprintf("writeMessage: %v %v", err, outMsg.OutHeaderBytes())
if c.errorLogger != nil {
c.errorLogger.Print(writeErrMsg)
}
return fmt.Errorf(writeErrMsg)
}
outMsg.Sglist = nil
return err
}

// ReadOp consumes the next op from the kernel process, returning the op and a
Expand Down Expand Up @@ -527,25 +542,7 @@ func (c *Connection) Reply(ctx context.Context, opErr error) error {
noResponse := c.kernelResponse(outMsg, inMsg.Header().Unique, op, opErr)

if !noResponse {
var err error
if outMsg.Sglist != nil {
if fusekernel.IsPlatformFuseT {
// writev is not atomic on macos, restrict to fuse-t platform
writeLock.Lock()
defer writeLock.Unlock()
}
_, err = writev(int(c.dev.Fd()), outMsg.Sglist)
} else {
err = c.writeMessage(outMsg.OutHeaderBytes())
}
if err != nil {
writeErrMsg := fmt.Sprintf("writeMessage: %v %v", err, outMsg.OutHeaderBytes())
if c.errorLogger != nil {
c.errorLogger.Print(writeErrMsg)
}
return fmt.Errorf(writeErrMsg)
}
outMsg.Sglist = nil
c.writeMessage(outMsg)
}

return nil
Expand All @@ -561,6 +558,17 @@ func (c *Connection) callbackForOp(op interface{}) func() {
return nil
}

// Send a notification to the kernel
// notification must be a pointer to one of fuseops.NotifyXXX structures
// To avoid a deadlock notifications must not be called in the execution path of a related filesytem operation or within any code that could hold a lock that could be needed to execute such an operation. As of kernel 4.18, a "related operation" is a lookup(), symlink(), mknod(), mkdir(), unlink(), rename(), link() or create() request for the parent, and a setattr(), unlink(), rmdir(), rename(), setxattr(), removexattr(), readdir() or readdirplus() request for the inode itself.
func (c *Connection) Notify(notification interface{}) error {
outMsg := c.getOutMessage()
defer c.putOutMessage(outMsg)
c.kernelNotification(outMsg, notification)
outMsg.OutHeader().Len = uint32(outMsg.Len())
return c.writeMessage(outMsg)
}

// Close the connection. Must not be called until operations that were read
// from the connection have been responded to.
func (c *Connection) close() error {
Expand Down
109 changes: 109 additions & 0 deletions conversions.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,42 @@ func convertInMessage(
},
}

case fusekernel.OpPoll:
type input fusekernel.PollIn
in := (*input)(inMsg.Consume(unsafe.Sizeof(input{})))
if in == nil {
return nil, errors.New("Corrupt OpPoll")
}

o = &fuseops.PollOp{
Inode: fuseops.InodeID(inMsg.Header().Nodeid),
Handle: fuseops.HandleID(in.Fh),
Kh: in.Kh,
Flags: fusekernel.PollFlags(in.Flags),
Events: fusekernel.PollEvents(in.Events),
OpContext: fuseops.OpContext{Pid: inMsg.Header().Pid},
}

case fusekernel.OpNotifyReply:
type input fusekernel.NotifyRetrieveIn
in := (*input)(inMsg.Consume(unsafe.Sizeof(input{})))
if in == nil {
return nil, errors.New("Corrupt OpNotifyReply")
}

buf := inMsg.ConsumeBytes(inMsg.Len())
if len(buf) < int(in.Size) {
return nil, errors.New("Corrupt OpNotifyReply")
}

o = &fuseops.NotifyRetrieveReplyOp{
Inode: fuseops.InodeID(inMsg.Header().Nodeid),
Unique: inMsg.Header().Unique,
Offset: in.Offset,
Length: in.Size,
OpContext: fuseops.OpContext{Pid: inMsg.Header().Pid},
}

default:
o = &unknownOp{
OpCode: inMsg.Header().Opcode,
Expand Down Expand Up @@ -764,6 +800,9 @@ func (c *Connection) kernelResponse(
case *fuseops.BatchForgetOp:
return true

case *fuseops.NotifyRetrieveReplyOp:
return true

case *interruptOp:
return true
}
Expand Down Expand Up @@ -998,13 +1037,83 @@ func (c *Connection) kernelResponseForOp(
out.TimeGran = 1
out.MaxPages = o.MaxPages

case *fuseops.PollOp:
out := (*fusekernel.PollOut)(m.Grow(int(unsafe.Sizeof(fusekernel.PollOut{}))))
out.Revents = uint32(o.Revents)

case *fuseops.NotifyRetrieveReplyOp:
// Empty response

default:
panic(fmt.Sprintf("Unexpected op: %#v", op))
}

return
}

// Like kernelResponse, but assumes the user replied with a nil error to the op.
func (c *Connection) kernelNotification(
m *buffer.OutMessage,
op interface{}) {

h := m.OutHeader()
h.Unique = 0

// Create the appropriate output message
switch o := op.(type) {
case *fuseops.NotifyPollWakeup:
h.Error = fusekernel.NotifyCodePoll
out := (*fusekernel.NotifyPollWakeupOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyPollWakeupOut{}))))
out.Kh = o.Kh

case *fuseops.NotifyInvalInode:
h.Error = fusekernel.NotifyCodeInvalInode
out := (*fusekernel.NotifyInvalInodeOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyInvalInodeOut{}))))
out.Ino = uint64(o.Inode)
out.Off = o.Offset
out.Len = o.Length

case *fuseops.NotifyInvalEntry:
h.Error = fusekernel.NotifyCodeInvalEntry
out := (*fusekernel.NotifyInvalEntryOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyInvalEntryOut{}))))
out.Parent = uint64(o.Parent)
out.Namelen = uint32(len(o.Name))
m.AppendString(o.Name)
m.AppendString("\x00")

case *fuseops.NotifyDelete:
h.Error = fusekernel.NotifyCodeDelete
out := (*fusekernel.NotifyDeleteOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyDeleteOut{}))))
out.Parent = uint64(o.Parent)
out.Child = uint64(o.Child)
out.Namelen = uint32(len(o.Name))
m.AppendString(o.Name)
m.AppendString("\x00")

case *fuseops.NotifyStore:
h.Error = fusekernel.NotifyCodeStore
out := (*fusekernel.NotifyStoreOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyStoreOut{}))))
out.Nodeid = uint64(o.Inode)
out.Offset = o.Offset
out.Size = o.Length
m.Append(o.Data...)
m.ShrinkTo(buffer.OutMessageHeaderSize + int(unsafe.Sizeof(fusekernel.NotifyStoreOut{})) + int(o.Length))

case *fuseops.NotifyRetrieve:
h.Error = fusekernel.NotifyCodeRetrieve
out := (*fusekernel.NotifyRetrieveOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyRetrieveOut{}))))
out.Unique = o.Unique
out.Nodeid = uint64(o.Inode)
out.Offset = o.Offset
out.Size = o.Length

default:
panic(fmt.Sprintf("Unexpected notification: %#v", op))
}

return
}

////////////////////////////////////////////////////////////////////////
// General conversions
////////////////////////////////////////////////////////////////////////
Expand Down
124 changes: 124 additions & 0 deletions fuseops/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -1009,3 +1009,127 @@ type SyncFSOp struct {
Inode InodeID
OpContext OpContext
}

// Request notifications when the file system user calls poll/select or
// similar operations on a file.
type PollOp struct {
// The inode and handle the user wants to poll
Inode InodeID
Handle HandleID

// Kh is the "kernel handle". The reason behind it is that it's allocated
// by the kernel on file allocation and guaranteed to be unique as opposed
// to regular file handles (HandleID) generated by the userland server
// (by us). Kh has to be used in NotifyPollWakeupOut replies.
Kh uint64

// Poll flags
Flags fusekernel.PollFlags

// Requested events
Events fusekernel.PollEvents

// Set by the file system: the actual events that have happened
// since the last poll
Revents fusekernel.PollEvents
OpContext OpContext
}

// Notify consumers waiting for poll/epoll that events are incoming
// for the specified kernel handle. The kernel will send a PollOp request
// to get the event mask after receiving this notification
type NotifyPollWakeup struct {
Kh uint64
}

// Notify to invalidate cache for an inode.
//
// If the filesystem has writeback caching enabled, invalidating an inode
// will first trigger a writeback of all dirty pages. The call will block
// until all writeback requests have completed and the inode has been
// invalidated. It will, however, not wait for completion of pending writeback
// requests that have been issued before.
type NotifyInvalInode struct {
Inode InodeID
Offset int64
Length int64
}

// Notify to invalidate parent attributes and the dentry matching parent/name
//
// To avoid a deadlock this request must not be sent in the execution path
// of a related filesytem operation or within any code that could hold a lock
// that could be needed to execute such an operation. As of kernel 4.18, a
// "related operation" is a lookup(), symlink(), mknod(), mkdir(), unlink(),
// rename(), link() or create() request for the parent, and a setattr(),
// unlink(), rmdir(), rename(), setxattr(), removexattr(), readdir() or
// readdirplus() request for the inode itself.
//
// When called correctly, it will never block.
type NotifyInvalEntry struct {
Parent InodeID
Name string
}

// This request behaves like NotifyInvalEntry with the following additional
// effect (at least as of Linux kernel 4.8):
//
// If the provided child inode matches the inode that is currently associated
// with the cached dentry, and if there are any inotify watches registered for
// the dentry, then the watchers are informed that the dentry has been deleted.
//
// To avoid a deadlock this request must not be sent while executing a
// related filesytem operation or while holding a lock that could be needed to
// execute such an operation.
type NotifyDelete struct {
Parent InodeID
Child InodeID
Name string
}

// Store data to the kernel buffers
//
// Synchronously store data in the kernel buffers belonging to the given inode.
// The stored data is marked up-to-date (no read will be performed against it,
// unless it's invalidated or evicted from the cache).
//
// If the stored data overflows the current file size, then the size is extended,
// similarly to a write(2) on the filesystem.
//
// If this request returns an error, then the store wasn't fully completed, but
// it may have been partially completed.
type NotifyStore struct {
Inode InodeID
Offset uint64
Length uint32
Data [][]byte
}

// Retrieve data from the kernel buffers belonging to the given inode
//
// If successful then the kernel will send a NotifyRetrieveReplyOp as a reply.
// Only present pages are returned in the retrieve reply. Retrieving stops when it
// finds a non-present page and only data prior to that is returned.
//
// If this request returns an error, then the retrieve will not be completed and
// no reply will be sent.
//
// This request doesn't change the dirty state of pages in the kernel buffer. For
// dirty pages the write() method will be called regardless of having been retrieved
// previously.
type NotifyRetrieve struct {
Inode InodeID
Unique uint64
Offset uint64
Length uint32
}

// Matches the size of WriteIn
type NotifyRetrieveReplyOp struct {
Inode InodeID
Unique uint64
Offset uint64
Length uint32

OpContext OpContext
}
8 changes: 8 additions & 0 deletions fuseutil/file_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ type FileSystem interface {
SetXattr(context.Context, *fuseops.SetXattrOp) error
Fallocate(context.Context, *fuseops.FallocateOp) error
SyncFS(context.Context, *fuseops.SyncFSOp) error
Poll(context.Context, *fuseops.PollOp) error

SetConnection(*fuse.Connection)

// Regard all inodes (including the root inode) as having their lookup counts
// decremented to zero, and clean up any resources associated with the file
Expand Down Expand Up @@ -96,6 +99,8 @@ type fileSystemServer struct {
}

func (s *fileSystemServer) ServeOps(c *fuse.Connection) {
s.fs.SetConnection(c)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What’s this SetConnection call for? Is it needed? Can you add a comment to the source to explain?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's needed to have something to call Notify() on :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think SetConnection should be added to this interface.

If you want to store a connection field in your FS implementation, that’s fine.

Personally, I would just add a conn: conn, field to where I construct the &fuseFS{…}.

But if it doesn’t absolutely need to be in the interface (which I don’t think it does?), it should not be added.

Copy link
Contributor Author

@vitalif vitalif Aug 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be exported to the user FS code somehow. How to do it?
I mean, the thing that implements interface FileSystem must be able to call SetConnection() or Notify() on something.
Currently we do

fsint := NewGoofysFuse(fs)
server := fuseutil.NewFileSystemServer(fsint)
fuseMfs, err := fuse.Mount(fs.flags.MountPoint, server, mountCfg)

I think there's an option to add Notify() method to the server - would it be ok? It will only work when ServeOps(connection) is active though

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I still don’t quite understand.

Can you show me the code that uses this feature? Without an example implementation it’s hard to think this through.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the example. Seeing how the code is used makes me think that we should not pursue the SetConnection approach. It seems unclean to expose the internal connection and thereby let users call methods in any order (e.g. Connection.Reply, which the doc comment explicitly mentions should not be called).

@jacobsa Would you have a suggestion regarding your preferred way to model this? I’m thinking it would be cleaner to collect kernel notifications and then send them on the connection from an internal function. Perhaps that could also help in preventing the deadlock that is mentioned in the doc comment?


// When we are done, we clean up by waiting for all in-flight ops then
// destroying the file system.
defer func() {
Expand Down Expand Up @@ -240,6 +245,9 @@ func (s *fileSystemServer) handleOp(

case *fuseops.SyncFSOp:
err = s.fs.SyncFS(ctx, typed)

case *fuseops.PollOp:
err = s.fs.Poll(ctx, typed)
}

c.Reply(ctx, err)
Expand Down
9 changes: 9 additions & 0 deletions fuseutil/not_implemented_file_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,5 +210,14 @@ func (fs *NotImplementedFileSystem) SyncFS(
return fuse.ENOSYS
}

func (fs *NotImplementedFileSystem) Poll(
ctx context.Context,
op *fuseops.PollOp) error {
return fuse.ENOSYS
}

func (fs *NotImplementedFileSystem) SetConnection(*fuse.Connection) {
}

func (fs *NotImplementedFileSystem) Destroy() {
}
Loading
Loading