Skip to content

Commit

Permalink
DAOS-14834 control: Enable parallel server->engine dRPCs (#14193)
Browse files Browse the repository at this point in the history
The idea here is to remove the bottleneck in daos_server that serializes
dRPC calls, to enable daos_server to pass along multiple dRPC calls even
if the first one hasn't yet returned. In the current master branch, we
have a single dRPC client structure that uses RW locks to control access
to its internals. dRPC calls that take a long time can potentially impede
other commands.

My proposed solution is to create a new drpc.ClientConnection for each
command that needs to be sent to the daos_engine. Each command is handled
on its own connection. We were using a connect->send->disconnect pattern
on the client connection anyway.

Required-githooks: true
Change-Id: Ibe5a03d28ecc5099b5827ef22fbbace9e3d8b963
Signed-off-by: Kris Jacque <[email protected]>
  • Loading branch information
kjacque authored and mjmac committed May 14, 2024
1 parent f16a7dd commit 3190405
Show file tree
Hide file tree
Showing 15 changed files with 268 additions and 119 deletions.
2 changes: 1 addition & 1 deletion src/control/cmd/daos_agent/security_rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func setupTestUnixConn(t *testing.T) (*net.UnixConn, func()) {
return newConn, cleanup
}

func getClientConn(t *testing.T, path string) *drpc.ClientConnection {
func getClientConn(t *testing.T, path string) drpc.DomainSocketClient {
client := drpc.NewClientConnection(path)
if err := client.Connect(test.Context(t)); err != nil {
t.Fatalf("Failed to connect: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion src/control/drpc/drpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (c *ClientConnection) GetSocketPath() string {
}

// NewClientConnection creates a new dRPC client
func NewClientConnection(socket string) *ClientConnection {
func NewClientConnection(socket string) DomainSocketClient {
return &ClientConnection{
socketPath: socket,
dialer: &clientDialer{},
Expand Down
5 changes: 3 additions & 2 deletions src/control/drpc/drpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,13 @@ func TestNewClientConnection(t *testing.T) {
t.Fatal("Expected a real client")
return
}
test.AssertEqual(t, client.socketPath, testSockPath,
clientConn := client.(*ClientConnection)
test.AssertEqual(t, clientConn.socketPath, testSockPath,
"Should match the path we passed in")
test.AssertFalse(t, client.IsConnected(), "Shouldn't be connected yet")

// Dialer should be the private implementation type
_ = client.dialer.(*clientDialer)
_ = clientConn.dialer.(*clientDialer)
}

func TestClient_Connect_Success(t *testing.T) {
Expand Down
12 changes: 9 additions & 3 deletions src/control/server/ctl_ranks_rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ func TestServer_CtlSvc_PrepShutdownRanks(t *testing.T) {
cfg.setResponseDelay(tc.responseDelay)
}
}
srv.setDrpcClient(newMockDrpcClient(cfg))
srv.getDrpcClientFn = func(s string) drpc.DomainSocketClient {
return newMockDrpcClient(cfg)
}
}

var cancel context.CancelFunc
Expand Down Expand Up @@ -580,7 +582,9 @@ func TestServer_CtlSvc_PingRanks(t *testing.T) {
cfg.setResponseDelay(tc.responseDelay)
}
}
srv.setDrpcClient(newMockDrpcClient(cfg))
srv.getDrpcClientFn = func(string) drpc.DomainSocketClient {
return newMockDrpcClient(cfg)
}
}

ctx, outerCancel := context.WithCancel(test.Context(t))
Expand Down Expand Up @@ -1092,7 +1096,9 @@ func TestServer_CtlSvc_SetEngineLogMasks(t *testing.T) {
cfg.setResponseDelay(tc.responseDelay)
}
}
srv.setDrpcClient(newMockDrpcClient(cfg))
srv.getDrpcClientFn = func(s string) drpc.DomainSocketClient {
return newMockDrpcClient(cfg)
}
}

gotResp, gotErr := svc.SetEngineLogMasks(test.Context(t), tc.req)
Expand Down
10 changes: 8 additions & 2 deletions src/control/server/ctl_smd_rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,10 @@ func TestServer_CtlSvc_SmdQuery(t *testing.T) {
cfg.setSendMsgResponseList(t, mock)
}
}
srv.setDrpcClient(newMockDrpcClient(cfg))
cli := newMockDrpcClient(cfg)
srv.getDrpcClientFn = func(s string) drpc.DomainSocketClient {
return cli
}
srv.ready.SetTrue()
}
if tc.harnessStopped {
Expand Down Expand Up @@ -1627,7 +1630,10 @@ func TestServer_CtlSvc_SmdManage(t *testing.T) {
cfg.setSendMsgResponseList(t, mock)
}
}
srv.setDrpcClient(newMockDrpcClient(cfg))
cli := newMockDrpcClient(cfg)
srv.getDrpcClientFn = func(s string) drpc.DomainSocketClient {
return cli
}
srv.ready.SetTrue()
}
if tc.harnessStopped {
Expand Down
5 changes: 4 additions & 1 deletion src/control/server/ctl_storage_rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1626,7 +1626,10 @@ func TestServer_CtlSvc_StorageScan_PostEngineStart(t *testing.T) {
} else {
t.Fatal("drpc response mocks unpopulated")
}
te.setDrpcClient(newMockDrpcClient(dcc))
cli := newMockDrpcClient(dcc)
te.getDrpcClientFn = func(string) drpc.DomainSocketClient {
return cli
}
te._superblock.Rank = ranklist.NewRankPtr(uint32(idx + 1))
for _, tc := range te.storage.GetBdevConfigs() {
tc.Bdev.DeviceRoles.OptionBits = storage.OptionBits(storage.BdevRoleAll)
Expand Down
3 changes: 2 additions & 1 deletion src/control/server/ctl_svc_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// (C) Copyright 2019-2023 Intel Corporation.
// (C) Copyright 2019-2024 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand Down Expand Up @@ -66,6 +66,7 @@ func mockControlService(t *testing.T, log logging.Logger, cfg *config.Server, bm
})
if started {
ei.ready.SetTrue()
ei.setDrpcSocket("/dontcare")
}
if err := cs.harness.AddInstance(ei); err != nil {
t.Fatal(err)
Expand Down
19 changes: 10 additions & 9 deletions src/control/server/harness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,13 +294,17 @@ func TestServer_Harness_Start(t *testing.T) {
}

instances := harness.Instances()

mockDrpcClients := make([]*mockDrpcClient, 0, len(instances))
// set mock dRPC client to record call details
for _, e := range instances {
ei := e.(*EngineInstance)
ei.setDrpcClient(newMockDrpcClient(&mockDrpcClientConfig{
cli := newMockDrpcClient(&mockDrpcClientConfig{
SendMsgResponse: &drpc.Response{},
}))
})
mockDrpcClients = append(mockDrpcClients, cli)
ei.getDrpcClientFn = func(s string) drpc.DomainSocketClient {
return cli
}
}

ctx, cancel := context.WithCancel(test.Context(t))
Expand Down Expand Up @@ -417,13 +421,10 @@ func TestServer_Harness_Start(t *testing.T) {
defer joinMu.Unlock()
// verify expected RPCs were made, ranks allocated and
// members added to membership
for _, e := range instances {
for i, e := range instances {
ei := e.(*EngineInstance)
dc, err := ei.getDrpcClient()
if err != nil {
t.Fatal(err)
}
gotDrpcCalls := dc.(*mockDrpcClient).CalledMethods()
dc := mockDrpcClients[i]
gotDrpcCalls := dc.CalledMethods()
AssertEqual(t, tc.expDrpcCalls[ei.Index()], gotDrpcCalls,
fmt.Sprintf("%s: unexpected dRPCs for instance %d", name, ei.Index()))

Expand Down
9 changes: 3 additions & 6 deletions src/control/server/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ type EngineInstance struct {
onStorageReady []onStorageReadyFn
onReady []onReadyFn
onInstanceExit []onInstanceExitFn
getDrpcClientFn func(string) drpc.DomainSocketClient

sync.RWMutex
// these must be protected by a mutex in order to
// avoid racy access.
_drpcSocket string
_cancelCtx context.CancelFunc
_drpcClient drpc.DomainSocketClient
_superblock *Superblock
_lastErr error // populated when harness receives signal
}
Expand Down Expand Up @@ -162,11 +163,7 @@ func (ei *EngineInstance) Index() uint32 {
func (ei *EngineInstance) removeSocket() error {
fMsg := fmt.Sprintf("removing instance %d socket file", ei.Index())

dc, err := ei.getDrpcClient()
if err != nil {
return errors.Wrap(err, fMsg)
}
engineSock := dc.GetSocketPath()
engineSock := ei.getDrpcSocket()

if err := checkDrpcClientSocketPath(engineSock); err != nil {
return errors.Wrap(err, fMsg)
Expand Down
37 changes: 23 additions & 14 deletions src/control/server/instance_drpc.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// (C) Copyright 2020-2023 Intel Corporation.
// (C) Copyright 2020-2024 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand Down Expand Up @@ -28,32 +28,37 @@ import (
)

var (
errDRPCNotReady = errors.New("no dRPC client set (data plane not started?)")
errDRPCNotReady = errors.New("dRPC socket not ready (data plane not started?)")
errEngineNotReady = errors.New("engine not ready yet")
)

func (ei *EngineInstance) setDrpcClient(c drpc.DomainSocketClient) {
func (ei *EngineInstance) setDrpcSocket(sock string) {
ei.Lock()
defer ei.Unlock()
ei._drpcClient = c
ei._drpcSocket = sock
}

func (ei *EngineInstance) getDrpcClient() (drpc.DomainSocketClient, error) {
func (ei *EngineInstance) getDrpcSocket() string {
ei.RLock()
defer ei.RUnlock()
if ei._drpcClient == nil {
return nil, errDRPCNotReady
return ei._drpcSocket
}

func (ei *EngineInstance) getDrpcClient() drpc.DomainSocketClient {
ei.Lock()
defer ei.Unlock()
if ei.getDrpcClientFn == nil {
ei.getDrpcClientFn = drpc.NewClientConnection
}
return ei._drpcClient, nil
return ei.getDrpcClientFn(ei._drpcSocket)
}

// NotifyDrpcReady receives a ready message from the running Engine
// instance.
func (ei *EngineInstance) NotifyDrpcReady(msg *srvpb.NotifyReadyReq) {
ei.log.Debugf("%s instance %d drpc ready: %v", build.DataPlaneName, ei.Index(), msg)

// activate the dRPC client connection to this engine
ei.setDrpcClient(drpc.NewClientConnection(msg.DrpcListenerSock))
ei.setDrpcSocket(msg.DrpcListenerSock)

go func() {
ei.drpcReady <- msg
Expand All @@ -67,11 +72,12 @@ func (ei *EngineInstance) awaitDrpcReady() chan *srvpb.NotifyReadyReq {
return ei.drpcReady
}

func (ei *EngineInstance) isDrpcSocketReady() bool {
return ei.getDrpcSocket() != ""
}

func (ei *EngineInstance) callDrpc(ctx context.Context, method drpc.Method, body proto.Message) (*drpc.Response, error) {
dc, err := ei.getDrpcClient()
if err != nil {
return nil, err
}
dc := ei.getDrpcClient()

rankMsg := ""
if sb := ei.getSuperblock(); sb != nil && sb.Rank != nil {
Expand All @@ -94,6 +100,9 @@ func (ei *EngineInstance) CallDrpc(ctx context.Context, method drpc.Method, body
if !ei.IsReady() {
return nil, errEngineNotReady
}
if !ei.isDrpcSocketReady() {
return nil, errDRPCNotReady
}

return ei.callDrpc(ctx, method, body)
}
Expand Down
Loading

0 comments on commit 3190405

Please sign in to comment.