forked from petabridge/akka-bootcamp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTailActor.cs
129 lines (112 loc) · 4.01 KB
/
TailActor.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
using System.IO;
using System.Text;
using Akka.Actor;
namespace WinTail
{
/// <summary>
/// Monitors the file at <see cref="_filePath"/> for changes and sends file updates to console.
/// </summary>
public class TailActor : UntypedActor
{
#region Message types
/// <summary>
/// Signal that the file has changed, and we need to read the next line of the file.
/// </summary>
public class FileWrite
{
public FileWrite(string fileName)
{
FileName = fileName;
}
public string FileName { get; private set; }
}
/// <summary>
/// Signal that the OS had an error accessing the file.
/// </summary>
public class FileError
{
public FileError(string fileName, string reason)
{
FileName = fileName;
Reason = reason;
}
public string FileName { get; private set; }
public string Reason { get; private set; }
}
/// <summary>
/// Signal to read the initial contents of the file at actor startup.
/// </summary>
public class InitialRead
{
public InitialRead(string fileName, string text)
{
FileName = fileName;
Text = text;
}
public string FileName { get; private set; }
public string Text { get; private set; }
}
#endregion
private readonly string _filePath;
private readonly IActorRef _reporterActor;
private FileObserver _observer;
private Stream _fileStream;
private StreamReader _fileStreamReader;
public TailActor(IActorRef reporterActor, string filePath)
{
_reporterActor = reporterActor;
_filePath = filePath;
}
/// <summary>
/// Initialization logic for actor that will tail changes to a file.
/// </summary>
protected override void PreStart()
{
// start watching file for changes
_observer = new FileObserver(Self, Path.GetFullPath(_filePath));
_observer.Start();
// open the file stream with shared read/write permissions (so file can be written to while open)
_fileStream = new FileStream(Path.GetFullPath(_filePath), FileMode.Open, FileAccess.Read,
FileShare.ReadWrite);
_fileStreamReader = new StreamReader(_fileStream, Encoding.UTF8);
// read the initial contents of the file and send it to console as first message
var text = _fileStreamReader.ReadToEnd();
Self.Tell(new InitialRead(_filePath, text));
}
/// <summary>
/// Cleanup OS handles for <see cref="_fileStreamReader"/> and <see cref="FileObserver"/>.
/// </summary>
protected override void PostStop()
{
_observer.Dispose();
_observer = null;
_fileStreamReader.Close();
_fileStreamReader.Dispose();
base.PostStop();
}
protected override void OnReceive(object message)
{
if (message is FileWrite)
{
// move file cursor forward
// pull results from cursor to end of file and write to output
// (tis is assuming a log file type format that is append-only)
var text = _fileStreamReader.ReadToEnd();
if (!string.IsNullOrEmpty(text))
{
_reporterActor.Tell(text);
}
}
else if (message is FileError)
{
var fe = message as FileError;
_reporterActor.Tell(string.Format("Tail error: {0}", fe.Reason));
}
else if (message is InitialRead)
{
var ir = message as InitialRead;
_reporterActor.Tell(ir.Text);
}
}
}
}