-
Notifications
You must be signed in to change notification settings - Fork 91
/
run.csx
149 lines (121 loc) · 5.46 KB
/
run.csx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#r "System.Configuration"
using System;
using System.Configuration;
using Elasticsearch.Net;
using Nest;
public static async Task Run(string myEventHubMessage, TraceWriter log)
{
await SendEventToElasticSearch(myEventHubMessage, log);
}
// Required application setting - target Elastic Search endpoint.
private const string Setting_Es_Uri = "es_uri";
// Required application setting - username for writing to ES.
private const string Setting_Es_Username = "es_username";
// Required application setting - password for writing to ES.
private const string Setting_Es_Password = "es_password";
// Optional application setting - ES index name prefix.
private const string Setting_Es_Index_Name_Prefix = "es_index_name_prefix";
private const string Error_Es_Uri_Missing = "Setting \"" + Setting_Es_Uri + "\" is missing.";
private const string Error_Es_Username_Missing = "Setting \"" + Setting_Es_Username + "\" is missing.";
private const string Error_Es_Password_Missing = "Setting \"" + Setting_Es_Password + "\" is missing.";
private const string DefaultIndexNamePrefix = "wadeventhub";
private const string EventDocumentTypeName = "event";
// Caches the last index name used so we don't have to do the check each time before sending data to ES.
private static string LastIndexName = null;
private static async Task SendEventToElasticSearch(string myEventHubMessage, TraceWriter log)
{
string esUri = ConfigurationManager.AppSettings[Setting_Es_Uri];
string username = ConfigurationManager.AppSettings[Setting_Es_Username];
string password = ConfigurationManager.AppSettings[Setting_Es_Password];
string indexNamePrefix = ConfigurationManager.AppSettings[Setting_Es_Index_Name_Prefix];
if (string.IsNullOrWhiteSpace(esUri))
{
ReportError(log, Error_Es_Uri_Missing, throwException: true);
}
if (string.IsNullOrWhiteSpace(username))
{
ReportError(log, Error_Es_Username_Missing, throwException: true);
}
if (string.IsNullOrWhiteSpace(password))
{
ReportError(log, Error_Es_Password_Missing, throwException: true);
}
if (string.IsNullOrWhiteSpace(indexNamePrefix))
{
log.Warning($"Setting \"{Setting_Es_Index_Name_Prefix}\" not specified, using default value \"{DefaultIndexNamePrefix}\".");
indexNamePrefix = DefaultIndexNamePrefix;
}
log.Verbose($"Sending Event Hub data to ElasticSearch at {esUri} ...");
ConnectionSettings connectionSettings = new ConnectionSettings(new Uri(esUri)).BasicAuthentication(username, password);
ElasticClient client = new ElasticClient(connectionSettings);
string currentIndexName = GetIndexName(indexNamePrefix);
if (!string.Equals(LastIndexName, currentIndexName, StringComparison.Ordinal))
{
await EnsureIndexExists(currentIndexName, client, log);
LastIndexName = currentIndexName;
}
var data = new PostData<string>(myEventHubMessage);
var result = await client.LowLevel.IndexAsync<string>(currentIndexName, EventDocumentTypeName, data);
if (result.Success)
{
log.Info("Data successfully sent.");
log.Verbose(result.Body);
}
else
{
ReportError(log, $"Failed to send data.{Environment.NewLine}{result.DebugInformation}", throwException: true);
}
}
/// <summary>
/// Generates an index name.
/// </summary>
/// <param name="indexNamePrefix">Specifies a string prefix of the index name.</param>
/// <returns>An index name in the form of <prefix>-YYYY.MM.DD.</returns>
private static string GetIndexName(string indexNamePrefix)
{
DateTimeOffset now = DateTimeOffset.UtcNow;
string retval = $"{indexNamePrefix}-{now.Year.ToString("0000")}.{now.Month.ToString("00")}.{now.Day.ToString("00")}";
return retval;
}
/// <summary>
/// Makes sure the specified index name has been created.
/// </summary>
/// <param name="currentIndexName">The index name to check.</param>
/// <param name="esClient">An Elastic client instance.</param>
/// <param name="log">A Trace writer instance.</param>
/// <returns>A Task representing the async operation.</returns>
private static async Task EnsureIndexExists(string currentIndexName, ElasticClient esClient, TraceWriter log)
{
IExistsResponse existsResult = await esClient.IndexExistsAsync(currentIndexName);
if (!existsResult.IsValid)
{
ReportError(log, $"Index exists check failed.{Environment.NewLine}{existsResult.DebugInformation}", throwException: true);
}
if (existsResult.Exists)
{
return;
}
// TODO: allow the consumer to fine-tune index settings
IndexState indexState = new IndexState();
indexState.Settings.NumberOfReplicas = 1;
indexState.Settings.NumberOfShards = 5;
indexState.Settings.Add("refresh_interval", "15s");
ICreateIndexResponse createIndexResult = await esClient.CreateIndexAsync(currentIndexName, c => c.InitializeUsing(indexState));
if (!createIndexResult.IsValid)
{
if (string.Equals(createIndexResult.ServerError?.Error?.Type, "IndexAlreadyExistsException", StringComparison.OrdinalIgnoreCase))
{
// This is fine, someone just beat us to create a new index.
return;
}
ReportError(log, $"Create index failed.{Environment.NewLine}{createIndexResult.DebugInformation}", throwException: true);
}
}
private static void ReportError(TraceWriter log, string errorMessage, bool throwException)
{
log.Error(errorMessage);
if (throwException)
{
throw new ApplicationException(errorMessage);
}
}