diff --git a/.clang-format b/.clang-format new file mode 100644 index 00000000..c22a34c0 --- /dev/null +++ b/.clang-format @@ -0,0 +1,95 @@ +--- +Language: Cpp +BasedOnStyle: LLVM +AccessModifierOffset: -4 +AlignAfterOpenBracket: Align +AlignConsecutiveAssignments: false +AlignConsecutiveDeclarations: false +AlignEscapedNewlinesLeft: true +AlignOperands: true +AlignTrailingComments: true +AllowAllParametersOfDeclarationOnNextLine: true +AllowShortBlocksOnASingleLine: false +AllowShortCaseLabelsOnASingleLine: false +AllowShortFunctionsOnASingleLine: Inline +AllowShortIfStatementsOnASingleLine: false +AllowShortLoopsOnASingleLine: false +AlwaysBreakAfterDefinitionReturnType: None +AlwaysBreakAfterReturnType: None +AlwaysBreakBeforeMultilineStrings: true +AlwaysBreakTemplateDeclarations: true +BinPackArguments: true +BinPackParameters: true +BraceWrapping: + AfterClass: false + AfterControlStatement: false + AfterEnum: false + AfterFunction: false + AfterNamespace: false + AfterObjCDeclaration: false + AfterStruct: false + AfterUnion: false + BeforeCatch: false + BeforeElse: false + IndentBraces: false +BreakBeforeBinaryOperators: None +BreakBeforeBraces: Attach +BreakBeforeTernaryOperators: true +BreakConstructorInitializersBeforeComma: false +BreakAfterJavaFieldAnnotations: false +BreakStringLiterals: true +ColumnLimit: 80 +CommentPragmas: '^ IWYU pragma:' +CompactNamespaces: false +ConstructorInitializerAllOnOneLineOrOnePerLine: true +ConstructorInitializerIndentWidth: 8 +ContinuationIndentWidth: 8 +Cpp11BracedListStyle: true +DerivePointerAlignment: false +DisableFormat: false +ExperimentalAutoDetectBinPacking: false +ForEachMacros: [ foreach, Q_FOREACH, BOOST_FOREACH ] +IncludeCategories: + - Regex: '^<.*\.h>' + Priority: 1 + - Regex: '^<.*' + Priority: 2 + - Regex: '.*' + Priority: 3 +IncludeIsMainRegex: '([-_](test|unittest))?$' +IndentCaseLabels: true +IndentWidth: 4 +IndentWrappedFunctionNames: false +JavaScriptQuotes: Leave +JavaScriptWrapImports: true +KeepEmptyLinesAtTheStartOfBlocks: false +MacroBlockBegin: '' +MacroBlockEnd: '' +MaxEmptyLinesToKeep: 1 +NamespaceIndentation: None +ObjCBlockIndentWidth: 2 +ObjCSpaceAfterProperty: false +ObjCSpaceBeforeProtocolList: false +PenaltyBreakBeforeFirstCallParameter: 1 +PenaltyBreakComment: 300 +PenaltyBreakFirstLessLess: 120 +PenaltyBreakString: 1000 +PenaltyExcessCharacter: 1000000 +PenaltyReturnTypeOnItsOwnLine: 200 +PointerAlignment: Left +ReflowComments: true +SortIncludes: true +SpaceAfterCStyleCast: false +SpaceBeforeAssignmentOperators: true +SpaceBeforeParens: ControlStatements +SpaceInEmptyParentheses: false +SpacesBeforeTrailingComments: 2 +SpacesInAngles: false +SpacesInContainerLiterals: true +SpacesInCStyleCastParentheses: false +SpacesInParentheses: false +SpacesInSquareBrackets: false +Standard: Auto +TabWidth: 8 +UseTab: Never +... \ No newline at end of file diff --git a/.gitignore b/.gitignore index 2ccd54ac..6d944114 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,5 @@ goofys goofys.test xout s3proxy.jar +/s3proxy.sh +/mnt \ No newline at end of file diff --git a/Makefile b/Makefile index a43d9d2b..ce51cfae 100644 --- a/Makefile +++ b/Makefile @@ -33,7 +33,7 @@ check: staticcheck check-fmt check-gomod .PHONY: staticcheck staticcheck: - @staticcheck -checks 'all,-ST1000,-U1000,-ST1020,-ST1001' ./... + @staticcheck -checks 'all,-ST1000,-U1000,-ST1020,-ST1001,-SA1019' ./... .PHONY: unparam unparam: diff --git a/api/api.go b/api/api.go index 47ac2a74..c97daa60 100644 --- a/api/api.go +++ b/api/api.go @@ -40,6 +40,8 @@ func Mount( fuseLog.Level = logrus.DebugLevel log.Level = logrus.DebugLevel mountCfg.DebugLogger = GetStdLogger(fuseLog, logrus.DebugLevel) + } else { + GetLogger("fuse").Level = logrus.InfoLevel } if flags.Backend == nil { diff --git a/api/common/config.go b/api/common/config.go index 17ec5124..51e79e87 100644 --- a/api/common/config.go +++ b/api/common/config.go @@ -47,11 +47,14 @@ type FlagStorage struct { Backend interface{} // Tuning - Cheap bool - ExplicitDir bool - StatCacheTTL time.Duration - TypeCacheTTL time.Duration - HTTPTimeout time.Duration + Cheap bool + ExplicitDir bool + BlockReadCache bool + BlockReadCacheSize uint64 + BlockReadCacheMemRatio float64 + StatCacheTTL time.Duration + TypeCacheTTL time.Duration + HTTPTimeout time.Duration // Debugging DebugFuse bool diff --git a/go.mod b/go.mod index 6a543646..08362b3b 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/jacobsa/fuse v0.0.0-20240909130001-a1c7c8268f12 github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 github.com/mitchellh/go-homedir v1.1.0 + github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 github.com/sevlyar/go-daemon v0.1.5 github.com/shirou/gopsutil/v3 v3.24.5 github.com/sirupsen/logrus v1.8.1 @@ -52,6 +53,7 @@ require ( github.com/google/s2a-go v0.1.4 // indirect github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect github.com/googleapis/gax-go/v2 v2.11.0 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/kr/pretty v0.2.1 // indirect github.com/kr/text v0.1.0 // indirect @@ -59,7 +61,6 @@ require ( github.com/mattn/go-ieproxy v0.0.1 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect - github.com/smartystreets/goconvey v1.8.1 // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect @@ -79,6 +80,7 @@ require ( ) require ( - golang.org/x/exp/typeparams v0.0.0-20231108232855-2478ac86f678 // indirect + github.com/smartystreets/goconvey v1.8.1 // indirect + golang.org/x/exp/typeparams v0.0.0-20221208152030-732eee02a75a // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 28daf79f..26c46ed5 100644 --- a/go.sum +++ b/go.sum @@ -126,6 +126,8 @@ github.com/googleapis/gax-go/v2 v2.11.0/go.mod h1:DxmR61SGKkGLa2xigwuZIQpkCI2S5i github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g= github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/jacobsa/fuse v0.0.0-20240909130001-a1c7c8268f12 h1:PIkShcSGp+IJB3h3Du/hNtrvug0b0o88p9pW8GF9xcg= github.com/jacobsa/fuse v0.0.0-20240909130001-a1c7c8268f12/go.mod h1:JYi9iIxdYNgxmMgLwtSHO/hmVnP2kfX1oc+mtx+XWLA= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= @@ -149,6 +151,8 @@ github.com/mattn/go-ieproxy v0.0.1 h1:qiyop7gCflfhwCzGyeT0gro3sF9AIg9HU98JORTkqf github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/gNWuh88E= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0= +github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -212,8 +216,8 @@ golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58 golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp/typeparams v0.0.0-20231108232855-2478ac86f678 h1:1P7xPZEwZMoBoz0Yze5Nx2/4pxj6nw9ZqHWXqP0iRgQ= -golang.org/x/exp/typeparams v0.0.0-20231108232855-2478ac86f678/go.mod h1:AbB0pIl9nAr9wVwH+Z2ZpaocVmF5I4GyWCDIsVjR0bk= +golang.org/x/exp/typeparams v0.0.0-20221208152030-732eee02a75a h1:Jw5wfR+h9mnIYH+OtGT2im5wV1YGGDora5vTv/aa5bE= +golang.org/x/exp/typeparams v0.0.0-20221208152030-732eee02a75a/go.mod h1:AbB0pIl9nAr9wVwH+Z2ZpaocVmF5I4GyWCDIsVjR0bk= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= diff --git a/internal/backend_s3.go b/internal/backend_s3.go index b64945bd..4f56f6d9 100644 --- a/internal/backend_s3.go +++ b/internal/backend_s3.go @@ -22,6 +22,7 @@ import ( "net/url" "strconv" "strings" + "sync" "sync/atomic" "syscall" "time" @@ -53,18 +54,28 @@ type S3Backend struct { aws bool gcs bool v2Signer bool + + // get stats + getStatsLock *sync.Mutex + totalGetCnt uint64 + totalGetBytes uint64 + prevGetPrintCnt uint64 + prevGetPrintTime time.Time } +const s3PricePer1kGet = 0.0004 + func NewS3(bucket string, flags *FlagStorage, config *S3Config) (*S3Backend, error) { awsConfig, err := config.ToAwsConfig(flags) if err != nil { return nil, err } s := &S3Backend{ - bucket: bucket, - awsConfig: awsConfig, - flags: flags, - config: config, + bucket: bucket, + awsConfig: awsConfig, + flags: flags, + config: config, + getStatsLock: &sync.Mutex{}, cap: Capabilities{ Name: "s3", MaxMultipartSize: 5 * 1024 * 1024 * 1024, @@ -748,6 +759,7 @@ func (s *S3Backend) GetBlob(param *GetBlobInput) (*GetBlobOutput, error) { if err != nil { return nil, mapAwsError(err) } + s.updateGetStats(uint64(*resp.ContentLength)) return &GetBlobOutput{ HeadBlobOutput: HeadBlobOutput{ @@ -766,6 +778,33 @@ func (s *S3Backend) GetBlob(param *GetBlobInput) (*GetBlobOutput, error) { }, nil } +func (s *S3Backend) updateGetStats(size uint64) { + getCnt := atomic.AddUint64(&s.totalGetCnt, 1) + getBytes := atomic.AddUint64(&s.totalGetBytes, size) + + if getCnt < atomic.LoadUint64(&s.prevGetPrintCnt)+5000 { + return + } + + curTime := time.Now() + s.getStatsLock.Lock() + if curTime.Sub(s.prevGetPrintTime).Seconds() >= 5 { + s.prevGetPrintTime = curTime + s.getStatsLock.Unlock() + atomic.StoreUint64(&s.prevGetPrintCnt, getCnt) + + cost := float64(getCnt) / 1000 * s3PricePer1kGet + log.Infof("S3: GET stats: req=%v avg_size=%.3fMiB"+ + " total_cost=$%.3f avg_cost=$%.3f/GiB", + getCnt, + float64(getBytes)/float64(getCnt*1024*1024), + cost, + cost/float64(getBytes)/1024/1024/1024) + } else { + s.getStatsLock.Unlock() + } +} + func getDate(resp *http.Response) *time.Time { date := resp.Header.Get("Date") if date != "" { diff --git a/internal/block_cache.go b/internal/block_cache.go new file mode 100644 index 00000000..6b3a9b67 --- /dev/null +++ b/internal/block_cache.go @@ -0,0 +1,517 @@ +package internal + +import ( + . "github.com/voyvodov/goofys/api/common" + + "fmt" + "io" + "math" + "sync" + "sync/atomic" + "time" + + lru "github.com/hashicorp/golang-lru/v2" + "github.com/jacobsa/fuse/fuseops" + "github.com/pbnjay/memory" +) + +// file smaller than this will be read at once +const blockCacheReadAllSize = 20 * 1024 * 1024 // 20MiB + +// maximum number of threads working on prefetch or readahead +// note: prefetch is to utilize latency and match throughput; readahead is to +// aggressively read future reads +const maxPrefetchThreads = int32(16) + +type cacheKey struct { + inode fuseops.InodeID + blkID uint64 // ID of the block (offset div by BLOCK_SIZE) +} + +type CacheValue struct { + mu sync.RWMutex + Data []byte // nil if error is present + Err error // should be checked before trying to access the value +} + +// BlockCache is cache for read-only files based on large blocks +type BlockCache struct { + mu sync.RWMutex + lru *lru.Cache[cacheKey, *CacheValue] + blockSize uint64 + + // number of alive prefetch threads + numPrefetchThreads int32 + + nrLatencyBlock int // max blocks that can be read without more latency + nrThroughputBlock int // min blocks that can be read with max throughput + benchmarkDone sync.WaitGroup + benchmarkOnce sync.Once +} + +// result of reading the cache +type cacheReadResult struct { + blkID uint64 // block ID + block *CacheValue // the cache block; guaranteed to be non-nil + + // whether this block is newly created; if it is true, write lock of block + // is acquired + isNewAlloc bool +} + +func NewBlockCache(flags *FlagStorage) *BlockCache { + size := int(float64(memory.TotalMemory()/flags.BlockReadCacheSize) * + flags.BlockReadCacheMemRatio) + size = MaxInt(size, + (blockCacheReadAllSize*2-1)/int(flags.BlockReadCacheSize)+1) + cache, err := lru.New[cacheKey, *CacheValue](size) + if err != nil { + log.Panic("failed to allocate LRU cache", err) + } + log.Infof("block cache: size=%d mem=%.2fGiB", size, + float64(size)*float64(flags.BlockReadCacheSize)/1024/1024/1024) + ret := &BlockCache{ + lru: cache, + blockSize: flags.BlockReadCacheSize, + } + ret.benchmarkDone.Add(1) + return ret +} + +// Remove cache entries for a specific inode +func (bc *BlockCache) RemoveCache(inode *Inode) { + var keys []cacheKey + var key cacheKey + bc.mu.Lock() + defer bc.mu.Unlock() + + for _, key = range bc.lru.Keys() { + if key.inode == inode.ID { + keys = append(keys, key) + } + } + + for _, key = range keys { + bc.lru.Remove(key) + } +} + +// Get a cache node containing given offset (which must be within the file +// size); this function requests the remote header if cache is unavailable yet. +// Remember to check the error marker before accessing the data. +// If aggressive_prefetch is true, will try to prefetch further blocks. +func (bc *BlockCache) Get(fh *FileHandle, offset uint64) ( + cacheBlock *CacheValue, cacheOffset uint64) { + + fileSize := fh.inode.Attributes.Size + assert(offset < fileSize, "invalid Get") + + blkID := offset / bc.blockSize + cacheOffset = blkID * bc.blockSize + + headRead := bc.getOrAllocate(fh.inode, blkID) + cacheBlock = headRead.block + + if headRead.isNewAlloc { + var blocks []cacheReadResult + + if fileSize <= blockCacheReadAllSize { + numBlocks := int( + DivUpUint64(fileSize, bc.blockSize) - headRead.blkID) + blocks = make([]cacheReadResult, 0, numBlocks) + blocks = append(blocks, headRead) + blocks = append(blocks, + bc.getOrAllocateMany(fh.inode, blkID+1, numBlocks-1)...) + } else { + bc.ensureBenchmark(fh) + blocks = bc.getReadAheadBlocks(fh, headRead) + } + + assert(len(blocks) > 0, "invalid read ahead blocks") + if len(blocks) == 1 { + bc.readMultipleBlocks(fh, blocks, false) + } else { + go bc.readMultipleBlocks( + fh, blocks, + fileSize > blockCacheReadAllSize && + len(blocks) > bc.nrLatencyBlock) + } + } + + cacheBlock.mu.RLock() // wait the writer to finish + defer cacheBlock.mu.RUnlock() + assert((cacheBlock.Err == nil) == (cacheBlock.Data != nil), + "block not read") + return +} + +// Start a thread to do read ahead at given location. Return false if prefetch +// threads are exhausted, and true otherwise. +func (bc *BlockCache) StartReadAhead( + fh *FileHandle, offset uint64, size uint64) bool { + + assert(size > 0 && offset < fh.inode.Attributes.Size, "invalid readahead") + + if !bc.acquirePrefetchThread() { + return false + } + + head := bc.getOrAllocate(fh.inode, offset/bc.blockSize) + if !head.isNewAlloc { + // already cached (probably being worked on by existing read ahead), do + // not try to read again + bc.releasePrefetchThread() + return true + } + + go func() { + nrBlock := int(DivUpUint64( + MinUInt64(fh.inode.Attributes.Size, offset+size), + bc.blockSize) - head.blkID) + assert(nrBlock > 0, "invalid nrBlock") + blocks := make([]cacheReadResult, 0, nrBlock) + blocks = append(blocks, head) + blocks = append(blocks, + bc.getOrAllocateMany(fh.inode, head.blkID+1, nrBlock-1)...) + bc.readMultipleBlocks(fh, blocks, true) + }() + return true +} + +// Read into consicutive blocks. +// Write locks will be released after the read is finished. +// If ownPrefetchThread is true, the prefetch thread will be released +func (bc *BlockCache) readMultipleBlocks( + fh *FileHandle, blocks []cacheReadResult, ownPrefetchThread bool) { + + assert(len(blocks) > 0 && blocks[0].isNewAlloc, + "empty blocks for readMultipleBlocks") + + for i := 0; i < len(blocks); i++ { + assert(blocks[i].blkID-blocks[0].blkID == uint64(i), + "block not consecutive") + } + + // remove trailing blocks that are already prepared + for !blocks[len(blocks)-1].isNewAlloc { + blocks = blocks[:len(blocks)-1] + } + + totSize := MinUInt64( + bc.blockSize*uint64(len(blocks)), + fh.inode.Attributes.Size-blocks[0].blkID*bc.blockSize) + resp, err := fh.cloud.GetBlob(&GetBlobInput{ + Key: fh.key, + Start: blocks[0].blkID * bc.blockSize, + Count: totSize, + }) + finished := 0 + + // release the thread, and set error markers + defer func() { + if ownPrefetchThread { + bc.releasePrefetchThread() + } + + for i := finished; i < len(blocks); i++ { + assert(err != nil, "unhandled block read") + if blocks[i].isNewAlloc { + c := blocks[i].block + c.Err = err + c.mu.Unlock() + } + } + }() + + if err != nil { + return + } + + reader := resp.Body + defer reader.Close() + + remainSize := totSize + + for ; finished < len(blocks); finished++ { + blkDone := 0 + curRead := 0 + assert(remainSize > 0, "remain_size=0: blocks exceed file size") + bufSize := int(MinUInt64(remainSize, bc.blockSize)) + remainSize -= uint64(bufSize) + buf := make([]byte, bufSize) + for blkDone < bufSize { + curRead, err = reader.Read(buf[blkDone:]) + blkDone += curRead + if err != nil && (err != io.EOF || blkDone != bufSize) { + // the deferred function will set error markers + return + } + } + if blocks[finished].isNewAlloc { + c := blocks[finished].block + c.Data = buf + c.mu.Unlock() + } + } +} + +// Get blocks for read ahead; blk_id is the first block requested by client +// This function also acquires a prefetch thread if return length is more than +// latency_blocks +func (bc *BlockCache) getReadAheadBlocks( + fh *FileHandle, head cacheReadResult) (ret []cacheReadResult) { + + ret = make([]cacheReadResult, 0, bc.nrThroughputBlock) + ret = append(ret, head) + + // max number of blocks to read + maxBlocks := int(MinUInt64( + DivUpUint64(fh.inode.Attributes.Size, bc.blockSize)-head.blkID, + uint64(bc.nrThroughputBlock))) + + ret = append(ret, bc.getOrAllocateMany(fh.inode, head.blkID+1, + MinInt(maxBlocks, bc.nrLatencyBlock)-1)...) + + if bc.nrLatencyBlock >= maxBlocks { + return + } + + assert(len(ret) == bc.nrLatencyBlock, "ret size does not match") + + if !bc.acquirePrefetchThread() { + return + } + + // read ahead to get max throughput, but stop at the first existing block + ret = append(ret, + bc.getOrAllocateMany2( + fh.inode, + head.blkID+uint64(bc.nrLatencyBlock), + maxBlocks-bc.nrLatencyBlock, + true)...) + + if len(ret) == bc.nrLatencyBlock { + // no new blocks are added + bc.releasePrefetchThread() + } + + return +} + +// try to acquire a prefetch thread; return false if the limit is reached +func (bc *BlockCache) acquirePrefetchThread() bool { + num := atomic.AddInt32(&bc.numPrefetchThreads, 1) + if num > maxPrefetchThreads { + bc.releasePrefetchThread() + assert(num <= maxPrefetchThreads+10, + "num_prefetch_threads too high") + return false + } + return true +} + +func (bc *BlockCache) releasePrefetchThread() { + num := atomic.AddInt32(&bc.numPrefetchThreads, -1) + assert(num >= 0, "num_prefetch_threads < 0") +} + +// Get a cache block. new_alloc indicates if this block is newly allocated. If +// it is true, the write lock is acquired, and the caller should fill in the +// data. +func (bc *BlockCache) getOrAllocate( + inode *Inode, blkID uint64) (ret cacheReadResult) { + return bc.getOrAllocateMany(inode, blkID, 1)[0] +} + +// Get consecutive cache blocks starting at blk_id +func (bc *BlockCache) getOrAllocateMany( + inode *Inode, blkID uint64, num int) (ret []cacheReadResult) { + return bc.getOrAllocateMany2(inode, blkID, num, false) +} + +// return immediately if requireNewAlloc is true but the block already exists +func (bc *BlockCache) getOrAllocateMany2( + inode *Inode, blkID uint64, num int, requireNewAlloc bool) ( + ret []cacheReadResult) { + + assert(num >= 0, "invalid num") + ret = make([]cacheReadResult, 0, num) + + if num == 0 { + return nil + } + + var ok bool + var block *CacheValue + + hasBcWriteLock := false + bc.mu.RLock() + + for i := 0; i < num; i++ { + key := cacheKey{inode.ID, blkID + uint64(i)} + assert(key.blkID*bc.blockSize < inode.Attributes.Size, + "block exceeds file size") + + block, ok = bc.lru.Get(key) + if ok { + if requireNewAlloc { + break + } + ret = append(ret, cacheReadResult{ + blkID: key.blkID, + block: block, + isNewAlloc: false, + }) + continue + } + + if !hasBcWriteLock { + bc.mu.RUnlock() + bc.mu.Lock() + i-- // try getting again with write lock + hasBcWriteLock = true + continue + } + + // create a new block now + block = &CacheValue{} + block.mu.Lock() // acquire cache wlock before returning + ret = append(ret, cacheReadResult{ + blkID: key.blkID, + block: block, + isNewAlloc: true, + }) + bc.lru.Add(key, block) + } + + if hasBcWriteLock { + bc.mu.Unlock() + } else { + bc.mu.RUnlock() + } + return +} + +// remove a cache entry, usually used due to read error +func (bc *BlockCache) Remove(inode *Inode, offset uint64) { + key := cacheKey{inode.ID, offset / bc.blockSize} + bc.mu.Lock() + bc.lru.Remove(key) + bc.mu.Unlock() +} + +func assert(cond bool, msg string) { + if !cond { + panic(msg) + } +} + +// Run the benchmark if it is not done; file size must be at least +// BLOCK_CACHE_FILE_SIZE_MIN +func (bc *BlockCache) ensureBenchmark(fh *FileHandle) { + + // run a single benchmark and return its execution time in seconds + benchmarkOne := func(size int) (secs float64) { + buf := make([]byte, size) + + do := func() float64 { + begin := time.Now() + resp, err := fh.cloud.GetBlob(&GetBlobInput{ + Key: fh.key, + Start: 0, + Count: uint64(size), + }) + if err != nil { + panic(fmt.Sprintf("failed to benchmark: %v", err)) + } + reader := resp.Body + defer reader.Close() + done := 0 + var n int + for done < size { + n, err = reader.Read(buf[done:]) + done += n + if err != nil && (err != io.EOF || done != size) { + panic(fmt.Sprintf("failed to benchmark: %v", err)) + } + } + return time.Since(begin).Seconds() + } + + do() // warm up + + nrun := 0 + totTime := float64(0) + totTime2 := float64(0) + var std float64 + for { + nrun++ + thisTime := do() + totTime += thisTime + totTime2 += thisTime * thisTime + if nrun >= 2 && totTime >= 0.1 { + n := float64(nrun) + mean := totTime / n + std = math.Sqrt(math.Max( + (totTime2/n-mean*mean)*(1+1/(n-1)), 0)) + if std/mean < 0.1 || totTime > 2 { + secs = mean + break + } + } + } + fuseLog.Infof("benchmark: size=%.2fMiB time=%.2fms"+ + "(+-%.2f) thrpt=%.2fMiB/s (%d runs, tot %.2fs)", + float64(size)/1024/1024, secs*1000, std*1000, + float64(size)/1024/1024/secs, nrun, totTime) + return secs + } + + benchmark := func() { + blockSize := int(bc.blockSize) + latency := benchmarkOne(blockSize) + throughput := blockCacheReadAllSize / + benchmarkOne(blockCacheReadAllSize) + + maxNrBlock := blockCacheReadAllSize / blockSize + + nrBlock := 2 + bc.nrLatencyBlock = maxNrBlock + for nrBlock < maxNrBlock { + curLat := benchmarkOne(blockSize * nrBlock) + if curLat > latency*1.2 { + bc.nrLatencyBlock = nrBlock / 2 + break + } + nrBlock *= 2 + } + + nrBlock = maxNrBlock / 2 + bc.nrThroughputBlock = bc.nrLatencyBlock + for nrBlock > bc.nrLatencyBlock { + size := blockSize * nrBlock + curThrpt := float64(size) / benchmarkOne(size) + if curThrpt < throughput*0.8 { + bc.nrThroughputBlock = nrBlock * 2 + break + } + nrBlock /= 2 + } + bc.benchmarkDone.Done() + + blockSizeMb := float64(blockSize) / 1024 / 1024 + fuseLog.Infof( + "block cache benchmark: latency=%.2fms throughput=%.2fMiB/s"+ + " latency_blocks=%d(%.2fMiB) throughput_blocks=%d(%.2fMiB)", + latency*1000, throughput/1024/1024, + bc.nrLatencyBlock, + float64(bc.nrLatencyBlock)*blockSizeMb, + bc.nrThroughputBlock, + float64(bc.nrThroughputBlock)*blockSizeMb) + } + + bc.benchmarkOnce.Do(benchmark) + bc.benchmarkDone.Wait() + assert(0 < bc.nrLatencyBlock && bc.nrLatencyBlock <= bc.nrThroughputBlock, + "invalid benchmark result") +} diff --git a/internal/file.go b/internal/file.go index 4dd894e6..4c067f10 100644 --- a/internal/file.go +++ b/internal/file.go @@ -36,6 +36,8 @@ type FileHandle struct { writeInit sync.Once mpuWG sync.WaitGroup + writeCachePurgeInit sync.Once // purge the read cache of this file + mu sync.Mutex mpuID *MultipartBlobCommitInput nextWriteOffset int64 @@ -50,11 +52,16 @@ type FileHandle struct { reader io.ReadCloser readBufOffset int64 + blockCacheLastOffset int64 + blockCacheReadAheadEnabled bool + // parallel read - buffers []*S3ReadBuffer + buffers []*S3ReadBuffer // prefetch buffers existingReadahead int seqReadAmount uint64 numOOORead uint64 // number of out of order read + numOOOPrefetch uint64 // number of out of order read with prefetch + numOOOReadReq uint64 // number of out of order read requests // User space PID. All threads created by a process will have the same TGID, // but different PIDs[1]. // This value can be nil if we fail to get TGID from PID[2]. @@ -73,7 +80,7 @@ const readAheadChunk = uint32(20 * 1024 * 1024) func NewFileHandle(inode *Inode, opMetadata fuseops.OpContext) *FileHandle { tgid, err := GetTgid(opMetadata.Pid) if err != nil { - log.Debugf( + fuseLog.Debugf( "Failed to retrieve tgid for the given pid. pid: %v err: %v inode id: %v err: %v", opMetadata.Pid, err, inode.ID, err) } @@ -231,6 +238,13 @@ func (fh *FileHandle) uploadCurrentBuf(parallel bool) (err error) { func (fh *FileHandle) WriteFile(offset int64, data []byte) (err error) { fh.inode.logFuse("WriteFile", offset, len(data)) + fh.writeCachePurgeInit.Do(func() { + bc := fh.inode.fs.blockCache + if bc != nil { + go bc.RemoveCache(fh.inode) + } + }) + fh.mu.Lock() defer fh.mu.Unlock() @@ -440,6 +454,7 @@ func (fh *FileHandle) readFromReadAhead(offset uint64, buf []byte) (bytesRead in // we've exhausted the first buffer readAheadBuf.buf.Close() fh.buffers = fh.buffers[1:] + fh.numOOOPrefetch = 0 // clear the counter after reading one buffer } buf = buf[nread:] @@ -454,8 +469,16 @@ func (fh *FileHandle) readFromReadAhead(offset uint64, buf []byte) (bytesRead in } func (fh *FileHandle) readAhead(offset uint64, needAtLeast int) (err error) { + expectOffset := offset existingReadahead := uint32(0) - for _, b := range fh.buffers { + for bidx, b := range fh.buffers { + if b.offset != expectOffset { + err = fmt.Errorf( + "read ahead buffer offset mismatch: file=%s buf=%v expect=%v got=%v", + *fh.inode.FullName(), bidx, expectOffset, b.offset) + return + } + expectOffset += uint64(b.size) existingReadahead += b.size } @@ -526,17 +549,12 @@ func (fh *FileHandle) ReadFile(offset int64, buf []byte) (bytesRead int, err err return } -func (fh *FileHandle) readFile(offset int64, buf []byte) (bytesRead int, err error) { - defer func() { - if bytesRead > 0 { - fh.readBufOffset += int64(bytesRead) - fh.seqReadAmount += uint64(bytesRead) - } - - fh.inode.logFuse("< readFile", bytesRead, err) - }() +func (fh *FileHandle) readFileBlockCache( + offset int64, buf []byte) (bytesRead int, err error) { + bytesRead = 0 + err = nil - if uint64(offset) >= fh.inode.Attributes.Size { + if offset < 0 || uint64(offset) >= fh.inode.Attributes.Size { // nothing to read if fh.inode.Invalid { err = fuse.ENOENT @@ -548,35 +566,101 @@ func (fh *FileHandle) readFile(offset int64, buf []byte) (bytesRead int, err err return } - fs := fh.inode.fs - - if fh.poolHandle == nil { - fh.poolHandle = fs.bufferPool + bc := fh.inode.fs.blockCache + cache, cacheOff := bc.Get(fh, uint64(offset)) + if cache.Err != nil { + err = cache.Err + fuseLog.Errorf("readFileBlockCache (%s: %v(+%v)/%v): %v", + *fh.inode.FullName(), offset, len(buf), fh.inode.Attributes.Size, + err) + return } - if fh.readBufOffset != offset { - // XXX out of order read, maybe disable prefetching - fh.inode.logFuse("out of order read", offset, fh.readBufOffset) + cachedData := cache.Data[offset-int64(cacheOff):] + bytesRead = MinInt(len(cachedData), len(buf)) + assert(bytesRead > 0, "zero bytes read") + copy(buf[:bytesRead], cachedData[:bytesRead]) - fh.readBufOffset = offset + if AbsInt64(offset-fh.blockCacheLastOffset) < int64(bc.blockSize)*2 { + fh.seqReadAmount += uint64(bytesRead) + if fh.seqReadAmount >= uint64(readAheadChunk) { + if !fh.blockCacheReadAheadEnabled { + fh.blockCacheReadAheadEnabled = true + fuseLog.Infof("enable block cache read ahead for %s", + *fh.inode.FullName()) + } + } + } else { fh.seqReadAmount = 0 - if fh.reader != nil { - fh.reader.Close() - fh.reader = nil + if fh.blockCacheReadAheadEnabled { + fh.blockCacheReadAheadEnabled = false + fuseLog.Infof("disable block cache read ahead for %s", + *fh.inode.FullName()) } + } + fh.blockCacheLastOffset = offset + int64(bytesRead) + if fh.blockCacheReadAheadEnabled { + fh.startBlockCacheReadAhead(uint64(offset)) + } - if fh.buffers != nil { - // we misdetected - fh.numOOORead++ + return +} + +func (fh *FileHandle) startBlockCacheReadAhead(offset uint64) { + CHUNK := uint64(readAheadChunk) + offset = (offset + CHUNK - 1) / CHUNK * CHUNK + maxOffset := MinUInt64( + fh.inode.Attributes.Size, offset+uint64(maxReadAhead)) + bc := fh.inode.fs.blockCache + for offset+CHUNK <= maxOffset { + if !bc.StartReadAhead(fh, offset, CHUNK) { + return } + offset += CHUNK + } +} +// close all readahead buffers and clear the buffer varaible +func (fh *FileHandle) closeAllBuffers() { + if fh.buffers != nil { for _, b := range fh.buffers { b.buf.Close() } fh.buffers = nil } +} - if !fs.flags.Cheap && fh.seqReadAmount >= uint64(readAheadChunk) && fh.numOOORead < 3 { +func (fh *FileHandle) readFile(offset int64, buf []byte) (bytesRead int, err error) { + fs := fh.inode.fs + + if fs.blockCache != nil { + return fh.readFileBlockCache(offset, buf) + } + + if fs.flags.DebugFuse { + defer func() { + fh.inode.logFuse("< readFile", bytesRead, err) + }() + } + + if uint64(offset) >= fh.inode.Attributes.Size { + // nothing to read + if fh.inode.Invalid { + err = fuse.ENOENT + } else if fh.inode.KnownSize == nil { + err = io.EOF + } else { + err = io.EOF + } + return + } + + if fh.poolHandle == nil { + fh.poolHandle = fs.bufferPool + } + + if !fs.flags.Cheap && fh.seqReadAmount >= uint64(readAheadChunk) && + fh.numOOOPrefetch < 3 { if fh.reader != nil { fh.inode.logFuse("cutover to the parallel algorithm") fh.reader.Close() @@ -589,26 +673,20 @@ func (fh *FileHandle) readFile(offset int64, buf []byte) (bytesRead int, err err return } else { // fall back to read serially - fh.inode.logFuse("not enough memory, fallback to serial read") + fuseLog.Errorf("read ahead failed: %v; fallback to serial read", err) fh.seqReadAmount = 0 - for _, b := range fh.buffers { - b.buf.Close() - } - fh.buffers = nil + fh.closeAllBuffers() } } - bytesRead, err = fh.readFromStream(offset, buf) + bytesRead, err = fh.readFromUpstream(offset, buf) return } func (fh *FileHandle) Release() { // read buffers - for _, b := range fh.buffers { - b.buf.Close() - } - fh.buffers = nil + fh.closeAllBuffers() if fh.reader != nil { fh.reader.Close() @@ -635,10 +713,17 @@ func (fh *FileHandle) Release() { } } -func (fh *FileHandle) readFromStream(offset int64, buf []byte) (bytesRead int, err error) { +// read from upstream, maintaining fh.reader related states +func (fh *FileHandle) readFromUpstream( + offset int64, buf []byte) (bytesRead int, err error) { defer func() { if fh.inode.fs.flags.DebugFuse { - fh.inode.logFuse("< readFromStream", bytesRead) + fh.inode.logFuse("< readFromUpstream", bytesRead) + } + + if bytesRead > 0 { + fh.readBufOffset += int64(bytesRead) + fh.seqReadAmount += uint64(bytesRead) } }() @@ -647,6 +732,22 @@ func (fh *FileHandle) readFromStream(offset int64, buf []byte) (bytesRead int, e return } + if fh.readBufOffset != offset && fh.reader != nil { + // XXX out of order read, maybe disable prefetching + fh.inode.logFuse("out of order read", offset, fh.readBufOffset) + + fh.reader.Close() + fh.reader = nil + + fh.numOOOReadReq++ + + if fh.buffers != nil { + // we misdetected + fh.numOOOPrefetch++ + fh.closeAllBuffers() + } + } + if fh.reader == nil { resp, err := fh.cloud.GetBlob(&GetBlobInput{ Key: fh.key, @@ -657,6 +758,8 @@ func (fh *FileHandle) readFromStream(offset int64, buf []byte) (bytesRead int, e } fh.reader = resp.Body + fh.readBufOffset = offset + fh.seqReadAmount = 0 } bytesRead, err = fh.reader.Read(buf) diff --git a/internal/flags.go b/internal/flags.go index c6edc41a..3fae2b08 100644 --- a/internal/flags.go +++ b/internal/flags.go @@ -219,7 +219,20 @@ func NewApp() (app *cli.App) { Name: "cheap", Usage: "Reduce S3 operation costs at the expense of some performance (default: off)", }, - + cli.BoolFlag{ + Name: "block-read-cache", + Usage: "Enable block read cache, which aggressively reads ahead based on blocks", + }, + cli.Uint64Flag{ + Name: "block-read-cache-size", + Usage: "Size of each block of block read cache, in bytes", + Value: 1024 * 256, + }, + cli.Float64Flag{ + Name: "block-read-cache-mem-ratio", + Usage: "Ratio of total RAM to be reserved for block read cache", + Value: 0.1, + }, cli.BoolFlag{ Name: "no-implicit-dir", Usage: "Assume all directory objects (\"dir/\") exist (default: off)", @@ -280,7 +293,9 @@ func NewApp() (app *cli.App) { flagCategories[f] = "aws" } - for _, f := range []string{"cheap", "no-implicit-dir", "stat-cache-ttl", "type-cache-ttl", "http-timeout"} { + for _, f := range []string{"cheap", "block-read-cache", + "block-read-cache-size", "block-read-cache-mem-ratio", "no-implicit-dir", + "stat-cache-ttl", "type-cache-ttl", "http-timeout"} { flagCategories[f] = "tuning" } @@ -332,11 +347,14 @@ func PopulateFlags(c *cli.Context) (ret *FlagStorage) { GID: uint32(c.Int("gid")), // Tuning, - Cheap: c.Bool("cheap"), - ExplicitDir: c.Bool("no-implicit-dir"), - StatCacheTTL: c.Duration("stat-cache-ttl"), - TypeCacheTTL: c.Duration("type-cache-ttl"), - HTTPTimeout: c.Duration("http-timeout"), + Cheap: c.Bool("cheap"), + BlockReadCache: c.Bool("block-read-cache"), + BlockReadCacheSize: c.Uint64("block-read-cache-size"), + BlockReadCacheMemRatio: c.Float64("block-read-cache-mem-ratio"), + ExplicitDir: c.Bool("no-implicit-dir"), + StatCacheTTL: c.Duration("stat-cache-ttl"), + TypeCacheTTL: c.Duration("type-cache-ttl"), + HTTPTimeout: c.Duration("http-timeout"), // Common Backend Config Endpoint: c.String("endpoint"), diff --git a/internal/goofys.go b/internal/goofys.go index 30458569..47a677c7 100644 --- a/internal/goofys.go +++ b/internal/goofys.go @@ -92,6 +92,8 @@ type Goofys struct { restorers *Ticket forgotCnt uint32 + + blockCache *BlockCache } var s3Log = GetLogger("s3") @@ -178,6 +180,10 @@ func newGoofys(ctx context.Context, bucket string, flags *FlagStorage, umask: 0122, } + if flags.BlockReadCache { + fs.blockCache = NewBlockCache(flags) + } + var prefix string colon := strings.Index(bucket, ":") if colon != -1 { diff --git a/internal/utils.go b/internal/utils.go index 6c824cee..94b40244 100644 --- a/internal/utils.go +++ b/internal/utils.go @@ -26,6 +26,17 @@ import ( var timeMax = time.Unix(1<<63-62135596801, 999999999) +func DivUpUint64(a, b uint64) uint64 { + return (a + b - 1) / b +} + +func AbsInt64(x int64) int64 { + if x < 0 { + return -x + } + return x +} + func MaxInt(a, b int) int { if a > b { return a diff --git a/test/mount_parallel_read.sh b/test/mount_parallel_read.sh new file mode 100644 index 00000000..caa4ee41 --- /dev/null +++ b/test/mount_parallel_read.sh @@ -0,0 +1,7 @@ +#!/bin/bash -e + +echo 'building ...' +go build + +echo 'Starting the server ...' +exec ./goofys --endpoint=http://127.0.0.1:9090 -f "$@" test mnt \ No newline at end of file diff --git a/test/prepare_parallel_read.sh b/test/prepare_parallel_read.sh new file mode 100644 index 00000000..9a33c710 --- /dev/null +++ b/test/prepare_parallel_read.sh @@ -0,0 +1,10 @@ +#!/bin/bash -e + +mkdir -p /tmp/s3proxy/test +for i in f1 f2 f3 f4; do + of=/tmp/s3proxy/test/$i + [ -f $of ] || dd if=/dev/urandom of=$of bs=1M count=1024 +done +of=/tmp/s3proxy/test/small +[ -f $of ] || dd if=/dev/urandom of=$of bs=1 count=1234513 +./s3proxy.sh --properties $(dirname $0)/test_parallel_read.properties \ No newline at end of file diff --git a/test/run_parallel_read.sh b/test/run_parallel_read.sh new file mode 100644 index 00000000..4beaf01f --- /dev/null +++ b/test/run_parallel_read.sh @@ -0,0 +1,25 @@ +#!/bin/bash -e + +set -u -e + +cd "$(dirname "$0")"/.. +rm -rf test/output +mkdir test/output +args='' + +files="f1 f2 f3 f4" + +for i in $files; do + args="$args mnt/$i test/output/$i" +done + +./test/test_parallel_read $args + +files="$files small" + +cp mnt/small test/output/small + +for i in $files; do + echo "checking $i" + cmp /tmp/s3proxy/test/$i test/output/$i +done \ No newline at end of file diff --git a/test/test_parallel_read.cpp b/test/test_parallel_read.cpp new file mode 100644 index 00000000..3049af7c --- /dev/null +++ b/test/test_parallel_read.cpp @@ -0,0 +1,148 @@ +// test parallel read from multiple threads on the same file; mainly used for +// testing and benchmarking block read cache + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include + +constexpr int NR_WORKERS = 4, NR_PARTS = 1024 * 1024 * 2; + +class ParallelReader { + std::string m_path; + std::unique_ptr m_buf; + std::vector> m_parts; + std::atomic_uint32_t m_next_part; + size_t m_file_size; + + static size_t get_file_size(const char* fpath) { + struct stat stat_buf; + int rc = stat(fpath, &stat_buf); + assert(rc == 0); + return stat_buf.st_size; + } + + // generate random m_parts + void gen_parts(std::default_random_engine& rng, size_t tot_size) { + // O(n) algorithm to permute objects and splits to generate a random + // partition + + std::uniform_real_distribution uniform{0.0, 1.0}; + size_t this_part = 1, + nr_parts = std::min(NR_PARTS, tot_size / 20), + nr_split = nr_parts - 1, nr_obj = tot_size - nr_split - 1; + + while (nr_split) { + if (uniform(rng) <= + static_cast(nr_obj) / + static_cast(nr_obj + nr_split)) { + ++this_part; + --nr_obj; + } else { + m_parts.emplace_back(0, this_part); + this_part = 1; + --nr_split; + } + } + m_parts.emplace_back(0, this_part + nr_obj); + assert(m_parts.size() == nr_parts); + + size_t off = 0; + for (size_t i = 0; i < nr_parts; ++i) { + m_parts[i].first = off; + assert(m_parts[i].second); + off += m_parts[i].second; + } + assert(off == tot_size); + + std::shuffle(m_parts.begin(), m_parts.end(), rng); + } + + void worker() { + int fd = open(m_path.c_str(), O_RDONLY); + assert(fd > 0); + for (;;) { + size_t job = m_next_part.fetch_add(1, std::memory_order_relaxed); + if (job >= m_parts.size()) { + break; + } + size_t off, size; + std::tie(off, size) = m_parts[job]; + while (size > 0) { + ssize_t r = pread(fd, m_buf.get() + off, size, off); + assert(r > 0); + size -= r; + off += r; + } + } + close(fd); + } + +public: + ParallelReader(std::default_random_engine& rng, std::string path) + : m_path{std::move(path)} { + size_t fsize = get_file_size(m_path.c_str()); + m_file_size = fsize; + gen_parts(rng, fsize); + m_buf.reset(new uint8_t[fsize]); + + std::vector workers; + for (int i = 0; i < NR_WORKERS; ++i) { + workers.emplace_back([this]() { worker(); }); + } + for (size_t done; (done = m_next_part.load(std::memory_order_relaxed)) < + m_parts.size();) { + printf("\rreading: %zu/%zu ", done, m_parts.size()); + fflush(stdout); + usleep(500'000); + } + printf("\n"); + for (auto& i : workers) { + i.join(); + } + } + + void write(const char* path) { + FILE* fout = fopen(path, "wb"); + assert(fout); + size_t w = fwrite(m_buf.get(), 1, m_file_size, fout); + assert(w == m_file_size); + fclose(fout); + } +}; + +int main(int argc, const char* const argv[]) { + if (argc == 1 || argc % 2 != 1) { + fprintf(stderr, "usage: %s ...\n", argv[0]); + return 1; + } + + std::vector workers; + std::random_device rd; + for (int i = 1; i < argc; i += 2) { + auto fn = [inp = argv[i], out = argv[i + 1], seed = rd()]() { + std::default_random_engine rng{seed}; + ParallelReader r{rng, inp}; + r.write(out); + }; + workers.emplace_back(fn); + } + + for (auto& i : workers) { + i.join(); + } +} + +// vim: syntax=cpp.doxygen foldmethod=marker foldmarker=f{{{,f}}} \ No newline at end of file diff --git a/test/test_parallel_read.properties b/test/test_parallel_read.properties new file mode 100644 index 00000000..e4a0855e --- /dev/null +++ b/test/test_parallel_read.properties @@ -0,0 +1,4 @@ +s3proxy.authorization=none +s3proxy.endpoint=http://127.0.0.1:9090 +jclouds.provider=filesystem +jclouds.filesystem.basedir=/tmp/s3proxy \ No newline at end of file