Skip to content

Commit

Permalink
determine store inited with informer hassynced (#88)
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
skeeey authored Nov 25, 2024
1 parent 4b0d204 commit 1536c39
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 12 deletions.
20 changes: 12 additions & 8 deletions pkg/cloudevents/work/store/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
// It is used for building ManifestWork source client.
type SourceInformerWatcherStore struct {
baseSourceStore
watcher *workWatcher
watcher *workWatcher
informer cache.SharedIndexInformer
}

var _ WorkClientWatcherStore = &SourceInformerWatcherStore{}
Expand Down Expand Up @@ -57,7 +58,7 @@ func (s *SourceInformerWatcherStore) Delete(work *workv1.ManifestWork) error {
}

func (s *SourceInformerWatcherStore) HasInitiated() bool {
return s.initiated
return s.initiated && s.informer.HasSynced()
}

func (s *SourceInformerWatcherStore) GetWatcher(namespace string, opts metav1.ListOptions) (watch.Interface, error) {
Expand All @@ -68,8 +69,9 @@ func (s *SourceInformerWatcherStore) GetWatcher(namespace string, opts metav1.Li
return s.watcher, nil
}

func (s *SourceInformerWatcherStore) SetStore(store cache.Store) {
s.store = store
func (s *SourceInformerWatcherStore) SetInformer(informer cache.SharedIndexInformer) {
s.informer = informer
s.store = informer.GetStore()
s.initiated = true
}

Expand All @@ -80,7 +82,8 @@ func (s *SourceInformerWatcherStore) SetStore(store cache.Store) {
// It is used for building ManifestWork agent client.
type AgentInformerWatcherStore struct {
baseStore
watcher *workWatcher
informer cache.SharedIndexInformer
watcher *workWatcher
}

var _ WorkClientWatcherStore = &AgentInformerWatcherStore{}
Expand Down Expand Up @@ -157,10 +160,11 @@ func (s *AgentInformerWatcherStore) GetWatcher(namespace string, opts metav1.Lis
}

func (s *AgentInformerWatcherStore) HasInitiated() bool {
return s.initiated
return s.initiated && s.informer.HasSynced()
}

func (s *AgentInformerWatcherStore) SetStore(store cache.Store) {
s.store = store
func (s *AgentInformerWatcherStore) SetInformer(informer cache.SharedIndexInformer) {
s.informer = informer
s.store = informer.GetStore()
s.initiated = true
}
2 changes: 1 addition & 1 deletion test/integration/cloudevents/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func StartWorkAgent(ctx context.Context,
workinformers.WithNamespace(clusterName),
)
informer := factory.Work().V1().ManifestWorks()
watcherStore.SetStore(informer.Informer().GetStore())
watcherStore.SetInformer(informer.Informer())

go informer.Informer().Run(ctx.Done())

Expand Down
4 changes: 2 additions & 2 deletions test/integration/cloudevents/cloudevents_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ var _ = ginkgo.Describe("CloudEvents Clients Test - Kafka", func() {
workinformers.WithNamespace(clusterName),
)
informer := factory.Work().V1().ManifestWorks()
watcherStore.SetStore(informer.Informer().GetStore())
watcherStore.SetInformer(informer.Informer())
go informer.Informer().Run(ctx.Done())

agentManifestClient := agentClientHolder.ManifestWorks(clusterName)
Expand Down Expand Up @@ -218,7 +218,7 @@ var _ = ginkgo.Describe("CloudEvents Clients Test - Kafka", func() {
)
informer = factory.Work().V1().ManifestWorks()

watcherStore.SetStore(informer.Informer().GetStore())
watcherStore.SetInformer(informer.Informer())

// case1: wait until the consumer is ready
time.Sleep(5 * time.Second)
Expand Down
2 changes: 1 addition & 1 deletion test/integration/cloudevents/source/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func StartManifestWorkSourceClient(

factory := workinformers.NewSharedInformerFactoryWithOptions(clientHolder.WorkInterface(), 5*time.Minute)
informer := factory.Work().V1().ManifestWorks()
watcherStore.SetStore(informer.Informer().GetStore())
watcherStore.SetInformer(informer.Informer())

go informer.Informer().Run(ctx.Done())

Expand Down

0 comments on commit 1536c39

Please sign in to comment.