-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathupload.py
executable file
·66 lines (49 loc) · 1.72 KB
/
upload.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import os
import json
import requests
from pathlib import Path
SCHEMA_REGISTRY_TOKEN = os.getenv("SCHEMA_REGISTRY_TOKEN")
SCHEMA_REGISTRY_URL = os.getenv("SCHEMA_REGISTRY_URL")
def upload_schema(file_path):
subject = file_path.stem
with open(file_path, "r") as f:
content = f.read()
payload = {
"schemaType": "PROTOBUF",
"schema": content
}
imports = [line for line in content.splitlines() if line.startswith('import "') and line.endswith('.proto";')]
references = []
for line in imports:
import_file = line.split('"')[1]
import_subject = Path(import_file).stem
import_path = file_path.parent / import_file
#Upload imports first and get the version and use that for the reference
upload_schema(import_path)
headers = {"Authorization": SCHEMA_REGISTRY_TOKEN} if SCHEMA_REGISTRY_TOKEN else {}
import_version = requests.get(
f"{SCHEMA_REGISTRY_URL}/subjects/{import_subject}/versions",
headers=headers
).json()[-1]
references.append({
"name": import_file,
"subject": import_subject,
"version": import_version
})
if references:
payload["references"] = references
#debug
print(json.dumps(payload, indent=2))
headers = {
"Content-Type": "application/vnd.schemaregistry.v1+json"
}
if SCHEMA_REGISTRY_TOKEN:
headers["Authorization"] = SCHEMA_REGISTRY_TOKEN
response = requests.post(
f"{SCHEMA_REGISTRY_URL}/subjects/{subject}/versions",
headers=headers,
data=json.dumps(payload)
)
response.raise_for_status()
for file in Path(".").glob("*.proto"):
upload_schema(file)