-
Notifications
You must be signed in to change notification settings - Fork 4
/
coarse.go
500 lines (445 loc) · 13.9 KB
/
coarse.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
package cablastp
import (
"encoding/binary"
"fmt"
"os"
"sync"
)
// Hard-coded file names for different pieces of a cablastp database.
const (
FileCoarseFasta = "coarse.fasta"
FileCoarseFastaIndex = "coarse.fasta.index"
FileCoarseLinks = "coarse.links"
FileCoarsePlainLinks = "coarse.links.plain"
FileCoarseLinksIndex = "coarse.links.index"
FileCoarseSeeds = "coarse.seeds"
FileCoarsePlainSeeds = "coarse.seeds.plain"
)
// CoarseDB represents a set of unique sequences that comprise the "coarse"
// database. Sequences in the coarse database, combined with information in the
// compressed database, are used to re-create the original sequences.
type CoarseDB struct {
Seqs []*CoarseSeq
Seeds Seeds
// The fastaCache is used during decompression. Namely, once a coarse
// sequence is decompressed, it is cached into this map.
fastaCache map[int]*CoarseSeq
// The size of the coarse database index in bytes. This can be used to
// quickly compute the number of sequences in the coarse database.
// (Since each sequence is represented by a 64-bit integer offset, simply
// divide by 8.)
fastaIndexSize int64
// File pointers to each file in the "coarse" part of a cablastp database.
FileFasta *os.File
FileFastaIndex *os.File
FileSeeds *os.File
FileLinks *os.File
FileLinksIndex *os.File
// Ensures that adding a sequence to the coarse database is atomic.
seqLock *sync.RWMutex
// If a database is *created* without the read only flag set, then we have
// to save the seeds table.
// (We may want to remove this feature.)
readOnly bool
// If read only is not set, then this is used to track how many sequences
// were already in the coarse database. (So that when we add more, we only
// write the new ones.)
seqsRead int
// plain is a debugging feature that writes the links and seeds table (when
// not read only) in a human readable format, rather than the default binary
// format.
plain bool
// File pointers to use when 'plain' is true.
plainLinks *os.File
plainSeeds *os.File
}
// newWriteCoarseDB sets up a new coarse database to be written to (or opens
// an existing one ready for writing when 'appnd' is set).
func newWriteCoarseDB(appnd bool, db *DB) (*CoarseDB, error) {
var err error
Vprintln("\tOpening coarse database...")
coarsedb := &CoarseDB{
Seqs: make([]*CoarseSeq, 0, 10000000),
seqsRead: 0,
Seeds: NewSeeds(db.MapSeedSize, db.SeedLowComplexity),
FileFasta: nil,
FileFastaIndex: nil,
fastaIndexSize: 0,
FileSeeds: nil,
FileLinks: nil,
FileLinksIndex: nil,
seqLock: &sync.RWMutex{},
readOnly: db.ReadOnly,
plain: db.SavePlain,
plainSeeds: nil,
}
coarsedb.FileFasta, err = db.openWriteFile(appnd, FileCoarseFasta)
if err != nil {
return nil, err
}
coarsedb.FileFastaIndex, err = db.openWriteFile(appnd, FileCoarseFastaIndex)
if err != nil {
return nil, err
}
coarsedb.FileSeeds, err = db.openWriteFile(appnd, FileCoarseSeeds)
if err != nil {
return nil, err
}
coarsedb.FileLinks, err = db.openWriteFile(appnd, FileCoarseLinks)
if err != nil {
return nil, err
}
coarsedb.FileLinksIndex, err = db.openWriteFile(appnd, FileCoarseLinksIndex)
if err != nil {
return nil, err
}
info, err := coarsedb.FileFastaIndex.Stat()
if err != nil {
return nil, err
}
coarsedb.fastaIndexSize = info.Size()
if coarsedb.plain {
coarsedb.plainLinks, err = db.openWriteFile(appnd, FileCoarsePlainLinks)
if err != nil {
return nil, err
}
coarsedb.plainSeeds, err = db.openWriteFile(appnd, FileCoarsePlainSeeds)
if err != nil {
return nil, err
}
}
if appnd {
if err = coarsedb.load(); err != nil {
return nil, err
}
// After we've loaded the coarse database, the file offset should be
// at the end of each file. For the coarse fasta file, this is
// exactly what we want. But for the links and seeds files, we need
// to clear the file and start over (since they are not amenable to
// appending like the coarse fasta file is).
// Do the same for plain files.
trunc := func(f *os.File) (err error) {
if err = f.Truncate(0); err != nil {
return
}
if _, err = f.Seek(0, os.SEEK_SET); err != nil {
return
}
return nil
}
if err = trunc(coarsedb.FileSeeds); err != nil {
return nil, err
}
if err = trunc(coarsedb.FileLinks); err != nil {
return nil, err
}
if err = trunc(coarsedb.FileLinksIndex); err != nil {
return nil, err
}
if coarsedb.plain {
if err = trunc(coarsedb.plainSeeds); err != nil {
return nil, err
}
if err = trunc(coarsedb.plainLinks); err != nil {
return nil, err
}
}
}
Vprintln("\tDone opening coarse database.")
return coarsedb, nil
}
// newReadCoarseDB opens a coarse database and prepares it for reading. This
// is typically called before decompression.
func newReadCoarseDB(db *DB) (*CoarseDB, error) {
var err error
Vprintln("\tOpening coarse database...")
coarsedb := &CoarseDB{
Seqs: make([]*CoarseSeq, 0, 100000),
Seeds: NewSeeds(db.MapSeedSize, db.SeedLowComplexity),
FileFasta: nil,
fastaCache: make(map[int]*CoarseSeq, 200),
FileFastaIndex: nil,
fastaIndexSize: 0,
FileSeeds: nil,
FileLinks: nil,
FileLinksIndex: nil,
seqLock: nil,
readOnly: false,
plain: db.SavePlain,
}
coarsedb.FileFasta, err = db.openReadFile(FileCoarseFasta)
if err != nil {
return nil, err
}
coarsedb.FileFastaIndex, err = db.openReadFile(FileCoarseFastaIndex)
if err != nil {
return nil, err
}
coarsedb.FileLinks, err = db.openReadFile(FileCoarseLinks)
if err != nil {
return nil, err
}
coarsedb.FileLinksIndex, err = db.openReadFile(FileCoarseLinksIndex)
if err != nil {
return nil, err
}
info, err := coarsedb.FileFastaIndex.Stat()
if err != nil {
return nil, err
}
coarsedb.fastaIndexSize = info.Size()
Vprintln("\tDone opening coarse database.")
return coarsedb, nil
}
// Add takes an original sequence, converts it to a coarse sequence, and
// adds it as a new coarse sequence to the coarse database. Seeds are
// also generated for each K-mer in the sequence. The resulting coarse
// sequence is returned along with its sequence identifier.
func (coarsedb *CoarseDB) Add(oseq []byte) (int, *CoarseSeq) {
coarsedb.seqLock.Lock()
id := len(coarsedb.Seqs)
corSeq := NewCoarseSeq(id, "", oseq)
coarsedb.Seqs = append(coarsedb.Seqs, corSeq)
coarsedb.seqLock.Unlock()
coarsedb.Seeds.Add(id, corSeq)
return id, corSeq
}
// CoarseSeqGet is a thread-safe way to retrieve a sequence with index `i`
// from the coarse database.
func (coarsedb *CoarseDB) CoarseSeqGet(i uint) *CoarseSeq {
coarsedb.seqLock.RLock()
seq := coarsedb.Seqs[i]
coarsedb.seqLock.RUnlock()
return seq
}
// Expand will follow all links to compressed sequences for the coarse
// sequence at index `id` and return a slice of decompressed sequences.
func (coarsedb *CoarseDB) Expand(
comdb *CompressedDB, id, start, end int) ([]OriginalSeq, error) {
// Calculate the byte offset into the coarse links file where the links
// for the coarse sequence `i` starts.
off, err := coarsedb.linkOffset(id)
if err != nil {
return nil, fmt.Errorf("Could not get link offset: %s", err)
}
// Actually seek to that offset.
newOff, err := coarsedb.FileLinks.Seek(off, os.SEEK_SET)
if err != nil {
return nil, fmt.Errorf("Could not seek: %s", err)
} else if newOff != off {
return nil,
fmt.Errorf("Tried to seek to offset %d in the coarse links, "+
"but seeked to %d instead.", off, newOff)
}
// Read in the number of links for this sequence.
// Each link corresponds to a single original sequence.
var numLinks uint32
err = binary.Read(coarsedb.FileLinks, binary.BigEndian, &numLinks)
if err != nil {
return nil, fmt.Errorf("Could not read number of links: %s", err)
}
// We use a map as a set of original sequence ids for eliminating
// duplicates (since a coarse sequence can point to different pieces of the
// same compressed sequence).
ids := make(map[uint32]bool, numLinks)
oseqs := make([]OriginalSeq, 0, numLinks)
s, e := uint16(start), uint16(end)
for i := uint32(0); i < numLinks; i++ {
compLink, err := coarsedb.readLink()
if err != nil {
return nil, fmt.Errorf("Could not read link: %s", err)
}
// We only use this link if the match is in the range.
if e < compLink.CoarseStart || s > compLink.CoarseEnd {
continue
}
// Don't decompress the same original sequence more than once.
if ids[compLink.OrgSeqId] {
continue
}
oseq, err := comdb.ReadSeq(coarsedb, int(compLink.OrgSeqId))
if err != nil {
return nil, fmt.Errorf(
"Could not read compressed sequence: %s", err)
}
ids[compLink.OrgSeqId] = true
oseqs = append(oseqs, oseq)
}
return oseqs, nil
}
// NumRequences returns the number of sequences in the coarse database based
// on the file size of the coarse database index.
func (coarsedb *CoarseDB) NumSequences() int {
return int(coarsedb.fastaIndexSize / 8)
}
// ReadCoarseSeq reads the coarse sequence with identifier 'id' from disk, using
// the fasta index. (If a coarse sequence has already been read, it is returned
// from cache to save trips to disk.)
//
// TODO: Note that this does *not* recover links typically found in a coarse
// sequence, although it probably should to avoid doing it in CoarseDB.Expand.
func (coarsedb *CoarseDB) ReadCoarseSeq(id int) (*CoarseSeq, error) {
// Prevent reading the same coarse sequence over and over.
if coarseSeq, ok := coarsedb.fastaCache[id]; ok {
return coarseSeq, nil
}
off, err := coarsedb.coarseOffset(id)
if err != nil {
return nil, fmt.Errorf("Could not get coarse offset: %s", err)
}
newOff, err := coarsedb.FileFasta.Seek(off, os.SEEK_SET)
if err != nil {
return nil, fmt.Errorf("Could not seek in coarse fasta: %s", err)
} else if newOff != off {
return nil,
fmt.Errorf("Tried to seek to offset %d in the coarse fasta file, "+
"but seeked to %d instead.", off, newOff)
}
// Read in the sequence.
var corSeqId int
var residues string
n, err := fmt.Fscanf(coarsedb.FileFasta, "> %d\n%s\n", &corSeqId, &residues)
if err != nil {
return nil, fmt.Errorf("Could not scan coarse sequence %d: %s", id, err)
} else if n != 2 {
return nil, fmt.Errorf("Expected to read in two values for coarse "+
"sequence %d, but read %d values instead.", id, n)
} else if corSeqId != id {
return nil, fmt.Errorf("Expected to read coarse sequence %d but read "+
"coarse sequence %d instead.", id, corSeqId)
}
coarseSeq := NewCoarseSeq(id, "", []byte(residues))
coarsedb.fastaCache[id] = coarseSeq
return coarseSeq, nil
}
// coarseOffset returns the integer byte offset into the coarse database of
// a particular coarse sequence. The offset is read from the coarse database
// index.
//
// An error is returned if the file seek fails.
func (coarsedb *CoarseDB) coarseOffset(id int) (seqOff int64, err error) {
tryOff := int64(id) * 8
realOff, err := coarsedb.FileFastaIndex.Seek(tryOff, os.SEEK_SET)
if err != nil {
return
} else if tryOff != realOff {
return 0,
fmt.Errorf("Tried to seek to offset %d in the coarse index, "+
"but seeked to %d instead.", tryOff, realOff)
}
err = binary.Read(coarsedb.FileFastaIndex, binary.BigEndian, &seqOff)
return
}
// linkOffset returns the integer byte offset into the coarse links database
// of a particular coarse sequence. The offset is read from the coarse links
// database index.
//
// An error is returned if the file seek fails.
func (coarsedb *CoarseDB) linkOffset(id int) (seqOff int64, err error) {
tryOff := int64(id) * 8
realOff, err := coarsedb.FileLinksIndex.Seek(tryOff, os.SEEK_SET)
if err != nil {
return
} else if tryOff != realOff {
return 0,
fmt.Errorf("Tried to seek to offset %d in the coarse links index, "+
"but seeked to %d instead.", tryOff, realOff)
}
err = binary.Read(coarsedb.FileLinksIndex, binary.BigEndian, &seqOff)
return
}
// readClose closes all files necessary for reading the coarse database.
func (coarsedb *CoarseDB) readClose() {
coarsedb.FileFasta.Close()
coarsedb.FileFastaIndex.Close()
coarsedb.FileLinks.Close()
coarsedb.FileLinksIndex.Close()
}
// writeClose closes all files necessary for writing the coarse database.
func (coarsedb *CoarseDB) writeClose() {
coarsedb.FileFasta.Close()
coarsedb.FileFastaIndex.Close()
coarsedb.FileSeeds.Close()
coarsedb.FileLinks.Close()
coarsedb.FileLinksIndex.Close()
if coarsedb.plain {
coarsedb.plainLinks.Close()
coarsedb.plainSeeds.Close()
}
}
// load reads the entire coarse database (sequences and links) into memory.
// If the database is being appended to, the seeds table is also read into
// memory.
//
// (This is only called when a coarse database is being appended to.)
func (coarsedb *CoarseDB) load() (err error) {
if err = coarsedb.readFasta(); err != nil {
return
}
if err = coarsedb.readLinks(); err != nil {
return
}
if coarsedb.FileSeeds != nil {
if err = coarsedb.readSeeds(); err != nil {
return
}
}
return nil
}
// save will save the coarse database as a FASTA file and a binary
// encoding of all coarse links. Seeds are also saved if this is not a read
// only database.
func (coarsedb *CoarseDB) save() error {
coarsedb.seqLock.RLock()
defer coarsedb.seqLock.RUnlock()
errc := make(chan error, 20)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
if err := coarsedb.saveFasta(); err != nil {
errc <- err
}
wg.Done()
}()
wg.Add(1)
go func() {
if err := coarsedb.saveLinks(); err != nil {
errc <- err
}
wg.Done()
}()
if !coarsedb.readOnly {
wg.Add(1)
go func() {
if err := coarsedb.saveSeeds(); err != nil {
errc <- err
}
wg.Done()
}()
}
if coarsedb.plain {
wg.Add(1)
go func() {
if err := coarsedb.saveLinksPlain(); err != nil {
errc <- err
}
wg.Done()
}()
if !coarsedb.readOnly {
wg.Add(1)
go func() {
if err := coarsedb.saveSeedsPlain(); err != nil {
errc <- err
}
wg.Done()
}()
}
}
wg.Wait()
// If there's something in the error channel, pop off the first
// error and return that.
if len(errc) > 0 {
return <-errc
}
return nil
}