Skip to content

Commit

Permalink
restrict concurrency in labelmap /specificblocks to transcoding
Browse files Browse the repository at this point in the history
Testing shows that excessive concurrency on basholeveldb store requests
leads to increasing delays at high /specificblocks usage.  Allow
database reads to proceed serially while reserving goroutines for
the transcoding portion of block pipeline.
  • Loading branch information
DocSavage committed Mar 25, 2022
1 parent 4607321 commit 663a9ef
Showing 1 changed file with 87 additions and 48 deletions.
135 changes: 87 additions & 48 deletions datatype/labelmap/labelmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2397,6 +2397,67 @@ func (d *Data) convertTo64bit(geom dvid.Geometry, data []uint8, bytesPerVoxel, s
return data64, nil
}

// convert a slice of 3 integer strings into a coordinate
func strArrayToBCoord(coordarray []string) (bcoord dvid.ChunkPoint3d, err error) {
var xloc, yloc, zloc int
if xloc, err = strconv.Atoi(coordarray[0]); err != nil {
return
}
if yloc, err = strconv.Atoi(coordarray[1]); err != nil {
return
}
if zloc, err = strconv.Atoi(coordarray[2]); err != nil {
return
}
return dvid.ChunkPoint3d{int32(xloc), int32(yloc), int32(zloc)}, nil
}

type blockTiming struct {
readT, transcodeT, writeT time.Duration
readN, transcodeN int64
sync.RWMutex
}

func (bt *blockTiming) writeDone(t0 time.Time) {
bt.Lock()
bt.writeT += time.Since(t0)
bt.Unlock()
}

func (bt *blockTiming) readDone(t0 time.Time) {
bt.Lock()
bt.readT += time.Since(t0)
bt.readN++
bt.Unlock()
}

func (bt *blockTiming) transcodeDone(t0 time.Time) {
bt.Lock()
bt.transcodeT += time.Since(t0)
bt.transcodeN++
bt.Unlock()
}

func (bt *blockTiming) String() string {
var readAvgT, transcodeAvgT, writeAvgT time.Duration
if bt.readN == 0 {
readAvgT = 0
} else {
readAvgT = bt.readT / time.Duration(bt.readN)
}
if bt.transcodeN == 0 {
transcodeAvgT = 0
} else {
transcodeAvgT = bt.transcodeT / time.Duration(bt.transcodeN)
}
if bt.readN == 0 {
writeAvgT = 0
} else {
writeAvgT = bt.writeT / time.Duration(bt.readN)
}
return fmt.Sprintf("read %s (%s), transcode %s (%s), write %s (%s)", bt.readT, readAvgT, bt.transcodeT, transcodeAvgT, bt.writeT, writeAvgT)
}

type blockSend struct {
bcoord dvid.ChunkPoint3d
value []byte
Expand Down Expand Up @@ -2440,6 +2501,7 @@ func (d *Data) sendBlocksSpecific(ctx *datastore.VersionedCtx, w http.ResponseWr
}

w.Header().Set("Content-type", "application/octet-stream")

// extract querey string
if blockstring == "" {
return
Expand All @@ -2450,8 +2512,7 @@ func (d *Data) sendBlocksSpecific(ctx *datastore.VersionedCtx, w http.ResponseWr
}

var store storage.KeyValueDB
store, err = datastore.GetKeyValueDB(d)
if err != nil {
if store, err = datastore.GetKeyValueDB(d); err != nil {
return
}

Expand All @@ -2462,8 +2523,7 @@ func (d *Data) sendBlocksSpecific(ctx *datastore.VersionedCtx, w http.ResponseWr
ch := make(chan blockSend, numBlocks)
var sendErr error
var startBlock dvid.ChunkPoint3d
var readT, transcodeT, writeT time.Duration
var readNum, transcodeNum int64
var timing blockTiming
go func() {
for data := range ch {
if data.err != nil && sendErr == nil {
Expand All @@ -2474,57 +2534,38 @@ func (d *Data) sendBlocksSpecific(ctx *datastore.VersionedCtx, w http.ResponseWr
if err != nil && sendErr == nil {
sendErr = err
}
writeT += time.Now().Sub(t0)
timing.writeDone(t0)
}
wg.Done()
}
timedLog.Infof("labelmap %q specificblocks - finished sending %d blocks starting with %s", d.DataName(), numBlocks, startBlock)
if transcodeNum == 0 {
transcodeNum = 1
}
if readNum == 0 {
readNum = 1
}
dvid.Infof("labelmap %q specificblocks - %d blocks starting with %s: read %s (%s), transcode %s (%s), write %s (%s)\n", d.DataName(), numBlocks, startBlock,
readT, readT/time.Duration(readNum), transcodeT, transcodeT/time.Duration(transcodeNum), writeT, writeT/time.Duration(readNum))
}()

var timeMutex sync.Mutex
// iterate through each block, get data from store, and transcode based on request parameters
for i := 0; i < len(coordarray); i += 3 {
var xloc, yloc, zloc int
xloc, err = strconv.Atoi(coordarray[i])
if err != nil {
return
}
yloc, err = strconv.Atoi(coordarray[i+1])
if err != nil {
return
}
zloc, err = strconv.Atoi(coordarray[i+2])
if err != nil {
var bcoord dvid.ChunkPoint3d
if bcoord, err = strArrayToBCoord(coordarray[i : i+3]); err != nil {
return
}
bcoord := dvid.ChunkPoint3d{int32(xloc), int32(yloc), int32(zloc)}
if i == 0 {
startBlock = bcoord
}
wg.Add(1)
go func(bcoord dvid.ChunkPoint3d) {
t0 := time.Now()
indexBeg := dvid.IndexZYX(bcoord)
keyBeg := NewBlockTKey(scale, &indexBeg)

value, err := store.Get(ctx, keyBeg)
timeMutex.Lock()
readNum++
readT += time.Now().Sub(t0)
timeMutex.Unlock()
if err != nil {
ch <- blockSend{err: err}
return
}
if len(value) > 0 {
t0 := time.Now()
indexBeg := dvid.IndexZYX(bcoord)
keyBeg := NewBlockTKey(scale, &indexBeg)

var value []byte
value, err = store.Get(ctx, keyBeg)
timing.readDone(t0)

if err != nil {
ch <- blockSend{err: err}
return
}

if len(value) > 0 {
go func(bcoord dvid.ChunkPoint3d) {
b := blockData{
bcoord: bcoord,
v: ctx.VersionID(),
Expand All @@ -2534,19 +2575,17 @@ func (d *Data) sendBlocksSpecific(ctx *datastore.VersionedCtx, w http.ResponseWr
}
t0 := time.Now()
out, err := d.transcodeBlock(b)
timeMutex.Lock()
transcodeNum++
transcodeT += time.Now().Sub(t0)
timeMutex.Unlock()
timing.transcodeDone(t0)
ch <- blockSend{bcoord: bcoord, value: out, err: err}
return
}
}(bcoord)
} else {
ch <- blockSend{value: nil}
}(bcoord)
}
}
timedLog.Infof("labelmap %q specificblocks - launched concurrent reads of %d blocks starting with %s", d.DataName(), numBlocks, startBlock)
wg.Wait()
close(ch)
dvid.Infof("labelmap %q specificblocks - %d blocks starting with %s: %s\n", d.DataName(), numBlocks, startBlock, &timing)
return numBlocks, sendErr
}

Expand Down

0 comments on commit 663a9ef

Please sign in to comment.