Skip to content

Commit

Permalink
EQL - Endpoint (#38)
Browse files Browse the repository at this point in the history
New features:
- Handle `/:index/_eql/seach` endpoint (see
`http_requests/search_eql.http`)
- Support for SQL query parameters. 

Refactor:
- Extracted interface `IQueryTranslator` from the
`ClickhouseQueryTranslator` type. It needs attention.
  • Loading branch information
nablaone authored May 7, 2024
1 parent 8772943 commit b13632a
Show file tree
Hide file tree
Showing 17 changed files with 409 additions and 51 deletions.
12 changes: 11 additions & 1 deletion docker/device-log-generator/windows_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"
)

const elasticSearchBulkUrl = "http://elasticsearch:9200/_bulk"
const windowsJsonFile = "assets/windows_logs.json"

const windowsBulkJson = `{"create":{"_index":"windows_logs"}}`
Expand Down Expand Up @@ -248,7 +249,16 @@ func toBulk(serialized []byte) (logBytes []byte) {
}

func sendToWindowsLog(logBytes []byte) {
targetUrl := configureTargetUrl()

// We need the same data in both places for manual testing purposes.
// This is temporary and will be removed when we'll have end-to-end tests.
//

sendToWindowsLogTo(elasticSearchBulkUrl, logBytes)
sendToWindowsLogTo(configureTargetUrl(), logBytes)
}

func sendToWindowsLogTo(targetUrl string, logBytes []byte) {

if resp, err := http.Post(targetUrl, "application/json", bytes.NewBuffer(logBytes)); err != nil {
log.Printf("Failed to send windows logs: %v", err)
Expand Down
6 changes: 6 additions & 0 deletions http_requests/search_eql.http
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
GET http://localhost:8080/windows_logs/_eql/search
Content-type: application/json

{
"query": "process where true"
}
25 changes: 17 additions & 8 deletions quesma/eql/playground/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,16 @@ func main() {
break
}

query := translate(cmd)
query, parameters := translate(cmd)
if query == "" {
continue
}

execute(db, query)
execute(db, query, parameters)
}
}

func translate(cmd string) string {
func translate(cmd string) (string, map[string]interface{}) {
translateName := func(name *transform.Symbol) (*transform.Symbol, error) {
res := strings.ReplaceAll(name.Name, ".", "::")
res = "\"" + res + "\"" // TODO proper escaping
Expand All @@ -82,19 +82,20 @@ func translate(cmd string) string {

trans := eql.NewTransformer()
trans.FieldNameTranslator = translateName
where, err := trans.TransformQuery(cmd)
trans.ExtractParameters = true
where, parameters, err := trans.TransformQuery(cmd)

if err != nil {
fmt.Println("tranform erors:")
fmt.Println(err)
return ""
return "", nil
}

fmt.Printf("where clause: '%s'\n", where)

sql := `select "@timestamp", "event::category", "process::name", "process::pid", "process::executable" from windows_logs where ` + where
fmt.Println("SQL: \n" + sql)
return sql
return sql, parameters
}

func cellValue(a interface{}) string {
Expand Down Expand Up @@ -132,9 +133,17 @@ func cellValue(a interface{}) string {

}

func execute(db *sql.DB, sql string) {
func execute(db *sql.DB, query string, parameters map[string]interface{}) {

rows, err := db.Query(sql)
fmt.Println("executing query:", query, parameters)

var args []any

for k, v := range parameters {
args = append(args, sql.Named(k, v))
}

rows, err := db.Query(query, args...)

if err != nil {
fmt.Println("query error:")
Expand Down
154 changes: 154 additions & 0 deletions quesma/eql/query_translator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package eql

import (
"context"
"encoding/json"
"mitmproxy/quesma/clickhouse"
"mitmproxy/quesma/eql/transform"
"mitmproxy/quesma/logger"
"mitmproxy/quesma/model"
"mitmproxy/quesma/queryparser"
"strconv"
"strings"
)

// It implements quesma.IQueryTranslator for EQL queries.

type ClickhouseEQLQueryTranslator struct {
ClickhouseLM *clickhouse.LogManager
Table *clickhouse.Table
Ctx context.Context
}

func (cw *ClickhouseEQLQueryTranslator) BuildNRowsQuery(fieldName string, simpleQuery queryparser.SimpleQuery, limit int) *model.Query {

return &model.Query{
Fields: []string{fieldName},
NonSchemaFields: []string{},
WhereClause: simpleQuery.Sql.Stmt,
SuffixClauses: []string{},
FromClause: cw.Table.FullTableName(),
CanParse: true,
}
}

func (cw *ClickhouseEQLQueryTranslator) MakeSearchResponse(ResultSet []model.QueryResultRow, typ model.SearchQueryType, highlighter queryparser.Highlighter) (*model.SearchResp, error) {

// This shares a lot of code with the ClickhouseQueryTranslator
//

hits := make([]model.SearchHit, len(ResultSet))
for i := range ResultSet {
resultRow := ResultSet[i]

hits[i].Fields = make(map[string][]interface{})
hits[i].Highlight = make(map[string][]string)
hits[i].Source = []byte(resultRow.String(cw.Ctx))
if typ == model.ListAllFields {
hits[i].ID = strconv.Itoa(i + 1)
hits[i].Index = cw.Table.Name
hits[i].Score = 1
hits[i].Version = 1
hits[i].Sort = []any{
"2024-01-30T19:38:54.607Z",
2944,
}
}
}

return &model.SearchResp{
Hits: model.SearchHits{
Total: &model.Total{
Value: len(ResultSet),
Relation: "eq",
},
Events: hits,
},
Shards: model.ResponseShards{
Total: 1,
Successful: 1,
Failed: 0,
},
}, nil
}

func (cw *ClickhouseEQLQueryTranslator) ParseQuery(queryAsJson string) (query queryparser.SimpleQuery, searchQueryInfo model.SearchQueryInfo, highlighter queryparser.Highlighter) {

// no highlighting here
highlighter = queryparser.NewEmptyHighlighter()

searchQueryInfo.Typ = model.ListAllFields
query.Sql = queryparser.Statement{}

queryAsMap := make(map[string]interface{})
err := json.Unmarshal([]byte(queryAsJson), &queryAsMap)
if err != nil {
logger.ErrorWithCtx(cw.Ctx).Err(err).Msg("error parsing query request's JSON")

query.CanParse = false
query.Sql.Stmt = "Invalid JSON"
return query, model.NewSearchQueryInfoNone(), highlighter
}

var eqlQuery string

if queryAsMap["query"] != nil {
eqlQuery = queryAsMap["query"].(string)
}

if eqlQuery == "" {
query.CanParse = false
query.Sql.Stmt = "Empty query"
return query, model.NewSearchQueryInfoNone(), highlighter
}

// FIXME this is a naive translation.
// It should use the table schema to translate field names
translateName := func(name *transform.Symbol) (*transform.Symbol, error) {
res := strings.ReplaceAll(name.Name, ".", "::")
res = "\"" + res + "\"" // TODO proper escaping
return transform.NewSymbol(res), nil
}

trans := NewTransformer()
trans.FieldNameTranslator = translateName

// We don't extract parameters for now.
// Query execution does not support parameters yet.
trans.ExtractParameters = false
where, _, err := trans.TransformQuery(eqlQuery)

if err != nil {
logger.ErrorWithCtx(cw.Ctx).Err(err).Msg("error transforming EQL query")
query.CanParse = false
query.Sql.Stmt = "Invalid EQL query"
return query, model.NewSearchQueryInfoNone(), highlighter
}

query.Sql.Stmt = where
query.CanParse = true

return query, searchQueryInfo, highlighter
}

// These methods are not supported by EQL. They are here to satisfy the interface.

func (cw *ClickhouseEQLQueryTranslator) BuildSimpleCountQuery(whereClause string) *model.Query {
panic("EQL does not support count")
}

func (cw *ClickhouseEQLQueryTranslator) BuildSimpleSelectQuery(whereClause string) *model.Query {
panic("EQL does not support this method")
}

func (cw *ClickhouseEQLQueryTranslator) MakeResponseAggregation(aggregations []model.QueryWithAggregation, aggregationResults [][]model.QueryResultRow) *model.SearchResp {
panic("EQL does not support aggregations")
}

func (cw *ClickhouseEQLQueryTranslator) BuildFacetsQuery(fieldName string, simpleQuery queryparser.SimpleQuery, limit int) *model.Query {
panic("EQL does not support facets")
}

func (cw *ClickhouseEQLQueryTranslator) ParseAggregationJson(aggregationJson string) ([]model.QueryWithAggregation, error) {
panic("EQL does not support aggregations")
}
24 changes: 16 additions & 8 deletions quesma/eql/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

type Transformer struct {
FieldNameTranslator func(*transform.Symbol) (*transform.Symbol, error)
ExtractParameters bool
}

func NewTransformer() *Transformer {
Expand All @@ -27,47 +28,54 @@ func NewTransformer() *Transformer {
// 4. Replace the field names with clickhouse field names
// 5. Render the expression as WHERE clause

func (t *Transformer) TransformQuery(query string) (string, error) {
func (t *Transformer) TransformQuery(query string) (string, map[string]interface{}, error) {

// 1. parse EQL
p := NewEQL()
ast, err := p.Parse(query)
if err != nil {
return "", err
return "", nil, err
}

if !p.IsSupported(ast) {
return "", fmt.Errorf("unsupported query type") // TODO proper error message
return "", nil, fmt.Errorf("unsupported query type") // TODO proper error message
}

// 2. Convert EQL to Exp model
eql2ExpTransformer := transform.NewEQLParseTreeToExpTransformer()
var exp transform.Exp
exp = ast.Accept(eql2ExpTransformer).(transform.Exp)
if len(eql2ExpTransformer.Errors) > 0 {
return "", fmt.Errorf("eql2exp conversion errors: count=%d, %v", len(eql2ExpTransformer.Errors), eql2ExpTransformer.Errors)
return "", nil, fmt.Errorf("eql2exp conversion errors: count=%d, %v", len(eql2ExpTransformer.Errors), eql2ExpTransformer.Errors)
}

// exp can be null if query is empty
// we return empty as well
if exp == nil {
return "", nil
return "", nil, nil
}

// 3. Replace operators with clickhouse operators
transOp := &transform.ClickhouseTransformer{}
exp = exp.Accept(transOp).(transform.Exp)

if len(transOp.Errors) > 0 {
return "", fmt.Errorf("transforming opertators failed: errors: count=%d message: %v", len(transOp.Errors), transOp.Errors)
return "", nil, fmt.Errorf("transforming opertators failed: errors: count=%d message: %v", len(transOp.Errors), transOp.Errors)
}

transFieldName := &transform.FieldNameTransformer{
Translate: t.FieldNameTranslator,
}
exp = exp.Accept(transFieldName).(transform.Exp)
if len(transFieldName.Errors) > 0 {
return "", fmt.Errorf("transforming field names failed: errors: count=%d message: %v", len(transFieldName.Errors), transFieldName.Errors)
return "", nil, fmt.Errorf("transforming field names failed: errors: count=%d message: %v", len(transFieldName.Errors), transFieldName.Errors)
}

parameters := make(map[string]interface{})
if t.ExtractParameters {
constTransformer := transform.NewParametersExtractorTransformer()
exp = exp.Accept(constTransformer).(transform.Exp)
parameters = constTransformer.Parameters
}

// 6. Render the expression as WHERE clause
Expand All @@ -76,5 +84,5 @@ func (t *Transformer) TransformQuery(query string) (string, error) {
renderer := &transform.Renderer{}
whereClause := exp.Accept(renderer).(string)

return whereClause, nil
return whereClause, parameters, nil
}
1 change: 1 addition & 0 deletions quesma/eql/transform/renderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type Renderer struct {

func (v *Renderer) VisitConst(e *Const) interface{} {
switch e.Value.(type) {
// TODO proper escaping here
case string:
// TODO add proper escaping
return fmt.Sprintf("'%v'", e.Value.(string))
Expand Down
Loading

0 comments on commit b13632a

Please sign in to comment.