Skip to content
This repository has been archived by the owner on Jan 30, 2020. It is now read-only.

registry: remove units from etcd registry upon DestroyUnit() #1510

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build-env
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ export PATH="${GOROOT}/bin:${PATH}"
export FLEETD_BIN="${CDIR}/bin/fleetd"
export FLEETCTL_BIN="${CDIR}/bin/fleetctl"
export FLEETD_TEST_ENV="enable_grpc=false"
export ETCDCTL_BIN="/usr/bin/etcdctl"
2 changes: 2 additions & 0 deletions functional/platform/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type Cluster interface {
WaitForNActiveUnits(Member, int) (map[string][]util.UnitState, error)
WaitForNUnitFiles(Member, int) (map[string][]util.UnitFileState, error)
WaitForNMachines(Member, int) ([]string, error)

Keyspace() string
}

func CreateNClusterMembers(cl Cluster, count int) ([]Member, error) {
Expand Down
6 changes: 3 additions & 3 deletions functional/platform/nspawn.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (nc *nspawnCluster) nextID() string {
return strconv.Itoa(nc.maxID)
}

func (nc *nspawnCluster) keyspace() string {
func (nc *nspawnCluster) Keyspace() string {
// TODO(jonboulle): generate this dynamically with atomic in order keys?
return fmt.Sprintf("/fleet_functional/%s", nc.name)
}
Expand Down Expand Up @@ -367,7 +367,7 @@ func (nc *nspawnCluster) buildConfigDrive(dir, ip string) error {
defer userFile.Close()

etcd := "http://172.18.0.1:4001"
return util.BuildCloudConfig(userFile, ip, etcd, nc.keyspace())
return util.BuildCloudConfig(userFile, ip, etcd, nc.Keyspace())
}

func (nc *nspawnCluster) Members() []Member {
Expand Down Expand Up @@ -601,7 +601,7 @@ func (nc *nspawnCluster) Destroy(t *testing.T) error {
// TODO(bcwaldon): This returns 4 on success, but we can't easily
// ignore just that return code. Ignore the returned error
// altogether until this is fixed.
run("etcdctl rm --recursive " + nc.keyspace())
run("etcdctl rm --recursive " + nc.Keyspace())

run("ip link del fleet0")

Expand Down
148 changes: 148 additions & 0 deletions functional/unit_action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package functional

import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
Expand All @@ -26,6 +27,7 @@ import (

"github.com/coreos/fleet/functional/platform"
"github.com/coreos/fleet/functional/util"
"github.com/coreos/fleet/unit"
)

const (
Expand Down Expand Up @@ -678,3 +680,149 @@ func waitForNUnitsCmd(cl platform.Cluster, m platform.Member, cmd string, nu int
// frequently than before, it's a huge pain to make it succeed every time.
// The failure brings a negative impact on productivity. So remove the entire
// test for now. - dpark 20160829

// TestUnitDestroyFromRegistry() checks for a submitted unit being removed
// from the etcd registry. It compares a local unit body with the unit in
// the etcd registry, to verify the body is identical.
func TestUnitDestroyFromRegistry(t *testing.T) {
cluster, err := platform.NewNspawnCluster("smoke")
if err != nil {
t.Fatal(err)
}
defer cluster.Destroy(t)

m, err := cluster.CreateMember()
if err != nil {
t.Fatal(err)
}
_, err = cluster.WaitForNMachines(m, 1)
if err != nil {
t.Fatal(err)
}

// submit a unit and assert it shows up
if _, _, err := cluster.Fleetctl(m, "submit", "fixtures/units/hello.service"); err != nil {
t.Fatalf("Unable to submit fleet unit: %v", err)
}
stdout, _, err := cluster.Fleetctl(m, "list-units", "--no-legend")
if err != nil {
t.Fatalf("Failed to run list-units: %v", err)
}
units := strings.Split(strings.TrimSpace(stdout), "\n")
if len(units) != 1 {
t.Fatalf("Did not find 1 unit in cluster: \n%s", stdout)
}

// cat the unit and compare it with the value in etcd registry
unitBody, _, err := cluster.Fleetctl(m, "cat", "hello.service")
if err != nil {
t.Fatalf("Unable to retrieve the fleet unit: %v", err)
}

var hashUnit string
if hashUnit, err = retrieveJobObjectHash(cluster, "hello.service"); err != nil {
t.Fatalf("Failed to retrieve hash of job object hello.service: %v", err)
}

var regBody string
if regBody, err = retrieveUnitBody(cluster, hashUnit); err != nil {
t.Fatalf("Failed to retrieve unit body for hello.service: %v", err)
}

// compare it with unitBody
if regBody != unitBody {
t.Fatalf("Failed to verify fleet unit: %v", err)
}

// destroy the unit again
if _, _, err := cluster.Fleetctl(m, "destroy", "hello.service"); err != nil {
t.Fatalf("Failed to destroy unit: %v", err)
}

stdout, _, err = cluster.Fleetctl(m, "list-units", "--no-legend")
if err != nil {
t.Fatalf("Failed to run list-units: %v", err)
}
units = strings.Split(strings.TrimSpace(stdout), "\n")
if len(stdout) != 0 && len(units) != 1 {
t.Fatalf("Did not find 1 unit in cluster: \n%s", stdout)
}

// check for the unit being destroyed from the etcd registry,
// /fleet_functional/smoke/unit/.
// NOTE: do not check error of etcdctl, as it returns 4 on an empty list.
etcdUnitPrefix := path.Join(cluster.Keyspace(), "unit")
etcdUnitPath := path.Join(etcdUnitPrefix, hashUnit)
stdout, _, _ = util.RunEtcdctl("ls", etcdUnitPath)
units = strings.Split(strings.TrimSpace(stdout), "\n")
if len(stdout) != 0 && len(units) != 1 {
t.Fatalf("The unit still remains in the registry: %v")
}
}

// retrieveJobObjectHash fetches the job hash value from
// /fleet_functional/smoke/job/<jobName>/object in the etcd registry.
func retrieveJobObjectHash(cluster platform.Cluster, jobName string) (hash string, err error) {
etcdJobPrefix := path.Join(cluster.Keyspace(), "job")
etcdJobPath := path.Join(etcdJobPrefix, jobName, "object")

var stdout string
if stdout, _, err = util.RunEtcdctl("ls", etcdJobPath); err != nil {
return "", fmt.Errorf("Failed to list a unit from the registry: %v", err)
}
units := strings.Split(strings.TrimSpace(stdout), "\n")
if len(stdout) == 0 || len(units) == 0 {
return "", fmt.Errorf("No such unit in the registry: %v", err)
}

stdout, _, err = util.RunEtcdctl("get", etcdJobPath)
stdout = strings.TrimSpace(stdout)
objectBody := strings.Split(stdout, "\n")
if err != nil || len(stdout) == 0 || len(objectBody) == 0 {
return "", fmt.Errorf("Failed to get unit from the registry: %v", err)
}

type jobModel struct {
Name string
UnitHash unit.Hash
}
var jm jobModel
if err = json.Unmarshal([]byte(stdout), &jm); err != nil {
return "", fmt.Errorf("Failed to unmarshal fleet unit in the registry: %v", err)
}

return jm.UnitHash.String(), nil
}

// retrieveUnitBody fetches unit body from /fleet_functional/smoke/unit/<hash>
// in the etcd registry.
func retrieveUnitBody(cluster platform.Cluster, hashUnit string) (regBody string, err error) {
etcdUnitPrefix := path.Join(cluster.Keyspace(), "unit")
etcdUnitPath := path.Join(etcdUnitPrefix, hashUnit)

var stdout string
if stdout, _, err = util.RunEtcdctl("ls", etcdUnitPath); err != nil {
return "", fmt.Errorf("Failed to list a unit from the registry: %v", err)
}

units := strings.Split(strings.TrimSpace(stdout), "\n")
if len(stdout) == 0 || len(units) == 0 {
return "", fmt.Errorf("No such unit in the registry: %v", err)
}
stdout, _, err = util.RunEtcdctl("get", etcdUnitPath)
stdout = strings.TrimSpace(stdout)
unitBody := strings.Split(stdout, "\n")
if err != nil || len(stdout) == 0 || len(unitBody) == 0 {
return "", fmt.Errorf("Failed to get unit from the registry: %v", err)
}

type rawModel struct {
Raw string
}

var rm rawModel
if err = json.Unmarshal([]byte(stdout), &rm); err != nil {
return "", fmt.Errorf("Failed to unmarshal fleet unit in the registry: %v", err)
}
return rm.Raw, nil
}
42 changes: 42 additions & 0 deletions functional/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
)

var fleetctlBinPath string
var etcdctlBinPath string

func init() {
fleetctlBinPath = os.Getenv("FLEETCTL_BIN")
Expand All @@ -39,6 +40,15 @@ func init() {
os.Exit(1)
}

etcdctlBinPath = os.Getenv("ETCDCTL_BIN")
if etcdctlBinPath == "" {
fmt.Println("ETCDCTL_BIN environment variable must be set")
os.Exit(1)
} else if _, err := os.Stat(etcdctlBinPath); err != nil {
fmt.Printf("%v\n", err)
os.Exit(1)
}

if os.Getenv("SSH_AUTH_SOCK") == "" {
fmt.Println("SSH_AUTH_SOCK environment variable must be set")
os.Exit(1)
Expand Down Expand Up @@ -79,6 +89,38 @@ func RunFleetctlWithInput(input string, args ...string) (string, string, error)
return stdoutBytes.String(), stderrBytes.String(), err
}

func RunEtcdctl(args ...string) (string, string, error) {
log.Printf("%s %s", etcdctlBinPath, strings.Join(args, " "))
var stdoutBytes, stderrBytes bytes.Buffer
cmd := exec.Command(etcdctlBinPath, args...)
cmd.Stdout = &stdoutBytes
cmd.Stderr = &stderrBytes
err := cmd.Run()
return stdoutBytes.String(), stderrBytes.String(), err
}

func RunEtcdctlWithInput(input string, args ...string) (string, string, error) {
log.Printf("%s %s", etcdctlBinPath, strings.Join(args, " "))
var stdoutBytes, stderrBytes bytes.Buffer
cmd := exec.Command(etcdctlBinPath, args...)
cmd.Stdout = &stdoutBytes
cmd.Stderr = &stderrBytes
stdin, err := cmd.StdinPipe()
if err != nil {
return "", "", err
}

if err = cmd.Start(); err != nil {
return "", "", err
}

stdin.Write([]byte(input))
stdin.Close()
err = cmd.Wait()

return stdoutBytes.String(), stderrBytes.String(), err
}

// ActiveToSingleStates takes a map of active states (such as that returned by
// WaitForNActiveUnits) and ensures that each unit has at most a single active
// state. It returns a mapping of unit name to a single UnitState.
Expand Down
Loading