-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsearchClient.py
142 lines (129 loc) · 5.25 KB
/
searchClient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
__author__ = "Maysam Mokarian"
__email__ = "[email protected]"
__license__ = "MIT"
__version__ = "February 2022"
import json
import os
import requests
from client.clientabstract import ClientAbstract
from parser.parser import Parser
from client.storageClient import StorageClient
class SearchClient(ClientAbstract):
def __init__(self):
search_service_name = (
self.config["search"]["service-name"]
if self.config
else os.getenv("SEARCH_SERVICE_NAME")
)
api_version = (
self.config["search"]["api-version"]
if self.config
else os.getenv("SEARCH_API_VERSION")
)
search_api_key = (
self.config["search"]["api-key"]
if self.config
else os.getenv("SEARCH_API_KEY")
)
self.endpoint = "https://" + search_service_name + ".search.windows.net/"
self.api_version = "?api-version=" + api_version
self.headers = {"Content-Type": "application/json", "api-key": search_api_key}
self.storage_client = StorageClient()
self.index_name = (
self.config["search"]["index-name"]
if self.config
else os.getenv("SEARCH_INDEX_NAME")
)
self.index_schema_path = (
self.config["search"]["index-schema-path"]
if self.config
else os.getenv("INDEX_SCHEMA_PATH")
)
self.insights_container = (
self.config["storage"]["container"]
if self.config
else os.getenv("INSIGHTS_CONTAINER_NAME")
)
self.ingest_log_filename = (
self.config["files"]["ingested-file"]
if self.config
else os.getenv("INGEST_LOG_FILENAME")
)
self.ingest_failure_log_filename = (
self.config["files"]["failed-to-ingest-file"]
if self.config
else os.getenv("INGEST_FAILURE_LOG_FILENAME")
)
self.vi_output_directory = (
self.config["files"]["vi-output-directory"]
if self.config
else os.getenv("VI_OUTPUT_DIRECTORY")
)
def upload_to_search(self, docs):
url = (
self.endpoint
+ "indexes/"
+ self.index_name
+ "/docs/index"
+ self.api_version
)
response = requests.post(url, headers=self.headers, json=docs)
index_content = response.json()
print(index_content)
def create_index(self):
url = self.endpoint + "indexes" + self.api_version
index_schema = self.read_file_from_directory(self.index_schema_path)
index_schema["name"] = self.index_name
response = requests.post(url, headers=self.headers, json=index_schema)
response = response.json()
print(response)
def upload_files_from_storage_to_search(self):
print("uploading files from storage account to search")
vi_output_files = self.storage_client.list_files_in_container(
self.insights_container
)
i = 0
for file in vi_output_files:
i += 1
try:
json_object = json.loads(
self.storage_client.get_blob_string(
self.insights_container, file.name
).encode() # Encode as UTF-8 in case UTF-8 BOM
)
if json_object["state"] == "Processed":
parser = Parser()
intervals = parser.parse_vi_json(json_object)
intervals = list(intervals.values())
for item in intervals:
item["@search.action"] = "upload"
documents = {"value": intervals}
print(str(i) + f": uploading {str(file.name)} to search index")
self.upload_to_search(documents)
self.write_status_file(str(file.name), self.ingest_log_filename)
except ValueError:
print("could not process " + str(file))
self.write_status_file(file, self.ingest_failure_log_filename)
def upload_local_files_to_search(self):
print("uploading local files to search")
files = self.read_files_from_directory(self.vi_output_directory)
i = 0
for file in files:
path = os.path.join(self.vi_output_directory, file)
i += 1
with open(path) as f:
try:
json_object = json.load(f)
if json_object["state"] == "Processed":
parser = Parser()
intervals = parser.parse_vi_json(json_object)
intervals = list(intervals.values())
for item in intervals:
item["@search.action"] = "upload"
documents = {"value": intervals}
print(str(i) + f": uploading {str(file)} to search index")
self.upload_to_search(documents)
self.write_status_file(file, self.ingest_log_filename)
except ValueError:
print("could not process " + str(file))
self.write_status_file(file, self.ingest_failure_log_filename)