-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Memgraph integration SDK CLI #941
Open
matea16
wants to merge
2
commits into
JupiterOne:main
Choose a base branch
from
matea16:add-memgraph-integration
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
import * as commander from 'commander'; | ||
import path from 'path'; | ||
|
||
import dotenv from 'dotenv'; | ||
import dotenvExpand from 'dotenv-expand'; | ||
|
||
import * as log from '../log'; | ||
import { uploadToMemgraph, wipeMemgraphByID, wipeAllMemgraph } from '../memgraph'; | ||
|
||
export function memgraph() { | ||
dotenvExpand(dotenv.config()); | ||
|
||
const program = new commander.Command(); | ||
program.description( | ||
`Suite of memgraph commands. Options are currently 'memgraph push', 'memgraph wipe', and 'memgraph wipe-all'`, | ||
); | ||
const memgraphCommand = program.command('memgraph'); | ||
memgraphCommand | ||
.command('push') | ||
.description('upload collected entities and relationships to local Memgraph') | ||
.option( | ||
'-d, --data-dir <directory>', | ||
'path to collected entities and relationships', | ||
path.resolve(process.cwd(), '.j1-integration'), | ||
) | ||
.option( | ||
'-i, --integration-instance-id <id>', | ||
'_integrationInstanceId assigned to uploaded entities', | ||
'defaultLocalInstanceID', | ||
) | ||
.option( | ||
'-db, --database-name <database>', | ||
'optional database to push data to (only available for enterprise Memgraph databases)', | ||
'memgraph', | ||
) | ||
.action(async (options) => { | ||
log.info(`Beginning data upload to local Memgraph instance`); | ||
// Point `fileSystem.ts` functions to expected location relative to | ||
// integration project path. | ||
const finalDir = path.resolve(process.cwd(), options.dataDir); | ||
process.env.JUPITERONE_INTEGRATION_STORAGE_DIRECTORY = finalDir; | ||
|
||
await uploadToMemgraph({ | ||
pathToData: finalDir, | ||
integrationInstanceID: options.integrationInstanceId, | ||
memgraphDatabase: options.databaseName, | ||
}); | ||
log.info(`Data uploaded to local Memgraph instance`); | ||
}); | ||
|
||
memgraphCommand | ||
.command('wipe') | ||
.description( | ||
'wipe entities and relationships for a given integrationInstanceID in the Memgraph database', | ||
) | ||
.option( | ||
'-i, --integration-instance-id <id>', | ||
'_integrationInstanceId assigned to uploaded entities', | ||
'defaultLocalInstanceID', | ||
) | ||
.option( | ||
'-db, --database-name <database>', | ||
'optional database to wipe data from (only available for enterprise Memgraph databases)', | ||
'memgraph', | ||
) | ||
.action(async (options) => { | ||
await wipeMemgraphByID({ | ||
integrationInstanceID: options.integrationInstanceId, | ||
memgraphDatabase: options.databaseName, | ||
}); | ||
}); | ||
|
||
memgraphCommand | ||
.command('wipe-all') | ||
.description('wipe all entities and relationships in the Memgraph database') | ||
.option( | ||
'-db, --database-name <database>', | ||
'optional database to wipe data from (only available for enterprise Memgraph databases)', | ||
'memgraph', | ||
) | ||
.action(async (options) => { | ||
await wipeAllMemgraph({ | ||
memgraphDatabase: options.databaseName, | ||
}); | ||
}); | ||
|
||
return memgraphCommand; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
# Memgraph JupiterOne CLI Command | ||
|
||
## Installation | ||
|
||
This command assumes you have three additional values stored in your local .env | ||
file: MEMGRAPH_URI MEMGRAPH_USER MEMGRAPH_PASSWORD | ||
|
||
This can be used for uploading to local or remote Memgraph databases. For | ||
easy access to a local Memgraph instance, you can launch one via a Memgraph provided | ||
Docker image with the command: | ||
|
||
``` | ||
docker run \ | ||
-p 3000:3000 -p 7444:7444 -p 7687:7687 \ | ||
-d \ | ||
-v mg_lib:/var/lib/memgraph \ | ||
-v mg_log:/var/log/memgraph \ | ||
-v mg_etc:/etc/memgraph \ | ||
memgraph/memgraph-platform | ||
``` | ||
|
||
## Usage | ||
|
||
Data is still collected in the same way as before with a call to `yarn start`. | ||
|
||
Once data has been collected, you can run `j1-integration memgraph push`. This will | ||
push data to the Memgraph server listed in the MEMGRAPH_URI .env parameter. If running | ||
locally, you can then access data in the Memgraph database by visiting | ||
http://localhost:3000. Alternatively, you can download Memgraph Lab at the | ||
https://memgraph.com/lab. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
export * from './uploadToMemgraph'; | ||
export * from './wipeMemgraph'; |
181 changes: 181 additions & 0 deletions
181
packages/integration-sdk-cli/src/memgraph/memgraphGraphStore.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,181 @@ | ||
import { Entity, Relationship } from '@jupiterone/integration-sdk-core'; | ||
import { | ||
sanitizeValue, | ||
buildPropertyParameters, | ||
sanitizePropertyName, | ||
getFromTypeLabel, | ||
getToTypeLabel, | ||
} from './memgraphUtilities'; | ||
|
||
import * as memgraph from 'neo4j-driver'; | ||
|
||
export interface MemgraphGraphObjectStoreParams { | ||
uri: string; | ||
username: string; | ||
password: string; | ||
integrationInstanceID: string; | ||
session?: memgraph.Session; | ||
database?: string; | ||
} | ||
|
||
export class MemgraphGraphStore { | ||
private memgraphDriver: memgraph.Driver; | ||
private persistedSession: memgraph.Session; | ||
private databaseName = 'memgraph'; | ||
private typeList = new Set<string>(); | ||
private integrationInstanceID: string; | ||
|
||
constructor(params: MemgraphGraphObjectStoreParams) { | ||
if (params.session) { | ||
this.persistedSession = params.session; | ||
} else { | ||
this.memgraphDriver = memgraph.driver( | ||
params.uri, | ||
memgraph.auth.basic(params.username, params.password), | ||
); | ||
} | ||
this.integrationInstanceID = params.integrationInstanceID; | ||
if (params.database) { | ||
this.databaseName = params.database; | ||
} | ||
} | ||
|
||
private async runCypherCommand( | ||
cypherCommand: string, | ||
cypherParameters?: any, | ||
): Promise<memgraph.Result> { | ||
if (this.persistedSession) { | ||
const result = await this.persistedSession.run(cypherCommand); | ||
return result; | ||
} else { | ||
const session = this.memgraphDriver.session({ | ||
database: this.databaseName, | ||
defaultAccessMode: memgraph.session.WRITE, | ||
}); | ||
const result = await session.run(cypherCommand, cypherParameters); | ||
await session.close(); | ||
return result; | ||
} | ||
} | ||
|
||
async addEntities(newEntities: Entity[]) { | ||
const nodeAlias: string = 'entityNode'; | ||
const promiseArray: Promise<memgraph.Result>[] = []; | ||
for (const entity of newEntities) { | ||
let classLabels = ''; | ||
if (entity._class) { | ||
if (typeof entity._class === 'string') { | ||
classLabels += `:${sanitizePropertyName(entity._class)}`; | ||
} else { | ||
for (const className of entity._class) { | ||
classLabels += `:${sanitizePropertyName(className)}`; | ||
} | ||
} | ||
} | ||
if (!this.typeList.has(entity._type)) { | ||
await this.runCypherCommand(`CREATE INDEX ON :${entity._type}(_key);`); | ||
await this.runCypherCommand(`CREATE INDEX ON :${entity._type}(_integrationInstanceID);`); | ||
this.typeList.add(entity._type); | ||
} | ||
const sanitizedType = sanitizePropertyName(entity._type); | ||
const propertyParameters = buildPropertyParameters(entity); | ||
const finalKeyValue = sanitizeValue(entity._key.toString()); | ||
const buildCommand = ` | ||
MERGE (${nodeAlias} {_key: $finalKeyValue, _integrationInstanceID: $integrationInstanceID}) | ||
SET ${nodeAlias} += $propertyParameters | ||
SET ${nodeAlias}:${sanitizedType}${classLabels};`; | ||
promiseArray.push( | ||
this.runCypherCommand(buildCommand, { | ||
propertyParameters: propertyParameters, | ||
finalKeyValue: finalKeyValue, | ||
integrationInstanceID: this.integrationInstanceID, | ||
}), | ||
); | ||
} | ||
await Promise.all(promiseArray); | ||
} | ||
|
||
async addRelationships(newRelationships: Relationship[]) { | ||
const promiseArray: Promise<memgraph.Result>[] = []; | ||
for (const relationship of newRelationships) { | ||
const relationshipAlias: string = 'relationship'; | ||
const propertyParameters = buildPropertyParameters(relationship); | ||
|
||
let startEntityKey = ''; | ||
let endEntityKey = ''; | ||
|
||
if (relationship._fromEntityKey) { | ||
startEntityKey = sanitizeValue(relationship._fromEntityKey.toString()); | ||
} | ||
if (relationship._toEntityKey) { | ||
endEntityKey = sanitizeValue(relationship._toEntityKey.toString()); | ||
} | ||
|
||
//Attempt to get start and end types | ||
const startEntityTypeLabel = getFromTypeLabel(relationship); | ||
const endEntityTypeLabel = getToTypeLabel(relationship); | ||
|
||
if (relationship._mapping) { | ||
//Mapped Relationship | ||
if (relationship._mapping['skipTargetCreation'] === false) { | ||
const targetEntity = relationship._mapping['targetEntity']; | ||
//Create target entity first | ||
const tempEntity: Entity = { | ||
...targetEntity, | ||
_class: targetEntity._class, | ||
_key: sanitizeValue( | ||
relationship._key.replace( | ||
relationship._mapping['sourceEntityKey'], | ||
'', | ||
), | ||
), | ||
_type: targetEntity._type, | ||
}; | ||
await this.addEntities([tempEntity]); | ||
} | ||
startEntityKey = sanitizeValue( | ||
relationship._mapping['sourceEntityKey'], | ||
); | ||
endEntityKey = sanitizeValue( | ||
relationship._key.replace( | ||
relationship._mapping['sourceEntityKey'], | ||
'', | ||
), | ||
); | ||
} | ||
|
||
const sanitizedRelationshipClass = sanitizePropertyName( | ||
relationship._class, | ||
); | ||
|
||
const buildCommand = ` | ||
MERGE (start${startEntityTypeLabel} {_key: $startEntityKey, _integrationInstanceID: $integrationInstanceID}) | ||
MERGE (end${endEntityTypeLabel} {_key: $endEntityKey, _integrationInstanceID: $integrationInstanceID}) | ||
MERGE (start)-[${relationshipAlias}:${sanitizedRelationshipClass}]->(end) | ||
SET ${relationshipAlias} += $propertyParameters;`; | ||
promiseArray.push( | ||
this.runCypherCommand(buildCommand, { | ||
propertyParameters: propertyParameters, | ||
startEntityKey: startEntityKey, | ||
endEntityKey: endEntityKey, | ||
integrationInstanceID: this.integrationInstanceID, | ||
}), | ||
); | ||
} | ||
await Promise.all(promiseArray); | ||
} | ||
|
||
async wipeInstanceIdData() { | ||
const wipeCypherCommand = `MATCH (n {_integrationInstanceID: '${this.integrationInstanceID}'}) DETACH DELETE n`; | ||
await this.runCypherCommand(wipeCypherCommand); | ||
} | ||
|
||
async wipeDatabase() { | ||
const wipeCypherCommand = `MATCH (n) DETACH DELETE n`; | ||
await this.runCypherCommand(wipeCypherCommand); | ||
} | ||
|
||
async close() { | ||
await this.memgraphDriver.close(); | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does Memgraph use the same Javascript driver as Neo4j? If so, would we be able to share some of the same codebase with our existing Neo4j graph storage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Memgraph is compatible with Neo4j and uses the same JS driver so you can share some of the codebase. I separated everything to preserve the integrity of your code, but feel free to make the necessary changes