Skip to content

Commit

Permalink
废掉 HTTP 文件上传协议,改成 GRPC 以方便同步
Browse files Browse the repository at this point in the history
  • Loading branch information
movsb committed Apr 15, 2024
1 parent cb3154f commit ffeec25
Show file tree
Hide file tree
Showing 11 changed files with 1,668 additions and 327 deletions.
36 changes: 0 additions & 36 deletions cmd/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,16 @@ package client
import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"

"github.com/movsb/taoblog/protocols"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
)

var (
// ErrStatusCode ...
ErrStatusCode = errors.New("http.statusCode != 200")
)

const (
contentTypeBinary = "application/octet-stream"
)

// Client ...
// TODO: close client connection.
type Client struct {
Expand Down Expand Up @@ -102,27 +90,3 @@ func NewClient(config HostConfig) *Client {
func (c *Client) token() context.Context {
return metadata.NewOutgoingContext(context.TODO(), metadata.Pairs("token", c.config.Token))
}

func (c *Client) post(path string, body io.Reader, ty string) *http.Response {
req, err := http.NewRequest("POST", c.config.API+path, body)
if err != nil {
panic(err)
}
req.Header.Set("Authorization", c.config.Token)
req.Header.Set("Content-Type", ty)
resp, err := c.client.Do(req)
if err != nil {
panic(err)
}
return resp
}

func (c *Client) mustPost(path string, body io.Reader, ty string) *http.Response {
resp := c.post(path, body, ty)
if resp.StatusCode != 200 {
io.Copy(os.Stderr, resp.Body)
resp.Body.Close()
panic(resp.Status)
}
return resp
}
177 changes: 169 additions & 8 deletions cmd/client/post.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package client

import (
"errors"
"fmt"
"log"
"os"
"path/filepath"
"sort"
"strings"

"github.com/movsb/taoblog/protocols"
Expand Down Expand Up @@ -225,6 +225,7 @@ func (c *Client) DeletePost(id int64) error {
// UploadPostFiles 上传文章附件。
// TODO 目前为了简单起见,使用的是 HTTP POST 方式上传;
// TODO 应该像 Backup 那样改成带进度的 protocol buffer 方式上传。
// files 路径列表,相对于工作目录,相对路径。
func (c *Client) UploadPostFiles(files []string) {
config := c.readPostConfig()
if config.ID <= 0 {
Expand All @@ -233,17 +234,177 @@ func (c *Client) UploadPostFiles(files []string) {
if len(files) <= 0 {
return
}

client, err := c.management.FileSystem(c.token())
if err != nil {
panic(err)
}
defer client.CloseSend()

if err := client.Send(&protocols.FileSystemRequest{
Init: &protocols.FileSystemRequest_InitRequest{
For: &protocols.FileSystemRequest_InitRequest_Post_{
Post: &protocols.FileSystemRequest_InitRequest_Post{
Id: config.ID,
},
},
},
}); err != nil {
panic(err)
}
rsp, err := client.Recv()
if err != nil {
panic(err)
}
if rsp.GetInit() == nil {
panic("expect init")
}

// log.Println("获取远程文件列表...")
if err := client.Send(&protocols.FileSystemRequest{
Request: &protocols.FileSystemRequest_ListFiles{
ListFiles: &protocols.FileSystemRequest_ListFilesRequest{},
},
}); err != nil {
panic(err)
}
rsp, err = client.Recv()
if err != nil {
panic(err)
}
remoteList := rsp.GetListFiles()
if remoteList == nil {
panic("list is nil")
}
remoteFiles := remoteList.GetFiles()

// log.Println("获取本地文件列表...")
var localFiles []*protocols.FileSpec
for _, file := range files {
fmt.Println(" +", file)
var err error
fp, err := os.Open(file)
stat, err := os.Stat(file)
if err != nil {
log.Fatalln(err)
}
defer fp.Close()
path := fmt.Sprintf("/posts/%d/files/%s", config.ID, file)
resp := c.mustPost(path, fp, contentTypeBinary)
_ = resp
f := protocols.FileSpec{
Path: file,
Mode: uint32(stat.Mode()),
Size: uint32(stat.Size()),
Time: uint32(stat.ModTime().Unix()),
}
localFiles = append(localFiles, &f)
}

sort.Slice(remoteFiles, func(i, j int) bool {
return strings.Compare(remoteFiles[i].Path, remoteFiles[j].Path) < 0
})
sort.Slice(localFiles, func(i, j int) bool {
return strings.Compare(localFiles[i].Path, localFiles[j].Path) < 0
})

rl, rr := localFiles, remoteFiles
i, j := len(rl)-1, len(rr)-1

for {
if i == -1 && j == -1 {
// log.Println("没有更多需要比较的文件。")
break
}

deleteRemote := func(r *protocols.FileSpec) {
// delete remote
if err := client.Send(&protocols.FileSystemRequest{
Request: &protocols.FileSystemRequest_DeleteFile{
DeleteFile: &protocols.FileSystemRequest_DeleteFileRequest{
Path: r.Path,
},
},
}); err != nil {
panic(err)
}
rsp, err := client.Recv()
if err != nil {
panic(err)
}
if rsp.GetDeleteFile() == nil {
panic("expect get delete")
}
log.Println("删除远程:", r.Path)
}

if i == -1 {
deleteRemote(rr[j])
j--
continue
}
copyToRemote := func(l *protocols.FileSpec, data []byte) {
// log.Println("准备复制到远程:", l.Path)
if err := client.Send(&protocols.FileSystemRequest{
Request: &protocols.FileSystemRequest_WriteFile{
WriteFile: &protocols.FileSystemRequest_WriteFileRequest{
Spec: l,
Data: data,
},
},
}); err != nil {
panic(err)
}
rsp, err := client.Recv()
if err != nil {
panic(err)
}
if rsp.GetWriteFile() == nil {
panic("expect write file")
}
log.Println("复制到远程:", l.Path)
}
if j == -1 {
data, err := os.ReadFile(localFiles[i].Path)
if err != nil {
// TODO 不正确的判断方式
if !strings.Contains(err.Error(), "is a dir") {
panic(err)
}
i--
continue
}
l := localFiles[i]
copyToRemote(l, data)
i--
continue
}
switch n := strings.Compare(rl[i].Path, rr[j].Path); {
case n < 0:
data, err := os.ReadFile(rl[i].Path)
if err != nil {
panic(err)
}
copyToRemote(rl[i], data)
case n > 0:
deleteRemote(rr[j])
case n == 0:
lm, rm := os.FileMode(rl[i].Mode), os.FileMode(rr[j].Mode)
if lm.IsDir() != rm.IsDir() {
panic(("file != dir"))
}
shouldSync := false
if rl[i].Size != rr[j].Size {
shouldSync = true
}
if rl[i].Time != rr[j].Time {
shouldSync = true
}
if shouldSync {
if rm.IsRegular() {
data, err := os.ReadFile(rl[i].Path)
if err != nil {
panic(err)
}
copyToRemote(rl[i], data)
}
}
i--
j--
}
}
}

Expand Down
59 changes: 0 additions & 59 deletions gateway/file.go

This file was deleted.

5 changes: 0 additions & 5 deletions gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,6 @@ func (g *Gateway) runHTTPService(ctx context.Context, mux *http.ServeMux, mux2 *

handle(`GET`, `/v3/avatar/{id}`, g.GetAvatar)

handle(`GET`, `/v3/posts/{post_id}/files`, g.ListFiles)
handle(`GET`, `/v3/posts/{post_id}/files/{file=**}`, g.GetFile)
handle(`POST`, `/v3/posts/{post_id}/files/{file=**}`, g.CreateFile)
// handle(`DELETE`, `/v3/posts/{post_id}/files/{file=**}`, g.DeleteFile)

handle(`GET`, `/v3/redirect-to-grafana`, redirectToGrafana)

return nil
Expand Down
Loading

0 comments on commit ffeec25

Please sign in to comment.