Skip to content

Commit

Permalink
1.消息协议重构
Browse files Browse the repository at this point in the history
2.支持k8s部署
  • Loading branch information
alber committed Feb 7, 2023
1 parent 6b27d56 commit 0f7f724
Show file tree
Hide file tree
Showing 104 changed files with 6,126 additions and 7,012 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ gim是一个即时通讯服务器,代码全部使用golang完成。主要特
4.单聊,群聊,以及房间聊天场景
5.支持服务水平扩展
6.使用领域驱动设计
7.支持裸机部署和k8s部署
gim可以作为以业务服务器的一个组件,为现有业务服务器提供im的能力,业务服务器
只需要实现business.int.proto协议中定义的GRPC接口,为gim服务提供基本的用户功能即可
### 使用技术:
Expand Down
13 changes: 10 additions & 3 deletions build_proto.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
cd pkg/proto
protoc --go_out=plugins=grpc:../../../ *.proto
cd ../../
#!/usr/bin/env bash

set -e

root_path=$(pwd)
rm -rf pkg/protocol/pb/*
cd pkg/protocol/proto
pb_root_path=$root_path/../
protoc --proto_path=$root_path/pkg/protocol/proto --go_out=$pb_root_path --go-grpc_out=$pb_root_path *.proto
cd $root_path
8 changes: 2 additions & 6 deletions cmd/business/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ package main
import (
"gim/config"
"gim/internal/business/api"
"gim/pkg/db"
"gim/pkg/interceptor"
"gim/pkg/logger"
"gim/pkg/pb"
"gim/pkg/protocol/pb"
"gim/pkg/urlwhitelist"
"net"
"os"
Expand All @@ -18,9 +17,6 @@ import (
)

func main() {
config.Init()
db.Init()

server := grpc.NewServer(grpc.UnaryInterceptor(interceptor.NewInterceptor("business_interceptor", urlwhitelist.Business)))

// 监听服务关闭信号,服务平滑重启
Expand All @@ -34,7 +30,7 @@ func main() {

pb.RegisterBusinessIntServer(server, &api.BusinessIntServer{})
pb.RegisterBusinessExtServer(server, &api.BusinessExtServer{})
listen, err := net.Listen("tcp", config.RPCListenAddr)
listen, err := net.Listen("tcp", config.Config.BusinessRPCListenAddr)
if err != nil {
panic(err)
}
Expand Down
14 changes: 5 additions & 9 deletions cmd/connect/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ import (
"context"
"gim/config"
"gim/internal/connect"
"gim/pkg/db"
"gim/pkg/interceptor"
"gim/pkg/logger"
"gim/pkg/pb"
"gim/pkg/protocol/pb"
"gim/pkg/rpc"
"net"
"os"
Expand All @@ -20,17 +19,14 @@ import (
)

func main() {
config.Init()
db.Init()

// 启动TCP长链接服务器
go func() {
connect.StartTCPServer(config.TCPListenAddr)
connect.StartTCPServer(config.Config.ConnectTCPListenAddr)
}()

// 启动WebSocket长链接服务器
go func() {
connect.StartWSServer(config.WSListenAddr)
connect.StartWSServer(config.Config.ConnectWSListenAddr)
}()

// 启动服务订阅
Expand All @@ -44,14 +40,14 @@ func main() {
signal.Notify(c, syscall.SIGTERM)
s := <-c
logger.Logger.Info("server stop start", zap.Any("signal", s))
_, _ = rpc.GetLogicIntClient().ServerStop(context.TODO(), &pb.ServerStopReq{ConnAddr: config.LocalAddr})
_, _ = rpc.GetLogicIntClient().ServerStop(context.TODO(), &pb.ServerStopReq{ConnAddr: config.Config.ConnectLocalAddr})
logger.Logger.Info("server stop end")

server.GracefulStop()
}()

pb.RegisterConnectIntServer(server, &connect.ConnIntServer{})
listener, err := net.Listen("tcp", config.RPCListenAddr)
listener, err := net.Listen("tcp", config.Config.ConnectRPCListenAddr)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/connect/run.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build main.go
echo "打包完成"
docker run -v $(pwd)/:/app -p 8080:8080 -p 8081:8081 -p 50100:50100 alpine .//app/main
docker run -v $(pwd)/:/app -p 8000:8000 -p 8002:8002 -p 8003:8003 alpine .//app/main
15 changes: 6 additions & 9 deletions cmd/logic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package main
import (
"gim/config"
"gim/internal/logic/api"
"gim/internal/logic/app"
"gim/internal/logic/domain/device"
"gim/internal/logic/domain/message"
"gim/internal/logic/proxy"
"gim/pkg/db"
"gim/pkg/interceptor"
"gim/pkg/logger"
"gim/pkg/pb"
"gim/pkg/protocol/pb"
"gim/pkg/urlwhitelist"
"net"
"os"
Expand All @@ -20,14 +20,11 @@ import (
)

func init() {
proxy.MessageProxy = app.MessageApp
proxy.DeviceProxy = app.DeviceApp
proxy.MessageProxy = message.App
proxy.DeviceProxy = device.App
}

func main() {
config.Init()
db.Init()

server := grpc.NewServer(grpc.UnaryInterceptor(interceptor.NewInterceptor("logic_interceptor", urlwhitelist.Logic)))

// 监听服务关闭信号,服务平滑重启
Expand All @@ -41,7 +38,7 @@ func main() {

pb.RegisterLogicIntServer(server, &api.LogicIntServer{})
pb.RegisterLogicExtServer(server, &api.LogicExtServer{})
listen, err := net.Listen("tcp", config.RPCListenAddr)
listen, err := net.Listen("tcp", config.Config.LogicRPCListenAddr)
if err != nil {
panic(err)
}
Expand Down
79 changes: 39 additions & 40 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,57 +2,56 @@ package config

import (
"context"
"gim/pkg/k8sutil"
"gim/pkg/logger"
"gim/pkg/gerrors"
"gim/pkg/protocol/pb"
"os"
"strconv"

"go.uber.org/zap"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"google.golang.org/grpc"
)

const (
RPCListenAddr = ":8000"
TCPListenAddr = ":8080"
WSListenAddr = ":8001"
)
var builders = map[string]Builder{
"default": &defaultBuilder{},
"k8s": &k8sBuilder{},
}

var (
Namespace = "gimns"
MySQL string
RedisIP string
RedisPassword string
var Config Configuration

LocalAddr string
type Builder interface {
Build() Configuration
}

type Configuration struct {
MySQL string
RedisHost string
RedisPassword string
PushRoomSubscribeNum int
PushAllSubscribeNum int
)

func Init() {
k8sClient, err := k8sutil.GetK8sClient()
if err != nil {
panic(err)
}
configmap, err := k8sClient.CoreV1().ConfigMaps(Namespace).Get(context.TODO(), "config", metav1.GetOptions{})
if err != nil {
panic(err)
}
ConnectLocalAddr string
ConnectWSListenAddr string
ConnectTCPListenAddr string
ConnectRPCListenAddr string

MySQL = configmap.Data["mysql"]
RedisIP = configmap.Data["redisIP"]
RedisPassword = configmap.Data["redisPassword"]
PushRoomSubscribeNum, _ = strconv.Atoi(configmap.Data["pushRoomSubscribeNum"])
if PushRoomSubscribeNum == 0 {
panic("PushRoomSubscribeNum == 0")
}
PushAllSubscribeNum, _ = strconv.Atoi(configmap.Data["pushAllSubscribeNum"])
if PushRoomSubscribeNum == 0 {
panic("PushAllSubscribeNum == 0")
LogicRPCListenAddr string
BusinessRPCListenAddr string
FileHTTPListenAddr string

ConnectIntClientBuilder func() pb.ConnectIntClient
LogicIntClientBuilder func() pb.LogicIntClient
BusinessIntClientBuilder func() pb.BusinessIntClient
}

func init() {
env := os.Getenv("GIM_ENV")
builder, ok := builders[env]
if !ok {
builder = new(defaultBuilder)
}
Config = builder.Build()

LocalAddr = os.Getenv("POD_IP") + RPCListenAddr
}

logger.Level = zap.DebugLevel
logger.Target = logger.Console
func interceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
err := invoker(ctx, method, req, reply, cc, opts...)
return gerrors.WrapRPCError(err)
}
65 changes: 65 additions & 0 deletions config/default_builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package config

import (
"context"
"fmt"
"gim/pkg/grpclib/picker"
"gim/pkg/logger"
"gim/pkg/protocol/pb"

"go.uber.org/zap"

_ "gim/pkg/grpclib/resolver/addrs"

"google.golang.org/grpc"
"google.golang.org/grpc/balancer/roundrobin"
)

type defaultBuilder struct{}

func (*defaultBuilder) Build() Configuration {
logger.Level = zap.DebugLevel
logger.Target = logger.Console

return Configuration{
MySQL: "root:gim123456@tcp(111.229.238.28:3306)/gim?charset=utf8&parseTime=true",
RedisHost: "111.229.238.28:6379",
RedisPassword: "alber123456",
PushRoomSubscribeNum: 100,
PushAllSubscribeNum: 100,

ConnectLocalAddr: "127.0.0.1:8000",
ConnectRPCListenAddr: ":8000",
ConnectWSListenAddr: ":8001",
ConnectTCPListenAddr: ":8002",

LogicRPCListenAddr: ":8010",
BusinessRPCListenAddr: ":8020",
FileHTTPListenAddr: "8030",

ConnectIntClientBuilder: func() pb.ConnectIntClient {
conn, err := grpc.DialContext(context.TODO(), "addrs:///127.0.0.1:8000", grpc.WithInsecure(), grpc.WithUnaryInterceptor(interceptor),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, picker.AddrPickerName)))
if err != nil {
panic(err)
}
return pb.NewConnectIntClient(conn)
},
LogicIntClientBuilder: func() pb.LogicIntClient {
conn, err := grpc.DialContext(context.TODO(), "addrs:///docker.for.mac.host.internal:8010", grpc.WithInsecure(), grpc.WithUnaryInterceptor(interceptor),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)))
if err != nil {
panic(err)
}
return pb.NewLogicIntClient(conn)
},
BusinessIntClientBuilder: func() pb.BusinessIntClient {
conn, err := grpc.DialContext(context.TODO(), "addrs:///127.0.0.1:8020", grpc.WithInsecure(), grpc.WithUnaryInterceptor(interceptor),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)))
if err != nil {
panic(err)
}
return pb.NewBusinessIntClient(conn)
},
}
}
Loading

0 comments on commit 0f7f724

Please sign in to comment.