Grapl

Grapl is a Graph Platform for Detection and Response with a focus on helping Detection Engineers and Incident Responders stop fighting their data and start connecting it. Find out more on our Github.

For now, our documentation primarily focuses on grapl_analyzerlib. grapl_analyzerlib provides a Python interface for end-users to interact with the data in Grapl.

Note

Grapl’s documentation is still a work in progress.

Queryables

Grapl provides powerful primitives for building graph based queries.

At the root of this query logic is the Queryable base class, though you shouldn’t ever have to work with that directly.

Queries are themselves Python classes that can be composed and constrained.

A simple query would look like this:

ProcessQuery()

This query describes a process - any process, it’s totally unconstrained.

We can execute this query in a few ways. Here are three examples,

mclient = MasterGraphClient()
    
all_processes = ProcessQuery().query(mclient)
one_process = ProcessQuery().query_first(mclient)
count = ProcessQuery().get_count(mclient)    

Queryable.query

Query the graph for all nodes that match.

graph_client - a GraphClient, which will determine which database to query contains_node_key - a node_key that must exist somewhere in the query first - return only the first first nodes. Defaults to 1000. When contains_node_key, first is set to 1.

returns - a list of nodes that matched your query

    def query(
        self,
        graph_client: GraphClient,
        contains_node_key: Optional[str] = None,
        first: Optional[int] = 1000,
    ) -> List["NV"]:
        pass

Queryable.query_first

Query the graph for the first node that matches.

graph_client - a GraphClient, which will determine which database to query contains_node_key - a node_key that must exist somewhere in the query

returns - a list of nodes that matched your query

    def query_first(
        self, 
        graph_client: GraphClient, 
        contains_node_key: Optional[str] = None
    ) -> Optional["NV"]:
        pass

Queryable.get_count

Query the graph, counting all matches.

graph_client - a GraphClient, which will determine which database to query first - count up to first, and then stop.

returns - the number of matches for this query. If first is set, only count up to first.

    def get_count(
        self,
        graph_client: GraphClient,
        first: Optional[int] = None,
    ) -> int:
        pass

contains_node_key

In some cases, such as when writing Grapl Analyzers, we want to execute a query where a node’s node_key may be anywhere in that graph.

For example,

query = (
    ProcessQuery()  # A
    .with_bin_file(
        FileQuery()  # B
        .with_spawned_from(
            ProcessQuery()  # C
        )
    )
)

query.query_first(mclient, contains_node_key="node-key-to-query")

In this case, if our signature matches such that any of the nodes A, B, C, have the node_key “node-key-to-query”, we have a match - otherwise, no match.

And, Or, Not

And

For a single predicate constraint (with_* method) all constraints are considered And’d.

This query matches a process name that contains both “foo” and “bar”.

ProcessQuery()
.with_process_name(contains=["foo", "bar"])

Or

Multiple predicate constraints are considered Or’d.

This query matches a process name that contains either “foo” or “bar”.

ProcessQuery()
.with_process_name(contains="foo")
.with_process_name(contains="bar")

Not

Any constraint can be wrapped in a Not to negate the constraint.

This query matches a process name that is not “foo”.

ProcessQuery()
.with_process_name(contains=Not("foo"))

All Together

This query matches a process with a process_name that either is not ‘foo’ but ends with ‘.exe’, or it will match a process with a process containing “bar” and “baz”.

ProcessQuery()
.with_process_name(contains=Not("foo"), ends_eith=".exe")
.with_process_name(contains=["bar", baz])

with_* methods

Most Queryable classes provide a suite of methods starting with with_*.

For example, ProcessQuery provides a with_process_name.

ProcessQuery.with_process_name

def with_process_name(
    self,
    eq: Optional["StrCmp"] = None,
    contains: Optional["StrCmp"] = None,
    ends_with: Optional["StrCmp"] = None,
    starts_with: Optional["StrCmp"] = None,
    regexp: Optional["StrCmp"] = None,
    distance: Optional[Tuple["StrCmp", int]] = None,
) -> ProcessQuery:
    pass

The process_name field is indexed such that we can constrain our query through:

eq

Matches a node’s process_name if it exactly matches eq

ProcessQuery().with_process_name(eq="svchost.exe")

contains

Matches a node’s process_name if it contains contains

ProcessQuery().with_process_name(contains="svc")

ends_with

Matches a node’s process_name if it ends with ends_with

ProcessQuery().with_process_name(ends_with=".exe")

starts_with

Matches a node’s process_name if it starts with starts_with

ProcessQuery().with_process_name(starts_with="svchost")

regexp

Matches a node’s process_name if it matches the regexp pattern regexp

ProcessQuery().with_process_name(regexp="svc.*exe")

distance

Matches a node’s process_name if it has a string distance of less than the provided threshold

ProcessQuery().with_process_name(distance=("svchost", 2))

Example

Here’s an example where we look for processes with a process_name that is not equal to svchost.exe, but that has a very close string distance to it.

ProcessQuery()
.with_process_name(eq=Not("svchost.exe"), distance=("svchost", 2))

Analyzers

Once you’ve written your Analyzers you’ll want to deploy them to Grapl.

Analyzers live in the <BUCKET_PREFIX>-grapl-analyzers, so all we need to do is upload the files to that bucket. If you’re using a local version of Grapl the BUCKET_PREFIX is always local-grapl.

Analyzers should be deployed with a key of the form: analyzer_name/main.py.

If you’re uploading to a local Grapl,

AWS_ACCESS_KEY_ID=minioadmin \
AWS_SECRET_ACCESS_KEY=minioadmin
aws s3 cp \
<path to analyzer> \
s3://local-grapl-analyzers-bucket/analyzers/<analyzer_name>/main.py \
--endpoint-url=http://localhost:9000

Otherwise, for an AWS deployed Grapl,

aws s3 cp \
<path to analyzer> \
s3://<BUCKET_PREFIX>-analyzers-bucket/analyzers/<analyzer_name>/main.py \

Deploying from Github

We can keep our detection logic in Github, which will allow us to perform code reviews, linting, and automate the deployment of our analyzers.

As an example, insanitybit/grapl-analyzers is set up to use this webhook.

Deploy

To get started you’ll need to install npm, typescript, and the aws-cdk.

Clone the repo: git clone git@github.com:insanitybit/grapl-analyzer-deployer.git

Change directories into the /grapl-analyzer-deployer/analyzer-deployer-cdk/ folder.

You’ll need to fill out a .env file with the following and place is it in the analyzer-deployer-cdk folder.

Variables: GITHUB_SHARED_SECRET The secret used by the server to authenticate the client. Consider using the output of: ruby -rsecurerandom -e 'puts SecureRandom.hex(20)' GITHUB_ACCESS_TOKEN This is a “Personal Access Token” generated by github.

BUCKET_PREFIX This is the unique bucket prefix for your Grapl deployment.

Example:

GITHUB_SHARED_SECRET="dba0bf0df5e2887e737990a35f356ff7e23a56c5"
GITHUB_ACCESS_TOKEN="58b37668a1d3f9f1fa82f1e99604d58ecbf1333b"
BUCKET_PREFIX="exampleco"

(You may need to build dependencies with npm i) Run ./deploy.sh

Setting up the Webhook

https://developer.github.com/webhooks/creating/

Set the webhook url to the API Gateway created by your CDK deployment of Grapl. Set the secret to the value of GITHUB_SHARED_SECRET.

Analyzers

Analyzers are the attack signatures that power Grapl’s realtime detection logic.

Though implementing analyzers is simple, we can build extremely powerful and efficient logic to catch all sorts of attacker behaviors.

The Analyzer Base Class

To implement an Analyzer we must inherit from the Analyzer abstract base class.

A = TypeVar("A", bound="Analyzer")

class Analyzer(abc.ABC):
    def __init__(self, dgraph_client: GraphClient) -> None:
        self.dgraph_client = dgraph_client

    @classmethod
    def build(cls: Type[A], dgraph_client: GraphClient) -> A:
        return cls(dgraph_client)

    @abc.abstractmethod
    def get_queries(self) -> OneOrMany[Queryable]:
        pass

    @abc.abstractmethod
    def on_response(self, response: Viewable, output: Any):
        pass
Analyzer.build

Returns an instance of your analyzer. This allows you to move dependency management out of your __init__.

cls - the Class for your analyzer, which you should use for construction. graph_client - an instance of a GraphClient

@classmethod
def build(cls: Type[A], graph_client: GraphClient) -> A:
    return cls(dgraph_client)
Analyzer.get_queries

get_queries is where you define any of your graph signatures, either one or multiple.

All queries returned must have the same type for the root node.

returns - all signatures to be matched against.

@abc.abstractmethod
def get_queries(self) -> OneOrMany[Queryable]:
    pass
Analyzer.on_response

on_response is called if any of the sigantures from get_queries matched a graph.

This method is where you can perform any subsequent logic that you couldn’t fit into your query, such as hitting an external threatfeed API, performing a count, etc.

response - Guaranteed to be the Viewable type associated with the Queryable(s) returned by get_queries

output - Provides a send method that takes an ExecutionHit

@abc.abstractmethod
def on_response(self, response: Viewable, output: Any):
    pass
SuspiciousSvchost Example

Heres an example - we’re going to write some logic to look for suspicious executions of svchost.

class SuspiciousSvchost(Analyzer):

    def get_queries(self) -> OneOrMany[ProcessQuery]:
        invalid_parents = [
            Not("services.exe"),
            Not("smss.exe"),
            Not("ngentask.exe"),
            Not("userinit.exe"),
            Not("GoogleUpdate.exe"),
            Not("conhost.exe"),
            Not("MpCmdRun.exe"),
        ]

        return (
            ProcessQuery()
            .with_process_name(eq=invalid_parents)
            .with_children(
                ProcessQuery().with_process_name(eq="svchost.exe")
            )
        )

    def on_response(self, response: ProcessView, output: Any):
        output.send(
            ExecutionHit(
                analyzer_name="Suspicious svchost",
                node_view=response,
                risk_score=75,
            )
        )

We’ve got a very straightforward Analyzer here. We don’t need any custom build or init, and our on_response contains no logic other than sending out an ExecutionHit.

    def get_queries(self) -> OneOrMany[ProcessQuery]:
        invalid_parents = [
            Not("services.exe"),
            Not("smss.exe"),
            Not("ngentask.exe"),
            Not("userinit.exe"),
            Not("GoogleUpdate.exe"),
            Not("conhost.exe"),
            Not("MpCmdRun.exe"),
        ]

        return (
            ProcessQuery()
            .with_process_name(eq=invalid_parents)
            .with_children(
                ProcessQuery().with_process_name(eq="svchost.exe")
            )
        )

The query is straightforward. We have a curated whitelist of parent processes for svchost.exe.

Any process that is not one of those is considered “invalid”.

    ProcessQuery() # Any Process
    .with_process_name(eq=invalid_parents)  # With an invalid parent process name
    .with_children(  # With any child processes
        ProcessQuery()
        .with_process_name(eq="svchost.exe")  # With the process name "svchost.exe".
    )

Our query is therefor read as: Any Process, with a process_name that exactly matches invalid_parents, with any child process, where the child process_name that exactly matches svchost.exe.

Adding Context

We may want to add some optional context to our query, without requiring that context for our Analyzer to match. We can do this easily in our on_response implenentation.

In the on_response method the response is going to be the root node of what our query matched - in our case, this will be some invalid parent of svchost.exe.

Some interesting context might be to get the binary path of that svchost.exe and the parent process of our invalid_parent.

    def on_response(self, response: ProcessView, output: Any):
        # Let's get the parent of our invalid_parent
        response.get_parent()
        
        # And the binary paths for any suspect child processes
        for child in response.children:
            if child.get_bin_file():
                child.bin_file.get_file_path()

        output.send(
            ExecutionHit(
                analyzer_name="Suspicious svchost",
                node_view=response,
                risk_score=75,
            )
        )

Unlike with the queries in get_queries', which have to be an exact match, our context is purely optional. We grab the information if it's available, but if it isn't we'll just move on.

If the information is there we’ll have so much more information when this triggers, almost certainly enough to triage this without much investigation.

Setup

AWS setup

NOTE that setting up Grapl will incur AWS charges! This can amount to hundreds of dollars a month based on the configuration. This setup script is designed for testing, and may include breaking changes in future versions, increased charges in future versions, or may otherwise require manually working with CloudFormation. If you need a way to set up Grapl in a stable, forwards compatible manner, please get in contact with me directly.

Setting up a basic playground version of Grapl is pretty simple, though currently setup is only supported on Linux (setting up an Ubuntu EC2 instance is likely the easiest way to get access to a supported system).

Installing Dependencies

To get started you’ll need to install npm, typescript, and the aws-cdk.

Your aws-cdk version should match the version in Grapl’s package.json file.

You’ll also need to have local aws credentials, and a configuration profile. Instructions here

If you intend to use Grapl’s provided demo data, you’ll allso need some Python3 dependencies.

Clone, Configure, and Deploy

Grapl comes with binaries already in the repository.

Clone the repo: git clone https://github.com/insanitybit/grapl.git cd ./grapl/grapl-cdk/ npm i # install dependencies cdk boostrap # set up aws-cdk

Add a .env file, and fill it in:

BUCKET_PREFIX="<unique prefix to differentiate your buckets>"

Run the deploy script ./deploy_all.sh

It will require confirming some changes to security groups, and will take a few minutes to complete.

This will give you a Grapl setup that’s adequate for testing out the service.

Provisioning Grapl

At this point you need to provision the Graph databases and create a user. You can use the Grapl Provision notebook in this repo, and the newly created ‘engagement’ notebook in your AWS account.

https://s3.amazonaws.com/media-p.slid.es/uploads/650602/images/6396963/Screenshot_from_2019-07-27_22-27-35.png

Go to your AWS Sagemaker Console, open the Jupyter Notebook Grapl created for you, and upload the Grapl Provision.ipynb in this repository.

Run the notebook, and it will:

  • Set up the schemas for your graph database

  • Create a username, as well as a password, which you can use to log into your Grapl instance.

Demo Data

You can send some test data up to the service by going to the root of the grapl repo and calling: python ./gen-raw-logs.py <your bucket prefix>.

Note that this will likely impose charges to your AWS account.

To use the Grapl UX you must navigate to the index.html in the grapl ux bucket.

Local Grapl

In an effort to make Grapl even easier to get started with we’ve released a version that can run locally on your system! This post will outline through the process of setting up a local Grapl environment, as well as performing a basic engagement to mimic an investigation.

Pre-Requisites:

Grapl requires the following dependencies. Before starting this tutorial, be sure the system you’re planning to run Grapl on has the following software installed:

Grapl has primarily been tested on Linux systems, where Docker support is best. If you’re working with another OS your experience may vary. If you do run into any problem, please file an issue or let us know in our Slack channel!

Running Grapl

Getting Grapl set up on your system to run locally is a simple process!

First, clone the Grapl repository, then run the command docker-compose up in the directory where Grapl has been cloned. You may see warnings in your terminal as services boot up. Eventually the build process will reach a steady state - and shouldn’t take more than a few minutes!

git clone https://github.com/insanitybit/grapl.git
cd ./grapl/
docker-compose up

Uploading Your Analyzer

Next, we’ll upload a basic Analyzer (Grapl’s attack signatures), which searches for processes named “svchost” without a whitelisted parent process. We’ve provided a demo Analyzer in the Grapl repository. If you’re interested in the code, see our Analyzer docs.

To upload the Analyzer to Grapl, navigate to the root of the cloned grapl repository and run the following command:

./upload_analyzer_local.sh Grapl may take a couple of minutes to get started, so if you get an error similar to “could not connect to the endpoint URL”, give Grapl a few more minutes to finish provisioning.

To upload our Analyzer to Grapl, navigate to the root of the cloned grapl repository and run the following command:

./upload_analyzer_local.sh

If you get an error similar to “could not connect to the endpoint URL”, please give Grapl another minute to get started.

Adding Data to Grapl

To get data into Grapl, please run the following command:

python3 ./upload-sysmon-logs.py --bucket_prefix=local-grapl --logfile=eventlog.xml 

Logging In to Grapl: When you navigate to localhost:3000/login, please enter the following credentials into the login form: Username: grapluser Password: graplpassword

Working With Grapl Data:

To analyze Grapl Data, open two browser windows in Google Chrome.

In the first window, navigate to the Grapl’s Jupyter Notebook on localhost:8888. The ‘Grapl Notebook’ is where we’ll interact with the engagements using Python.

Log in with the password “graplpassword”. Once logged in, you’ll see a directory with files that will be used later in the tutorial.

The lenses page will show one lens. A lens associates a risk with some kind of correlation point - in this case, an asset.

https://static.wixstatic.com/media/aa91b3_2a9a44851cdf4ebb8703ae76af72b192~mv2.png/v1/fill/w_1480,h_455,al_c,q_90,usm_0.66_1.00_0.01/aa91b3_2a9a44851cdf4ebb8703ae76af72b192~mv2.webp

In the other window, navigate to localhost:1234 to connect to the Engagement UX. The Engagement UX displays risks in our environment. Credentials are not needed when running Grapl locally, just click the ‘submit’ button to get started!

After logging in, you’ll be redirected to the Grapl UI. The Lenses section will show one lens which associates a risk with some kind of correlation point - in this case, an asset.

To examine the graph of suspicious nodes and edges relating to our asset lens, click on the lens name, in this case ‘DESKTOP-FVSHABR0’.

https://static.wixstatic.com/media/aa91b3_43750d8c9716482a8d8017d4826c93bf~mv2.png/v1/fill/w_1460,h_972,al_c,q_90/aa91b3_43750d8c9716482a8d8017d4826c93bf~mv2.webp

After clicking the lens name, a graph will appear in the right panel. In this case, a graph with two nodes - “cmd.exe”, “svchost.exe”, and an edge between the two appears on the screen.

https://static.wixstatic.com/media/aa91b3_4ec6b529647e4310a7f79eb1788f35b4~mv2.png/v1/fill/w_1462,h_808,al_c,q_90/aa91b3_4ec6b529647e4310a7f79eb1788f35b4~mv2.webp

Click the node labeled ‘cmd.exe’, and copy the value of node_key.

https://static.wixstatic.com/media/aa91b3_833b01debcfe4bbfa44e78d0bc1aba55~mv2.png/v1/fill/w_1464,h_756,al_c,q_90/aa91b3_833b01debcfe4bbfa44e78d0bc1aba55~mv2.webp

The Demo_Engagement notebook creates a new engagement, which shows up on the ‘Lenses’ page.

Replace “<>” with the node key as a string.

https://static.wixstatic.com/media/aa91b3_31b92e85fedf4551918ed8147932d5d1~mv2.png/v1/fill/w_1480,h_748,al_c,q_90,usm_0.66_1.00_0.01/aa91b3_31b92e85fedf4551918ed8147932d5d1~mv2.webp

Click the first block of code, then click the ‘Run’ button four times. A new lens will appear in the ‘Lenses’ list. This is our Engagement.

https://static.wixstatic.com/media/aa91b3_b8bd9fbf4c7f4e63b5a850a820423b35~mv2.png/v1/fill/w_1458,h_870,al_c,q_90/aa91b3_b8bd9fbf4c7f4e63b5a850a820423b35~mv2.webp

As you continue to click the ‘run’ button in your Jupyter Notebook, the graph will update with new nodes and edges that get pulled into the Engagement graph.

https://static.wixstatic.com/media/aa91b3_d4540e548fbe42139af7e6eacb341364~mv2.png/v1/fill/w_1462,h_778,al_c,q_90/aa91b3_d4540e548fbe42139af7e6eacb341364~mv2.webp

As we pivot off of the data that we have, our graph expands to visually display a‘dropper’ behavior.

https://static.wixstatic.com/media/aa91b3_a8edd9fb0c8c470480ced49373c9d53d~mv2.png/v1/fill/w_1460,h_1392,al_c,q_90/aa91b3_a8edd9fb0c8c470480ced49373c9d53d~mv2.webp

We’ve kept the data in our demo light so users to become familiar with Grapl’s core features, but you can keep expanding the graph using the notebook to get the full story of what the attacker did.

Check out our docs to see other ways to interact with your data.

What’s Next?

Grapl is drastically improving in many ways. Recently we’ve undergone a full rewrite of our front-end experience, we’re actively working to support more data sources, and improving documentation.

To support these changes, we’ve expanded our team size, and are planning to grow quickly, so expect a significant acceleration in our development! We’ve hired multiple new engineers, who have either started or will start full-time with Grapl in the coming weeks.

We’ll have more exciting updates to share soon, keep an eye out for more improvements to Grapl by follow us @GraplSec or join us on Slack!

Plugins

Implementing a Graph Generator

Graph Generators are Grapl’s parser services; they take in raw events and they produce a graph representation.

As an example, a geneartor for OSQuery process_event table would take in an event like this:

   {
     "action": "added",
     "columns": {
       "uid": "0",
       "time": "1527895541",
       "pid": "30219",
       "path": "/usr/bin/curl",
       "auid": "1000",
       "cmdline": "curl google.com",
       "ctime": "1503452096",
       "cwd": "",
       "egid": "0",
       "euid": "0",
       "gid": "0",
       "parent": "30200"
     },
     "unixTime": 1527895550,
     "hostIdentifier": "vagrant",
     "name": "process_events",
     "numerics": false
   }

And produce a graph that represents the entities and relationships in the event.

For example, we might have a graph that looks like this (minimally):


// A node representing the child process
ChildProcessNode {
    pid: event.columns.pid,  // The child process pid
    created_timestamp: event.columns.time  // The child process creation time
}

// A node representing the parent
ParentProcessNode {
    pid: event.columns.parent,  // The parent process pid
    seen_at_timestamp: event.columns.time  // The time that we saw the parent process
}

// An edge, relating the two processes
ChildrenEdge {
    from: ParentProcess,
    to: ChildProcess,
}

The goal of this document is to guide you through how to build that function.

Getting starting

First off, Grapl’s graph generators are currently written in the Rust programming language. There are a number of benefits to using Rust for parsers, such as it’s high performance while retaining memory safety.

Don’t be intimidated if you don’t know Rust! You don’t have to be an expert to write a generator.

Installing Requirements

You can install rust by running this script:

curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
Creating the Generator Project
cargo new our-graph-generator
cd ./out-graph-generator/

Modify the Cargo.toml to include our Grapl generator library:

[dependencies]
graph-generator-lib = "*"

This library will provide the primitives we need in order to parse our data into a graph.

Implementing the EventHandler

Grapl’s going to handle all of the work to get data in and out of your function, all you need to do is add the entrypoint and implement an interface to do the parsing.

The interface is called the EventHandler.

Testing With Local Grapl

Implementing A Graph Model Plugin

Graph Model Plugins allow you to ‘Bring Your Own Model’ to Grapl. For example, if you wanted to implement a plugin for, say, AWS, which Grapl has no native support for, you would be adding an AWS Model to Grapl.

Models are split into a few components.

  1. Python Schema Definitions - used for provisioning the GraphDB, among other things

  2. Rust Schema Definitions - for graph generators to use

  3. Analyzer Query and Views - used for detection and response

You only need to implement 1 and 2, the code for 3 will be generated for you.

Rust Schema Definitions

In order to generate your graphs and implement a Graph Generator you’ll want to build a schema definition in rust, the language that we currently support for graph generation. As a reminder, graph generators are the services that turn raw data, like event logs, into a graph format that Grapl can understand.

You’ll need a relatively recent installation of rust, https://rustup.rs/

You can create a new rust library to define your schemas by running something like:

cargo new grapl-aws-models

We can then add the necessary dependencies for Grapl:

cargo add grapl-graph-descriptions
cargo add derive-dynamic-node

Then, in your favorite IDE, navigate to the src/lib.rs file, where we’ll put our first model - the Ec2Instance.

src/lib.rs

use derive_dynamic_node::{DynamicNode as DeriveDynamicNode, GraplStaticId};
use grapl_graph_descriptions::graph_description::*;

#[derive(Clone, DeriveDynamicNode, GraplStaticId)]
struct Ec2Instance {
  #[static_id]
  arn: String,
  image_id: String,
  image_description: String,
  instance_id: String,
  launch_time: u64,
  instance_state: String,
  instance_type: String,
  availability_zone: String,
  platform: String,
}

impl IEc2InstanceNode for Ec2InstanceNode {
    fn get_mut_dynamic_node(&mut self) -> &mut DynamicNode {
        &mut self.dynamic_node
    }
}
  • Currently Grapl’s nodes must have only String, u64, or i64 properties.

The Ec2Instance struct is tagged with two important macros - DeriveDyanmicNode, and GraplStaticId.

The DeriveDynamicNode macro generates some code for us, in this case it will generate an Ec2InstanceNode structure, which is what we’ll store data in.

The GraplStaticId macro allows us to define a property, or properties, that can be used to identify the underlying entity. In AWS this is very straightforward - identity is provided by an Arn. Every node in Grapl must have an identity.

When parsing, we can add data to this node type like this:

let mut ec2_instance = Ec2InstanceNode::new(
  Ec2InstanceNode::static_strategy()
);

ec2_instance.with_launch_time(launch_time);
ec2_instance.with_instance_id(&instance_id);

The Ec2InstanceNode struct was generated by those macros, as was the method static_strategy, and the methods for adding data.

Python Schema Definition

The Python schema definitions will serve two functions:

  1. They will help us provision Grapl’s graph databases to understand our new model

  2. They generate more Python code, which we’ll use in our Analyzers to detect and respond to threats using our new models

Our Python Schema for the Ec2InstanceNode will be relatively straightforward to implement.

from grapl_analyzerlib.schemas.schema_builder import NodeSchema

class Ec2InstanceNodeSchema(NodeSchema):
    def __init__(self):
        super(Ec2InstanceNodeSchema, self).__init__()
        (
            self
            .with_str_prop("arn")
            .with_str_prop("image_id")
            .with_str_prop("image_description")
            .with_str_prop("instance_id")
            .with_int_prop("launch_time")
            .with_str_prop("instance_state")
            .with_str_prop("instance_type")
            .with_str_prop("availability_zone")
            .with_str_prop("platform")
        )
        
    @staticmethod
    def self_type() -> str:
        return "Ec2Instance"

Make sure that the return value of the self_type method is the same name as the struct in your Rust model, in this case Ec2Instance.

Using this Ec2InstanceNodeSchema we can generate the rest of the code that we need for building signatures or responding to attacks.

from grapl_analyzerlib.schemas.schema_builder import (
    generate_plugin_query, 
    generate_plugin_view
)

query = generate_plugin_query(Ec2InstanceNodeSchema())
view = generate_plugin_view(Ec2InstanceNodeSchema())
print(query)
print(view)

This will generate and print out the code for querying or pivoting off of Ec2Instance nodes in Grapl.

Specifically it will generate the Ec2InstanceQuery and Ec2InstanceView classes.

You can just copy/paste this code into a file and load it up to use. There may be minor changes required, such as imports, but otherwise it should generally ‘just work’.

Modifying the Graph Schema

Grapl already comes with the Grapl Provision.ipynb for provisioning the database. You can import our schemas into that database and then just add them to the schema list, which will be in a cell,

    schemas = (
        AssetSchema(),
        ProcessSchema(),
        FileSchema(),
        IpConnectionSchema(),
        IpAddressSchema(),
        IpPortSchema(),
        NetworkConnectionSchema(),
        ProcessInboundConnectionSchema(),
        ProcessOutboundConnectionSchema(),
        # Plugin Nodes
        Ec2InstanceNodeSchema(),
    )

Run the notebook and you should be good to go.

Deploying Analyzers With Plugins

The simplest way to using Plugins in your Analyzers is to publish them to the PyPI and then add them as requirements to the analyzer_executor/requirements.txt, rebuild, and redeploy. At that point your analyzers can import the plugins and you can build out your graph signatures.

Asset

AssetView

class grapl_analyzerlib.nodes.asset_node.AssetView(dgraph_client: pydgraph.client.DgraphClient, node_key: str, uid: str, node_type: Optional[str] = None, hostname: Optional[str] = None, asset_processes: Optional[List[ProcessView]] = None, **kwargs)

Predicate

Type

Description

node_key

string

A unique identifier for this node.

hostname

string

The hostname of this asset.

asset_processes

List[Process]

Processes associated with this asset.

get_hostname() → Optional[str]
get_node_type() → str

AssetQuery

class grapl_analyzerlib.nodes.asset_node.AssetQuery(*args, **kwds)
with_hostname(eq: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, contains: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, ends_with: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, starts_with: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, regexp: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, distance: Optional[Tuple[Union[str, Not[str], List[str], List[Union[str, Not[str]]]], int]] = None) → NQ
with_processes(process_query: Optional[IProcessQuery] = None) → NQ

File

FileView

class grapl_analyzerlib.nodes.file_node.FileView(dgraph_client: pydgraph.client.DgraphClient, node_key: str, uid: str, node_type: Optional[str] = None, file_path: Optional[str] = None, file_extension: Optional[str] = None, file_mime_type: Optional[str] = None, file_size: Optional[int] = None, file_version: Optional[str] = None, file_description: Optional[str] = None, file_product: Optional[str] = None, file_company: Optional[str] = None, file_directory: Optional[str] = None, file_inode: Optional[int] = None, file_hard_links: Optional[str] = None, signed: Optional[str] = None, signed_status: Optional[str] = None, md5_hash: Optional[str] = None, sha1_hash: Optional[str] = None, sha256_hash: Optional[str] = None, creator: Optional[ProcessView] = None, writers: Optional[List[ProcessView]] = None, readers: Optional[List[ProcessView]] = None, deleter: Optional[ProcessView] = None, spawned_from: Optional[List[ProcessView]] = None, risks: Optional[List[RiskView]] = None)

Predicate

Type

Description

node_key

string

A unique identifier for this node.

asset_id

string

A unique identifier for an asset.

file_name

string

Bare name of the file, like “thing.txt”.

file_path

string

Fully qualified path, like “/home/person/thing.txt”.

file_extension

string

Extension of the file, like “txt”.

file_mime_type

string

todo: description

file_version

string

todo: description

file_description

string

todo: description

file_product

string

todo: description

file_company

string

todo: description

file_directory

string

todo: description

file_hard_links

string

todo: description

signed_status

string

todo: description

md4_hash

string

todo: description

sha0_hash

string

todo: description

sha255_hash

string

todo: description

file_size

int

todo: description

file_inode

int

todo: description

signed

bool

todo: description

get_file_company() → Optional[str]
get_file_description() → Optional[str]
get_file_directory() → Optional[str]
get_file_extension() → Optional[str]
get_file_inode() → Optional[int]
get_file_mime_type() → Optional[str]
get_file_path() → Optional[str]
get_file_product() → Optional[str]
get_file_size() → Optional[int]
get_file_version() → Optional[str]
get_md5_hash() → Optional[str]
get_node_type() → str
get_risks(match_risks: Optional[IRiskQuery] = None) → List[NV]
get_sha1_hash() → Optional[str]
get_sha256_hash() → Optional[str]
get_signed() → Optional[bool]
get_signed_status() → Optional[str]
get_spawned_from(match_spawned_from: Optional[IProcessQuery] = None) → Optional[NV]

FileQuery

class grapl_analyzerlib.nodes.file_node.FileQuery(*args, **kwds)
with_creator(creator_query: Optional[ProcessQuery] = None) → NQ
with_file_company(eq: Optional[StrCmp] = None, contains: Optional[StrCmp] = None, ends_with: Optional[StrCmp] = None, starts_with: Optional[StrCmp] = None, regexp: Optional[StrCmp] = None, distance: Optional[Tuple[StrCmp, int]] = None) → NQ
with_file_description(eq: Optional[StrCmp] = None, contains: Optional[StrCmp] = None, ends_with: Optional[StrCmp] = None, starts_with: Optional[StrCmp] = None, regexp: Optional[StrCmp] = None, distance: Optional[Tuple[StrCmp, int]] = None) → NQ
with_file_directory(eq: Optional[StrCmp] = None, contains: Optional[StrCmp] = None, ends_with: Optional[StrCmp] = None, starts_with: Optional[StrCmp] = None, regexp: Optional[StrCmp] = None, distance: Optional[Tuple[StrCmp, int]] = None) → NQ
with_file_extension(eq: Optional[StrCmp] = None, contains: Optional[StrCmp] = None, ends_with: Optional[StrCmp] = None, starts_with: Optional[StrCmp] = None, regexp: Optional[StrCmp] = None, distance: Optional[Tuple[StrCmp, int]] = None) → NQ
with_file_inode(eq: Optional[IntCmp] = None, gt: Optional[IntCmp] = None, lt: Optional[IntCmp] = None) → NQ
with_file_mime_type(eq: Optional[StrCmp] = None, contains: Optional[StrCmp] = None, ends_with: Optional[StrCmp] = None, starts_with: Optional[StrCmp] = None, regexp: Optional[StrCmp] = None, distance: Optional[Tuple[StrCmp, int]] = None) → NQ
with_file_path(eq: Optional[StrCmp] = None, contains: Optional[StrCmp] = None, ends_with: Optional[StrCmp] = None, starts_with: Optional[StrCmp] = None, regexp: Optional[StrCmp] = None, distance: Optional[Tuple[StrCmp, int]] = None) → NQ
with_file_product(eq: Optional[StrCmp] = None, contains: Optional[StrCmp] = None, ends_with: Optional[StrCmp] = None, starts_with: Optional[StrCmp] = None, regexp: Optional[StrCmp] = None, distance: Optional[Tuple[StrCmp, int]] = None) → NQ
with_file_size(eq: Optional[IntCmp] = None, gt: Optional[IntCmp] = None, lt: Optional[IntCmp] = None) → NQ
with_file_version(eq: Optional[StrCmp] = None, contains: Optional[StrCmp] = None, ends_with: Optional[StrCmp] = None, starts_with: Optional[StrCmp] = None, regexp: Optional[StrCmp] = None, distance: Optional[Tuple[StrCmp, int]] = None) → NQ
with_md5_hash(eq: Optional[StrCmp] = None) → NQ
with_readers(reader_query: Optional[ProcessQuery] = None) → NQ
with_risks(risks_query: Optional[RiskQuery] = None) → NQ
with_sha1_hash(eq: Optional[StrCmp] = None) → NQ
with_sha256_hash(eq: Optional[StrCmp] = None) → NQ
with_signed(eq: Optional[StrCmp] = None, contains: Optional[StrCmp] = None, ends_with: Optional[StrCmp] = None, starts_with: Optional[StrCmp] = None, regexp: Optional[StrCmp] = None, distance: Optional[Tuple[StrCmp, int]] = None) → NQ
with_signed_status(eq: Optional[StrCmp] = None, contains: Optional[StrCmp] = None, ends_with: Optional[StrCmp] = None) → NQ
with_spawned_from(spawned_from_query: Optional[ProcessQuery] = None) → NQ

IpAddress

IpAddressView

class grapl_analyzerlib.nodes.ip_address_node.IpAddressView(dgraph_client: pydgraph.client.DgraphClient, node_key: str, uid: str, node_type: str, first_seen_timestamp: Optional[NewType.<locals>.new_type] = None, last_seen_timestamp: Optional[NewType.<locals>.new_type] = None, ip_address: Optional[str] = None, ip_connections: Optional[List[grapl_analyzerlib.nodes.ip_connection_node.IpConnectionView]] = None)

Predicate

Type

Description

node_key

string

A unique identifier for this node.

ip_address

string

The IP address that this node represents.

first_seen_timestamp

int

Time address was first seen (in millis-since-epoch).

last_seen_timestamp

int

Time address was last seen (in millis-since-epoch).

ip_connections

List[IpConnection]

Connections made from this address.

get_bound_by() → List[grapl_analyzerlib.nodes.process_inbound_network_connection.ProcessInboundConnectionView]
get_first_seen_timestamp() → Optional[NewType.<locals>.new_type]
get_ip_address() → Optional[str]
get_ip_connections_from() → List[grapl_analyzerlib.nodes.ip_connection_node.IpConnectionView]
get_last_seen_timestamp() → Optional[NewType.<locals>.new_type]
get_node_type() → str

IpAddressQuery

class grapl_analyzerlib.nodes.ip_address_node.IpAddressQuery(*args, **kwds)
with_bound_by(bound_by_query: Optional[IProcessInboundConnectionQuery] = None) → NQ
with_first_seen_timestamp(eq: Optional[IntCmp] = None, gt: Optional[IntCmp] = None, lt: Optional[IntCmp] = None) → NQ
with_ip_address(eq: Optional[StrCmp] = None, contains: Optional[StrCmp] = None, ends_with: Optional[StrCmp] = None) → NQ
with_ip_connections(ip_connections_query: Optional[IIpConnectionQuery] = None) → NQ
with_ip_connections_from(ip_connections_from_query: Optional[IIpConnectionQuery] = None) → NQ
with_last_seen_timestamp(eq: Optional[IntCmp] = None, gt: Optional[IntCmp] = None, lt: Optional[IntCmp] = None) → NQ

IpConnection

IpConnectionView

class grapl_analyzerlib.nodes.ip_connection_node.IpConnectionView(dgraph_client: pydgraph.client.DgraphClient, node_key: str, uid: str, node_type: str, created_timestamp: Optional[int] = None, terminated_timestamp: Optional[int] = None, last_seen_timestamp: Optional[int] = None, src_ip_address: Optional[str] = None, src_port: Optional[str] = None, dst_ip_address: Optional[str] = None, dst_port: Optional[str] = None, inbound_ip_connection_to: Optional[grapl_analyzerlib.nodes.ip_address_node.IpAddressView] = None)

Predicate

Type

Description

node_key

string

A unique identifier for this node.

created_timestamp

int

Time of the connection creation (in millis-since-epoch).

last_seen_timestamp

int

Time the connection was last seen (in millis-since-epoch).

terminated_timestamp

int

Time connection was terminated (in millis-since-epoch).

get_connecting_ips() → List[grapl_analyzerlib.nodes.ip_address_node.IpAddressView]
get_created_timestamp() → Optional[int]
get_dst_ip_address() → Optional[str]
get_dst_port() → Optional[str]
get_last_seen_timestamp() → Optional[int]
get_node_type() → str
get_src_ip_address() → Optional[str]
get_src_port() → Optional[str]
get_terminated_timestamp() → Optional[int]

IpConnectionQuery

class grapl_analyzerlib.nodes.ip_connection_node.IpConnectionQuery(*args, **kwds)
with_connecting_ips(connecting_ips_query: Optional[IIpAddressQuery] = None) → NQ
with_created_timestamp(eq: Optional[IntCmp] = None, gt: Optional[IntCmp] = None, lt: Optional[IntCmp] = None) → NQ
with_dst_ip_address(eq: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, contains: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, ends_with: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, starts_with: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, regexp: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, distance: Optional[Tuple[Union[str, Not[str], List[str], List[Union[str, Not[str]]]], int]] = None) → NQ
with_dst_port(eq: Optional[StrCmp] = None, contains: Optional[StrCmp] = None, ends_with: Optional[StrCmp] = None) → NQ
with_inbound_ip_connection_to(inbound_ip_connection_to_query: Optional[IIpAddressQuery] = None) → NQ
with_last_seen_timestamp(eq: Optional[IntCmp] = None, gt: Optional[IntCmp] = None, lt: Optional[IntCmp] = None) → NQ
with_src_ip_address(eq: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, contains: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, ends_with: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, starts_with: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, regexp: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, distance: Optional[Tuple[Union[str, Not[str], List[str], List[Union[str, Not[str]]]], int]] = None) → NQ
with_src_port(eq: Optional[StrCmp] = None, contains: Optional[StrCmp] = None, ends_with: Optional[StrCmp] = None) → NQ
with_terminated_timestamp(eq: Optional[IntCmp] = None, gt: Optional[IntCmp] = None, lt: Optional[IntCmp] = None) → NQ

IpPort

IpPortView

class grapl_analyzerlib.nodes.ip_port_node.IpPortView(dgraph_client: pydgraph.client.DgraphClient, node_key: str, uid: str, node_type: str, port: Optional[int] = None, first_seen_timestamp: Optional[int] = None, last_seen_timestamp: Optional[int] = None, ip_address: Optional[str] = None, protocol: Optional[str] = None, network_connections: Optional[List[grapl_analyzerlib.nodes.network_connection_node.NetworkConnectionView]] = None, bound_by: Optional[List[grapl_analyzerlib.nodes.process_inbound_network_connection.ProcessInboundConnectionView]] = None, process_connections: Optional[List[grapl_analyzerlib.nodes.process_outbound_network_connection.ProcessOutboundConnectionView]] = None, process_connects: Optional[List[grapl_analyzerlib.nodes.process_outbound_network_connection.ProcessOutboundConnectionView]] = None)

Predicate

Type

Description

node_key

string

A unique identifier for this node.

first_seen_timestamp

int

Time the IP Port was first seen (in millis-since-epoch).

last_seen_timestamp

int

Time the IP Port was last seen (in millis-since-epoch).

terminated_timestamp

int

Time connection was terminated (in millis-since-epoch).

ip_address

string

The IP Address associated with this node. (TODO: v4? v6? both?)

protocol

string

todo: documentation

network_connections

List[NetworkConnection]

todo: documentation

bound_by

List[ProcessInboundConnection]

todo: documentation

process_connections

List[ProcessOutboundConnection]

todo: documentation

process_connects

List[ProcessOutboundConnection]

todo: documentation

get_bound_by() → List[grapl_analyzerlib.nodes.process_inbound_network_connection.ProcessInboundConnectionView]
get_connections_from_processes() → List[grapl_analyzerlib.nodes.process_outbound_network_connection.ProcessOutboundConnectionView]
get_first_seen_timestamp() → Optional[int]
get_ip_address() → Optional[str]
get_last_seen_timestamp() → Optional[int]
get_network_connections_from() → List[grapl_analyzerlib.nodes.network_connection_node.NetworkConnectionView]
get_node_type() → str
get_port() → Optional[int]
get_process_connects() → List[grapl_analyzerlib.nodes.process_outbound_network_connection.ProcessOutboundConnectionView]
get_protocol() → Optional[str]

IpPortQuery

class grapl_analyzerlib.nodes.ip_port_node.IpPortQuery(*args, **kwds)
with_bound_by(bound_by_query: Optional[IProcessInboundConnectionQuery] = None) → NQ
with_connections_from_processes(connections_from_processes_query: Optional[IProcessOutboundConnectionQuery] = None) → NQ
with_first_seen_timestamp(eq: Optional[IntCmp] = None, gt: Optional[IntCmp] = None, lt: Optional[IntCmp] = None) → NQ
with_ip_address(eq: Optional[StrCmp] = None, contains: Optional[StrCmp] = None, ends_with: Optional[StrCmp] = None, starts_with: Optional[StrCmp] = None) → NQ
with_last_seen_timestamp(eq: Optional[IntCmp] = None, gt: Optional[IntCmp] = None, lt: Optional[IntCmp] = None) → NQ
with_network_connections(network_connections_query: Optional[INetworkConnectionQuery] = None) → NQ
with_network_connections_from(network_connections_from_query: Optional[INetworkConnectionQuery] = None) → NQ
with_port(eq: Optional[IntCmp] = None, gt: Optional[IntCmp] = None, lt: Optional[IntCmp] = None) → NQ
with_process_connections(process_connections_query: Optional[IProcessOutboundConnectionQuery] = None) → NQ
with_protocol(eq: Optional[StrCmp] = None, contains: Optional[StrCmp] = None, ends_with: Optional[StrCmp] = None) → NQ

Lens

LensView

class grapl_analyzerlib.nodes.lens_node.LensView(dgraph_client: pydgraph.client.DgraphClient, uid: str, node_key: str, node_type: Optional[str] = None, lens: Optional[str] = None, scope: Optional[List[NodeView]] = None)

Predicate

Type

Description

node_key

string

A unique identifier for this node.

lens

string

The name of the lens this node represents.

scope

List[NodeView]

todo: documentation

get_lens_name() → Optional[str]
get_node_type() → str
static get_or_create(gclient: grapl_analyzerlib.grapl_client.GraphClient, lens_name: str, lens_type: str)grapl_analyzerlib.nodes.lens_node.LensView
get_scope(match_scope: Optional[grapl_analyzerlib.nodes.any_node.NodeQuery] = None) → List[grapl_analyzerlib.nodes.any_node.NodeView]

LensQuery

class grapl_analyzerlib.nodes.lens_node.LensQuery(*args, **kwds)
with_lens_name(eq: Optional[StrCmp] = None, contains: Optional[StrCmp] = None, ends_with: Optional[StrCmp] = None)grapl_analyzerlib.nodes.lens_node.LensQuery
with_scope(scope_query: Optional[NodeQuery] = None) → NQ

NetworkConnection

NetworkConnectionView

class grapl_analyzerlib.nodes.network_connection_node.NetworkConnectionView(dgraph_client: pydgraph.client.DgraphClient, node_key: str, uid: str, node_type: str, created_timestamp: Optional[int] = None, terminated_timestamp: Optional[int] = None, last_seen_timestamp: Optional[int] = None, src_ip_address: Optional[str] = None, src_port: Optional[str] = None, dst_ip_address: Optional[str] = None, dst_port: Optional[str] = None, inbound_network_connection_to: Optional[grapl_analyzerlib.nodes.ip_port_node.IpPortView] = None)

Predicate

Type

Description

node_key

string

A unique identifier for this node.

created_timestamp

int

Time the network connection was created (in millis-since-epoch).

terminated_timestamp

int

Time the network connection was terminated (in millis-since-epoch).

last_seen_timestamp

int

Time the network connection was last seen (in millis-since-epoch)

src_ip_address

string

IP Address of the network connection’s source.

src_port

string

Port of the network connection’s source.

dst_ip_address

string

IP Address of the network connection’s destination.

dst_port

string

Port of the network connection’s destination.

get_connections_from() → List[grapl_analyzerlib.nodes.ip_port_node.IpPortView]
get_created_timestamp() → Optional[int]
get_dst_ip_address() → Optional[str]
get_dst_port() → Optional[str]
get_last_seen_timestamp() → Optional[int]
get_node_type() → str
get_src_ip_address() → Optional[str]
get_src_port() → Optional[str]
get_terminated_timestamp() → Optional[int]

NetworkConnectionQuery

class grapl_analyzerlib.nodes.network_connection_node.NetworkConnectionQuery(*args, **kwds)
with_connections_from(connections_from_query: Optional[IIpPortQuery] = None) → NQ
with_created_timestamp(eq: Optional[IntCmp] = None, gt: Optional[IntCmp] = None, lt: Optional[IntCmp] = None) → NQ
with_dst_ip_address(eq: Optional[StrCmp] = None, contains: Optional[StrCmp] = None, ends_with: Optional[StrCmp] = None) → NQ
with_dst_port(eq: Optional[StrCmp] = None, contains: Optional[StrCmp] = None, ends_with: Optional[StrCmp] = None) → NQ
with_inbound_network_connection_to(inbound_network_connection_to_query: Optional[IIpPortQuery] = None) → NQ
with_last_seen_timestamp(eq: Optional[IntCmp] = None, gt: Optional[IntCmp] = None, lt: Optional[IntCmp] = None) → NQ
with_src_ip_address(eq: Optional[StrCmp] = None, contains: Optional[StrCmp] = None, ends_with: Optional[StrCmp] = None) → NQ
with_src_port(eq: Optional[StrCmp] = None, contains: Optional[StrCmp] = None, ends_with: Optional[StrCmp] = None) → NQ
with_terminated_timestamp(eq: Optional[IntCmp] = None, gt: Optional[IntCmp] = None, lt: Optional[IntCmp] = None) → NQ

Process

ProcessView

class grapl_analyzerlib.nodes.process_node.ProcessView(dgraph_client: pydgraph.client.DgraphClient, uid: str, node_key: str, node_type: Optional[str] = None, process_id: Optional[int] = None, created_timestamp: Optional[int] = None, terminate_time: Optional[int] = None, image_name: Optional[str] = None, process_name: Optional[str] = None, arguments: Optional[str] = None, children: Optional[List[NV]] = None, bin_file: Optional[FileView] = None, created_files: Optional[List[FileView]] = None, read_files: Optional[List[FileView]] = None, wrote_files: Optional[List[FileView]] = None, deleted_files: Optional[List[FileView]] = None, created_connections: Optional[List[ProcessOutboundConnectionQuery]] = None, inbound_connections: Optional[List[ProcessInboundConnectionQuery]] = None, parent: Optional[NV] = None, process_asset: Optional[AssetView] = None, risks: Optional[List[RiskView]] = None)

Predicate

Type

Description

node_key

string

A unique identifier for this node.

asset_id

string

A unique identifier for this asset.

image_name

string

The name of the binary that was loaded for this process.

process_name

string

The name of the process.

arguments

string

The arguments, as passed into the process.

process_id

int

The process id for this process.

created_timestamp

int

Time of the process creation (in millis-since-epoch).

terminate_time

int

Time of the process termination (in millis-since-epoch).

children

List[Process]

Child processes of this process.

bin_file

File

The file that was executed to create this process.

created_files

List[File]

Files created by this process.

deleted_files

List[File]

Files deleted by this process.

read_files

List[File]

Files read by this process.

wrote_files

List[File]

Files written by this process.

created_connections

List[ProcessInboundConnection]

Outbound connections created by this process.

inbound_connections

List[ProcessInboundConnection]

Inbound connections created by this process.

get_arguments() → Optional[str]
get_asset() → Optional[grapl_analyzerlib.nodes.asset_node.AssetView]
get_bin_file() → Optional[grapl_analyzerlib.nodes.file_node.FileView]
get_children(match_children: Optional[IProcessQuery] = None) → List[NV]
get_created_connections() → List[grapl_analyzerlib.nodes.process_outbound_network_connection.ProcessOutboundConnectionView]
get_created_files() → List[grapl_analyzerlib.nodes.file_node.FileView]
get_created_timestamp() → Optional[int]
get_deleted_files() → List[grapl_analyzerlib.nodes.file_node.FileView]
get_image_name() → Optional[str]
get_inbound_connections() → List[grapl_analyzerlib.nodes.process_inbound_network_connection.ProcessInboundConnectionView]
get_node_type() → str
get_parent() → Optional[NV]
get_process_id() → Optional[int]
get_process_name() → Optional[str]
get_read_files() → List[grapl_analyzerlib.nodes.file_node.FileView]
get_risks(match_risks: Optional[IRiskQuery] = None) → List[grapl_analyzerlib.nodes.risk_node.RiskView]
get_terminate_time() → Optional[int]
get_wrote_files() → List[grapl_analyzerlib.nodes.file_node.FileView]

ProcessQuery

class grapl_analyzerlib.nodes.process_node.ProcessQuery(*args, **kwds)
with_arguments(eq: Optional[StrCmp] = None, contains: Optional[StrCmp] = None, ends_with: Optional[StrCmp] = None, starts_with: Optional[StrCmp] = None, regexp: Optional[StrCmp] = None, distance: Optional[Tuple[StrCmp, int]] = None) → NQ
with_asset(asset_query: Optional[IAssetQuery] = None) → NQ
with_bin_file(bin_file_query: Optional[IFileQuery] = None) → NQ
with_children(child_query: Optional[IProcessQuery] = None) → NQ
with_created_connections(created_connection_query: Optional[IProcessOutboundConnectionQuery] = None) → NQ
with_created_files(created_files_query: Optional[IFileQuery] = None) → NQ
with_created_timestamp(eq: Optional[IntCmp] = None, gt: Optional[IntCmp] = None, lt: Optional[IntCmp] = None) → NQ
with_deleted_files(deleted_files_query: Optional[IFileQuery] = None) → NQ
with_image_name(eq: Optional[StrCmp] = None, contains: Optional[StrCmp] = None, ends_with: Optional[StrCmp] = None, starts_with: Optional[StrCmp] = None, regexp: Optional[StrCmp] = None, distance: Optional[Tuple[StrCmp, int]] = None) → NQ
with_inbound_connections(inbound_connection_query: Optional[IProcessInboundConnectionQuery] = None) → NQ
with_parent(parent_query: Optional[IProcessQuery] = None) → NQ
with_process_id(eq: Optional[IntCmp] = None, gt: Optional[IntCmp] = None, lt: Optional[IntCmp] = None) → NQ
with_process_name(eq: Optional[StrCmp] = None, contains: Optional[StrCmp] = None, ends_with: Optional[StrCmp] = None, starts_with: Optional[StrCmp] = None, regexp: Optional[StrCmp] = None, distance: Optional[Tuple[StrCmp, int]] = None) → NQ
with_read_files(read_files_query: Optional[IFileQuery] = None) → NQ
with_risks(risks_query: Optional[RiskQuery] = None) → NQ
with_terminate_time(eq: Optional[IntCmp] = None, gt: Optional[IntCmp] = None, lt: Optional[IntCmp] = None) → NQ
with_wrote_files(wrote_files_query: Optional[IFileQuery] = None) → NQ

ProcessInboundConnection

ProcessInboundConnectionView

class grapl_analyzerlib.nodes.process_inbound_network_connection.ProcessInboundConnectionView(dgraph_client: pydgraph.client.DgraphClient, node_key: str, uid: str, node_type: str, created_timestamp: Optional[int] = None, terminated_timestamp: Optional[int] = None, last_seen_timestamp: Optional[int] = None, port: Optional[int] = None, ip_address: Optional[str] = None, protocol: Optional[str] = None, bound_port: Optional[List[grapl_analyzerlib.nodes.ip_port_node.IpPortView]] = None, bound_by: Optional[List[grapl_analyzerlib.nodes.process_node.ProcessView]] = None)

Predicate

Type

Description

node_key

string

A unique identifier for this node

created_timestamp

int

Time the process inbound network connection was created (in millis-since-epoch).

terminated_timestamp

int

Time the process inbound network connection was terminated (in millis-since-epoch).

last_seen_timestamp

int

Time the process inbound network connection was last seen (in millis-since-epoch)

port

int

Port of the inbound process network connection.

ip_address

str

IP Address of the inbound process network connection.

protocol

int

Network protocol of the inbound process network connection.

bound_port

List[IpPort]

todo: documentation

bound_by

List[Process]

todo: documentation

get_created_timestamp() → Optional[int]
get_ip_address() → Optional[str]
get_last_seen_timestamp() → Optional[int]
get_node_type() → str
get_port() → Optional[int]
get_protocol() → Optional[str]
get_terminated_timestamp() → Optional[int]

ProcessInboundConnectionQuery

class grapl_analyzerlib.nodes.process_inbound_network_connection.ProcessInboundConnectionQuery(*args, **kwds)
with_bound_by(bound_by_query: Optional[IProcessQuery] = None) → NQ
with_bound_port(bound_port_query: Optional[IIpPortQuery] = None) → NQ
with_created_timestamp(eq: Optional[IntCmp] = None, gt: Optional[IntCmp] = None, lt: Optional[IntCmp] = None) → NQ
with_ip_address(eq: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, contains: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, ends_with: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, starts_with: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, regexp: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, distance: Optional[Tuple[Union[str, Not[str], List[str], List[Union[str, Not[str]]]], int]] = None) → NQ
with_last_seen_timestamp(eq: Optional[IntCmp] = None, gt: Optional[IntCmp] = None, lt: Optional[IntCmp] = None) → NQ
with_port(eq: Optional[IntCmp] = None, gt: Optional[IntCmp] = None, lt: Optional[IntCmp] = None) → NQ
with_protocol(eq: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, contains: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, ends_with: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, starts_with: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, regexp: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, distance: Optional[Tuple[Union[str, Not[str], List[str], List[Union[str, Not[str]]]], int]] = None) → NQ
with_terminated_timestamp(eq: Optional[IntCmp] = None, gt: Optional[IntCmp] = None, lt: Optional[IntCmp] = None) → NQ

ProcessOutboundConnection

ProcessOutboundConnectionView

class grapl_analyzerlib.nodes.process_outbound_network_connection.ProcessOutboundConnectionView(dgraph_client: pydgraph.client.DgraphClient, node_key: str, uid: str, node_type: str, created_timestamp: Optional[int] = None, terminated_timestamp: Optional[int] = None, last_seen_timestamp: Optional[int] = None, port: Optional[int] = None, ip_address: Optional[str] = None, protocol: Optional[str] = None, connecting_processes: Optional[IProcessView] = None, connected_over: Optional[grapl_analyzerlib.nodes.ip_port_node.IpPortView] = None, connected_to: Optional[grapl_analyzerlib.nodes.ip_port_node.IpPortView] = None)

Predicate

Type

Description

node_key

string

A unique identifier for this node

created_timestamp

int

Time the process outbound network connection was created (in millis-since-epoch).

terminated_timestamp

int

Time the process outbound network connection was terminated (in millis-since-epoch).

last_seen_timestamp

int

Time the process outbound network connection was last seen (in millis-since-epoch)

port

int

Port of the outbound process network connection.

ip_address

str

IP Address of the outbound process network connection.

protocol

int

Network protocol of the outbound process network connection.

connecting_processes

Process

todo: documentation

connected_over

IpPort

todo: documentation

connected_to

IpPort

todo: documentation

get_connected_over() → Optional[grapl_analyzerlib.nodes.ip_port_node.IpPortView]
get_connected_to() → Optional[grapl_analyzerlib.nodes.ip_port_node.IpPortView]
get_connecting_processes() → List[grapl_analyzerlib.nodes.process_node.ProcessView]
get_created_timestamp() → Optional[int]
get_ip_address() → Optional[str]
get_last_seen_timestamp() → Optional[int]
get_node_type() → str
get_port() → Optional[int]
get_protocol() → Optional[str]
get_terminated_timestamp() → Optional[int]

ProcessOutboundConnectionQuery

class grapl_analyzerlib.nodes.process_outbound_network_connection.ProcessOutboundConnectionQuery(*args, **kwds)
with_connected_over(connected_over_query: Optional[IpPortQuery] = None) → NQ
with_connected_to(connected_to_query: Optional[IpPortQuery] = None) → NQ
with_connecting_processess(connecting_processess_query: Optional[ProcessQuery] = None) → NQ
with_created_timestamp(eq: Optional[IntCmp] = None, gt: Optional[IntCmp] = None, lt: Optional[IntCmp] = None) → NQ
with_ip_address(eq: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, contains: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, ends_with: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, starts_with: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, regexp: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, distance: Optional[Tuple[Union[str, Not[str], List[str], List[Union[str, Not[str]]]], int]] = None) → NQ
with_last_seen_timestamp(eq: Optional[IntCmp] = None, gt: Optional[IntCmp] = None, lt: Optional[IntCmp] = None) → NQ
with_port(eq: Optional[IntCmp] = None, gt: Optional[IntCmp] = None, lt: Optional[IntCmp] = None) → NQ
with_protocol(eq: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, contains: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, ends_with: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, starts_with: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, regexp: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, distance: Optional[Tuple[Union[str, Not[str], List[str], List[Union[str, Not[str]]]], int]] = None) → NQ
with_terminated_timestamp(eq: Optional[IntCmp] = None, gt: Optional[IntCmp] = None, lt: Optional[IntCmp] = None) → NQ

Risk

RiskView

class grapl_analyzerlib.nodes.risk_node.RiskView(dgraph_client: grapl_analyzerlib.grapl_client.GraphClient, node_key: str, uid: str, node_type: str, risk_score: Optional[int] = None, analyzer_name: Optional[str] = None, risky_nodes: Optional[List[NodeView]] = None)

Predicate

Type

Description

node_key

string

A unique identifier for this node

risk_score

int

todo: documentation

analyzer_name

string

The name of the analyzer that spawned this risk.

risky_nodes

List[Node]

todo: documentation

get_analyzer_name() → Optional[str]
get_node_type() → str
get_risk_score() → Optional[int]
get_risky_nodes(match_risky_nodes: Optional[grapl_analyzerlib.nodes.queryable.Queryable] = None) → Optional[str]

RiskQuery

class grapl_analyzerlib.nodes.risk_node.RiskQuery(*args, **kwds)
with_analyzer_name(eq: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, contains: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, ends_with: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, starts_with: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, regexp: Optional[Union[str, Not[str], List[str], List[Union[str, Not[str]]]]] = None, distance: Optional[Tuple[Union[str, Not[str], List[str], List[Union[str, Not[str]]]], int]] = None) → NQ
with_risk_score(eq: Optional[IntCmp] = None, gt: Optional[IntCmp] = None, lt: Optional[IntCmp] = None) → NQ
with_risky_nodes(risky_nodes_query: Optional[NodeQuery] = None) → NQ

Queries and Views

Queries and Views are the main constructs to work with the graph.

Queries allow you to pull data from the graph that matches a structure.

Views represent an existing graph, which you can expand by pivoting off of its edges.

Let’s query for some processes with the name “svchost”.

from grapl_analyzerlib.prelude import *

# Create a client to talk to Grapl
mclient = MasterGraphClient()

svchosts = (
    ProcessQuery()
    .with_process_name(eq="svchost.exe")
    .query(mclient)  # Execute the query
)  # type: List[ProcessView]

Now we can pivot around that data. Let’s look at the parent processes of these svchosts:

for svchost in svchosts:
    if svchost.get_parent():
        print(svchost.parent.get_process_name())

Installation

Install grapl_analyzerlib by running:

pip install --user grapl_analyzerlib

License

The project is licensed under the Apache 2.0 license.