diff --git a/go.mod b/go.mod index 5afb1b5..c269c0b 100644 --- a/go.mod +++ b/go.mod @@ -3,3 +3,5 @@ module github.com/fullstorydev/gosolr go 1.13 require github.com/fullstorydev/zk v1.0.3-0.20200828191825-edfcb5d63fdd + +replace github.com/fullstorydev/zk => github.com/patsonluk/zk v1.0.3-0.20230620222602-ecd88884e248 diff --git a/go.sum b/go.sum index 2b7d942..dac8781 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,2 @@ -github.com/fullstorydev/zk v1.0.3-0.20200828191825-edfcb5d63fdd h1:0lubUEXk7T4Z4fJtam94F9IVGK/k/wFw7jjTMpX0Ke8= -github.com/fullstorydev/zk v1.0.3-0.20200828191825-edfcb5d63fdd/go.mod h1:8fIOiu6/LYedHuzFUvyuLg0K2kFGSWGrEnsBytyZ9N4= +github.com/patsonluk/zk v1.0.3-0.20230620222602-ecd88884e248 h1:m4pApDjJN6FgotGGoDIbPIPaMl4JYVmNm6S9cSXXtZ8= +github.com/patsonluk/zk v1.0.3-0.20230620222602-ecd88884e248/go.mod h1:ssbnU3H1lBdE6eTeQ4pyY53X+kHUNqwongLRP8AYygo= diff --git a/solrmonitor/main/solrmonitor/solrmonitor.go b/solrmonitor/main/solrmonitor/solrmonitor.go index 7d4ab8e..80a7866 100644 --- a/solrmonitor/main/solrmonitor/solrmonitor.go +++ b/solrmonitor/main/solrmonitor/solrmonitor.go @@ -237,3 +237,24 @@ func (s *sexPantherZkCli) State() zk.State { func (s *sexPantherZkCli) Close() { s.delegate.Close() } + +func (s *sexPantherZkCli) Children(path string) ([]string, *zk.Stat, error) { + if s.isFlaky() && s.rnd.Float32() > flakeChance { + return nil, nil, errors.New("flaky error") + } + return s.delegate.Children(path) +} + +func (s *sexPantherZkCli) AddPersistentWatch(path string, mode zk.AddWatchMode) (ch zk.EventQueue, err error) { + if s.isFlaky() && s.rnd.Float32() > flakeChance { + return nil, errors.New("flaky error") + } + return s.delegate.AddPersistentWatch(path, mode) +} + +func (s *sexPantherZkCli) RemoveAllPersistentWatches(path string) (err error) { + if s.isFlaky() && s.rnd.Float32() > flakeChance { + return errors.New("flaky error") + } + return s.delegate.RemoveAllPersistentWatches(path) +} diff --git a/solrmonitor/mock.go b/solrmonitor/mock.go index 2f19fa6..966fb68 100644 --- a/solrmonitor/mock.go +++ b/solrmonitor/mock.go @@ -23,6 +23,19 @@ func newMockZkClient() ZkCli { type mockZkClient struct { } +func (m mockZkClient) Children(path string) ([]string, *zk.Stat, error) { + //TODO implement me + panic("implement me") +} + +func (m mockZkClient) AddPersistentWatch(path string, mode zk.AddWatchMode) (ch zk.EventQueue, err error) { + panic("implement me") +} + +func (m mockZkClient) RemoveAllPersistentWatches(path string) (err error) { + panic("implement me") +} + func (m mockZkClient) ChildrenW(path string) ([]string, *zk.Stat, <-chan zk.Event, error) { panic("Not implemented") } diff --git a/solrmonitor/solrmonitor.go b/solrmonitor/solrmonitor.go index dffc376..f38669e 100644 --- a/solrmonitor/solrmonitor.go +++ b/solrmonitor/solrmonitor.go @@ -54,11 +54,14 @@ type SolrMonitor struct { // Minimal interface solrmonitor needs (allows for mock ZK implementations). type ZkCli interface { ChildrenW(path string) ([]string, *zk.Stat, <-chan zk.Event, error) + Children(path string) ([]string, *zk.Stat, error) Get(path string) ([]byte, *zk.Stat, error) GetW(path string) ([]byte, *zk.Stat, <-chan zk.Event, error) ExistsW(path string) (bool, *zk.Stat, <-chan zk.Event, error) State() zk.State Close() + AddPersistentWatch(path string, mode zk.AddWatchMode) (ch zk.EventQueue, err error) + RemoveAllPersistentWatches(path string) (err error) } // Create a new solrmonitor. Solrmonitor takes ownership of the provided zkCli and zkWatcher-- they @@ -107,6 +110,12 @@ func (c callbacks) ChildrenChanged(path string, children []string) error { } func (c callbacks) DataChanged(path string, data string, stat *zk.Stat) error { + if c.isPrsPath(path) { //special handling for PRS entry, since we are not fetching data on them + if stat.Version > -1 { //ignore deletion + return c.SolrMonitor.updateCollectionWithPrsPath(path) + } + } + version := int32(-1) if stat != nil { version = stat.Version @@ -123,6 +132,10 @@ func (c callbacks) ShouldWatchData(path string) bool { return c.SolrMonitor.shouldWatchPath(path) || c.SolrMonitor.shouldWatchCollection(path) } +func (c callbacks) ShouldFetchData(path string) bool { + return c.SolrMonitor.shouldFetchData(path) +} + func (c *SolrMonitor) Close() { c.zkCli.Close() c.zkWatcher.Close() @@ -216,11 +229,13 @@ func (c *SolrMonitor) childrenChanged(path string, children []string) error { case c.solrRoot + liveQueryNodesPath: return c.updateLiveQueryNodes(children) default: - //collectionsPath + "/" + coll.name + "/state.json": we want to state.json children - if !strings.HasPrefix(path, c.solrRoot+"/collections/") || !strings.HasSuffix(path, "/state.json") { - return fmt.Errorf("solrmonitor: unknown childrenChanged: %s", path) + //since we use watch on /collections/ and recursive watch on /collections//state.json, + //the ONLY change should be on init (for PRS entries) + if strings.Contains(path, c.solrRoot+"/collections/") { + return c.initCollectionWithPrsStrings(c.getCollFromPath(path), children) + //return fmt.Errorf("solrmonitor: unexpected childrenChanged on collections: %s", path) } - return c.updateCollection(path, children) + return nil } } @@ -269,98 +284,111 @@ func (c *SolrMonitor) clusterPropsChanged(data string) error { return nil } -func (c *SolrMonitor) updateCollection(path string, children []string) error { - rs, err := c.updateCollectionState(path, children) - - if err == nil && rs != nil && len(rs) > 0 { - coll := c.getCollFromPath(path) - if coll != nil { - c.mu.RLock() - defer c.mu.RUnlock() - if c.solrEventListener != nil { - collName := c.getCollNameFromPath(path) - c.solrEventListener.SolrCollectionReplicaStatesChanged(collName, rs) - } - } - } +// updateCollectionWithPrsPath update with a single PRS entry change. Should only call this for PRS path creation +func (c *SolrMonitor) updateCollectionWithPrsPath(path string) error { + coll := c.getCollFromPath(path) + prsString := path[strings.Index(path, "/state.json/")+len("/state.json/"):] + return c.updateCollectionWithPrsStrings(coll, []string{prsString}) +} - return err +func (c *SolrMonitor) initCollectionWithPrsStrings(coll *collection, children []string) error { + c.logger.Printf("\nInitializing with PRS strings\n: %v", children) + return c.updateCollectionWithPrsStrings(coll, children) } -func (c *SolrMonitor) updateCollectionState(path string, children []string) (map[string]*PerReplicaState, error) { - c.logger.Printf("updateCollectionState: path %s, children %d", path, len(children)) - coll := c.getCollFromPath(path) - if coll == nil || len(children) == 0 { - //looks like we have not got the collection event yet; it should be safe to ignore it - return nil, nil +// updateCollectionWithPrsPath does 2 things: +// 1.update the collection with PRS update(s) from the path +// 2.notify the solrEventListener of the PRS change, it provides the full PRS entries + the new updated one +// +// Take note that the PRS strings might not be the full prs state for all replicas, it could just be an update for a +// single replica. The PRS entry supplied is expected to have a new version (ie update), this should not be called for +// PRS entry deletion +func (c *SolrMonitor) updateCollectionWithPrsStrings(coll *collection, prsStrings []string) error { + allPrs := make(map[string]*PerReplicaState) + + for _, prsString := range prsStrings { + prs := c.parsePerReplicaState(prsString) + allPrs[prs.Name] = prs } - rmap := make(map[string]*PerReplicaState) + c.logger.Printf("updateCollectionState on collection %s: updating prs state %s\n", coll.name, allPrs) + coll.mu.Lock() + defer coll.mu.Unlock() + for _, shard := range coll.collectionState.Shards { + for rname, rstate := range shard.Replicas { + isUpdate := false + if prs, exists := allPrs[rname]; exists { + if prs.Version <= rstate.Version { //keep the latest PRS state + c.logger.Printf("WARNING: PRS update with lower/same version than received previously. Existing: %v, Incoming: %v", rstate, prs) + } else { + isUpdate = true + rstate.Version = prs.Version + rstate.Leader = prs.Leader + rstate.State = prs.State + allPrs[prs.Name] = prs + } + } - for _, r := range children { - replicaParts := strings.Split(r, ":") - if len(replicaParts) < 3 || len(replicaParts) > 4 { - c.logger.Printf("PRS protocol is wrong %s ", r) - panic(fmt.Sprintf("PRS protocol is in wrong format %s ", r)) - } - version, err := strconv.ParseInt(replicaParts[1], 10, 32) - if err != nil { - c.logger.Printf("PRS protocol has wrong version %s ", r) - panic(fmt.Sprintf("PRS protocol has wrong version %s ", r)) + //not an update, just copy the existing entry. We are making an assumption that ReplicaStates within + //collectionState is in sync with the existing PRS entries + if !isUpdate { + allPrs[rname] = &PerReplicaState{ + Name: rname, + Version: rstate.Version, + State: rstate.State, + Leader: rstate.Leader, + } + } } + } - prs := &PerReplicaState{ - Name: replicaParts[0], - Version: int32(version), - } + c.mu.RLock() //not totally sure we need this lock? as we are not mutating anything here? + defer c.mu.RUnlock() + if c.solrEventListener != nil { + c.solrEventListener.SolrCollectionReplicaStatesChanged(coll.name, allPrs) + } - switch replicaParts[2] { - case "A": - prs.State = "active" - case "D": - prs.State = "down" - case "R": - prs.State = "recovering" - case "F": - prs.State = "RECOVERY_FAILED" - default: - // marking inactive - as it should be recoverable error - c.logger.Printf("ERROR: PRS protocol UNKNOWN state %s ", replicaParts[2]) - prs.State = "inactive" - } + return nil +} - prs.Leader = "false" - if len(replicaParts) == 4 { - prs.Leader = "true" - } +func (c *SolrMonitor) parsePerReplicaState(child string) *PerReplicaState { + replicaParts := strings.Split(child, ":") + if len(replicaParts) < 3 || len(replicaParts) > 4 { + c.logger.Printf("PRS protocol is wrong %s ", child) + panic(fmt.Sprintf("PRS protocol is in wrong format %s ", child)) + } + version, err := strconv.ParseInt(replicaParts[1], 10, 32) + if err != nil { + c.logger.Printf("PRS protocol has wrong version %s ", child) + panic(fmt.Sprintf("PRS protocol has wrong version %s ", child)) + } - //keep ths latest prs - if currentPrs, found := rmap[prs.Name]; found { - if currentPrs.Version >= prs.Version { - continue - } - } + prs := &PerReplicaState{ + Name: replicaParts[0], + Version: int32(version), + } - rmap[prs.Name] = prs + switch replicaParts[2] { + case "A": + prs.State = "active" + case "D": + prs.State = "down" + case "R": + prs.State = "recovering" + case "F": + prs.State = "RECOVERY_FAILED" + default: + // marking inactive - as it should be recoverable error + c.logger.Printf("ERROR: PRS protocol UNKNOWN state %s ", replicaParts[2]) + prs.State = "inactive" } - c.logger.Printf("updateCollectionState on collection %s: updating prs state %s", coll.name, rmap) - coll.mu.Lock() - defer coll.mu.Unlock() - //update the collection state based on new PRS (per replica state) - for _, shard := range coll.collectionState.Shards { - for rname, rstate := range shard.Replicas { - if prs, found := rmap[rname]; found { - if prs.Version < rstate.Version { - c.logger.Printf("WARNING: PRS update with lower version than received previously. Existing: %v, Update: %v", rstate, prs) - } - rstate.Version = prs.Version - rstate.Leader = prs.Leader - rstate.State = prs.State - } - } + + prs.Leader = "false" + if len(replicaParts) == 4 { + prs.Leader = "true" } - return rmap, nil + return prs } func (c *SolrMonitor) shouldWatchChildren(path string) bool { @@ -372,17 +400,24 @@ func (c *SolrMonitor) shouldWatchChildren(path string) bool { case c.solrRoot + liveQueryNodesPath: return true default: - // watch coll/state.json childrens for replica status - if strings.HasPrefix(path, c.solrRoot+"/collections/") && strings.HasSuffix(path, "/state.json") { - coll := c.getCollFromPath(path) - if coll != nil { - return coll.isPRSEnabled() - } - } + //for PRS entries in /solr/collections//state.json ... it's now using recursive watch + //There should not be any child watches used per collection anymore return false } } +func (c *SolrMonitor) shouldFetchData(path string) bool { + //PRS update, we don't care about the data/stat + if c.isPrsPath(path) { + return false + } + return true +} + +func (c *SolrMonitor) isPrsPath(path string) bool { + return strings.HasPrefix(path, c.solrRoot+collectionsPath) && strings.Contains(path, "/state.json/") +} + func (c *SolrMonitor) dataChanged(path string, data string, version int32) error { if strings.HasSuffix(path, rolesPath) { return c.rolesChanged(data) @@ -404,13 +439,14 @@ func (c *SolrMonitor) dataChanged(path string, data string, version int32) error return fmt.Errorf("unknown dataChanged: %s", path) } - if strings.HasSuffix(path, "/state.json") { + if strings.HasSuffix(path, "/state.json") { //change on state.json // common state.json change state := coll.setStateData(data, version) c.callSolrListener(coll.name, state) - if coll.isPRSEnabled() { - coll.startMonitoringReplicaStatus() - } + + //No need to start monitoring replica status, as the recursive watch on state.json is already doing that + } else if coll.isPRSEnabled() && c.isPrsPath(path) { //change on PRS entry + return fmt.Errorf("unexpected data change on PRS entry: %s", path) } else { // less common (usually just initialization) collection change state := coll.setCollectionData(data) @@ -449,8 +485,17 @@ func (c *SolrMonitor) getCollFromPath(path string) *collection { } func (c *SolrMonitor) getCollNameFromPath(path string) string { + //example inputs: + // /collections/myCollection + // /collections/myCollection/state.json + // /collections/myCollection/state.json/some_prs + name := strings.TrimPrefix(path, c.solrRoot+"/collections/") - name = strings.TrimSuffix(name, "/state.json") + stateStartIndex := strings.Index(name, "/state.json") + if stateStartIndex > -1 { + name = name[:stateStartIndex] + } + return name } @@ -515,6 +560,9 @@ func (c *SolrMonitor) updateCollections(collections []string) error { // Now remove any collections that disappeared. for name := range c.collections { if !collectionExists[name] { + if collection, _ := c.collections[name]; collection != nil { + collection.stop() + } delete(c.collections, name) logRemoved = append(logRemoved, name) } @@ -657,15 +705,24 @@ func parseStateData(name string, data []byte, version int32) (*CollectionState, func (coll *collection) start() error { collPath := coll.parent.solrRoot + "/collections/" + coll.name statePath := collPath + "/state.json" - if err := coll.parent.zkWatcher.MonitorData(collPath); err != nil { + if err := coll.parent.zkWatcher.MonitorData(collPath); err != nil { //for init return err } - if err := coll.parent.zkWatcher.MonitorData(statePath); err != nil { + + if err := coll.parent.zkWatcher.MonitorDataRecursive(statePath, 1); err != nil { return err } + return nil } +func (coll *collection) stop() { + collPath := coll.parent.solrRoot + "/collections/" + coll.name + statePath := collPath + "/state.json" + coll.parent.zkWatcher.StopMonitorData(collPath) + coll.parent.zkWatcher.StopMonitorData(statePath) +} + func (coll *collection) setCollectionData(data string) *CollectionState { coll.parent.logger.Printf("setCollectionData:updating the collection %s ", coll.name) if data == "" { @@ -743,19 +800,6 @@ func (coll *collection) carryOverConfigName(newState *CollectionState) { } } -func (coll *collection) startMonitoringReplicaStatus() { - path := coll.parent.solrRoot + "/collections/" + coll.name + "/state.json" - - // TODO: need to revisit coll.isWatched flag(if zk disconnects?). we need to create watch once only Scott? - if !coll.hasWatch() { - err := coll.parent.zkWatcher.MonitorChildren(path) - if err == nil { - coll.parent.logger.Printf("startMonitoringReplicaStatus: watching collection [%s] children for PRS", coll.name) - coll.watchAdded() - } - } -} - func (coll *collection) watchAdded() { coll.mu.Lock() defer coll.mu.Unlock() diff --git a/solrmonitor/solrmonitor_test.go b/solrmonitor/solrmonitor_test.go index 1f91bd1..e9477ba 100644 --- a/solrmonitor/solrmonitor_test.go +++ b/solrmonitor/solrmonitor_test.go @@ -22,6 +22,7 @@ import ( "fmt" "runtime" "strings" + "sync/atomic" "testing" "time" @@ -31,12 +32,13 @@ import ( ) type testutil struct { - t *testing.T - conn *zk.Conn - root string - sm *SolrMonitor - logger *smtestutil.ZkTestLogger - solrEventListener *SEListener + t *testing.T + conn *zk.Conn + root string + sm *SolrMonitor + logger *smtestutil.ZkTestLogger + solrEventListener *SEListener + collectionStateFetchCount *int32 } func (tu *testutil) teardown() { @@ -85,18 +87,26 @@ func setup(t *testing.T) (*SolrMonitor, *testutil) { collStateEvents: 0, collectionStates: make(map[string]*CollectionState), } - sm, err := NewSolrMonitorWithRoot(conn, watcher, logger, root, l) + + collectionStateFetchCount := int32(0) + proxyZkClient := &proxyZkCli{ + delegate: conn, + collectionStateFetchCount: &collectionStateFetchCount, + } + + sm, err := NewSolrMonitorWithRoot(proxyZkClient, watcher, logger, root, l) if err != nil { conn.Close() t.Fatal(err) } return sm, &testutil{ - t: t, - conn: conn, - root: root, - sm: sm, - logger: logger, - solrEventListener: l, + t: t, + conn: conn, + root: root, + sm: sm, + logger: logger, + solrEventListener: l, + collectionStateFetchCount: &collectionStateFetchCount, } } @@ -226,20 +236,22 @@ func TestCollectionChanges(t *testing.T) { } func TestPRSProtocol(t *testing.T) { - sm, testutil := setup(t) - defer testutil.teardown() + sm, testSetup := setup(t) + defer testSetup.teardown() shouldNotExist(t, sm, "c1") - zkCli := testutil.conn + zkCli := testSetup.conn zkCli.Create(sm.solrRoot+"/collections", nil, 0, zk.WorldACL(zk.PermAll)) _, err := zkCli.Create(sm.solrRoot+"/collections/c1", []byte(`{"configName":"_FS4"}`), 0, zk.WorldACL(zk.PermAll)) if err != nil { t.Fatal(err) } + currentFetchCount := testSetup.collectionStateFetchCount shouldNotExist(t, sm, "c1") - checkCollectionStateCallback(t, 1, testutil.solrEventListener.collStateEvents+testutil.solrEventListener.collReplicaChangeEvents) + checkCollectionStateCallback(t, 1, testSetup.solrEventListener.collStateEvents+testSetup.solrEventListener.collReplicaChangeEvents) + checkFetchCount(t, currentFetchCount, 2) //state.json fetch and 1 children fetch on PRS from coll.start() _, err = zkCli.Create(sm.solrRoot+"/collections/c1/state.json", nil, 0, zk.WorldACL(zk.PermAll)) if err != nil { @@ -247,9 +259,10 @@ func TestPRSProtocol(t *testing.T) { } shouldNotExist(t, sm, "c1") - checkCollectionStateCallback(t, 2, testutil.solrEventListener.collStateEvents+testutil.solrEventListener.collReplicaChangeEvents) + checkCollectionStateCallback(t, 2, testSetup.solrEventListener.collStateEvents+testSetup.solrEventListener.collReplicaChangeEvents) + checkFetchCount(t, currentFetchCount, 3) //state.json fetch from new state.json - _, err = zkCli.Set(sm.solrRoot+"/collections/c1/state.json", []byte("{\"c1\":{\"perReplicaState\":\"true\", \"shards\":{\"shard_1\":{\"replicas\":{\"R1\":{\"core\":\"core1\"}}}}}}"), -1) + _, err = zkCli.Set(sm.solrRoot+"/collections/c1/state.json", []byte("{\"c1\":{\"perReplicaState\":\"true\", \"shards\":{\"shard_1\":{\"replicas\":{\"R1\":{\"core\":\"core1\", \"state\":\"down\"}}}}}}"), -1) if err != nil { t.Fatal(err) } @@ -262,7 +275,8 @@ func TestPRSProtocol(t *testing.T) { } shouldExist(t, sm, "c1", collectionAssertions) - checkCollectionStateCallback(t, 3, testutil.solrEventListener.collStateEvents+testutil.solrEventListener.collReplicaChangeEvents) + checkCollectionStateCallback(t, 3, testSetup.solrEventListener.collStateEvents+testSetup.solrEventListener.collReplicaChangeEvents) + checkFetchCount(t, currentFetchCount, 4) //state.json fetch from updated state.json // 1. adding PRS for replica R1, version 1, state down _, err = zkCli.Create(sm.solrRoot+"/collections/c1/state.json/R1:1:D", nil, 0, zk.WorldACL(zk.PermAll)) @@ -270,38 +284,51 @@ func TestPRSProtocol(t *testing.T) { t.Fatal(err) } prsShouldExist(t, sm, "c1", "shard_1", "R1", "down", "false", 1) - checkCollectionStateCallback(t, 4, testutil.solrEventListener.collStateEvents+testutil.solrEventListener.collReplicaChangeEvents) + checkCollectionStateCallback(t, 4, testSetup.solrEventListener.collStateEvents+testSetup.solrEventListener.collReplicaChangeEvents) // 2. adding PRS for replica R1, version 1 -same, state active => should ignore as same version _, err = zkCli.Create(sm.solrRoot+"/collections/c1/state.json/R1:1:R", nil, 0, zk.WorldACL(zk.PermAll)) if err != nil { t.Fatal(err) } prsShouldExist(t, sm, "c1", "shard_1", "R1", "down", "false", 1) - checkCollectionStateCallback(t, 5, testutil.solrEventListener.collStateEvents+testutil.solrEventListener.collReplicaChangeEvents) + checkCollectionStateCallback(t, 5, testSetup.solrEventListener.collStateEvents+testSetup.solrEventListener.collReplicaChangeEvents) // 3. adding PRS for replica R1, version 2, state active _, err = zkCli.Create(sm.solrRoot+"/collections/c1/state.json/R1:2:A", nil, 0, zk.WorldACL(zk.PermAll)) if err != nil { t.Fatal(err) } + err = zkCli.Delete(sm.solrRoot+"/collections/c1/state.json/R1:1:D", int32(-1)) + if err != nil { + t.Fatal(err) + } + err = zkCli.Delete(sm.solrRoot+"/collections/c1/state.json/R1:1:R", int32(-1)) + if err != nil { + t.Fatal(err) + } prsShouldExist(t, sm, "c1", "shard_1", "R1", "active", "false", 2) - checkCollectionStateCallback(t, 6, testutil.solrEventListener.collStateEvents+testutil.solrEventListener.collReplicaChangeEvents) + checkCollectionStateCallback(t, 6, testSetup.solrEventListener.collStateEvents+testSetup.solrEventListener.collReplicaChangeEvents) // 4. adding PRS for replica R1, version 3, state active and leader _, err = zkCli.Create(sm.solrRoot+"/collections/c1/state.json/R1:3:A:L", nil, 0, zk.WorldACL(zk.PermAll)) if err != nil { t.Fatal(err) } + err = zkCli.Delete(sm.solrRoot+"/collections/c1/state.json/R1:2:A", int32(-1)) + if err != nil { + t.Fatal(err) + } prsShouldExist(t, sm, "c1", "shard_1", "R1", "active", "true", 3) - checkCollectionStateCallback(t, 7, testutil.solrEventListener.collStateEvents+testutil.solrEventListener.collReplicaChangeEvents) + checkCollectionStateCallback(t, 7, testSetup.solrEventListener.collStateEvents+testSetup.solrEventListener.collReplicaChangeEvents) //5. split shard - _, err = zkCli.Set(sm.solrRoot+"/collections/c1/state.json", []byte("{\"c1\":{\"perReplicaState\":\"true\", \"shards\":{\"shard_1\":{\"replicas\":{\"R1\":{\"core\":\"core1\"}}}, \"shard_1_0\":{\"replicas\":{\"R1_0\":{\"core\":\"core1\"}}}, \"shard_1_1\":{\"replicas\":{\"R1_1\":{\"core\":\"core1\"}}}}}}"), -1) + _, err = zkCli.Set(sm.solrRoot+"/collections/c1/state.json", []byte("{\"c1\":{\"perReplicaState\":\"true\", \"shards\":{\"shard_1\":{\"replicas\":{\"R1\":{\"core\":\"core1\", \"state\":\"down\"}}}, \"shard_1_0\":{\"replicas\":{\"R1_0\":{\"core\":\"core1\", \"state\":\"down\"}}}, \"shard_1_1\":{\"replicas\":{\"R1_1\":{\"core\":\"core1\", \"state\":\"down\"}}}}}}"), -1) if err != nil { t.Fatal(err) } time.Sleep(5000 * time.Millisecond) - checkCollectionStateCallback(t, 8, testutil.solrEventListener.collStateEvents+testutil.solrEventListener.collReplicaChangeEvents) + checkCollectionStateCallback(t, 8, testSetup.solrEventListener.collStateEvents+testSetup.solrEventListener.collReplicaChangeEvents) + checkFetchCount(t, currentFetchCount, 5) //state.json fetch from updated state.json // 6. replica R1_0 should exist _, err = zkCli.Create(sm.solrRoot+"/collections/c1/state.json/R1_0:1:A:L", nil, 0, zk.WorldACL(zk.PermAll)) @@ -309,7 +336,7 @@ func TestPRSProtocol(t *testing.T) { t.Fatal(err) } prsShouldExist(t, sm, "c1", "shard_1_0", "R1_0", "active", "true", 1) - checkCollectionStateCallback(t, 9, testutil.solrEventListener.collStateEvents+testutil.solrEventListener.collReplicaChangeEvents) + checkCollectionStateCallback(t, 9, testSetup.solrEventListener.collStateEvents+testSetup.solrEventListener.collReplicaChangeEvents) // 7. replica R1_1 should exist _, err = zkCli.Create(sm.solrRoot+"/collections/c1/state.json/R1_1:1:A:L", nil, 0, zk.WorldACL(zk.PermAll)) @@ -317,14 +344,52 @@ func TestPRSProtocol(t *testing.T) { t.Fatal(err) } prsShouldExist(t, sm, "c1", "shard_1_1", "R1_1", "active", "true", 1) - checkCollectionStateCallback(t, 10, testutil.solrEventListener.collStateEvents+testutil.solrEventListener.collReplicaChangeEvents) + checkCollectionStateCallback(t, 10, testSetup.solrEventListener.collStateEvents+testSetup.solrEventListener.collReplicaChangeEvents) - if testutil.solrEventListener.collStateEvents != 4 || testutil.solrEventListener.collections != 1 { - t.Fatalf("Event listener didn't not get event for collection = %d, collectionstateEvents = %d", testutil.solrEventListener.collections, testutil.solrEventListener.collStateEvents) + if testSetup.solrEventListener.collStateEvents != 4 || testSetup.solrEventListener.collections != 1 { + t.Fatalf("Event listener didn't not get event for collection = %d, collectionstateEvents = %d", testSetup.solrEventListener.collections, testSetup.solrEventListener.collStateEvents) } // and after all of the updates, should still exist with same config name shouldExist(t, sm, "c1", collectionAssertions) + + //test on a brand new solrmonitor/conn with an existing collection, it should load correctly + logger := smtestutil.NewZkTestLogger(t) + watcher := NewZkWatcherMan(logger) + connOption := func(c *zk.Conn) { c.SetLogger(logger) } + fetchCount := int32(0) + + conn, _, err := zk.Connect([]string{"127.0.0.1:2181"}, time.Second*5, connOption, zk.WithEventCallback(watcher.EventCallback)) + if err != nil { + t.Fatal(err) + } + sm, err = NewSolrMonitorWithRoot(&proxyZkCli{conn, &fetchCount}, watcher, logger, sm.solrRoot, nil) //make a new solrmonitor + if err != nil { + conn.Close() + t.Fatal(err) + } + collState, err := sm.GetCollectionState("c1") + if err != nil { + t.Fatal(err) + } + replicaState := collState.Shards["shard_1"].Replicas["R1"] + if replicaState.State != "active" { + t.Fatalf("Expected replica R1 state active but found %s", replicaState.State) + } + if replicaState.Leader != "true" { + t.Fatalf("Expected replica R1 leadership but found %s", replicaState.Leader) + } + if replicaState.Version != 3 { + t.Fatalf("Expected replica R1 version 3 but found %v", replicaState.Version) + } + //For brevity, just check states for child shards + if collState.Shards["shard_1_0"].Replicas["R1_0"].State != "active" { + t.Fatalf("Expected replica R1_0 state active, but it's not") + } + if collState.Shards["shard_1_1"].Replicas["R1_1"].State != "active" { + t.Fatalf("Expected replica R1_1 state active, but it's not") + } + checkFetchCount(t, &fetchCount, 2) //state.json fetch and 1 children fetch on PRS from coll.start() } func checkCollectionStateCallback(t *testing.T, expected int, found int) { @@ -393,3 +458,59 @@ func (l *SEListener) SolrCollectionReplicaStatesChanged(name string, replicaStat func (l *SEListener) SolrClusterPropsChanged(clusterprops map[string]string) { l.clusterPropChangeEvents++ } + +// proxyZkCli to ensure the zk collection state fetch count is as expected +type proxyZkCli struct { + delegate ZkCli + collectionStateFetchCount *int32 //fetches on collection state.json including PRS entries +} + +var _ ZkCli = &proxyZkCli{} + +func (p *proxyZkCli) Children(path string) ([]string, *zk.Stat, error) { + if strings.Contains(path, "/state.json") { + atomic.AddInt32(p.collectionStateFetchCount, 1) + } + return p.delegate.Children(path) +} + +func (p *proxyZkCli) ChildrenW(path string) ([]string, *zk.Stat, <-chan zk.Event, error) { + if strings.Contains(path, "/state.json") { + atomic.AddInt32(p.collectionStateFetchCount, 1) + } + return p.delegate.ChildrenW(path) +} + +func (p *proxyZkCli) Get(path string) ([]byte, *zk.Stat, error) { + if strings.Contains(path, "/state.json") { + atomic.AddInt32(p.collectionStateFetchCount, 1) + } + return p.delegate.Get(path) +} + +func (p *proxyZkCli) GetW(path string) ([]byte, *zk.Stat, <-chan zk.Event, error) { + if strings.Contains(path, "/state.json") { + atomic.AddInt32(p.collectionStateFetchCount, 1) + } + return p.delegate.GetW(path) +} + +func (p *proxyZkCli) ExistsW(path string) (bool, *zk.Stat, <-chan zk.Event, error) { + return p.delegate.ExistsW(path) +} + +func (p *proxyZkCli) State() zk.State { + return p.delegate.State() +} + +func (p *proxyZkCli) Close() { + p.delegate.Close() +} + +func (p *proxyZkCli) AddPersistentWatch(path string, mode zk.AddWatchMode) (ch zk.EventQueue, err error) { + return p.delegate.AddPersistentWatch(path, mode) +} + +func (p *proxyZkCli) RemoveAllPersistentWatches(path string) (err error) { + return p.delegate.RemoveAllPersistentWatches(path) +} diff --git a/solrmonitor/util_test.go b/solrmonitor/util_test.go index e7c1d5b..7160bd3 100644 --- a/solrmonitor/util_test.go +++ b/solrmonitor/util_test.go @@ -1,6 +1,7 @@ package solrmonitor import ( + "sync/atomic" "testing" "time" ) @@ -109,6 +110,25 @@ func prsShouldExist(t *testing.T, sm *SolrMonitor, name string, shard string, re t.Fatalf("expected collection %s 's replica updated", name) } +func checkFetchCount(t *testing.T, currentFetchCount *int32, expectedFetchCount int32) { + t.Helper() + // Wait a moment before checking, otherwise it might not flag when there are too many fetches + time.Sleep(200 * time.Millisecond) + var count int32 + for end := time.Now().Add(checkTimeout); time.Now().Before(end); { + count = atomic.LoadInt32(currentFetchCount) + if count > expectedFetchCount { + t.Fatalf("fetch count %v exceeded expected count %v", count, expectedFetchCount) + return + } + if count == expectedFetchCount { + return + } + time.Sleep(checkInterval) + } + t.Fatalf("fetch count %v is not equal to expected count %v", count, expectedFetchCount) +} + func shouldNotExist(t *testing.T, sm *SolrMonitor, name string) { t.Helper() diff --git a/solrmonitor/zkwatcherman.go b/solrmonitor/zkwatcherman.go index c23661d..6d4b62c 100644 --- a/solrmonitor/zkwatcherman.go +++ b/solrmonitor/zkwatcherman.go @@ -28,6 +28,11 @@ type Callbacks interface { DataChanged(path string, data string, stat *zk.Stat) error ShouldWatchChildren(path string) bool ShouldWatchData(path string) bool + + //ShouldFetchData indicates on DataChanged, whether we need to fetch the data from the node along with the stat, + //in some cases knowing just the path and whether it's a creation/deletion are good enough + //If this is false, DataChanged will be notified with data "" and stat with version -1 as deletion or 1 as creation + ShouldFetchData(path string) bool } // A MonitorChildren request whose last attempt to set a watch failed. @@ -37,7 +42,15 @@ type deferredChildrenTask struct { // A MonitorData request whose last attempt to set a watch failed. type deferredDataTask struct { - path string + path string + eventType zk.EventType +} + +// Init on a data path, which optionally install a persistent watch and +// fetch the data (recursively if needed) +type deferredInitDataTask struct { + path string + installWatch bool } // Helper class to continuously monitor nodes for state or data changes. @@ -52,6 +65,10 @@ type ZkWatcherMan struct { deferredTasksNotEmpty chan struct{} // signals the monitor loop that a new task was enqueued deferredTaskMu sync.Mutex // guards deferredRecoveryTasks deferredRecoveryTasks fifoTaskQueue + + //on init fetch, the depth to fetch recursive child paths, 1 means fetch the immediate children and stop. + //This is necessary for re-connect + initFetchDepths map[string]int } // Create a ZkWatcherMan to continuously monitor nodes for state and data changes. @@ -65,6 +82,7 @@ func NewZkWatcherMan(logger zk.Logger) *ZkWatcherMan { cancel: cancel, logger: logger, deferredTasksNotEmpty: make(chan struct{}, 1), + initFetchDepths: make(map[string]int), } return ret } @@ -76,18 +94,18 @@ func (m *ZkWatcherMan) EventCallback(evt zk.Event) { case zk.EventNodeCreated, zk.EventNodeDeleted: // Just enqueue both kinds of tasks, we might throw one away later. m.logger.Printf("ZkWatcherMan %s: %s", evt.Type, evt.Path) - m.enqueueDeferredTask(deferredDataTask{evt.Path}) + m.enqueueDeferredTask(deferredDataTask{evt.Path, evt.Type}) m.enqueueDeferredTask(deferredChildrenTask{evt.Path}) case zk.EventNodeDataChanged: m.logger.Printf("ZkWatcherMan data %s: %s", evt.Type, evt.Path) - m.enqueueDeferredTask(deferredDataTask{evt.Path}) + m.enqueueDeferredTask(deferredDataTask{evt.Path, evt.Type}) case zk.EventNodeChildrenChanged: m.logger.Printf("ZkWatcherMan children %s: %s", evt.Type, evt.Path) m.enqueueDeferredTask(deferredChildrenTask{evt.Path}) case zk.EventNotWatching: // Lost ZK session; we'll need to re-register all watches when it comes back. // Just enqueue both kinds of tasks, we might throw them away later. - m.enqueueDeferredTask(deferredDataTask{evt.Path}) + m.enqueueDeferredTask(deferredInitDataTask{evt.Path, true}) m.enqueueDeferredTask(deferredChildrenTask{evt.Path}) default: if evt.Err == zk.ErrClosing { @@ -155,11 +173,29 @@ func (m *ZkWatcherMan) Start(zkCli ZkCli, callbacks Callbacks) { } case deferredDataTask: if callbacks.ShouldWatchData(task.path) { - zkErr, cbErr := m.fetchData(task.path) + if m.callbacks.ShouldFetchData(task.path) { + zkErr, cbErr := m.fetchData(task.path) + if zkErr != nil { + m.logger.Printf("zkwatcherman: error fetching data for %s: %s", task.path, zkErr) + } else if cbErr != nil { + m.logger.Printf("zkwatcherman: error in data callback for %s: %s", task.path, zkErr) + } + success = zkErr == nil + } else { //all data watches are persistent, all we need to do is notify the callback + if task.eventType == zk.EventNodeCreated { + m.callbacks.DataChanged(task.path, "", &zk.Stat{Version: 1}) + } else { + m.callbacks.DataChanged(task.path, "", &zk.Stat{Version: -1}) + } + } + } + case deferredInitDataTask: + if callbacks.ShouldWatchData(task.path) { + zkErr, cbErr := m.initFetchData(task.path, polled.(deferredInitDataTask).installWatch) if zkErr != nil { - m.logger.Printf("zkwatcherman: error fetching data for %s: %s", task.path, zkErr) + m.logger.Printf("zkwatcherman: error init fetching data for %s: %s", task.path, zkErr) } else if cbErr != nil { - m.logger.Printf("zkwatcherman: error in data callback for %s: %s", task.path, zkErr) + m.logger.Printf("zkwatcherman: error in init data callback for %s: %s", task.path, zkErr) } success = zkErr == nil } @@ -226,34 +262,142 @@ func (m *ZkWatcherMan) fetchChildren(path string) (zkErr, cbErr error) { } } -// Begin monitoring the data at the given path, will resolve the current data before returning. +func (m *ZkWatcherMan) MonitorData(path string) error { + return m.monitorData(path, 0) +} + +func (m *ZkWatcherMan) MonitorDataRecursive(path string, initFetchDepth int) error { + return m.monitorData(path, initFetchDepth) +} + +// MonitorData begins monitoring the data at the given path, will resolve the current data before returning. // // Will return either a ZK error if the fetch failed, or propagate any errors returned from // synchronously callbacks. // // Even if this method returns an error, ZkWacherMan will continuously attempt to monitor the given path. -func (m *ZkWatcherMan) MonitorData(path string) error { - zkErr, cbErr := m.fetchData(path) +// fetchDepth is only used when recursive is true, fetch children (and notify cbs) will only go up to this depth. +// for example, if fetchDepth = 1, this will only fetch the immediate children of path, +// notify via m.callbacks.ChildrenChanged and stop there +func (m *ZkWatcherMan) monitorData(path string, initFetchDepth int) error { + if initFetchDepth > 0 { //keep track of this, as for disconnect, we need this piece of info to re-init the watch + m.initFetchDepths[path] = initFetchDepth + } + + zkErr, cbErr := m.initFetchData(path, true) if zkErr != nil { return zkErr } return cbErr } -func (m *ZkWatcherMan) fetchData(path string) (zkErr, cbErr error) { - if data, stat, _, err := getDataAndWatch(m.zkCli, path); err != nil { - if err == zk.ErrClosing { - return nil, nil +func (m *ZkWatcherMan) initFetchRecursively(path string, currentDepth, maxDepth int) (zkErr, cbErr error) { + children, _, err := m.zkCli.Children(path) + if err == zk.ErrNoNode || err == zk.ErrClosing { + return nil, nil + } else if err != nil { + return err, nil + } + if len(children) == 0 { //no need to notify empty children since this is the init fetch + return nil, nil + } + err = m.callbacks.ChildrenChanged(path, children) + if err != nil { + return nil, err + } + + if currentDepth >= maxDepth { + return nil, nil + } + + for _, child := range children { + zkErr, cbErr = m.initFetchRecursively(path+child, currentDepth+1, maxDepth) + if zkErr != nil || cbErr != nil { + return zkErr, cbErr } - // We failed to set a watch; add a task for the recovery thread to keep trying + } + + return nil, nil +} + +// initFetchData first installs a persistent watch (if installWatch is true and fetch the data. +// take note that this might fetch recursively on the children if initFetchDepths is defined for such path +func (m *ZkWatcherMan) initFetchData(path string, installWatch bool) (zkErr, cbErr error) { + fetchDepth, isRecursive := m.initFetchDepths[path] + + //can ignore the returned channel as zkwatcherman relies on its EventCallback being wired up + //to the zk.Conn as a global callback option + if installWatch { + if _, err := m.addPersistentWatch(path, isRecursive); err != nil { + // We failed to set a watch; add a task for the recovery thread to keep trying + m.logger.Printf("ZkWatcherMan %s: error installing watch: %s", path, err) + m.enqueueDeferredTask(deferredInitDataTask{path, installWatch}) + return err, nil + } + } + + //TODO ordering matter? what if child changes come in between the watch and fetch? or the other way around? + dataBytes, stat, err := m.zkCli.Get(path) + if err == zk.ErrClosing { //if closing simply return nil and do nothing + return nil, nil + } + var data string + //for ErrNoNode, we use data = "". This is to maintain same behavior as previous getDataAndWatch + if err == zk.ErrNoNode { + data = "" + } else if err != nil { + m.logger.Printf("ZkWatcherMan %s: error fetching data on init: %s", path, err) + m.enqueueDeferredTask(deferredInitDataTask{path, false}) + return err, nil + } + data = string(dataBytes) + err = m.callbacks.DataChanged(path, data, stat) + if err != nil { + return nil, err + } + + if isRecursive { + zkErr, cbErr = m.initFetchRecursively(path, 1, fetchDepth) + //try again if it's zk error + if zkErr != nil { + m.logger.Printf("ZkWatcherMan %s: error fetching recursive data on init: %s", path, err) + m.enqueueDeferredTask(deferredInitDataTask{path, false}) + } + } + return zkErr, cbErr +} + +// fetchData fetches the data of the path and then notifies the +// callbacks registered. No watch will be installed during this process. +// +// Take note that if such path does not exist, it will still notify the callbacks +// with empty "" +func (m *ZkWatcherMan) fetchData(path string) (zkErr, cbErr error) { + dataBytes, stat, err := m.zkCli.Get(path) + if err == zk.ErrClosing { //if closing simply return nil and do nothing + return nil, nil + } + + var data string + //for ErrNoNode, we use data = "". This is to maintain same behavior as previous getDataAndWatch + if err == zk.ErrNoNode { + data = "" + } else if err != nil { //unexpected error, try again m.logger.Printf("ZkWatcherMan %s: error getting data: %s", path, err) m.enqueueDeferredTask(deferredDataTask{path: path}) return err, nil } else { - return nil, m.callbacks.DataChanged(path, data, stat) + data = string(dataBytes) } + return nil, m.callbacks.DataChanged(path, data, stat) } +func (m *ZkWatcherMan) StopMonitorData(path string) { + m.zkCli.RemoveAllPersistentWatches(path) + delete(m.initFetchDepths, path) +} + +// TODO flag on permanent or not func getChildrenAndWatch(zkCli ZkCli, path string) ([]string, <-chan zk.Event, error) { for { children, _, childrenWatch, err := zkCli.ChildrenW(path) @@ -281,28 +425,18 @@ func getChildrenAndWatch(zkCli ZkCli, path string) ([]string, <-chan zk.Event, e } } -func getDataAndWatch(zkCli ZkCli, path string) (string, *zk.Stat, <-chan zk.Event, error) { - for { - data, stat, dataWatch, err := zkCli.GetW(path) - if err == nil { - // Success, we're done. - return string(data), stat, dataWatch, nil - } - - if err == zk.ErrNoNode { - // Node doesn't exist; add an existence watch. - exists, _, existsWatch, err := zkCli.ExistsW(path) - if err != nil { - return "", nil, nil, err - } - if exists { - // Improbable, but possible; first we checked and it wasn't there, then we checked and it was. - // Just loop and try again. - continue - } - return "", nil, existsWatch, nil - } - - return "", nil, nil, err +func (m *ZkWatcherMan) addPersistentWatch(path string, recursive bool) (zk.EventQueue, error) { + var addWatchMode zk.AddWatchMode + if recursive { + addWatchMode = zk.AddWatchModePersistentRecursive + m.logger.Printf("Adding persistent watch (recursive) on %s", path) + } else { + addWatchMode = zk.AddWatchModePersistent + m.logger.Printf("Adding persistent watch (non-recursive) on %s", path) + } + eventQueue, err := m.zkCli.AddPersistentWatch(path, addWatchMode) + if err != nil { + return nil, err } + return eventQueue, err }