Skip to content

Commit

Permalink
refactor: refactor field mapping and search mode
Browse files Browse the repository at this point in the history
  • Loading branch information
N3kox committed Jan 13, 2025
1 parent 25badfb commit da0e449
Show file tree
Hide file tree
Showing 27 changed files with 398 additions and 689 deletions.
19 changes: 0 additions & 19 deletions components/indexer/es8/field_mapping/consts.go

This file was deleted.

81 changes: 0 additions & 81 deletions components/indexer/es8/field_mapping/field_mapping.go

This file was deleted.

2 changes: 1 addition & 1 deletion components/indexer/es8/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.22

require (
github.com/bytedance/mockey v1.2.13
github.com/cloudwego/eino v0.3.5
github.com/cloudwego/eino v0.3.6
github.com/elastic/go-elasticsearch/v8 v8.16.0
github.com/smartystreets/goconvey v1.8.1
)
Expand Down
4 changes: 2 additions & 2 deletions components/indexer/es8/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ github.com/bytedance/sonic/loader v0.2.0/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4
github.com/certifi/gocertifi v0.0.0-20190105021004-abcd57078448/go.mod h1:GJKEexRPVJrBSOjoqN5VNOIKJ5Q3RViH6eu3puDRwx4=
github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y=
github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
github.com/cloudwego/eino v0.3.5 h1:9PkAOX/phFifrGXkfl4L9rdecxOQJBJY1FtZqF4bz3c=
github.com/cloudwego/eino v0.3.5/go.mod h1:+kmJimGEcKuSI6OKhet7kBedkm1WUZS3H1QRazxgWUo=
github.com/cloudwego/eino v0.3.6 h1:3yfdKKxMVWefdOyGXHuqUMM5cc9iioijj2mpPsDZKIg=
github.com/cloudwego/eino v0.3.6/go.mod h1:+kmJimGEcKuSI6OKhet7kBedkm1WUZS3H1QRazxgWUo=
github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg=
github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
92 changes: 25 additions & 67 deletions components/indexer/es8/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@ import (
"github.com/cloudwego/eino/components/embedding"
"github.com/cloudwego/eino/components/indexer"
"github.com/cloudwego/eino/schema"

"github.com/cloudwego/eino-ext/components/indexer/es8/field_mapping"
)

type IndexerConfig struct {
ESConfig elasticsearch.Config `json:"es_config"`
Index string `json:"index"`
BatchSize int `json:"batch_size"`

// VectorFields dense_vector field mappings
VectorFields []field_mapping.FieldKV `json:"vector_fields"`
// FieldMapping supports customize es fields from eino document, returns:
// needEmbeddingFields will be embedded by Embedding firstly, then join fields with its keys,
// and joined fields will be saved as bulk item.
FieldMapping func(ctx context.Context, doc *schema.Document) (fields map[string]any, needEmbeddingFields map[string]string, err error)
// Embedding vectorization method, must provide in two cases
// 1. VectorFields contains fields except doc Content
// 2. VectorFields contains doc Content and vector not provided in doc extra (see Document.Vector method)
Expand All @@ -58,13 +58,8 @@ func NewIndexer(_ context.Context, conf *IndexerConfig) (*Indexer, error) {
return nil, fmt.Errorf("[NewIndexer] new es client failed, %w", err)
}

if conf.Embedding == nil {
for _, kv := range conf.VectorFields {
if kv.FieldName != field_mapping.DocFieldNameContent {
return nil, fmt.Errorf("[NewIndexer] Embedding not provided in config, but field kv[%s]-[%s] requires",
kv.FieldNameVector, kv.FieldName)
}
}
if conf.FieldMapping == nil {
return nil, fmt.Errorf("[NewIndexer] field mapping method not provided")
}

if conf.BatchSize == 0 {
Expand Down Expand Up @@ -99,13 +94,7 @@ func (i *Indexer) Store(ctx context.Context, docs []*schema.Document, opts ...in
}

for _, slice := range chunk(docs, i.config.BatchSize) {
var items []esutil.BulkIndexerItem

if len(i.config.VectorFields) == 0 {
items, err = i.defaultQueryItems(ctx, slice, options)
} else {
items, err = i.vectorQueryItems(ctx, slice, options)
}
items, err := i.makeBulkItems(ctx, slice, options)
if err != nil {
return nil, err
}
Expand All @@ -128,73 +117,42 @@ func (i *Indexer) Store(ctx context.Context, docs []*schema.Document, opts ...in
return ids, nil
}

func (i *Indexer) defaultQueryItems(_ context.Context, docs []*schema.Document, _ *indexer.Options) (items []esutil.BulkIndexerItem, err error) {
items, err = iterWithErr(docs, func(doc *schema.Document) (item esutil.BulkIndexerItem, err error) {
b, err := json.Marshal(toESDoc(doc))
if err != nil {
return item, err
}

return esutil.BulkIndexerItem{
Index: i.config.Index,
Action: "index",
DocumentID: doc.ID,
Body: bytes.NewReader(b),
}, nil
})

if err != nil {
return nil, err
}

return items, nil
}

func (i *Indexer) vectorQueryItems(ctx context.Context, docs []*schema.Document, options *indexer.Options) (items []esutil.BulkIndexerItem, err error) {
func (i *Indexer) makeBulkItems(ctx context.Context, docs []*schema.Document, options *indexer.Options) (items []esutil.BulkIndexerItem, err error) {
emb := options.Embedding

items, err = iterWithErr(docs, func(doc *schema.Document) (item esutil.BulkIndexerItem, err error) {
mp := toESDoc(doc)
texts := make([]string, 0, len(i.config.VectorFields))
for _, kv := range i.config.VectorFields {
str, ok := kv.FieldName.Find(doc)
if !ok {
return item, fmt.Errorf("[vectorQueryItems] field name not found or type incorrect, name=%s, doc=%v", kv.FieldName, doc)
}

if kv.FieldName == field_mapping.DocFieldNameContent && len(doc.DenseVector()) > 0 {
mp[string(kv.FieldNameVector)] = doc.DenseVector()
} else {
texts = append(texts, str)
}
fields, needEmbeddingFields, err := i.config.FieldMapping(ctx, doc)
if err != nil {
return item, fmt.Errorf("[makeBulkItems] FieldMapping failed, %w", err)
}

if len(texts) > 0 {
if len(needEmbeddingFields) > 0 {
if emb == nil {
return item, fmt.Errorf("[vectorQueryItems] embedding not provided")
return item, fmt.Errorf("[makeBulkItems] embedding method not provided")
}

tuples := make([]tuple[string, int], 0, len(fields))
texts := make([]string, 0, len(fields))
for k, text := range needEmbeddingFields {
tuples = append(tuples, tuple[string, int]{k, len(texts)})
texts = append(texts, text)
}

vectors, err := emb.EmbedStrings(i.makeEmbeddingCtx(ctx, emb), texts)
if err != nil {
return item, fmt.Errorf("[vectorQueryItems] embedding failed, %w", err)
return item, fmt.Errorf("[makeBulkItems] embedding failed, %w", err)
}

if len(vectors) != len(texts) {
return item, fmt.Errorf("[vectorQueryItems] invalid vector length, expected=%d, got=%d", len(texts), len(vectors))
return item, fmt.Errorf("[makeBulkItems] invalid vector length, expected=%d, got=%d", len(texts), len(vectors))
}

vIdx := 0
for _, kv := range i.config.VectorFields {
if kv.FieldName == field_mapping.DocFieldNameContent && len(doc.DenseVector()) > 0 {
continue
}

mp[string(kv.FieldNameVector)] = vectors[vIdx]
vIdx++
for _, t := range tuples {
fields[t.A] = vectors[t.B]
}
}

b, err := json.Marshal(mp)
b, err := json.Marshal(fields)
if err != nil {
return item, err
}
Expand Down
Loading

0 comments on commit da0e449

Please sign in to comment.