Skip to content

Commit

Permalink
[v. small] Persistent storage for asyncs 2 (#894)
Browse files Browse the repository at this point in the history
Just style:
* change some internal struct to small letter to not pollute global
symbols
* merge error and no-error flow into one.
  • Loading branch information
trzysiek authored Oct 16, 2024
1 parent 90400ea commit 6df9dc2
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 38 deletions.
6 changes: 3 additions & 3 deletions quesma/quesma/async_search_storage/in_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,16 @@ func elapsedTime(t time.Time) time.Duration {
return time.Since(t)
}

type AsyncQueryIdWithTime struct {
type asyncQueryIdWithTime struct {
id string
time time.Time
}

func (e *AsyncQueriesEvictor) tryEvictAsyncRequests(timeFun func(time.Time) time.Duration) {
var ids []AsyncQueryIdWithTime
var ids []asyncQueryIdWithTime
e.AsyncRequestStorage.Range(func(key string, value *AsyncRequestResult) bool {
if timeFun(value.added) > EvictionInterval {
ids = append(ids, AsyncQueryIdWithTime{id: key, time: value.added})
ids = append(ids, asyncQueryIdWithTime{id: key, time: value.added})
}
return true
})
Expand Down
58 changes: 23 additions & 35 deletions quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (q *QueryRunner) handleAsyncSearch(ctx context.Context, indexPattern string
return q.handleSearchCommon(ctx, indexPattern, body, &async, QueryLanguageDefault)
}

type AsyncSearchWithError struct {
type asyncSearchWithError struct {
response *model.SearchResp
translatedQueryBody []types.TranslatedSQLQuery
err error
Expand Down Expand Up @@ -173,33 +173,33 @@ func (q *QueryRunner) checkProperties(ctx context.Context, plan *model.Execution
return nil, nil
}

func (q *QueryRunner) runExecutePlanAsync(ctx context.Context, plan *model.ExecutionPlan, queryTranslator IQueryTranslator, table *clickhouse.Table, doneCh chan AsyncSearchWithError, optAsync *AsyncQuery) {
func (q *QueryRunner) runExecutePlanAsync(ctx context.Context, plan *model.ExecutionPlan, queryTranslator IQueryTranslator, table *clickhouse.Table, doneCh chan asyncSearchWithError, optAsync *AsyncQuery) {
go func() {
defer recovery.LogAndHandlePanic(ctx, func(err error) {
doneCh <- AsyncSearchWithError{err: err}
doneCh <- asyncSearchWithError{err: err}
})

translatedQueryBody, results, err := q.searchWorker(ctx, plan, table, doneCh, optAsync)
if err != nil {
doneCh <- AsyncSearchWithError{translatedQueryBody: translatedQueryBody, err: err}
doneCh <- asyncSearchWithError{translatedQueryBody: translatedQueryBody, err: err}
return
}

if len(plan.Queries) > 0 && len(results) == 0 {
// if there are no queries, empty results are fine
logger.ErrorWithCtx(ctx).Msgf("no hits, sqls: %v", translatedQueryBody)
doneCh <- AsyncSearchWithError{translatedQueryBody: translatedQueryBody, err: errors.New("no hits")}
doneCh <- asyncSearchWithError{translatedQueryBody: translatedQueryBody, err: errors.New("no hits")}
return
}

results, err = q.postProcessResults(plan, results)
if err != nil {
doneCh <- AsyncSearchWithError{translatedQueryBody: translatedQueryBody, err: err}
doneCh <- asyncSearchWithError{translatedQueryBody: translatedQueryBody, err: err}
}

searchResponse := queryTranslator.MakeSearchResponse(plan.Queries, results)

doneCh <- AsyncSearchWithError{response: searchResponse, translatedQueryBody: translatedQueryBody, err: err}
doneCh <- asyncSearchWithError{response: searchResponse, translatedQueryBody: translatedQueryBody, err: err}
}()
}

Expand All @@ -209,7 +209,7 @@ func (q *QueryRunner) executePlan(ctx context.Context, plan *model.ExecutionPlan
path := contextValues.RequestPath
opaqueId := contextValues.OpaqueId

doneCh := make(chan AsyncSearchWithError, 1)
doneCh := make(chan asyncSearchWithError, 1)

sendMainPlanResult := func(responseBody []byte, err error) {
if optComparePlansCh != nil {
Expand Down Expand Up @@ -460,32 +460,19 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
}

func (q *QueryRunner) storeAsyncSearch(qmc *ui.QuesmaManagementConsole, id, asyncId string,
startTime time.Time, path string, body types.JSON, result AsyncSearchWithError, keep bool, opaqueId string) (responseBody []byte, err error) {
startTime time.Time, path string, body types.JSON, result asyncSearchWithError, keep bool, opaqueId string) (responseBody []byte, err error) {

took := time.Since(startTime)
if result.err != nil {
if keep {
q.AsyncRequestStorage.Store(asyncId, // maybe responseBody line below should be an empty array?
async_search_storage.NewAsyncRequestResult(nil, result.err, time.Now(), false))
}
bodyAsBytes, _ := body.Bytes()
if result.err == nil {
okStatus := 200
asyncResponse := queryparser.SearchToAsyncSearchResponse(result.response, asyncId, false, &okStatus)
responseBody, err = asyncResponse.Marshal()
} else {
responseBody, _ = queryparser.EmptyAsyncSearchResponse(asyncId, false, 503)
err = result.err
bodyAsBytes, _ := body.Bytes()
qmc.PushSecondaryInfo(&ui.QueryDebugSecondarySource{
Id: id,
AsyncId: asyncId,
OpaqueId: opaqueId,
Path: path,
IncomingQueryBody: bodyAsBytes,
QueryBodyTranslated: result.translatedQueryBody,
QueryTranslatedResults: responseBody,
SecondaryTook: took,
})
return
}
okStatus := 200
asyncResponse := queryparser.SearchToAsyncSearchResponse(result.response, asyncId, false, &okStatus)
responseBody, err = asyncResponse.Marshal()
bodyAsBytes, _ := body.Bytes()

qmc.PushSecondaryInfo(&ui.QueryDebugSecondarySource{
Id: id,
AsyncId: asyncId,
Expand All @@ -496,6 +483,7 @@ func (q *QueryRunner) storeAsyncSearch(qmc *ui.QuesmaManagementConsole, id, asyn
QueryTranslatedResults: responseBody,
SecondaryTook: took,
})

if keep {
compressedBody := responseBody
isCompressed := false
Expand All @@ -505,9 +493,9 @@ func (q *QueryRunner) storeAsyncSearch(qmc *ui.QuesmaManagementConsole, id, asyn
isCompressed = true
}
}
q.AsyncRequestStorage.Store(asyncId,
async_search_storage.NewAsyncRequestResult(compressedBody, err, time.Now(), isCompressed))
q.AsyncRequestStorage.Store(asyncId, async_search_storage.NewAsyncRequestResult(compressedBody, err, time.Now(), isCompressed))
}

return
}

Expand Down Expand Up @@ -563,13 +551,13 @@ func (q *QueryRunner) deleteAsyncSeach(id string) ([]byte, error) {
return []byte(`{"acknowledged":true}`), nil
}

func (q *QueryRunner) reachedQueriesLimit(ctx context.Context, asyncId string, doneCh chan<- AsyncSearchWithError) bool {
func (q *QueryRunner) reachedQueriesLimit(ctx context.Context, asyncId string, doneCh chan<- asyncSearchWithError) bool {
if q.AsyncRequestStorage.Size() < asyncQueriesLimit && q.asyncQueriesCumulatedBodySize() < asyncQueriesLimitBytes {
return false
}
err := errors.New("too many async queries")
logger.ErrorWithCtx(ctx).Msgf("cannot handle %s, too many async queries", asyncId)
doneCh <- AsyncSearchWithError{err: err}
doneCh <- asyncSearchWithError{err: err}
return true
}

Expand Down Expand Up @@ -775,7 +763,7 @@ func (q *QueryRunner) searchWorkerCommon(
func (q *QueryRunner) searchWorker(ctx context.Context,
plan *model.ExecutionPlan,
table *clickhouse.Table,
doneCh chan<- AsyncSearchWithError,
doneCh chan<- asyncSearchWithError,
optAsync *AsyncQuery) (translatedQueryBody []types.TranslatedSQLQuery, resultRows [][]model.QueryResultRow, err error) {
if optAsync != nil {
if q.reachedQueriesLimit(ctx, optAsync.asyncId, doneCh) {
Expand Down

0 comments on commit 6df9dc2

Please sign in to comment.