Skip to content

Commit

Permalink
feat(publisher): add fileserver worker (#189)
Browse files Browse the repository at this point in the history
upload to KS3 buckect.

Signed-off-by: wuhuizuo <[email protected]>

Signed-off-by: wuhuizuo <[email protected]>
  • Loading branch information
wuhuizuo authored Nov 1, 2024
1 parent 0de8b53 commit 7263851
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 27 deletions.
6 changes: 3 additions & 3 deletions publisher/cmd/worker/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ func loadConfig(configFile string) (config.Worker, error) {
return config, nil
}

func initTiupWorkerFromConfig(configFile string) (*kafka.Reader, impl.Worker, error) {
func initTiupWorkerFromConfig(configFile string) (*kafka.Reader, impl.Worker) {
config, err := loadConfig(configFile)
if err != nil {
return nil, nil, err
log.Fatal().Err(err).Msg("load config failed")
}

// Configure Redis client.
Expand All @@ -58,7 +58,7 @@ func initTiupWorkerFromConfig(configFile string) (*kafka.Reader, impl.Worker, er
Logger: kafka.LoggerFunc(log.Printf),
})

return kafkaReader, worker, nil
return kafkaReader, worker
}

func initFsWorkerFromConfig(configFile string) (*kafka.Reader, impl.Worker) {
Expand Down
2 changes: 1 addition & 1 deletion publisher/cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func main() {
zerolog.SetGlobalLevel(zerolog.InfoLevel)
}

tiupPublishRequestKafkaReader, tiupWorker := initFsWorkerFromConfig(*tiupConfigFile)
tiupPublishRequestKafkaReader, tiupWorker := initTiupWorkerFromConfig(*tiupConfigFile)
fsPublishRequestKafkaReader, fsWorker := initFsWorkerFromConfig(*fsConfigFile)

// Create channel used by both the signal handler and server goroutines
Expand Down
4 changes: 1 addition & 3 deletions publisher/example/config/service-example.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@

event_source: http://publisher.site/
kafka:
brokers:
- "kafka-broker-1:9092"
Expand All @@ -12,7 +12,5 @@ kafka:
redis:
addr: "redis-server:6379"
db: 0
username: "redis_user"
password: "redis_password"

event_source: http://publisher.site/
18 changes: 18 additions & 0 deletions publisher/example/config/worker-example.fileserver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
kafka:
brokers:
- example-bootstrap.kafka:9092
topic: example-topic
consumer_group: example-group-fs

redis:
addr: "redis-server:6379"
db: 0
password: "redis_password"

options:
lark_webhook_url: https://feishu.custom-bot-webhook # create and copy the url then paste here.
s3.endpoint: <endpoint>
s3.region: BEIJING
s3.bucket_name: <bucket-name>
s3.access_key: <access-key>
s3.secret_key: <secret-key>
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ kafka:
- example-bootstrap.kafka:9092
topic: example-topic
consumer_group: example-group

redis:
addr: "redis-server:6379"
db: 0
password: "redis_password"

options:
mirror_url: http://tiup.mirror.site
lark_webhook_url: https://feishu.custom-bot-webhook # create and copy the url then paste here.
nightly_interval: 1h
nightly_interval: 1h
67 changes: 50 additions & 17 deletions publisher/pkg/impl/fileserver_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ import (
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/go-redis/redis/v8"
"github.com/ks3sdklib/aws-sdk-go/aws"
"github.com/ks3sdklib/aws-sdk-go/aws/awsutil"
"github.com/ks3sdklib/aws-sdk-go/aws/credentials"
"github.com/ks3sdklib/aws-sdk-go/service/s3"
"github.com/rs/zerolog"
)

const defaultMaxKS3Retries = 3

type fsWorker struct {
logger zerolog.Logger
redisClient redis.Cmdable
Expand All @@ -41,14 +42,21 @@ func NewFsWorker(logger *zerolog.Logger, redisClient redis.Cmdable, options map[
handler.options.LarkWebhookURL = options["lark_webhook_url"]
handler.options.S3.BucketName = options["s3.bucket_name"]

cre := credentials.NewStaticCredentials(options["s3_access_key_id"],
options["s3_secret_access_key"],
options["s3_session_token"])
cre := credentials.NewStaticCredentials(
options["s3.access_key"],
options["s3.secret_key"],
options["s3.session_token"])
handler.s3Client = s3.New(&aws.Config{
Credentials: cre,
Region: options["s3.region"],
Endpoint: options["s3.endpoint"],
Region: options["s3.region"], // Ref: https://docs.ksyun.com/documents/6761
Endpoint: options["s3.endpoint"], // Ref: https://docs.ksyun.com/documents/6761
MaxRetries: defaultMaxKS3Retries,
Logger: logger,
})
// Enable KS3 log when debug mode is enabled.
if logger.GetLevel() <= zerolog.DebugLevel {
handler.s3Client.Config.LogLevel = aws.LogOn
}

return &handler, nil
}
Expand Down Expand Up @@ -135,20 +143,45 @@ func (p *fsWorker) notifyLark(publishInfo *PublishInfo, err error) {
}

func (p *fsWorker) publish(content io.ReadSeeker, info *PublishInfo) error {
targetPath := targetFsFullPaths(info)
// upload file to the KingSoft cloud object bucket with the target path as key.

key := targetPath[0]
resp, err := p.s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(p.options.S3.BucketName), // 存储空间名称,必填
Key: aws.String(key), // 对象的key,必填
Body: content, // 要上传的文件,必填
ACL: aws.String("public-read"), // 对象的访问权限,非必填
keys := targetFsFullPaths(info)
if len(keys) == 0 {
return nil
}

bucketName := p.options.S3.BucketName

// upload the artifact files to KS3 bucket.
for _, key := range keys {
_, err := p.s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
Body: content,
})
if err != nil {
p.logger.
Err(err).
Str("bucket", bucketName).
Str("key", key).
Msg("failed to upload file to KS3 bucket.")
return err
}
}

// update git ref sha: download/refs/pingcap/<comp>/<branch>/sha1
refKV := targetFsRefKeyValue(info)
_, err := p.s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(refKV[0]),
Body: bytes.NewReader([]byte(refKV[1])),
})
if err != nil {
p.logger.
Err(err).
Str("bucket", bucketName).
Str("key", refKV[0]).
Msg("failed to upload content in KS3 bucket.")
return err
}
fmt.Println(awsutil.StringValue(resp))
return nil

return nil
}
18 changes: 16 additions & 2 deletions publisher/pkg/impl/funcs_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,23 @@ func targetFsFullPaths(p *PublishInfo) []string {
var ret []string

// the <branch>/<commit> path: pingcap/<comp>/<branch>/<commit>/<entrypoint>
ret = append(ret, filepath.Join("pingcap", p.Name, strings.ReplaceAll(p.Version, "#", "/"), p.EntryPoint))
ret = append(ret, filepath.Join("download/builds/pingcap", p.Name, strings.ReplaceAll(p.Version, "#", "/"), p.EntryPoint))
// the <branch>/<commit> path: pingcap/<comp>/<commit>/<entrypoint>
ret = append(ret, filepath.Join("pingcap", p.Name, filepath.Base(strings.ReplaceAll(p.Version, "#", "/")), p.EntryPoint))
ret = append(ret, filepath.Join("download/builds/pingcap", p.Name, filepath.Base(strings.ReplaceAll(p.Version, "#", "/")), p.EntryPoint))

return ret
}

func targetFsRefKeyValue(p *PublishInfo) [2]string {
var ret [2]string
verParts := strings.Split(p.Version, "#")
if len(verParts) > 1 {
ret[0] = fmt.Sprintf("download/refs/pingcap/%s/%s/sha1", p.Name, verParts[0])
ret[1] = verParts[1]
} else {
ret[0] = fmt.Sprintf("download/refs/pingcap/%s/%s/sha1", p.Name, "master")
ret[1] = verParts[0]
}

return ret
}

0 comments on commit 7263851

Please sign in to comment.