clustergo是一个简单的网络游戏服务端框架。可以快速构建服务器集群内部以及服务器与客户端时间的通信。集群内部采用tcp通信。服务器客户端之间 支持tcp,websocket,kcp通信方式。
服务器集群间支持两种通信模式:PRC及普通的消息传递。服务器节点使用一个逻辑地址作为标识,只要知道对端逻辑地址就可以与对端通信。集群内部通信节点会建立tcp连接,连接在首次通信请求时建立。
各节点使用一个32位逻辑地址标识,逻辑地址被分为3段,高12位表示服务器组,中8位表示节点类型(255保留给harbor使用),低12位表示进程id。
cluster支持在节点之间传递protobuf消息和raw binary消息。如果只需要使用protobuf可以使用默认的SendPbMessage和RegisterPbMessageHandler。
如果需要支持不同的消息格式,可以使用SendBinMessage和RegisterBinMessageHandler自己处理消息的编码和解码。
clustergo默认采用pbrpc
将协议文件添加到pbrpc/proto目录中
例如对于echo服务,首先在proto目录添加echo.proto文件,之后填充文件内容如下:
syntax = "proto3";
option go_package = "../service/echo";
message request {
string msg = 1;
}
message response {
string msg = 1;
}
对于任务服务,请求参数必须命名为request,返回值必须命名为response,go_package永远设置为"../service/echo"。
在pbrpc目录执行./gen.sh,这个操作会在pbrpc/service目录下生成echo.pb.go和echo.go两个文件。
其中echo.go的内容如下:
package echo
import (
"context"
"time"
"github.com/sniperHW/rpcgo"
"github.com/sniperHW/clustergo"
"github.com/sniperHW/clustergo/addr"
)
type Replyer struct {
replyer *rpcgo.Replyer
}
func (this *Replyer) Reply(result *Response, err error) {
this.replyer.Reply(result, err)
}
type EchoService interface {
OnCall(context.Context, *Replyer, *Request)
}
func Register(o EchoService) {
clustergo.RegisterRPC("echo", func(ctx context.Context, r *rpcgo.Replyer, arg *Request) {
o.OnCall(ctx, &Replyer{replyer: r}, arg)
})
}
func Call(ctx context.Context, peer addr.LogicAddr, arg *Request) (*Response, error) {
var resp Response
err := clustergo.Call(ctx, peer, "echo", arg, &resp)
return &resp, err
}
func CallWithCallback(peer addr.LogicAddr, deadline time.Time, arg *Request, cb func(*Response, error)) func() bool {
var resp Response
var fn func(interface{}, error)
if cb != nil {
fn = func(ret interface{}, err error) {
if ret != nil {
cb(ret.(*Response), err)
} else {
cb(nil, err)
}
}
}
return clustergo.CallWithCallback(peer, deadline, "echo", arg, &resp, fn)
}
在服务端定义实现了EchoService的类型,通过Register注册服务:
package main
import (
"context"
"github.com/sniperHW/clustergo"
"github.com/sniperHW/clustergo/addr"
"github.com/sniperHW/clustergo/example/discovery"
"github.com/sniperHW/clustergo/log/zap"
"github.com/sniperHW/clustergo/pbrpc/service/echo"
)
//实现echo.EchoService
type echoService struct {
}
func (e *echoService) OnCall(ctx context.Context, replyer *echo.Replyer, request *echo.Request) {
clustergo.Logger().Debug("echo:", request.Msg)
replyer.Reply(&echo.Response{Msg: request.Msg}, nil)
}
func main() {
l := zap.NewZapLogger("1.1.1.log", "./logfile", "debug", 1024*1024*100, 14, 28, true)
clustergo.InitLogger(l.Sugar())
//注册服务
echo.Register(&echoService{})
localaddr, _ := addr.MakeLogicAddr("1.1.1")
clustergo.Start(discovery.NewClient("127.0.0.1:8110"), localaddr)
clustergo.Wait()
}
package main
import (
"context"
"github.com/sniperHW/clustergo"
"github.com/sniperHW/clustergo/addr"
"github.com/sniperHW/clustergo/example/discovery"
"github.com/sniperHW/clustergo/log/zap"
"github.com/sniperHW/clustergo/pbrpc/service/echo"
)
func main() {
l := zap.NewZapLogger("1.2.1.log", "./logfile", "debug", 1024*1024*100, 14, 28, true)
clustergo.InitLogger(l.Sugar())
localaddr, _ := addr.MakeLogicAddr("1.2.1")
clustergo.Start(discovery.NewClient("127.0.0.1:8110"), localaddr)
//假设echo服务全部由逻辑地址type=1的节点提供,这里任意获取一个type=1的节点
echoAddr, _ := clustergo.GetAddrByType(1)
//执行10次同步调用
for i := 0; i < 10; i++ {
resp, err := echo.Call(context.TODO(), echoAddr, &echo.Request{Msg: "hello"})
l.Sugar().Debug(resp, err)
}
clustergo.Stop()
clustergo.Wait()
}
逻辑地址最高字段位表示集群
集群内的节点可以建立通信连接直接通信(harbor不受这个限制),跨集群节点通信请看下一小节。
默认情况下,集群A的节点无法通过GetAddrByType获取到集群B中的节点地址(如果已经获得对端的逻辑地址,则不受此限制)。如果希望将节点暴露到集群外,可以将节点的Export字段设置为true。
例如对于一个分服,开房间的游戏。
每个服属于一个集群,房间服务器作为公共服务,属于另外的集群。此时需要将战斗节点的Export字段设置为true。游戏服中的节点便可以通过GetAddrByType获取到可用的战斗节点。
如果通信目标与自己不在同一集群内,则无法与目标建立直接通信连接,为了跨集群通信,需要启动harbor节点,消息路由如下:
本机 -> 本集群内harbor节点 -> 目标所在集群harbor节点 -> 目标节点
多集群部署示意图:
如上图所示,有A,B两个集群,集群各自的harbor节点分别连接本集群的center以及harbors center。 当节点B向节点A发消息时,发现A不在本集群内无法建立直接通信,因此将消息发往harbor B,harbor B接收到路由请求后,根据目标地址 2.1.1将请求转发给harbor A(harbor A和harbor B连接了同一个harbors center,因此他们之间可以建立直接通信连接)。harbor A 接收到之后发现目标A可以直达,于是将消息发送给A。
clustergo支持在同一cluster内的节点之间建立stream。典型的使用方式由gateway接受客户端连接,并且为每一个连接建立一个到gameserver的stream。在gameserver看来,每个stream代表一个客户端连接。
gameserver.go
package main
import (
"github.com/sniperHW/clustergo"
"github.com/sniperHW/clustergo/addr"
"github.com/sniperHW/clustergo/example/discovery"
"github.com/sniperHW/clustergo/logger/zap"
"github.com/xtaci/smux"
)
func main() {
l := zap.NewZapLogger("1.1.1.log", "./logfile", "debug", 1024*1024*100, 14, 28, true)
clustergo.InitLogger(l.Sugar())
localaddr, _ := addr.MakeLogicAddr("1.1.1")
clustergo.Start(discovery.NewClient("127.0.0.1:8110"), localaddr)
clustergo.OnNewStream(func(s *smux.Stream) {
//处理stream
go func() {
buff := make([]byte, 64)
for {
n, err := s.Read(buff)
if err != nil {
break
}
n, err = s.Write(buff[:n])
if err != nil {
break
}
}
s.Close()
}()
})
clustergo.Wait()
}
gateserver.go
package main
import (
"io"
"net"
"sync"
"github.com/sniperHW/netgo"
"github.com/sniperHW/clustergo"
"github.com/sniperHW/clustergo/addr"
"github.com/sniperHW/clustergo/example/discovery"
"github.com/sniperHW/clustergo/logger/zap"
)
func main() {
l := zap.NewZapLogger("1.2.1.log", "./logfile", "debug", 1024*1024*100, 14, 28, true)
clustergo.InitLogger(l.Sugar())
localaddr, _ := addr.MakeLogicAddr("1.2.1")
clustergo.Start(discovery.NewClient("127.0.0.1:8110"), localaddr)
gameAddr, _ := clustergo.GetAddrByType(1)
_, serve, _ := netgo.ListenTCP("tcp", "127.0.0.1:8113", func(conn *net.TCPConn) {
go func() {
//客户端连接到达,建立到1.1.1的stream
cliStream, err := clustergo.OpenStream(gameAddr)
if err != nil {
conn.Close()
return
}
defer func() {
conn.Close()
cliStream.Close()
}()
var wait sync.WaitGroup
wait.Add(2)
//将来自客户端的数据通过stream透传到1.1.1
go func() {
io.Copy(cliStream, conn)
wait.Done()
}()
//将来自1.1.1的stream数据透传回客户端
go func() {
io.Copy(conn, cliStream)
wait.Done()
}()
wait.Wait()
}()
})
go serve()
clustergo.Wait()
}
clustergo是为游戏服务端设计的分布式框架,游戏服务通常由一个个服务节点组成,每个节点提供一组内聚的功能服务。
Membership服务保存了所有成员的如下信息:
type Node struct {
Addr addr.Addr
Export bool //是否将节点暴露到cluster外部
Available bool //是否可用,
}
节点启动时,首先连接Membership获取配置信息,之后通过配置的逻辑地址从配置中查询自身节点信息。如果找到则使用配置中的网络地址启动服务,否则启动失败。
普通节点只会将配置中Cluster相同或Export为true的节点保存到本地。可以通过GetAddrByType
来获取某个类型的可用节点(Available=true),如果有多个节点可用GetAddrByType
将随机返回一个节点。