Skip to content

Commit

Permalink
Merge pull request #108 from fengjingchao/master
Browse files Browse the repository at this point in the history
Defragment framework interfaces into context based calls
  • Loading branch information
Hongchao Deng committed Jan 23, 2015
2 parents 7e62eb3 + 2431536 commit 65e2e8c
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 72 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ TaskGraph
[![Build Status](https://travis-ci.org/go-distributed/meritop.svg)](https://travis-ci.org/go-distributed/meritop)


TaskGraph is a framework for writing fault tolerent distributed applications. It assumes that application consists of a network of tasks, which are inter-connected based on certain topology (hence graph). TaskGraph assume for each task (logical unit of work), there are one primary node, and zero or more backup nodes. TaskGraph then help with two types of node failure: failure happens to nodes from different task, failure happens to the nodes from the same task. Framework monitors the task/node's health, and take care of restarting the failed tasks, and also pass on a standard set of events (parent/children fail/restart, primary/backup switch) to task implementation so that it can do application dependent recovery.
TaskGraph is a framework for writing fault tolerent distributed applications. It assumes that application consists of a network of tasks, which are inter-connected based on certain topology (hence graph). TaskGraph assume for each task (logical unit of work), there are one primary node, and zero or more backup nodes. TaskGraph then help with two types of node failure: failure happens to nodes from different task, failure happens to the nodes from the same task. Framework monitors the task/node's health, and take care of restarting the failed tasks, and also pass on a standard set of events (parent/children fail/restart, primary/backup switch) to task implementation so that it can do application dependent recovery.


TaskGrpah supports an event driven pull model for communication. When one task want to talk to some other task, it set a flag via framework, and framework will notify recipient, which can decide whether or when to fetch data via Framework. Framework will handle communication failures via automatic retry, reconnect to new task node, etc. In another word, it provides fetch-exactly-once semantic on data request.
TaskGrpah supports an event driven pull model for communication. When one task want to talk to some other task, it set a flag via framework, and framework will notify recipient, which can decide whether or when to fetch data via Framework. Framework will handle communication failures via automatic retry, reconnect to new task node, etc. In another word, it provides at-least-once semantic on data request.


An TaskGraph application usually has three layers. And application implementation need to configure TaskGraph in driver layer and also implement Task/TaskBuilder/Topology based on application logic.
An TaskGraph application usually has three layers. And application implementation need to configure TaskGraph in driver layer and also implement Task/TaskBuilder/Topology based on application logic.

1. In driver (main function), applicaiton need to configure the task graph. This include setting up TaskBuilder, which specify what task need to run as each node. One also need to specify the network topology which specify who connect to whom at each iteration. Then FrameWork.Start is called so that every node will get into event loop.
1. In driver (main function), applicaiton need to configure the task graph. This include setting up TaskBuilder, which specify what task need to run as each node. One also need to specify the network topology which specify who connect to whom at each iteration. Then FrameWork.Start is called so that every node will get into event loop.

2. TaskGraph framework handles fault tolerency within the framework. It uses etcd and/or kubernetes for this purpose. It should also handle the communication between logic task so that it can hide the communication between master and hot standby.

Expand Down
17 changes: 11 additions & 6 deletions framework/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,11 @@ func (f *framework) run() {
if meta.epoch != f.epoch {
break
}
go f.handleMetaChange(meta.who, meta.from, meta.meta)
// We need to create a context before handling next event. The context saves
// the epoch that was meant for this event. This context will be passed
// to user event handler functions and used to ask framework to do work later
// with previous information.
go f.handleMetaChange(f.createContext(), meta.who, meta.from, meta.meta)
case req := <-f.dataReqtoSendChan:
if req.epoch != f.epoch {
f.log.Printf("epoch mismatch: task %d, req-to-send epoch: %d, current epoch: %d",
Expand Down Expand Up @@ -138,13 +142,13 @@ func (f *framework) run() {
f.taskID, resp.Epoch, f.epoch)
break
}
go f.handleDataResp(resp)
go f.handleDataResp(f.createContext(), resp)
}
}
}

func (f *framework) setEpochStarted() {
f.task.SetEpoch(f.epoch)
f.task.SetEpoch(f.createContext(), f.epoch)

// setup etcd watches
// - create self's parent and child meta flag
Expand Down Expand Up @@ -247,11 +251,12 @@ func (f *framework) watchAll(who taskRole, taskIDs []uint64) {
}
f.metaStops = append(f.metaStops, stops...)
}
func (f *framework) handleMetaChange(who taskRole, taskID uint64, meta string) {

func (f *framework) handleMetaChange(ctx meritop.Context, who taskRole, taskID uint64, meta string) {
switch who {
case roleParent:
f.task.ParentMetaReady(taskID, meta)
f.task.ParentMetaReady(ctx, taskID, meta)
case roleChild:
f.task.ChildMetaReady(taskID, meta)
f.task.ChildMetaReady(ctx, taskID, meta)
}
}
29 changes: 29 additions & 0 deletions framework/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package framework

type context struct {
epoch uint64
f *framework
}

func (f *framework) createContext() *context {
return &context{
epoch: f.epoch,
f: f,
}
}

func (c *context) FlagMetaToParent(meta string) {
c.f.flagMetaToParent(meta, c.epoch)
}

func (c *context) FlagMetaToChild(meta string) {
c.f.flagMetaToChild(meta, c.epoch)
}

func (c *context) IncEpoch() {
c.f.incEpoch(c.epoch)
}

func (c *context) DataRequest(toID uint64, req string) {
c.f.dataRequest(toID, req, c.epoch)
}
11 changes: 6 additions & 5 deletions framework/data_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package framework
import (
"net/http"

"github.com/go-distributed/meritop"
"github.com/go-distributed/meritop/framework/frameworkhttp"
"github.com/go-distributed/meritop/pkg/etcdutil"
"github.com/go-distributed/meritop/pkg/topoutil"
Expand All @@ -18,10 +19,10 @@ func (f *framework) sendRequest(dr *dataRequest) {
d, err := frameworkhttp.RequestData(addr, dr.req, f.taskID, dr.taskID, dr.epoch, f.log)
if err != nil {
if err == frameworkhttp.ErrReqEpochMismatch {
f.log.Printf("Epoch mismatch error from server")
f.log.Printf("task %d got epoch mismatch error from server", f.taskID)
return
}
f.log.Printf("RequestData failed: %v", err)
f.log.Printf("task %d RequestData failed: %v", f.taskID, err)
return
}
f.dataRespChan <- d
Expand Down Expand Up @@ -105,12 +106,12 @@ func (f *framework) handleDataReq(dr *dataRequest) {
}
}

func (f *framework) handleDataResp(resp *frameworkhttp.DataResponse) {
func (f *framework) handleDataResp(ctx meritop.Context, resp *frameworkhttp.DataResponse) {
switch {
case topoutil.IsParent(f.topology, resp.Epoch, resp.TaskID):
f.task.ParentDataReady(resp.TaskID, resp.Req, resp.Data)
f.task.ParentDataReady(ctx, resp.TaskID, resp.Req, resp.Data)
case topoutil.IsChild(f.topology, resp.Epoch, resp.TaskID):
f.task.ChildDataReady(resp.TaskID, resp.Req, resp.Data)
f.task.ChildDataReady(ctx, resp.TaskID, resp.Req, resp.Data)
default:
f.log.Panic("unexpected")
}
Expand Down
18 changes: 9 additions & 9 deletions framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,17 @@ type framework struct {
dataRespChan chan *frameworkhttp.DataResponse
}

func (f *framework) FlagMetaToParent(meta string) {
value := fmt.Sprintf("%d-%s", f.epoch, meta)
func (f *framework) flagMetaToParent(meta string, epoch uint64) {
value := fmt.Sprintf("%d-%s", epoch, meta)
_, err := f.etcdClient.Set(etcdutil.ParentMetaPath(f.name, f.GetTaskID()), value, 0)
if err != nil {
f.log.Fatalf("etcdClient.Set failed; key: %s, value: %s, error: %v",
etcdutil.ParentMetaPath(f.name, f.GetTaskID()), value, err)
}
}

func (f *framework) FlagMetaToChild(meta string) {
value := fmt.Sprintf("%d-%s", f.epoch, meta)
func (f *framework) flagMetaToChild(meta string, epoch uint64) {
value := fmt.Sprintf("%d-%s", epoch, meta)
_, err := f.etcdClient.Set(etcdutil.ChildMetaPath(f.name, f.GetTaskID()), value, 0)
if err != nil {
f.log.Fatalf("etcdClient.Set failed; key: %s, value: %s, error: %v",
Expand All @@ -67,22 +67,22 @@ func (f *framework) FlagMetaToChild(meta string) {
// When app code invoke this method on framework, we simply
// update the etcd epoch to next uint64. All nodes should watch
// for epoch and update their local epoch correspondingly.
func (f *framework) IncEpoch() {
err := etcdutil.CASEpoch(f.etcdClient, f.name, f.epoch, f.epoch+1)
func (f *framework) incEpoch(epoch uint64) {
err := etcdutil.CASEpoch(f.etcdClient, f.name, epoch, epoch+1)
if err != nil {
f.log.Fatalf("task %d Epoch CompareAndSwap(%d, %d) failed: %v",
f.taskID, f.epoch+1, f.epoch, err)
f.taskID, f.epoch+1, epoch, err)
}
}

func (f *framework) DataRequest(toID uint64, req string) {
func (f *framework) dataRequest(toID uint64, req string, epoch uint64) {
// assumption here:
// Event driven task will call this in a synchronous way so that
// the epoch won't change at the time task sending this request.
// Epoch may change, however, before the request is actually being sent.
f.dataReqtoSendChan <- &dataRequest{
taskID: toID,
epoch: f.epoch,
epoch: epoch,
req: req,
}
}
Expand Down
24 changes: 12 additions & 12 deletions framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestFrameworkFlagMetaReady(t *testing.T) {

for i, tt := range tests {
// 0: F#FlagChildMetaReady -> 1: T#ParentMetaReady
f0.FlagMetaToChild(tt.cMeta)
f0.flagMetaToChild(tt.cMeta, 0)
// from child(1)'s view
data := <-pDataChan
expected := &tDataBundle{0, tt.cMeta, "", nil}
Expand All @@ -131,7 +131,7 @@ func TestFrameworkFlagMetaReady(t *testing.T) {
}

// 1: F#FlagParentMetaReady -> 0: T#ChildMetaReady
f1.FlagMetaToParent(tt.pMeta)
f1.flagMetaToParent(tt.pMeta, 0)
// from parent(0)'s view
data = <-cDataChan
expected = &tDataBundle{1, tt.pMeta, "", nil}
Expand Down Expand Up @@ -209,7 +209,7 @@ func TestFrameworkDataRequest(t *testing.T) {

for i, tt := range tests {
// 0: F#DataRequest -> 1: T#ServeAsChild -> 0: T#ChildDataReady
f0.DataRequest(1, tt.req)
f0.dataRequest(1, tt.req, 0)
// from child(1)'s view at 1: T#ServeAsChild
data := <-pDataChan
expected := &tDataBundle{0, "", data.req, nil}
Expand All @@ -224,7 +224,7 @@ func TestFrameworkDataRequest(t *testing.T) {
}

// 1: F#DataRequest -> 0: T#ServeAsParent -> 1: T#ParentDataReady
f1.DataRequest(0, tt.req)
f1.dataRequest(0, tt.req, 0)
// from parent(0)'s view at 0: T#ServeAsParent
data = <-cDataChan
expected = &tDataBundle{1, "", data.req, nil}
Expand Down Expand Up @@ -289,17 +289,17 @@ func (t *testableTask) Init(taskID uint64, framework meritop.Framework) {
t.setupLatch.Done()
}
}
func (t *testableTask) Exit() {}
func (t *testableTask) SetEpoch(epoch uint64) {}
func (t *testableTask) Exit() {}
func (t *testableTask) SetEpoch(ctx meritop.Context, epoch uint64) {}

func (t *testableTask) ParentMetaReady(fromID uint64, meta string) {
func (t *testableTask) ParentMetaReady(ctx meritop.Context, fromID uint64, meta string) {
if t.dataChan != nil {
t.dataChan <- &tDataBundle{fromID, meta, "", nil}
}
}

func (t *testableTask) ChildMetaReady(fromID uint64, meta string) {
t.ParentMetaReady(fromID, meta)
func (t *testableTask) ChildMetaReady(ctx meritop.Context, fromID uint64, meta string) {
t.ParentMetaReady(ctx, fromID, meta)
}

func (t *testableTask) ServeAsParent(fromID uint64, req string) []byte {
Expand All @@ -311,14 +311,14 @@ func (t *testableTask) ServeAsParent(fromID uint64, req string) []byte {
func (t *testableTask) ServeAsChild(fromID uint64, req string) []byte {
return t.ServeAsParent(fromID, req)
}
func (t *testableTask) ParentDataReady(fromID uint64, req string, resp []byte) {
func (t *testableTask) ParentDataReady(ctx meritop.Context, fromID uint64, req string, resp []byte) {
if t.dataChan != nil {
t.dataChan <- &tDataBundle{fromID, "", req, resp}
}
}

func (t *testableTask) ChildDataReady(fromID uint64, req string, resp []byte) {
t.ParentDataReady(fromID, req, resp)
func (t *testableTask) ChildDataReady(ctx meritop.Context, fromID uint64, req string, resp []byte) {
t.ParentDataReady(ctx, fromID, req, resp)
}

func createListener(t *testing.T) net.Listener {
Expand Down
36 changes: 18 additions & 18 deletions framework/regression_framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ func (t *dummyMaster) Init(taskID uint64, framework meritop.Framework) {
func (t *dummyMaster) Exit() {}

// Ideally, we should also have the following:
func (t *dummyMaster) ParentMetaReady(parentID uint64, meta string) {}
func (t *dummyMaster) ChildMetaReady(childID uint64, meta string) {
func (t *dummyMaster) ParentMetaReady(ctx meritop.Context, parentID uint64, meta string) {}
func (t *dummyMaster) ChildMetaReady(ctx meritop.Context, childID uint64, meta string) {
t.logger.Printf("master ChildMetaReady, task: %d, epoch: %d, child: %d\n", t.taskID, t.epoch, childID)
// Get data from child. When all the data is back, starts the next epoch.
t.framework.DataRequest(childID, meta)
ctx.DataRequest(childID, meta)
}

// This give the task an opportunity to cleanup and regroup.
func (t *dummyMaster) SetEpoch(epoch uint64) {
func (t *dummyMaster) SetEpoch(ctx meritop.Context, epoch uint64) {
t.logger.Printf("master SetEpoch, task: %d, epoch: %d\n", t.taskID, epoch)
if t.testablyFail("SetEpoch", strconv.FormatUint(epoch, 10)) {
return
Expand All @@ -85,7 +85,7 @@ func (t *dummyMaster) SetEpoch(epoch uint64) {

// Make sure we have a clean slate.
t.fromChildren = make(map[uint64]*dummyData)
t.framework.FlagMetaToChild("ParamReady")
ctx.FlagMetaToChild("ParamReady")
}

// These are payload rpc for application purpose.
Expand All @@ -101,8 +101,8 @@ func (t *dummyMaster) ServeAsChild(fromID uint64, req string) []byte {
return nil
}

func (t *dummyMaster) ParentDataReady(parentID uint64, req string, resp []byte) {}
func (t *dummyMaster) ChildDataReady(childID uint64, req string, resp []byte) {
func (t *dummyMaster) ParentDataReady(ctx meritop.Context, parentID uint64, req string, resp []byte) {}
func (t *dummyMaster) ChildDataReady(ctx meritop.Context, childID uint64, req string, resp []byte) {
t.logger.Printf("master ChildDataReady, task: %d, epoch: %d, child: %d, ready: %d\n",
t.taskID, t.epoch, childID, len(t.fromChildren))
d := new(dummyData)
Expand Down Expand Up @@ -130,7 +130,7 @@ func (t *dummyMaster) ChildDataReady(childID uint64, req string, resp []byte) {
close(t.finishChan)
} else {
t.logger.Printf("master finished current epoch, task: %d, epoch: %d", t.taskID, t.epoch)
t.framework.IncEpoch()
ctx.IncEpoch()
}
}
}
Expand Down Expand Up @@ -184,18 +184,18 @@ func (t *dummySlave) Init(taskID uint64, framework meritop.Framework) {
func (t *dummySlave) Exit() {}

// Ideally, we should also have the following:
func (t *dummySlave) ParentMetaReady(parentID uint64, meta string) {
func (t *dummySlave) ParentMetaReady(ctx meritop.Context, parentID uint64, meta string) {
t.logger.Printf("slave ParentMetaReady, task: %d, epoch: %d\n", t.taskID, t.epoch)
t.framework.DataRequest(parentID, meta)
ctx.DataRequest(parentID, meta)
}

func (t *dummySlave) ChildMetaReady(childID uint64, meta string) {
func (t *dummySlave) ChildMetaReady(ctx meritop.Context, childID uint64, meta string) {
t.logger.Printf("slave ChildMetaReady, task: %d, epoch: %d\n", t.taskID, t.epoch)
t.framework.DataRequest(childID, meta)
ctx.DataRequest(childID, meta)
}

// This give the task an opportunity to cleanup and regroup.
func (t *dummySlave) SetEpoch(epoch uint64) {
func (t *dummySlave) SetEpoch(ctx meritop.Context, epoch uint64) {
t.logger.Printf("slave SetEpoch, task: %d, epoch: %d\n", t.taskID, epoch)
t.param = &dummyData{}
t.gradient = &dummyData{}
Expand Down Expand Up @@ -223,7 +223,7 @@ func (t *dummySlave) ServeAsChild(fromID uint64, req string) []byte {
return b
}

func (t *dummySlave) ParentDataReady(parentID uint64, req string, resp []byte) {
func (t *dummySlave) ParentDataReady(ctx meritop.Context, parentID uint64, req string, resp []byte) {
t.logger.Printf("slave ParentDataReady, task: %d, epoch: %d, parent: %d\n", t.taskID, t.epoch, parentID)
if t.testablyFail("ParentDataReady") {
return
Expand All @@ -241,15 +241,15 @@ func (t *dummySlave) ParentDataReady(parentID uint64, req string, resp []byte) {
// parameter.
children := t.framework.GetTopology().GetChildren(t.epoch)
if len(children) != 0 {
t.framework.FlagMetaToChild("ParamReady")
ctx.FlagMetaToChild("ParamReady")
} else {
// On leaf node, we can immediately return by and flag parent
// that this node is ready.
t.framework.FlagMetaToParent("GradientReady")
ctx.FlagMetaToParent("GradientReady")
}
}

func (t *dummySlave) ChildDataReady(childID uint64, req string, resp []byte) {
func (t *dummySlave) ChildDataReady(ctx meritop.Context, childID uint64, req string, resp []byte) {
t.logger.Printf("slave ChildDataReady, task: %d, epoch: %d, child: %d\n", t.taskID, t.epoch, childID)
d := new(dummyData)
json.Unmarshal(resp, d)
Expand All @@ -274,7 +274,7 @@ func (t *dummySlave) ChildDataReady(childID uint64, req string, resp []byte) {
return
}

t.framework.FlagMetaToParent("GradientReady")
ctx.FlagMetaToParent("GradientReady")

// if this failure happens, the parent could
// 1. not have the data yet. In such case, the parent could
Expand Down
Loading

0 comments on commit 65e2e8c

Please sign in to comment.