-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathjob_export.py
239 lines (204 loc) · 8.41 KB
/
job_export.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
import json
import os
import tempfile
import hashlib
from datetime import datetime
from pfb.importers.gen3dict import _from_dict
from pfb.reader import PFBReader
from pfb.writer import PFBWriter
from pyspark import SparkConf
from pyspark.sql import SparkSession
import requests
from pelican.dictionary import init_dictionary, DataDictionaryTraversal
from pelican.graphql.guppy_gql import GuppyGQL
from pelican.jobs import export_pfb_job
from pelican.s3 import s3upload_file
from pelican.indexd import indexd_submit
from pelican.mds import metadata_submit_expiration
if __name__ == "__main__":
node = os.environ["ROOT_NODE"]
access_token = os.environ["ACCESS_TOKEN"]
input_data = os.environ["INPUT_DATA"]
access_format = os.environ["ACCESS_FORMAT"]
# the PFB file and indexd/mds records expire after 14 days by default
record_expiration_days = os.environ.get("RECORD_EXPIRATION_DAYS", 14)
print("This is the format")
print(access_format)
with open("/pelican-creds.json") as pelican_creds_file:
pelican_creds = json.load(pelican_creds_file)
required_keys = [
"manifest_bucket_name",
"aws_access_key_id",
"aws_secret_access_key",
]
if access_format == "guid":
required_keys += ["fence_client_id", "fence_client_secret"]
for key in required_keys:
assert pelican_creds.get(key), f"No '{key}' in config"
input_data = json.loads(input_data)
gql = GuppyGQL(
node=node, hostname="http://revproxy-service", access_token=access_token
)
filters = json.dumps({"filter": input_data.get("filter", {})})
case_ids = gql.execute(filters=filters)
try:
with open("/peregrine-creds.json") as pelican_creds_file:
peregrine_creds = json.load(pelican_creds_file)
except (FileNotFoundError, json.JSONDecodeError) as e:
print(f"Failed to load credentials file: {e}")
peregrine_creds = {}
# Set variables, prioritizing environment variables
DB_HOST = os.getenv("DB_HOST", peregrine_creds.get("db_host"))
DB_DATABASE = os.getenv("DB_DATABASE", peregrine_creds.get("db_database"))
DB_USER = os.getenv("DB_USER", peregrine_creds.get("db_username"))
DB_PASS = os.getenv("DB_PASS", peregrine_creds.get("db_password"))
# Construct the database URL if possible
if DB_HOST and DB_DATABASE:
DB_URL = f"jdbc:postgresql://{DB_HOST}/{DB_DATABASE}"
else:
DB_URL = None
raise RuntimeError(
"DB_HOST or DB_DATABASE is missing. DB_URL cannot be constructed."
)
dictionary_url = os.environ["DICTIONARY_URL"]
dictionary, model = init_dictionary(url=dictionary_url)
ddt = DataDictionaryTraversal(model)
# EXTRA_NODES is an optional comma-delimited list of nodes to additionally include in the PFB.
if os.environ.get("EXTRA_NODES") is not None:
# Allow user to specify EXTRA_NODES == None by passing an empty string.
# This is so that BioDataCatalyst PFB exports can specify no extra nodes.
if os.environ["EXTRA_NODES"].strip() == "":
extra_nodes = None
else:
extra_nodes = [n for n in os.environ["EXTRA_NODES"].split(",")]
else:
# Preserved for backwards compatibility:
# If EXTRA_NODES is not specified, add 'reference_file' node
# when exporting PFBs from BioDataCatalyst (aka STAGE aka gtex)
if "gtex" in dictionary_url:
extra_nodes = ["reference_file", "reference_file_index"]
else:
extra_nodes = None
conf = (
SparkConf()
.set("spark.jars", os.environ["POSTGRES_JAR_PATH"])
.set("spark.driver.memory", "10g")
.set("spark.executor.memory", "10g")
.setAppName("pelican")
)
spark = SparkSession.builder.config(conf=conf).getOrCreate()
db = spark.read.format("jdbc").options(
url=DB_URL, user=DB_USER, password=DB_PASS, driver="org.postgresql.Driver"
)
with tempfile.NamedTemporaryFile(mode="w+b", delete=False) as avro_output:
with PFBWriter(avro_output) as pfb_file:
_from_dict(pfb_file, dictionary_url)
filename = pfb_file.name
with tempfile.NamedTemporaryFile(mode="w+b", delete=False) as avro_output:
with PFBReader(filename) as reader:
with PFBWriter(avro_output) as pfb_file:
pfb_file.copy_schema(reader)
pfb_file.write()
fname = pfb_file.name
# If the input data specifies the root node to use, use
# that root node. Otherwise fall back to $ROOT_NODE environment variable.
root_node = input_data.get("root_node")
if root_node is None:
root_node = node
# adding aligned_reads_index to the extra nodes on VCF files to make them inline with CRAM files
if root_node == "simple_germline_variation":
if extra_nodes is None:
extra_nodes = ["aligned_reads_index"]
else:
extra_nodes.append("aligned_reads_index")
with open(fname, "a+b") as avro_output:
with PFBReader(filename) as reader:
with PFBWriter(avro_output) as pfb_file:
pfb_file.copy_schema(reader)
export_pfb_job(
db,
pfb_file,
ddt,
case_ids,
root_node,
extra_nodes,
True, # include upward nodes: project, program etc
)
avro_filename = "{}.avro".format(
datetime.now().strftime("export_%Y-%m-%dT%H:%M:%S")
)
s3file = s3upload_file(
pelican_creds["manifest_bucket_name"],
avro_filename,
pelican_creds["aws_access_key_id"],
pelican_creds["aws_secret_access_key"],
fname,
)
if access_format == "guid":
# calculate md5 sum
md5 = (
hashlib.md5()
if sys.version_info < (3, 9)
else hashlib.md5(usedforsecurity=False)
) # nosec
chunk_size = 8192
with open(fname, "rb") as f:
while True:
data = f.read(chunk_size)
if not data:
break
md5_sum.update(data)
md5_digest = md5_sum.hexdigest()
# get authz fields
auth_paths = gql._graphql_auth_resource_path(filters=filters)
authz = []
if len(auth_paths) > 0:
for path in auth_paths:
if path["auth_resource_path"] not in authz:
authz.append(path["auth_resource_path"])
hostname = os.environ["GEN3_HOSTNAME"]
COMMONS = "https://" + hostname + "/"
# try sending to indexd
try:
with open("/indexd-creds.json") as indexd_creds_file:
indexd_creds = json.load(indexd_creds_file)
gdcapi_credential = indexd_creds["user_db"]["gdcapi"]
except (FileNotFoundError, json.JSONDecodeError, KeyError) as e:
print(f"Failed to load indexd credentials file or missing keys: {e}")
indexd_creds = {}
gdcapi_credential = None
# Load the indexd credential (fallback to SHEEPDOG env variable)
indexd_creds = os.getenv("SHEEPDOG", gdcapi_credential)
s3_url = "s3://" + pelican_creds["manifest_bucket_name"] + "/" + avro_filename
# exchange the client ID and secret for an access token
r = requests.post(
f"{COMMONS}user/oauth2/token?grant_type=client_credentials&scope=openid",
auth=(
pelican_creds["fence_client_id"],
pelican_creds["fence_client_secret"],
),
)
if r.status_code != 200:
raise Exception(
f"Failed to obtain access token using OIDC client credentials - {r.status_code}:\n{r.text}"
)
client_access_token = r.json()["access_token"]
indexd_record = indexd_submit(
COMMONS,
indexd_creds,
avro_filename,
os.stat(fname).st_size,
[s3_url],
{"md5": str(md5_digest)},
authz,
)
metadata_submit_expiration(
hostname=COMMONS,
guid=indexd_record["did"],
access_token=client_access_token,
record_expiration_days=record_expiration_days,
)
# send s3 link and information to indexd to create guid and send it back
print("[out] {}".format(indexd_record["did"]))
else:
print("[out] {}".format(s3file))