Skip to content

Commit

Permalink
Minor UI fixes (#515)
Browse files Browse the repository at this point in the history
@avelanarius requested change, be able to search by `quesma_async_...`.
Now you can check the dashboard ID of the response and then find a
matching one in Quesma.

While there, I added two minor styles, one in the dashboard and the
other in total time.
  • Loading branch information
jakozaur authored Jul 12, 2024
1 parent 340a550 commit e42d684
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 70 deletions.
8 changes: 4 additions & 4 deletions quesma/queryparser/query_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ func EmptyAsyncSearchResponse(id string, isPartial bool, completionStatus int) (
return asyncSearchResp.Marshal() // error should never ever happen here
}

func (cw *ClickhouseQueryTranslator) MakeAsyncSearchResponse(ResultSet []model.QueryResultRow, query *model.Query, asyncRequestIdStr string, isPartial bool) (*model.AsyncSearchEntireResp, error) {
func (cw *ClickhouseQueryTranslator) MakeAsyncSearchResponse(ResultSet []model.QueryResultRow, query *model.Query, asyncId string, isPartial bool) (*model.AsyncSearchEntireResp, error) {
searchResponse := cw.MakeSearchResponse([]*model.Query{query}, [][]model.QueryResultRow{ResultSet})
id := new(string)
*id = asyncRequestIdStr
*id = asyncId
response := model.AsyncSearchEntireResp{
Response: *searchResponse,
ID: id,
Expand Down Expand Up @@ -355,9 +355,9 @@ func (cw *ClickhouseQueryTranslator) MakeSearchResponse(queries []*model.Query,
return response
}

func SearchToAsyncSearchResponse(searchResponse *model.SearchResp, asyncRequestIdStr string, isPartial bool, completionStatus int) *model.AsyncSearchEntireResp {
func SearchToAsyncSearchResponse(searchResponse *model.SearchResp, asyncId string, isPartial bool, completionStatus int) *model.AsyncSearchEntireResp {
id := new(string)
*id = asyncRequestIdStr
*id = asyncId
response := model.AsyncSearchEntireResp{
Response: *searchResponse,
ID: id,
Expand Down
3 changes: 2 additions & 1 deletion quesma/quesma/matchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import (
"quesma/quesma/config"
"quesma/quesma/mux"
"quesma/quesma/types"
"quesma/tracing"
"strings"
)

func matchedAgainstAsyncId() mux.RequestMatcher {
return mux.RequestMatcherFunc(func(req *mux.Request) bool {
if !strings.HasPrefix(req.Params["id"], quesmaAsyncIdPrefix) {
if !strings.HasPrefix(req.Params["id"], tracing.AsyncIdPrefix) {
logger.Debug().Msgf("async query id %s is forwarded to Elasticsearch", req.Params["id"])
return false
}
Expand Down
3 changes: 1 addition & 2 deletions quesma/quesma/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ import (
)

const (
httpOk = 200
quesmaAsyncIdPrefix = "quesma_async_search_id_"
httpOk = 200
)

func configureRouter(cfg config.QuesmaConfiguration, sr schema.Registry, lm *clickhouse.LogManager, console *ui.QuesmaManagementConsole, phoneHomeAgent telemetry.PhoneHomeAgent, queryRunner *QueryRunner) *mux.PathRouter {
Expand Down
66 changes: 31 additions & 35 deletions quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"quesma/tracing"
"quesma/util"
"slices"
"strconv"
"strings"
"sync/atomic"
"time"
Expand All @@ -36,8 +35,6 @@ const (
asyncQueriesLimitBytes = 1024 * 1024 * 500 // 500MB
)

var asyncRequestId atomic.Int64

type AsyncRequestResult struct {
responseBody []byte
added time.Time
Expand Down Expand Up @@ -122,13 +119,13 @@ func (q *QueryRunner) handleEQLSearch(ctx context.Context, indexPattern string,
func (q *QueryRunner) handleAsyncSearch(ctx context.Context, indexPattern string, body types.JSON,
waitForResultsMs int, keepOnCompletion bool) ([]byte, error) {
async := AsyncQuery{
asyncRequestIdStr: generateAsyncRequestId(),
waitForResultsMs: waitForResultsMs,
keepOnCompletion: keepOnCompletion,
startTime: time.Now(),
asyncId: tracing.GetAsyncId(),
waitForResultsMs: waitForResultsMs,
keepOnCompletion: keepOnCompletion,
startTime: time.Now(),
}
ctx = context.WithValue(ctx, tracing.AsyncIdCtxKey, async.asyncRequestIdStr)
logger.InfoWithCtx(ctx).Msgf("async search request id: %s started", async.asyncRequestIdStr)
ctx = context.WithValue(ctx, tracing.AsyncIdCtxKey, async.asyncId)
logger.InfoWithCtx(ctx).Msgf("async search request id: %s started", async.asyncId)
return q.handleSearchCommon(ctx, indexPattern, body, &async, QueryLanguageDefault)
}

Expand All @@ -139,10 +136,10 @@ type AsyncSearchWithError struct {
}

type AsyncQuery struct {
asyncRequestIdStr string
waitForResultsMs int
keepOnCompletion bool
startTime time.Time
asyncId string
waitForResultsMs int
keepOnCompletion bool
startTime time.Time
}

func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern string, body types.JSON, optAsync *AsyncQuery, queryLanguage QueryLanguage) ([]byte, error) {
Expand All @@ -155,15 +152,15 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin

var resp []byte
if optAsync != nil {
resp, _ = queryparser.EmptyAsyncSearchResponse(optAsync.asyncRequestIdStr, false, 200)
resp, _ = queryparser.EmptyAsyncSearchResponse(optAsync.asyncId, false, 200)
} else {
resp = queryparser.EmptySearchResponse(ctx)
}
return resp, err
case sourceNone:
if elasticsearch.IsIndexPattern(indexPattern) {
if optAsync != nil {
return queryparser.EmptyAsyncSearchResponse(optAsync.asyncRequestIdStr, false, 200)
return queryparser.EmptyAsyncSearchResponse(optAsync.asyncId, false, 200)
} else {
return queryparser.EmptySearchResponse(ctx), nil
}
Expand All @@ -184,7 +181,7 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
if len(sourcesClickhouse) == 0 {
if elasticsearch.IsIndexPattern(indexPattern) {
if optAsync != nil {
return queryparser.EmptyAsyncSearchResponse(optAsync.asyncRequestIdStr, false, 200)
return queryparser.EmptyAsyncSearchResponse(optAsync.asyncId, false, 200)
} else {
return queryparser.EmptySearchResponse(ctx), nil
}
Expand Down Expand Up @@ -286,7 +283,7 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
responseBody = []byte(fmt.Sprintf("Invalid Queries: %s, err: %v", queriesBody, err))
logger.ErrorWithCtxAndReason(ctx, "Quesma generated invalid SQL query").Msg(queriesBodyConcat)
bodyAsBytes, _ := body.Bytes()
pushSecondaryInfo(q.quesmaManagementConsole, id, path, bodyAsBytes, queriesBody, responseBody, startTime)
pushSecondaryInfo(q.quesmaManagementConsole, id, "", path, bodyAsBytes, queriesBody, responseBody, startTime)
return responseBody, errors.New(string(responseBody))
}

Expand All @@ -303,19 +300,19 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
} else {
responseBody, err = response.response.Marshal()
}
pushSecondaryInfo(q.quesmaManagementConsole, id, path, bodyAsBytes, response.translatedQueryBody, responseBody, startTime)
pushSecondaryInfo(q.quesmaManagementConsole, id, "", path, bodyAsBytes, response.translatedQueryBody, responseBody, startTime)
return responseBody, err
} else {
select {
case <-time.After(time.Duration(optAsync.waitForResultsMs) * time.Millisecond):
go func() { // Async search takes longer. Return partial results and wait for
recovery.LogPanicWithCtx(ctx)
res := <-doneCh
q.storeAsyncSearch(q.quesmaManagementConsole, id, optAsync.asyncRequestIdStr, optAsync.startTime, path, body, res, true)
q.storeAsyncSearch(q.quesmaManagementConsole, id, optAsync.asyncId, optAsync.startTime, path, body, res, true)
}()
return q.handlePartialAsyncSearch(ctx, optAsync.asyncRequestIdStr)
return q.handlePartialAsyncSearch(ctx, optAsync.asyncId)
case res := <-doneCh:
responseBody, err = q.storeAsyncSearch(q.quesmaManagementConsole, id, optAsync.asyncRequestIdStr, optAsync.startTime, path, body, res,
responseBody, err = q.storeAsyncSearch(q.quesmaManagementConsole, id, optAsync.asyncId, optAsync.startTime, path, body, res,
optAsync.keepOnCompletion)

return responseBody, err
Expand All @@ -334,19 +331,20 @@ func (q *QueryRunner) removeNotExistingTables(sourcesClickhouse []string) []stri
})
}

func (q *QueryRunner) storeAsyncSearch(qmc *ui.QuesmaManagementConsole, id, asyncRequestIdStr string,
func (q *QueryRunner) storeAsyncSearch(qmc *ui.QuesmaManagementConsole, id, asyncId string,
startTime time.Time, path string, body types.JSON, result AsyncSearchWithError, keep bool) (responseBody []byte, err error) {
took := time.Since(startTime)
if result.err != nil {
if keep {
q.AsyncRequestStorage.Store(asyncRequestIdStr, AsyncRequestResult{err: result.err, added: time.Now(),
q.AsyncRequestStorage.Store(asyncId, AsyncRequestResult{err: result.err, added: time.Now(),
isCompressed: false})
}
responseBody, _ = queryparser.EmptyAsyncSearchResponse(asyncRequestIdStr, false, 503)
responseBody, _ = queryparser.EmptyAsyncSearchResponse(asyncId, false, 503)
err = result.err
bodyAsBytes, _ := body.Bytes()
qmc.PushSecondaryInfo(&ui.QueryDebugSecondarySource{
Id: id,
AsyncId: asyncId,
Path: path,
IncomingQueryBody: bodyAsBytes,
QueryBodyTranslated: result.translatedQueryBody,
Expand All @@ -355,11 +353,12 @@ func (q *QueryRunner) storeAsyncSearch(qmc *ui.QuesmaManagementConsole, id, asyn
})
return
}
asyncResponse := queryparser.SearchToAsyncSearchResponse(result.response, asyncRequestIdStr, false, 200)
asyncResponse := queryparser.SearchToAsyncSearchResponse(result.response, asyncId, false, 200)
responseBody, err = asyncResponse.Marshal()
bodyAsBytes, _ := body.Bytes()
qmc.PushSecondaryInfo(&ui.QueryDebugSecondarySource{
Id: id,
AsyncId: asyncId,
Path: path,
IncomingQueryBody: bodyAsBytes,
QueryBodyTranslated: result.translatedQueryBody,
Expand All @@ -375,7 +374,7 @@ func (q *QueryRunner) storeAsyncSearch(qmc *ui.QuesmaManagementConsole, id, asyn
isCompressed = true
}
}
q.AsyncRequestStorage.Store(asyncRequestIdStr,
q.AsyncRequestStorage.Store(asyncId,
AsyncRequestResult{responseBody: compressedBody, added: time.Now(), err: err, isCompressed: isCompressed})
}
return
Expand All @@ -390,10 +389,6 @@ func (q *QueryRunner) asyncQueriesCumulatedBodySize() int {
return size
}

func generateAsyncRequestId() string {
return "quesma_async_search_id_" + strconv.FormatInt(asyncRequestId.Add(1), 10)
}

func (q *QueryRunner) handlePartialAsyncSearch(ctx context.Context, id string) ([]byte, error) {
if !strings.Contains(id, "quesma_async_search_id_") {
logger.ErrorWithCtx(ctx).Msgf("non quesma async id: %v", id)
Expand Down Expand Up @@ -437,12 +432,12 @@ func (q *QueryRunner) deleteAsyncSeach(id string) ([]byte, error) {
return []byte{}, nil
}

func (q *QueryRunner) reachedQueriesLimit(ctx context.Context, asyncRequestIdStr string, doneCh chan<- AsyncSearchWithError) bool {
func (q *QueryRunner) reachedQueriesLimit(ctx context.Context, asyncId string, doneCh chan<- AsyncSearchWithError) bool {
if q.AsyncRequestStorage.Size() < asyncQueriesLimit && q.asyncQueriesCumulatedBodySize() < asyncQueriesLimitBytes {
return false
}
err := errors.New("too many async queries")
logger.ErrorWithCtx(ctx).Msgf("cannot handle %s, too many async queries", asyncRequestIdStr)
logger.ErrorWithCtx(ctx).Msgf("cannot handle %s, too many async queries", asyncId)
doneCh <- AsyncSearchWithError{err: err}
return true
}
Expand Down Expand Up @@ -620,12 +615,12 @@ func (q *QueryRunner) searchWorker(ctx context.Context,
doneCh chan<- AsyncSearchWithError,
optAsync *AsyncQuery) (translatedQueryBody [][]byte, resultRows [][]model.QueryResultRow, err error) {
if optAsync != nil {
if q.reachedQueriesLimit(ctx, optAsync.asyncRequestIdStr, doneCh) {
if q.reachedQueriesLimit(ctx, optAsync.asyncId, doneCh) {
return
}
// We need different ctx as our cancel is no longer tied to HTTP request, but to overall timeout.
dbQueryCtx, dbCancel := context.WithCancel(tracing.NewContextWithRequest(ctx))
q.addAsyncQueryContext(dbQueryCtx, dbCancel, optAsync.asyncRequestIdStr)
q.addAsyncQueryContext(dbQueryCtx, dbCancel, optAsync.asyncId)
ctx = dbQueryCtx
}

Expand Down Expand Up @@ -675,9 +670,10 @@ func (q *QueryRunner) postProcessResults(table *clickhouse.Table, results [][]mo
return geoIpTransformer.Transform(res)
}

func pushSecondaryInfo(qmc *ui.QuesmaManagementConsole, Id, Path string, IncomingQueryBody []byte, QueryBodyTranslated [][]byte, QueryTranslatedResults []byte, startTime time.Time) {
func pushSecondaryInfo(qmc *ui.QuesmaManagementConsole, Id, AsyncId, Path string, IncomingQueryBody []byte, QueryBodyTranslated [][]byte, QueryTranslatedResults []byte, startTime time.Time) {
qmc.PushSecondaryInfo(&ui.QueryDebugSecondarySource{
Id: Id,
AsyncId: AsyncId,
Path: Path,
IncomingQueryBody: IncomingQueryBody,
QueryBodyTranslated: QueryBodyTranslated,
Expand Down
2 changes: 1 addition & 1 deletion quesma/quesma/ui/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (qmc *QuesmaManagementConsole) generateDashboardPanel() []byte {
buffer.Html(fmt.Sprintf(`<div class="status">Host uptime: %s</div>`, secondsToTerseString(h.Uptime)))
}

buffer.Html("<div>Version: ")
buffer.Html(`<div class="status">Version: `)
buffer.Text(buildinfo.Version)
buffer.Html("</div>")

Expand Down
41 changes: 25 additions & 16 deletions quesma/quesma/ui/live_tail_drilldown.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,34 @@ import (
"fmt"
"gopkg.in/yaml.v3"
"quesma/quesma/ui/internal/builder"
"quesma/tracing"
"quesma/util"
"strings"
)

func (qmc *QuesmaManagementConsole) generateReportForRequestId(requestId string) []byte {
var request queryDebugInfo
var requestFound bool
qmc.mutex.Lock()
request, requestFound := qmc.debugInfoMessages[requestId]
if strings.HasPrefix(requestId, tracing.AsyncIdPrefix) {
for _, debugInfo := range qmc.debugInfoMessages {
if debugInfo.AsyncId == requestId {
request = debugInfo
requestFound = true
break
}
}
} else {
request, requestFound = qmc.debugInfoMessages[requestId]
}
qmc.mutex.Unlock()

logMessages, optAsyncId := generateLogMessages(request.logMessages, []string{})
logMessages := generateLogMessages(request.logMessages, []string{})

buffer := newBufferWithHead()
if requestFound {
if optAsyncId != nil {
buffer.Write(generateSimpleTop("Report for request id " + requestId + " and async id " + *optAsyncId))
if len(request.AsyncId) > 0 {
buffer.Write(generateSimpleTop("Report for request id " + requestId + " and async id " + request.AsyncId))
} else {
buffer.Write(generateSimpleTop("Report for request id " + requestId))
}
Expand Down Expand Up @@ -76,8 +89,7 @@ func (qmc *QuesmaManagementConsole) generateReportForRequestId(requestId string)

buffer.Html(`<div class="quesma-response">` + "\n")
if len(request.QueryDebugSecondarySource.QueryTranslatedResults) > 0 {
tookStr := fmt.Sprintf(" took %d ms:", request.SecondaryTook.Milliseconds())
buffer.Html("<p class=\"title\">Quesma response").Text(tookStr).Html("</p>\n")
buffer.Html("<p class=\"title\">Quesma response").Html("</p>\n")
buffer.Html(`<pre>`)
buffer.Text(string(request.QueryDebugSecondarySource.QueryTranslatedResults))
buffer.Html("\n</pre>")
Expand Down Expand Up @@ -113,12 +125,14 @@ func (qmc *QuesmaManagementConsole) generateReportForRequestId(requestId string)
buffer.Html("<ul>\n")
buffer.Html("<li>").Text("Request id: ").Text(requestId).Html("</li>\n")
buffer.Html("<li>").Text("Path: ").Text(request.Path).Html("</li>\n")
if optAsyncId != nil {
buffer.Html("<li>").Text("Async id: ").Text(*optAsyncId).Html("</li>\n")
if len(request.AsyncId) > 0 {
buffer.Html("<li>").Text("Async id: ").Text(request.AsyncId).Html("</li>\n")
}
if request.unsupported != nil {
buffer.Html("<li>").Text("Unsupported: ").Text(*request.unsupported).Html("</li>\n")
}
tookStr := fmt.Sprintf("Took: %d ms", request.SecondaryTook.Milliseconds())
buffer.Html("<li>").Text(tookStr).Html("</li>")
buffer.Html("</ul>\n")
}

Expand Down Expand Up @@ -147,7 +161,7 @@ func (qmc *QuesmaManagementConsole) generateReportForRequestId(requestId string)

// links might be empty, then table won't have any links within.
// if i < len(logMessages) && i < len(links) then logMessages[i] will have link links[i]
func generateLogMessages(logMessages []string, links []string) ([]byte, *string) {
func generateLogMessages(logMessages []string, links []string) []byte {
// adds a link to the table row if there is a link for it
addOpeningLink := func(row, column int) string {
if row < len(links) {
Expand Down Expand Up @@ -178,8 +192,6 @@ func generateLogMessages(logMessages []string, links []string) ([]byte, *string)
buffer.Html("</thead>\n")
buffer.Html("<tbody>\n")

var asyncId *string

for i, logMessage := range logMessages {
buffer.Html("<tr>\n")

Expand Down Expand Up @@ -221,10 +233,7 @@ func generateLogMessages(logMessages []string, links []string) ([]byte, *string)

// get rid of request_id and async_id
delete(fields, "request_id")
if id, ok := fields["async_id"].(string); ok {
asyncId = &id
delete(fields, "async_id")
}
delete(fields, "async_id")

// message
buffer.Html(`<td class="message">` + addOpeningLink(i, 2))
Expand All @@ -244,7 +253,7 @@ func generateLogMessages(logMessages []string, links []string) ([]byte, *string)

buffer.Html("</tbody>\n")
buffer.Html("</table>\n")
return buffer.Bytes(), asyncId
return buffer.Bytes()
}

func (qmc *QuesmaManagementConsole) generateReportForRequestsWithStr(requestStr string) []byte {
Expand Down
4 changes: 3 additions & 1 deletion quesma/quesma/ui/management_console.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ type QueryDebugPrimarySource struct {
}

type QueryDebugSecondarySource struct {
Id string
Id string
AsyncId string

Path string
IncomingQueryBody []byte
Expand Down Expand Up @@ -179,6 +180,7 @@ func (qmc *QuesmaManagementConsole) processChannelMessage() {
// fmt.Println(msg.IncomingQueryBody)
secondaryDebugInfo := QueryDebugSecondarySource{
msg.Id,
msg.AsyncId,
msg.Path,
[]byte(util.JsonPrettify(string(msg.IncomingQueryBody), true)),
msg.QueryBodyTranslated,
Expand Down
Loading

0 comments on commit e42d684

Please sign in to comment.