-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathexternal-sort.go
142 lines (132 loc) · 2.84 KB
/
external-sort.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package main
import (
"github.com/golang/glog"
"os"
)
const (
OutChannelSize = 10
SplitPrefix = "split."
MergePrefix = "merge."
)
type errMsg struct {
msg string
}
func (e *errMsg) Error() string {
return e.msg
}
func watchError(jm *jobManager) {
for err := range jm.err {
glog.Fatalf("[%s] %s", jm.name, err)
}
}
// main entry
func sortFiles(outFile string, inFiles ...string) {
splitter := newJobManager("Split", newResourcePool().init())
go watchError(splitter)
merger := newJobManager("Merge", newResourcePool().init())
go watchError(merger)
splitter.wg.Add(len(inFiles))
// split phase
go func() {
glog.V(1).Infof("[%s] Started.", splitter.name)
for _, file := range inFiles {
w := splitter.getGenWorker()
w.jobChan <- &splitJob{
filename: file,
manager: splitter,
}
}
splitter.wg.Wait()
glog.V(1).Infof("[%s] Done.", splitter.name)
close(splitter.out)
close(splitter.err)
close(splitter.quit)
splitter.res.destroy()
}()
// merge phase
// it may be ok to keep it in main thread
glog.V(1).Infof("[%s] Started.", merger.name)
splitter_alive := true
var file [2]*os.File
current := 0
total := 0
MERGER:
for {
if splitter_alive {
select {
case file[current], splitter_alive = <-splitter.out:
if !splitter_alive {
continue
}
glog.V(2).Infof(
"[%s] Recevied from split phase: %s",
merger.name,
file[current].Name(),
)
current++
total++
merger.wg.Add(1)
case file[current] = <-merger.out:
glog.V(2).Infof(
"[%s] Recevied from merge phase: %s",
merger.name,
file[current].Name(),
)
current++
}
} else {
glog.V(2).Infof(
"[%s] Split phase is done. Files remained to be merged: %d",
merger.name,
total,
)
switch total {
case 0:
glog.Warningf("[%s] Nothing merged.", merger.name)
break MERGER
case 1:
// we have new putput from splitter
// which means total will not increase anymore
// and, we have already performed (total-1)'s merge
// which means all split files were merged to one
// Therefore, we are done here.
if current == 1 {
current = 0
} else {
file[current] = <-merger.out
}
file[current].Close()
glog.V(2).Infof(
"[%s] Renaming temp result %s to %s",
merger.name,
file[current].Name(), outFile,
)
err := os.Rename(file[current].Name(), outFile)
if err != nil {
merger.err <- err
}
merger.wg.Done()
break MERGER
default:
file[current] = <-merger.out
current++
}
}
if current == 2 {
current = 0
total--
w := merger.getGenWorker()
w.jobChan <- &mergeJob{
file1: file[0],
file2: file[1],
manager: merger,
}
}
}
merger.wg.Wait()
close(merger.out)
close(merger.err)
close(merger.quit)
merger.res.destroy()
glog.V(1).Infof("[%s] Done.", merger.name)
}