forked from NaicheD/datahub
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathingest_small.py
69 lines (59 loc) · 2.03 KB
/
ingest_small.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import json
import random
from datahub.emitter.serialization_helper import pre_json_transform
from datahub.metadata.com.linkedin.pegasus2avro.common import (
BrowsePaths,
Owner,
Ownership,
OwnershipType,
)
from datahub.metadata.com.linkedin.pegasus2avro.dataset import DatasetProperties
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from locust import HttpUser, constant, task
class IngestUser(HttpUser):
"""
Same as Ingest test except we only test with less IDs
so we hit update existing asepct cases
"""
wait_time = constant(1)
@task
def ingest(self):
proposed_snapshot = self._build_snapshot(random.randint(1, 100))
snapshot_fqn = (
f"com.linkedin.metadata.snapshot.{proposed_snapshot.RECORD_SCHEMA.name}"
)
self.client.post(
"/entities?action=ingest",
json.dumps(
{
"entity": {
"value": {
snapshot_fqn: pre_json_transform(proposed_snapshot.to_obj())
}
}
}
),
)
def _build_snapshot(self, id: int):
urn = self._build_urn(id)
return DatasetSnapshot(
urn,
[
self._build_properties(),
self._build_ownership(id),
self._build_browsepaths(id),
],
)
def _build_urn(self, id: int):
return f"urn:li:dataset:(urn:li:dataPlatform:bigquery,test_dataset_{id},PROD)"
def _build_properties(self):
return DatasetProperties(description="This is a great dataset")
def _build_browsepaths(self, id: int):
return BrowsePaths([f"/perf/testing/path/{id}"])
def _build_ownership(self, id: int):
return Ownership(
[
Owner(f"urn:li:corpuser:test_{id}", OwnershipType.DATAOWNER),
Owner(f"urn:li:corpuser:common", OwnershipType.DATAOWNER),
]
)