Skip to content

Commit

Permalink
Merge pull request #54 from uselagoon/multiple-kibana-indices
Browse files Browse the repository at this point in the history
Handle multiple migraiton versions of index-pattern indices
  • Loading branch information
smlx authored May 30, 2023
2 parents b9a14ef + e226ea1 commit f804561
Show file tree
Hide file tree
Showing 5 changed files with 301 additions and 67 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ require (
github.com/go-sql-driver/mysql v1.7.1
github.com/jmoiron/sqlx v1.3.5
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1
golang.org/x/oauth2 v0.8.0
)

Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU=
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 h1:k/i9J1pBpvlfR+9QsetwPyERsqu1GIbi967PQMq3Ivc=
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
Expand Down
146 changes: 84 additions & 62 deletions internal/opensearch/indexpatterns.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,16 @@ import (
"strings"

"go.uber.org/zap"
"golang.org/x/exp/slices"
)

// maximum size of search results returned by Opensearch
// https://opensearch.org/docs/latest/opensearch/ux/#scroll-search
const searchSize = 10000

var (
// globalTenantIndexName matches the name of the index that the global tenant
// index patterns are stored in.
globalTenantIndexName = regexp.MustCompile(`^\.kibana_[0-9]+$`)
// tenantIndexName matches the name of the index that regular tenant index
// patterns are stored in. The format of the match is
// <hashInt>_<sanitizedName>.
tenantIndexName = regexp.MustCompile(`^\.kibana_(.+)_[0-9]+$`)
// indexName matches the raw name of an index-pattern index name and its
// migration number
indexName = regexp.MustCompile(`^\.kibana(?:_(.+))?_([0-9]+)$`)
)

// SourceIndexPattern represents the index pattern definition inside the
Expand Down Expand Up @@ -136,53 +131,58 @@ func (c *Client) RawIndexPatterns(ctx context.Context,
return io.ReadAll(res.Body)
}

// IndexPatterns returns all Opensearch index patterns as a map of index names
// (which are derived from tenant names) to map of index pattern titles to
// index pattern IDs, which is set if the index pattern exists in the tenant.
//
// TODO:
// This function ignores migrated .kibana indices, so it may set the same index
// pattern to true in the map more than once if e.g. indices named
// .kibana_mytenant_{1,2,3} all exist. Instead it should figure out how to tell
// which of these indices represents the current index-pattern.
func (c *Client) IndexPatterns(ctx context.Context) (
map[string]map[string][]string, error) {
indexPatterns := map[string]map[string][]string{}
var after string
for {
rawIndexPatterns, err := c.RawIndexPatterns(ctx, after)
if err != nil {
return nil,
fmt.Errorf("couldn't get index patterns from Opensearch API: %v", err)
}
searchResultSize, lastUpdatedAt, err :=
parseIndexPatterns(rawIndexPatterns, indexPatterns)
// parseIndexName takes a raw index name with the ".kibana_" prefix and "_n"
// suffix (where "n" is the migration number). It returns the index name
// stripped of the prefix and suffix, the migration number as an int, and an
// error (if any).
func parseIndexName(rawIndex string) (string, int, error) {
matches := indexName.FindStringSubmatch(rawIndex)
if len(matches) != 3 {
return "", 0, fmt.Errorf("invalid index name: %s", rawIndex)
}
var index string
if matches[1] == "" {
index = "global_tenant"
} else {
index = matches[1]
}
migration, err := strconv.Atoi(matches[2])
if err != nil {
return "", 0, fmt.Errorf("couldn't parse migration number: %v", err)
}
if migration < 1 {
return "", 0, fmt.Errorf("invalid migration number: %d", migration)
}
return index, migration, nil
}

// indexMaxMigration iterates over hits and returns a map containing the unique
// index names found, mapped to the maximum migration number of each of those
// indices. The index names are stripped of their ".kibana_" prefix and their
// "_n" suffix, where "n" is the migration number.
func indexMaxMigration(hits []IndexPattern) (map[string]int, error) {
maxMigration := map[string]int{}
for _, hit := range hits {
index, migration, err := parseIndexName(hit.Index)
if err != nil {
return nil,
fmt.Errorf("couldn't parse index patterns: %v", err)
return nil, fmt.Errorf("couldn't parse index name %s: %v", hit.Index, err)
}
if searchResultSize < searchSize {
c.log.Debug("got all index patterns, returning result",
zap.Int("hits", searchResultSize))
break // we have got all the index patterns...
if maxMigration[index] < migration {
maxMigration[index] = migration
}
// ...otherwise we need to do another request
c.log.Debug("partial index pattern search response: scrolling results")
after = lastUpdatedAt
}
return indexPatterns, nil
return maxMigration, nil
}

// parseIndexPatterns takes the raw index patterns search results as a JSON
// blob, and a map to store results.
// It fills out the map according to the index patterns that it finds, and
// returns the number of search results found, the updated at date on the last
// search result, and an error (if any).
// returns the number of search results found in data, the updated at date on
// the last search result in data, and an error (if any).
func parseIndexPatterns(data []byte,
indexPatterns map[string]map[string][]string) (int, string, error) {
// unpack all index patterns
var s SearchResult
var index string
if err := json.Unmarshal(data, &s); err != nil {
return 0, "", fmt.Errorf(
"couldn't unmarshal index patterns search result: %v", err)
Expand All @@ -191,17 +191,18 @@ func parseIndexPatterns(data []byte,
if len(s.Hits.Hits) == 0 {
return 0, "1970-01-01T00:00:00Z", nil
}
maxMigration, err := indexMaxMigration(s.Hits.Hits)
if err != nil {
return 0, "", fmt.Errorf("couldn't get max migrations: %v", err)
}
for _, hit := range s.Hits.Hits {
if globalTenantIndexName.MatchString(hit.Index) {
index = "global_tenant"
} else {
matches := tenantIndexName.FindStringSubmatch(hit.Index)
// sanity-check the index pattern format and return an error if it is
// not as expected.
if len(matches) != 2 {
return 0, "", fmt.Errorf("unexpected index name: %v", hit.Index)
}
index = matches[1]
index, migration, err := parseIndexName(hit.Index)
if err != nil {
return 0, "", fmt.Errorf("couldn't parse index name %s: %v", hit.Index, err)
}
if maxMigration[index] != migration {
// ignore old migrations of indices
continue
}
// initialize the nested map
if indexPatterns[index] == nil {
Expand All @@ -211,20 +212,41 @@ func parseIndexPatterns(data []byte,
// because the prefix is not used when referring to the index pattern by ID
// in other API requests.
patternID := strings.TrimPrefix(hit.ID, "index-pattern:")
// check if the patternID is already in the slice. This happens when there
// are multiple versions of the same index pattern stored in opensearch as
// a result of a migration during version updates. For example, name_1,
// name_2 etc.
// If the patternID _is_ already in the slice, don't add it as the slice
// should contain unique IDs only.
if slices.Contains(indexPatterns[index][hit.Source.IndexPattern.Title],
patternID) {
continue
}
// Multiple identically named index patterns may be added to a single
// tenant, so map the index pattern names to a slice of IDs.
indexPatterns[index][hit.Source.IndexPattern.Title] =
append(indexPatterns[index][hit.Source.IndexPattern.Title], patternID)
}
return len(s.Hits.Hits), s.Hits.Hits[len(s.Hits.Hits)-1].Source.UpdatedAt, nil
}

// IndexPatterns returns all Opensearch index patterns as a map of index names
// (which are derived from tenant names) to map of index pattern titles to
// index pattern IDs, which is set if the index pattern exists in the tenant.
func (c *Client) IndexPatterns(ctx context.Context) (
map[string]map[string][]string, error) {
indexPatterns := map[string]map[string][]string{}
var after string
for {
rawIndexPatterns, err := c.RawIndexPatterns(ctx, after)
if err != nil {
return nil,
fmt.Errorf("couldn't get index patterns from Opensearch API: %v", err)
}
searchResultSize, lastUpdatedAt, err :=
parseIndexPatterns(rawIndexPatterns, indexPatterns)
if err != nil {
return nil,
fmt.Errorf("couldn't parse index patterns: %v", err)
}
if searchResultSize < searchSize {
c.log.Debug("got all index patterns, returning result",
zap.Int("hits", searchResultSize))
break // we have got all the index patterns...
}
// ...otherwise we need to do another request
c.log.Debug("partial index pattern search response: scrolling results")
after = lastUpdatedAt
}
return indexPatterns, nil
}
20 changes: 18 additions & 2 deletions internal/opensearch/indexpatterns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ func TestSearchBodyMarshal(t *testing.T) {
}
}

func TestIndexPatternsUnmarshal(t *testing.T) {

func TestParseIndexPatterns(t *testing.T) {
type parseIndexPatternsResponse struct {
indexPatterns map[string]map[string][]string
length int
Expand Down Expand Up @@ -275,6 +274,23 @@ func TestIndexPatternsUnmarshal(t *testing.T) {
lastUpdatedAt: "2022-12-02T17:18:31.585Z",
},
},
"handle multiple kibana indices": {
input: "testdata/indexpatterns3.json",
expect: parseIndexPatternsResponse{
indexPatterns: map[string]map[string][]string{
"global_tenant": {
"router-logs-*": []string{"router-logs-*"},
"lagoon-logs-*": []string{"lagoon-logs-*"},
"application-logs-*": []string{"9b7da830-d427-11ed-b326-3348256dd0e8"},
},
"-152937574_admintenant": {
"lagoon-logs-*": []string{"lagoon-logs-*"},
},
},
length: 9,
lastUpdatedAt: "2023-05-02T07:54:24.736Z",
},
},
}

for name, tc := range testCases {
Expand Down
Loading

0 comments on commit f804561

Please sign in to comment.