-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbridge.cs
261 lines (228 loc) · 9.87 KB
/
bridge.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Serialization;
using System;
using System.Diagnostics;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.IO;
namespace Nodebridge
{
public sealed class InvokeOptions
{
/// <summary>
/// Working directory for your node scripts
/// Default: Current working directory
/// </summary>
public string Workingdirectory { get; set; } = "";
/// <summary>
/// Port for the bridge to listen on, only change this if you know what you are doing.
/// Default: Random port
/// </summary>
public int? Port { get; set; }
/// <summary>
/// Number of instances worker instances the bridge should span.
/// Default: Number of cores of the system
/// </summary>
public int? Instances { get; set; }
/// <summary>
/// Logger implementation
/// Default: Built in logger
/// </summary>
public ILogger Logger { get; set; } = null;
}
internal class StartMessage
{
public int Port { get; set; }
public string Addr { get; set; }
}
internal class ErrorResponse
{
public string Error { get; set; }
public string Message { get; set; }
}
internal class InvokeParams
{
public string Module { get; set; }
public string Opt { get; set; }
public object[] Data { get; set; }
}
public sealed class Bridge
{
private readonly HttpClient _client;
private bool _started;
private readonly InvokeOptions _options;
private Process _nodeProcess;
private readonly ILogger _logger;
private readonly CancellationToken _stoppingToken;
private int _port;
private string _addr;
private string HttpAddr { get => $"http://{_addr}:{_port}"; }
/// <summary>
/// Create a new bridge
/// </summary>
/// <param name="options"></param>
public Bridge(InvokeOptions options, CancellationToken stoppingToken)
{
_client = new HttpClient();
_options = options;
_stoppingToken = stoppingToken;
_logger = options.Logger;
// Start the bridge.
Start();
}
/// <summary>
/// Json settings
/// </summary>
private static readonly JsonSerializerSettings jsonSettings = new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver(),
TypeNameHandling = TypeNameHandling.None
};
/// <summary>
/// Invoke a new function
/// </summary>
/// <param name="module">file to use</param>
/// <param name="opt">function to invoke</param>
/// <param name="args">arguments for the function</param>
/// <typeparam name="T">return type, either string or an object</typeparam>
/// <returns>Task<T></returns>
public Task<T> Invoke<T>(string module, string opt, params object[] args)
{
return DoInvoke<T>(new CancellationToken(), module, opt, args);
}
/// <summary>
/// Invoke a new function
/// </summary>
/// <param name="cancellationToken">cancellationToken</param>
/// <param name="module">file to use</param>
/// <param name="opt">function to invoke</param>
/// <param name="args">arguments for the function</param>
/// <typeparam name="T">return type, either string or an object</typeparam>
/// <returns>Task<T></returns>
[System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "<Pending>")]
public Task<T> Invoke<T>(CancellationToken cancellationToken, string module, string opt, params object[] args)
{
return DoInvoke<T>(cancellationToken, module, opt, args);
}
[System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "<Pending>")]
internal async Task<T> DoInvoke<T>(CancellationToken cancellationToken, string module, string opt, params object[] args)
{
StringContent post = new StringContent(JsonConvert.SerializeObject(new InvokeParams { Data = args, Module = module, Opt = opt }, jsonSettings), Encoding.UTF8, "application/json");
HttpResponseMessage response = await _client.PostAsync(HttpAddr, post, cancellationToken);
string readResponse = await response.Content.ReadAsStringAsync().WithCancellation(cancellationToken);
// Bridge gave us an error, lets rethrow in the .net process.
if (!response.IsSuccessStatusCode)
{
var error = JsonConvert.DeserializeObject<ErrorResponse>(readResponse, jsonSettings);
throw new InvokeException(error.Message + Environment.NewLine + error.Error);
}
var responseContentType = response.Content.Headers.ContentType;
switch (responseContentType.MediaType)
{
case "text/plain":
if (typeof(T) != typeof(string))
{
throw new ArgumentException($"Unable to convert to: {typeof(T).FullName} got string response from the module.");
}
var responseString = await response.Content.ReadAsStringAsync().WithCancellation(cancellationToken);
return (T)(object)responseString;
case "application/json":
var json = await response.Content.ReadAsStringAsync().WithCancellation(cancellationToken);
return JsonConvert.DeserializeObject<T>(json, jsonSettings);
default:
throw new InvalidOperationException("Unexpected content type: " + responseContentType.MediaType);
}
}
/// <summary>
/// Start the process and start listening to the output.
/// </summary>
/// <param name="module">worker implementation</param>
internal void Start()
{
if (!_started)
{
var pid = Process.GetCurrentProcess().Id;
string extra = null;
string workerpath = new StringAsTempFile(EmbeddedResourceReader.Read(typeof(Bridge), "/node/dist/worker.js"), _stoppingToken).FileName;
string modulePath = new StringAsTempFile(EmbeddedResourceReader.Read(typeof(Bridge), "/node/dist/index.js"), _stoppingToken).FileName;
if (_options.Port != null)
{
extra += $"--port {_options.Port}";
}
if (_options.Instances != null)
{
extra += $"--instances {_options.Instances}";
}
if (string.IsNullOrEmpty(_options.Workingdirectory))
{
_options.Workingdirectory = Directory.GetCurrentDirectory();
}
// Start the node process
// Using experimental-worker flag since it's in beta.
var proc = new ProcessStartInfo("node")
{
Arguments = $"--experimental-worker {modulePath} --pid {pid} --workerpath {workerpath} --workingdir {_options.Workingdirectory} {extra ?? string.Empty}",
UseShellExecute = false,
RedirectStandardInput = true,
RedirectStandardOutput = true,
RedirectStandardError = true,
WorkingDirectory = Directory.GetCurrentDirectory()
};
_nodeProcess = Process.Start(proc);
ReadOutputStreams();
}
}
/// <summary>
/// Read output of the node process.
/// </summary>
private void ReadOutputStreams()
{
var pid = Process.GetCurrentProcess().Id;
// listen for everything here.
_nodeProcess.OutputDataReceived += (sender, evt) =>
{
if (evt.Data != null)
{
// This only happens once, listen for the specific message that tells us the port an addr it's listening on.
if (!_started && evt.Data.IndexOf($"[{pid}]") > -1)
{
var systemOutput = JsonConvert.DeserializeObject<StartMessage>(evt.Data.Split(new string[] { $"[{pid}] " }, StringSplitOptions.None)[1]);
_port = systemOutput.Port;
_addr = systemOutput.Addr;
_started = true;
}
else
{
// Log output of the node process to the logger
var decoded = UnencodeNewlines(evt.Data);
_logger.LogInformation(decoded);
}
}
};
// Listening for errors here
_nodeProcess.ErrorDataReceived += (sender, evt) =>
{
if (evt.Data != null)
{
// Log output of the node process to the logger
var decoded = UnencodeNewlines(evt.Data);
_logger.LogError(decoded);
}
};
_nodeProcess.BeginOutputReadLine();
_nodeProcess.BeginErrorReadLine();
}
// https://github.com/aspnet/aspnetcore/blob/master/src/Middleware/NodeServices/src/HostingModels/OutOfProcessNodeInstance.cs
private static string UnencodeNewlines(string str)
{
if (str != null)
{
str = str.Replace("__ns_newline__", Environment.NewLine);
}
return str;
}
}
}