Skip to content

Commit

Permalink
Update FalconHookIT.java
Browse files Browse the repository at this point in the history
  • Loading branch information
mneethiraj authored Feb 19, 2025
1 parent b7149b5 commit cf7d7c3
Showing 1 changed file with 67 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,24 +62,28 @@
public class FalconHookIT {
public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(FalconHookIT.class);

public static final String CLUSTER_RESOURCE = "/cluster.xml";
public static final String FEED_RESOURCE = "/feed.xml";
public static final String FEED_HDFS_RESOURCE = "/feed-hdfs.xml";
public static final String FEED_REPLICATION_RESOURCE = "/feed-replication.xml";
public static final String PROCESS_RESOURCE = "/process.xml";
private static final ConfigurationStore STORE = ConfigurationStore.get();
public static final String CLUSTER_RESOURCE = "/cluster.xml";
public static final String FEED_RESOURCE = "/feed.xml";
public static final String FEED_HDFS_RESOURCE = "/feed-hdfs.xml";
public static final String FEED_REPLICATION_RESOURCE = "/feed-replication.xml";
public static final String PROCESS_RESOURCE = "/process.xml";

private static final ConfigurationStore STORE = ConfigurationStore.get();

private AtlasClientV2 atlasClient;

@BeforeClass
public void setUp() throws Exception {
Configuration atlasProperties = ApplicationProperties.get();

if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
atlasClient = new AtlasClientV2(atlasProperties.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT), new String[] {"admin", "admin"});
} else {
atlasClient = new AtlasClientV2(atlasProperties.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT));
}

AtlasService service = new AtlasService();

service.init();
STORE.registerListener(service);
CurrentUser.authenticate(System.getProperty("user.name"));
Expand All @@ -88,25 +92,26 @@ public void setUp() throws Exception {
@Test
public void testCreateProcess() throws Exception {
Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random());

STORE.publish(EntityType.CLUSTER, cluster);
assertClusterIsRegistered(cluster);

Feed infeed = getTableFeed(FEED_RESOURCE, cluster.getName(), null);
String infeedId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
FalconBridge.getFeedQualifiedName(infeed.getName(), cluster.getName()))).getGuid();
assertClusterIsRegistered(cluster);

Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName());
String outFeedId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
FalconBridge.getFeedQualifiedName(outfeed.getName(), cluster.getName()))).getGuid();
Feed infeed = getTableFeed(FEED_RESOURCE, cluster.getName(), null);
String infeedId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, FalconBridge.getFeedQualifiedName(infeed.getName(), cluster.getName()))).getGuid();
Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName());
String outFeedId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, FalconBridge.getFeedQualifiedName(outfeed.getName(), cluster.getName()))).getGuid();
Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, "process" + random());

Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, "process" + random());
process.getClusters().getClusters().get(0).setName(cluster.getName());
process.getInputs().getInputs().get(0).setFeed(infeed.getName());
process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName());

STORE.publish(EntityType.PROCESS, process);

String pid = assertProcessIsRegistered(process, cluster.getName());
AtlasEntity processEntity = atlasClient.getEntityByGuid(pid).getEntity();

assertNotNull(processEntity);
assertEquals(processEntity.getAttribute(AtlasClient.NAME), process.getName());
assertEquals(getGuidFromObjectId(((List<?>) processEntity.getAttribute("inputs")).get(0)), infeedId);
Expand All @@ -116,50 +121,52 @@ public void testCreateProcess() throws Exception {
@Test
public void testReplicationFeed() throws Exception {
Cluster srcCluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random());

STORE.publish(EntityType.CLUSTER, srcCluster);

assertClusterIsRegistered(srcCluster);

Cluster targetCluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random());

STORE.publish(EntityType.CLUSTER, targetCluster);

assertClusterIsRegistered(targetCluster);

Feed feed = getTableFeed(FEED_REPLICATION_RESOURCE, srcCluster.getName(), targetCluster.getName());
String inId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
FalconBridge.getFeedQualifiedName(feed.getName(), srcCluster.getName()))).getGuid();
String outId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
FalconBridge.getFeedQualifiedName(feed.getName(), targetCluster.getName()))).getGuid();
Feed feed = getTableFeed(FEED_REPLICATION_RESOURCE, srcCluster.getName(), targetCluster.getName());
String inId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, FalconBridge.getFeedQualifiedName(feed.getName(), srcCluster.getName()))).getGuid();
String outId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, FalconBridge.getFeedQualifiedName(feed.getName(), targetCluster.getName()))).getGuid();
String processId = assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_REPLICATION.getName(), AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, feed.getName());
AtlasEntity process = atlasClient.getEntityByGuid(processId).getEntity();

String processId = assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_REPLICATION.getName(),
AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, feed.getName());
AtlasEntity process = atlasClient.getEntityByGuid(processId).getEntity();
assertEquals(getGuidFromObjectId(((List<?>) process.getAttribute("inputs")).get(0)), inId);
assertEquals(getGuidFromObjectId(((List<?>) process.getAttribute("outputs")).get(0)), outId);
}

@Test
public void testCreateProcessWithHDFSFeed() throws Exception {
Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random());

STORE.publish(EntityType.CLUSTER, cluster);

TypesUtil.Pair<String, Feed> result = getHDFSFeed(FEED_HDFS_RESOURCE, cluster.getName());
Feed infeed = result.right;
String infeedId = result.left;

Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName());
String outfeedId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
FalconBridge.getFeedQualifiedName(outfeed.getName(), cluster.getName()))).getGuid();
Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName());
String outfeedId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, FalconBridge.getFeedQualifiedName(outfeed.getName(), cluster.getName()))).getGuid();
Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, "process" + random());

Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, "process" + random());
process.getClusters().getClusters().get(0).setName(cluster.getName());
process.getInputs().getInputs().get(0).setFeed(infeed.getName());
process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName());

STORE.publish(EntityType.PROCESS, process);

String pid = assertProcessIsRegistered(process, cluster.getName());
AtlasEntity processEntity = atlasClient.getEntityByGuid(pid).getEntity();

assertEquals(processEntity.getAttribute(AtlasClient.NAME), process.getName());
assertEquals(processEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME),
FalconBridge.getProcessQualifiedName(process.getName(), cluster.getName()));
assertEquals(processEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME), FalconBridge.getProcessQualifiedName(process.getName(), cluster.getName()));
assertEquals(getGuidFromObjectId(((List<?>) processEntity.getAttribute("inputs")).get(0)), infeedId);
assertEquals(getGuidFromObjectId(((List<?>) processEntity.getAttribute("outputs")).get(0)), outfeedId);
}
Expand All @@ -172,6 +179,7 @@ public void testCreateProcessWithHDFSFeed() throws Exception {
*/
protected void waitFor(int timeout, Predicate predicate) throws Exception {
ParamChecker.notNull(predicate, "predicate");

long mustEnd = System.currentTimeMillis() + timeout;

while (true) {
Expand All @@ -182,14 +190,17 @@ protected void waitFor(int timeout, Predicate predicate) throws Exception {
if (System.currentTimeMillis() >= mustEnd) {
fail("Assertions failed. Failing after waiting for timeout " + timeout + " msecs", e);
}

LOG.debug("Waiting up to {} msec as assertion failed", mustEnd - System.currentTimeMillis(), e);

Thread.sleep(400);
}
}
}

private <T extends Entity> T loadEntity(EntityType type, String resource, String name) throws JAXBException {
Entity entity = (Entity) type.getUnmarshaller().unmarshal(this.getClass().getResourceAsStream(resource));

switch (entity.getEntityType()) {
case CLUSTER:
((Cluster) entity).setName(name);
Expand All @@ -203,6 +214,7 @@ private <T extends Entity> T loadEntity(EntityType type, String resource, String
((Process) entity).setName(name);
break;
}

return (T) entity;
}

Expand All @@ -215,38 +227,38 @@ private String getTableUri(String dbName, String tableName) {
}

private String assertProcessIsRegistered(Process process, String clusterName) throws Exception {
return assertEntityIsRegistered(FalconDataTypes.FALCON_PROCESS.getName(),
AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
FalconBridge.getProcessQualifiedName(process.getName(), clusterName));
return assertEntityIsRegistered(FalconDataTypes.FALCON_PROCESS.getName(), AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, FalconBridge.getProcessQualifiedName(process.getName(), clusterName));
}

private String assertClusterIsRegistered(Cluster cluster) throws Exception {
return assertEntityIsRegistered(FalconDataTypes.FALCON_CLUSTER.getName(),
AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, cluster.getName());
return assertEntityIsRegistered(FalconDataTypes.FALCON_CLUSTER.getName(), AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, cluster.getName());
}

private TypesUtil.Pair<String, Feed> getHDFSFeed(String feedResource, String clusterName) throws Exception {
Feed feed = loadEntity(EntityType.FEED, feedResource, "feed" + random());
org.apache.falcon.entity.v0.feed.Cluster feedCluster = feed.getClusters().getClusters().get(0);

feedCluster.setName(clusterName);
STORE.publish(EntityType.FEED, feed);

String feedId = assertFeedIsRegistered(feed, clusterName);

assertFeedAttributes(feedId);

String processId = assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_CREATION.getName(),
AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
FalconBridge.getFeedQualifiedName(feed.getName(), clusterName));
String processId = assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_CREATION.getName(), AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, FalconBridge.getFeedQualifiedName(feed.getName(), clusterName));
AtlasEntity processEntity = atlasClient.getEntityByGuid(processId).getEntity();

assertEquals(getGuidFromObjectId(((List<?>) processEntity.getAttribute("outputs")).get(0)), feedId);

String inputId = getGuidFromObjectId(((List<?>) processEntity.getAttribute("inputs")).get(0));
AtlasEntity pathEntity = atlasClient.getEntityByGuid(inputId).getEntity();

assertEquals(pathEntity.getTypeName(), HiveMetaStoreBridge.HDFS_PATH);

List<Location> locations = FeedHelper.getLocations(feedCluster, feed);
Location dataLocation = FileSystemStorage.getLocation(locations, LocationType.DATA);
assertEquals(pathEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME),
FalconBridge.normalize(dataLocation.getPath()));

assertEquals(pathEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME), FalconBridge.normalize(dataLocation.getPath()));

return TypesUtil.Pair.of(feedId, feed);
}
Expand All @@ -258,70 +270,78 @@ private Feed getTableFeed(String feedResource, String clusterName) throws Except
private Feed getTableFeed(String feedResource, String clusterName, String secondClusterName) throws Exception {
Feed feed = loadEntity(EntityType.FEED, feedResource, "feed" + random());
org.apache.falcon.entity.v0.feed.Cluster feedCluster = feed.getClusters().getClusters().get(0);

feedCluster.setName(clusterName);

String dbName = "db" + random();
String tableName = "table" + random();

feedCluster.getTable().setUri(getTableUri(dbName, tableName));

String dbName2 = "db" + random();
String tableName2 = "table" + random();

if (secondClusterName != null) {
org.apache.falcon.entity.v0.feed.Cluster feedCluster2 = feed.getClusters().getClusters().get(1);

feedCluster2.setName(secondClusterName);
feedCluster2.getTable().setUri(getTableUri(dbName2, tableName2));
}

STORE.publish(EntityType.FEED, feed);

String feedId = assertFeedIsRegistered(feed, clusterName);

assertFeedAttributes(feedId);
verifyFeedLineage(feed.getName(), clusterName, feedId, dbName, tableName);

if (secondClusterName != null) {
String feedId2 = assertFeedIsRegistered(feed, secondClusterName);

assertFeedAttributes(feedId2);
verifyFeedLineage(feed.getName(), secondClusterName, feedId2, dbName2, tableName2);
}

return feed;
}

private void assertFeedAttributes(String feedId) throws Exception {
AtlasEntity feedEntity = atlasClient.getEntityByGuid(feedId).getEntity();

assertEquals(feedEntity.getAttribute(AtlasClient.OWNER), "testuser");
assertEquals(feedEntity.getAttribute(FalconBridge.FREQUENCY), "hours(1)");
assertEquals(feedEntity.getAttribute(AtlasClient.DESCRIPTION), "test input");
}

private void verifyFeedLineage(String feedName, String clusterName, String feedId, String dbName, String tableName)
throws Exception {
private void verifyFeedLineage(String feedName, String clusterName, String feedId, String dbName, String tableName) throws Exception {
//verify that lineage from hive table to falcon feed is created
String processId = assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_CREATION.getName(),
AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
FalconBridge.getFeedQualifiedName(feedName, clusterName));
String processId = assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_CREATION.getName(), AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, FalconBridge.getFeedQualifiedName(feedName, clusterName));
AtlasEntity processEntity = atlasClient.getEntityByGuid(processId).getEntity();

assertEquals(getGuidFromObjectId(((List<?>) processEntity.getAttribute("outputs")).get(0)), feedId);

String inputId = getGuidFromObjectId(((List<?>) processEntity.getAttribute("inputs")).get(0));
AtlasEntity tableEntity = atlasClient.getEntityByGuid(inputId).getEntity();

assertEquals(tableEntity.getTypeName(), HiveDataTypes.HIVE_TABLE.getName());
assertEquals(tableEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME),
HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName));
assertEquals(tableEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME), HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName));
}

private String assertFeedIsRegistered(Feed feed, String clusterName) throws Exception {
return assertEntityIsRegistered(FalconDataTypes.FALCON_FEED.getName(), AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
FalconBridge.getFeedQualifiedName(feed.getName(), clusterName));
return assertEntityIsRegistered(FalconDataTypes.FALCON_FEED.getName(), AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, FalconBridge.getFeedQualifiedName(feed.getName(), clusterName));
}

private String assertEntityIsRegistered(final String typeName, final String property, final String value) throws Exception {
waitFor(80000, new Predicate() {
@Override
public void evaluate() throws Exception {
AtlasEntity.AtlasEntityWithExtInfo entity = atlasClient.getEntityByAttribute(typeName, Collections.singletonMap(property, value));

assertNotNull(entity);
assertNotNull(entity.getEntity());
}
});

return atlasClient.getEntityHeaderByAttribute(typeName, Collections.singletonMap(property, value)).getGuid();
}

Expand Down

0 comments on commit cf7d7c3

Please sign in to comment.