Skip to content

Latest commit

 

History

History
137 lines (104 loc) · 6.7 KB

10.multiple-practices-of-workflow.md

File metadata and controls

137 lines (104 loc) · 6.7 KB

The role of asset.py is to enable the workflow function.

The basic invocation method of the tool is defined in the previous file namely tool.py file. But in practical applications, the output of the previous tool usually needs to be passed to the next tool. That's where workflow comes from, that is, the asset.py file.

The following example is a workflow invoking the nmap_python tool, and nmap.py has already defined three modes: raw, alive, and port_os for invocation.

Import dependency packages:

import asyncio as aio
from levrt import ctx, annot, Concurrent
from levrt.annot.cats import Attck, BlackArch

# Import nmap_python tool for further invocation.
from . import nmap

Multiple practices of workflow

The basic workflow is the simplest invocation to a certain mode of the tool (Note 7). There are many ways a workflow can invoke the tool. The following will show several common workflow writing methods. Of course, the tool author is also required to write the workflow notes according to the format.

@annot.meta(
    desc="ICMP loopback message to determine host survival",
    params=[annot.Param("ip", "target ip or ip segment for scanning", holder="192.168.1.1/24")],
    cats={
        Attck: [Attck.Reconnaissance],
        BlackArch: [BlackArch.Scanner]
    }
)
# Leviathan is built based on an asynchronous engine, and the automated progress system also depends on the asynchronous protocol, so you need to use python asynchronous programming.
async def alive_nmap_python(ip: str):
# async def is the way to write asynchronous functions. The alive_nmap_python method specifies that the string parameter ip needs to be passed in for the alive mode.
    """
    By sending an ICMP loopback detection packet to the target host, it is judged whether the host is alive or not according to the discard situation of the response packet, the response time, the response content, the banner information and the match of the specific service keyword.
	\```
    await alive_nmap_python("192.168.1.1/24")
    \```
    """
    # await tool.mode(args), will invoke the tool according to the specified mode, will wait for the end of the tool execution, and will save the obtained result to mongodb firstly. The result will get a data set of mongodb, namely levrt. Document object.
    # await keyword will block the current function and wait for the asynchronous function alive() to finish executing and return the result before execution starts. For details, please refer to python asynchronous programming.
    # The alive function will return a Document object, please refer to the API documentation for details.
    result = await nmap.alive(ip)
    # Because the get() method of the Document object is asynchronous, you need to await again. Using the .get() method, you can obtain data from the result dataset.
    data = await result.get()
    print(data)
    
    
@annot.meta(
    desc="SYN scan target host open ports, host names and operating systems",
    params=[annot.Param("ip", "target ip or ip segment for scanning", holder="192.168.1.1/24")],
    cats={
        Attck: [Attck.Reconnaissance],
        BlackArch: [BlackArch.Scanner]
    }
)
async def port_os_nmap_python(ip: str):
    """
    By sending a TCP SYN detection packet to the target host, according to the discard situation of the response packet, response time, response content, banner information and specific service keyword matching, identify the open ports, host names and related services and operating systems of the target host information.
    ```
    await port_os_nmap_python("192.168.1.1")
    ```
    """
    result = await nmap.port_os(ip)
    data = await result.get()
    print(data)

Background execution

By default, the await method will wait for the tool to execute the command in container before the next step. When the command of tool invocation takes a lot of time, the workflow can specify the invocation to be placed in the background to execute .start() and will start immediately the next steps in:

async def background(ip: str):
    task = await nmap.port_os(ip).start()
    # Assuming that this scan needs to be executed in the container for a long time, it can be executed in the background. The signal() method will block the code and wait for the task to execute successfully before returning
    # After launched, the task will execute in the background. Then you can continue to launch other tasks, block them when needed, and wait for execution results

    await task.signal()
    # When the background execution is specified, the signal (.signal()) can be passed in the tool execution, such as immediately interrupting the execution of the tool in the container, etc.
    result = await task()
    # Get the dataset from the database.
    print(result.get())

Multi-thread execution

When a task execution does not return the result immediately, it needs to wait for a period of time. You can consider using multi-thread execution in parallel when there are many similar tasks to execute.

Workflow supports multi-thread concurrent tool invocation, and the method Concurrent() is used in this scenario:

task_list = []
docs = []
async def concurrent():
    # Concurrent execution function Concurrent(), where limit is an optional parameter used to limit the number of tasks to be executed concurrently, the default is no limitation.
    # The Concurrent() function is the function imported from the levrt package above.
    async with Concurrent(limit=2) as c:
        # Concurrently invoke checkdmarc's DMARC mode for different domain names.
        task = c.start(checkdmarc.dmarc(i))
        task_list.append(task)
    
    for t in task_list:
        t_result = await t
        data = await t_result
        docs.append(data)
    print(docs)

Extra space

When the tool invocation ends and the data is returned, we can also specify the data storage location (in the database storage space) in workflow. For example, when you use Nmap to scan two network segments, the results of the two network segments need to be saved in different namespaces.

This method can be reused:

async def extra_namespace(ip: str):
    with ctx / "demo1":
        # Under szczecin/nmap_python.port_os, the namespace that has been specified by our tool Cr() function, create another demo1 space.
        result = await nmap.port_os(ip)
        data = await result.get()
        print(data)

async def extra_namespace2(ip: str):
    with ctx / "demo2":
        await extra_namespace(ip)
        # Reuse this method to add a new layer of demo2 namespace under the demo1 namespace which is added by using the method above.