-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfile.go
434 lines (382 loc) · 9.52 KB
/
file.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
package pslog
import (
"bufio"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"gitee.com/xuesongtao/gotool/base"
plg "gitee.com/xuesongtao/ps-log/log"
fs "github.com/fsnotify/fsnotify"
)
const (
saveOffsetDir = ".pslog" // 保存偏移量的文件目录
cleanOffsetFileDayDur = 3 // 清理偏移量文件变动多少天之前的文件
)
type FileInfo struct {
Dir string // 文件目录路径
Name string // 文件名
Handler *Handler // 这里优先 PsLog.handler
mu sync.Mutex
watchChangeFilename string
filename string
// 父级特有参数
op fs.Op
children map[string]*FileInfo // 如果当前为目录的时候, 这里就有值, key: 文件名[这里不是全路径]
// 子级特有参数
fh *os.File // 存放的文件句柄, 只有 可读权限, key: filename
reader *bufio.Reader // fh 读
offsetChange int32 // 记录 offset 变化次数
offset int64 // 当前文件偏移量
beginOffset int64 // 记录最开始的偏移量
}
// NewFileInfo 初始化
func NewFileInfo(path string, handler *Handler) (*FileInfo, error) {
fileInfo := &FileInfo{
Dir: "",
Name: "",
Handler: handler,
mu: sync.Mutex{},
watchChangeFilename: "",
op: 0,
children: make(map[string]*FileInfo),
fh: nil,
offsetChange: 0,
offset: 0,
beginOffset: 0,
}
if !handler.initd {
handler.path = path
if err := handler.init(); err != nil {
return nil, err
}
}
fileInfo.init()
return fileInfo, nil
}
func (f *FileInfo) init() error {
// fmt.Println("-----", f.Handler.path, f.IsDir())
f.Parse(f.Handler.path)
if !f.IsDir() {
f.initFh()
f.initOffset()
return nil
}
// 如果是目录的话需要初始化 children
return f.loopDir(f.Handler.path, func(info os.FileInfo) error {
filename := filepath.Join(f.Dir, info.Name())
// fmt.Println("=====",filename)
if f.needCollect(filename) {
tmp, err := NewFileInfo(filename, f.Handler.copy())
if err != nil {
return err
}
f.children[info.Name()] = tmp
}
return nil
})
}
func isRename(op fs.Op) bool {
return op.Has(fs.Rename)
}
func isCreate(op fs.Op) bool {
return op.Has(fs.Create)
}
func (f *FileInfo) initFh() error {
if f.fh != nil {
return nil
}
ff, err := os.Open(f.FileName())
if err != nil {
return err
}
f.fh = ff
f.reader = bufio.NewReader(f.fh)
return nil
}
// ScanLinesOfInCr 对文件进行增量读取
func (f *FileInfo) ScanLinesOfInCr(fn func(row []byte) error) (int64, error) {
if f.offset > 0 {
_, err := f.fh.Seek(f.offset, io.SeekStart)
if err != nil {
return 0, err
}
}
offset := f.offset
for {
b, err := f.reader.ReadBytes('\n')
if e := fn(b); e != nil {
return 0, e
}
offset += int64(len(b))
if err != nil {
if err == io.EOF {
break
}
return 0, err
}
}
return offset, nil
}
func (f *FileInfo) resetFn() {
f.closeFileHandle()
f.offset = 0
f.saveOffset(true)
}
// Extension 延期
func (f *FileInfo) Extension() {
f.Handler.ExpireAt = time.Now().Add(f.Handler.ExpireDur)
}
// getFileInfo 根据文件全路径名获取 FileInfo
func (f *FileInfo) getFileInfo(filename string) (*FileInfo, error) {
// fmt.Println("getFileInfo:", filename, f.FileName())
if !f.IsDir() { // 当前 FileInfo 为文件
return f, nil
}
f.mu.Lock()
defer f.mu.Unlock()
// FileInfo 为目录
basename := filepath.Base(filename)
tmp, ok := f.children[basename]
if ok {
return tmp, nil
}
tmp, err := NewFileInfo(filename, f.Handler.copy())
if err != nil {
return nil, err
}
f.children[basename] = tmp
return tmp, nil
}
// HandlerIsNil
func (f *FileInfo) HandlerIsNil() bool {
return f.Handler == nil
}
// getFileHandle 获取文件 handle
func (f *FileInfo) getFileHandle() (*os.File, error) {
if err := f.initFh(); err != nil {
return nil, err
}
return f.fh, nil
}
// closeFileHandle 检查句柄账号
func (f *FileInfo) closeFileHandle() {
if f.fh == nil {
return
}
f.fh.Close()
f.fh = nil
}
// expireClose 到期关闭句柄
func (f *FileInfo) expireClose() {
if !f.IsExpire() {
return
}
if !f.IsDir() {
f.closeFileHandle()
return
}
for _, fileInfo := range f.children {
fileInfo.expireClose()
}
}
// NeedCollect 判断下是否需要被采集
func (f *FileInfo) needCollect(filename string) bool {
if f.HandlerIsNil() {
return false
}
return f.Handler.NeedCollect(filename)
}
// IsDir 是否为目录
func (f *FileInfo) IsDir() bool {
return f.Handler.isDir
}
// FileName 获取文件的全路径名
func (f *FileInfo) FileName() string {
if f.filename != "" {
return f.filename
}
f.filename = filepath.Join(f.Dir, f.Name)
return f.filename
}
// Parse 解析 path
func (f *FileInfo) Parse(path string) {
if f.IsDir() {
f.Dir = path
return
}
f.Dir, f.Name = filepath.Split(path)
}
// IsExpire 文件是否过期
func (f *FileInfo) IsExpire(t ...time.Time) bool {
defaultTime := time.Now()
if len(t) > 0 {
defaultTime = t[0]
}
if !f.HandlerIsNil() {
return f.Handler.ExpireAt.Before(defaultTime)
}
return false
}
// CleanNameFmt 清理 Name 中的格式, 如: test.log => test
func (f *FileInfo) CleanNameFmt() string {
if f.Name == "" {
return ""
}
tmpIndex := strings.Index(f.Name, ".")
return f.Name[:tmpIndex]
}
// PsLogDir 目录
func (f *FileInfo) PsLogDir() string {
return filepath.Join(f.Dir, saveOffsetDir)
}
// offsetDir 保存偏移量文件的目录
func (f *FileInfo) offsetDir() string {
return filepath.Join(f.PsLogDir(), "offset")
}
// offsetFilename 获取保存文件偏移量的名称
func (f *FileInfo) offsetFilename(suffix ...string) string {
// 处理为 xxx/.pslog/offset/_xxx.log.txt
name := f.Name
if len(suffix) > 0 && suffix[0] != "" {
name += "_" + suffix[0]
}
return filepath.Join(f.offsetDir(), "_"+name+".txt")
}
func (f *FileInfo) storeOffset(o int64) {
atomic.StoreInt64(&f.offset, o)
}
func (f *FileInfo) loadOffset() int64 {
return atomic.LoadInt64(&f.offset)
}
func (f *FileInfo) loadBeginOffset() int64 {
return atomic.LoadInt64(&f.beginOffset)
}
// initOffset 初始化文件 offset
func (f *FileInfo) initOffset() {
// 清理已过期文件偏移量文件
f.removeOffsetFile()
// 初次使用需要判断下是否需要清除偏移量
if f.cleanOffset() {
return
}
// 需要判断下是否已处理过
if f.offset > 0 {
return
}
// 从文件中读取偏移量
filename := f.offsetFilename()
offset, err := f.getContent(filename)
if err != nil {
plg.Errorf("f.getContent %q is failed, err: %v", filename, err)
return
}
offsetInt, _ := strconv.Atoi(offset)
f.offset = int64(offsetInt)
f.beginOffset = f.offset
}
func (f *FileInfo) cleanOffset() (skip bool) {
if !f.Handler.CleanOffset {
return
}
f.offset = 0
f.beginOffset = f.offset
f.putContent(f.offsetFilename(), "0")
f.Handler.CleanOffset = false
skip = true
return
}
// saveOffset 保存偏移量
// 通过隐藏文件来保存
func (f *FileInfo) saveOffset(mustSaveOffset bool) {
filename := f.offsetFilename()
// 判断下是否需要持久化
if mustSaveOffset || f.Handler.Change == -1 {
if _, err := f.putContent(filename, base.ToString(f.offset)); err != nil {
plg.Error("f.putContent is failed, err:", err)
}
return
}
f.offsetChange++
if f.offsetChange > f.Handler.Change {
if _, err := f.putContent(filename, base.ToString(f.offset)); err != nil {
plg.Error("f.putContent is failed, err:", err)
}
f.removeOffsetFile()
f.offsetChange = 0
}
}
// getContent 查询
func (f *FileInfo) getContent(path string) (string, error) {
fh, err := filePool.Get(path, os.O_RDWR)
if err != nil {
return "", fmt.Errorf("filePool.Get %q is failed, err: %v", path, err)
}
defer filePool.Put(fh)
data, err := fh.GetContent()
if err != nil {
return "", fmt.Errorf("getContent %q is failed, err: %v", path, err)
}
return data, nil
}
// putContent 覆写
func (f *FileInfo) putContent(path string, content string) (int, error) {
fh, err := filePool.Get(path, os.O_RDWR|os.O_TRUNC)
if err != nil {
return 0, fmt.Errorf("filePool.Get %q is failed, err: %v", path, err)
}
defer filePool.Put(fh)
return fh.PutContent(content)
}
// remove 移出
func (f *FileInfo) remove(path string) {
filePool.Remove(path)
}
// removeOffsetFile 移除保存文件偏移量的文件
func (f *FileInfo) removeOffsetFile(filename ...string) {
if len(filename) > 0 && filename[0] != "" {
if err := os.Remove(filename[0]); err != nil {
plg.Errorf("os.Remove %q is failed, err: %v", filename[0], err)
}
f.remove(filename[0])
return
}
// 移除当前目录下cleanOffsetFileDayDur天前的文件
curTime := time.Now()
f.loopDir(f.offsetDir(), func(info os.FileInfo) error {
if curTime.Sub(info.ModTime())/base.DayDur <= cleanOffsetFileDayDur {
return nil
}
delFilename := filepath.Join(f.offsetDir(), info.Name())
if err := os.Remove(delFilename); err != nil {
plg.Warningf("os.Remove %q is failed, err: %v", delFilename, err)
}
f.remove(delFilename)
return nil
})
}
// loopDir 遍历目录, 只会遍历一级子级
func (f *FileInfo) loopDir(path string, handle func(info os.FileInfo) error) error {
entrys, err := os.ReadDir(path)
if err != nil {
return fmt.Errorf("ioutil.ReadDir %q is failed, err: %v", path, err)
}
for _, entry := range entrys {
if entry.IsDir() {
continue
}
info, err := entry.Info()
if err != nil {
return err
}
if err := handle(info); err != nil {
return err
}
}
return nil
}