From d42b62cc74311ea6fc008300a2eaefb714f1163b Mon Sep 17 00:00:00 2001 From: wuhuizuo Date: Fri, 1 Nov 2024 11:32:59 +0000 Subject: [PATCH] feat(publisher): add fileserver worker upload to KS3 buckect. Signed-off-by: wuhuizuo --- publisher/cmd/worker/init.go | 6 +- publisher/cmd/worker/main.go | 2 +- publisher/example/config/service-example.yaml | 4 +- .../config/worker-example.fileserver.yaml | 18 +++++ ...-example.yaml => worker-example.tiup.yaml} | 8 ++- publisher/pkg/impl/fileserver_worker.go | 67 ++++++++++++++----- publisher/pkg/impl/funcs_fs.go | 18 ++++- 7 files changed, 96 insertions(+), 27 deletions(-) create mode 100644 publisher/example/config/worker-example.fileserver.yaml rename publisher/example/config/{worker-example.yaml => worker-example.tiup.yaml} (71%) diff --git a/publisher/cmd/worker/init.go b/publisher/cmd/worker/init.go index da48f890..7b1fc1b1 100644 --- a/publisher/cmd/worker/init.go +++ b/publisher/cmd/worker/init.go @@ -29,10 +29,10 @@ func loadConfig(configFile string) (config.Worker, error) { return config, nil } -func initTiupWorkerFromConfig(configFile string) (*kafka.Reader, impl.Worker, error) { +func initTiupWorkerFromConfig(configFile string) (*kafka.Reader, impl.Worker) { config, err := loadConfig(configFile) if err != nil { - return nil, nil, err + log.Fatal().Err(err).Msg("load config failed") } // Configure Redis client. @@ -58,7 +58,7 @@ func initTiupWorkerFromConfig(configFile string) (*kafka.Reader, impl.Worker, er Logger: kafka.LoggerFunc(log.Printf), }) - return kafkaReader, worker, nil + return kafkaReader, worker } func initFsWorkerFromConfig(configFile string) (*kafka.Reader, impl.Worker) { diff --git a/publisher/cmd/worker/main.go b/publisher/cmd/worker/main.go index 5c47fce5..116ca48a 100644 --- a/publisher/cmd/worker/main.go +++ b/publisher/cmd/worker/main.go @@ -35,7 +35,7 @@ func main() { zerolog.SetGlobalLevel(zerolog.InfoLevel) } - tiupPublishRequestKafkaReader, tiupWorker := initFsWorkerFromConfig(*tiupConfigFile) + tiupPublishRequestKafkaReader, tiupWorker := initTiupWorkerFromConfig(*tiupConfigFile) fsPublishRequestKafkaReader, fsWorker := initFsWorkerFromConfig(*fsConfigFile) // Create channel used by both the signal handler and server goroutines diff --git a/publisher/example/config/service-example.yaml b/publisher/example/config/service-example.yaml index 405196ed..67464933 100644 --- a/publisher/example/config/service-example.yaml +++ b/publisher/example/config/service-example.yaml @@ -1,4 +1,4 @@ - +event_source: http://publisher.site/ kafka: brokers: - "kafka-broker-1:9092" @@ -12,7 +12,5 @@ kafka: redis: addr: "redis-server:6379" db: 0 - username: "redis_user" password: "redis_password" -event_source: http://publisher.site/ diff --git a/publisher/example/config/worker-example.fileserver.yaml b/publisher/example/config/worker-example.fileserver.yaml new file mode 100644 index 00000000..cf40db88 --- /dev/null +++ b/publisher/example/config/worker-example.fileserver.yaml @@ -0,0 +1,18 @@ +kafka: + brokers: + - example-bootstrap.kafka:9092 + topic: example-topic + consumer_group: example-group-fs + +redis: + addr: "redis-server:6379" + db: 0 + password: "redis_password" + +options: + lark_webhook_url: https://feishu.custom-bot-webhook # create and copy the url then paste here. + s3.endpoint: + s3.region: BEIJING + s3.bucket_name: + s3.access_key: + s3.secret_key: \ No newline at end of file diff --git a/publisher/example/config/worker-example.yaml b/publisher/example/config/worker-example.tiup.yaml similarity index 71% rename from publisher/example/config/worker-example.yaml rename to publisher/example/config/worker-example.tiup.yaml index bb74e36d..4ec25521 100644 --- a/publisher/example/config/worker-example.yaml +++ b/publisher/example/config/worker-example.tiup.yaml @@ -3,7 +3,13 @@ kafka: - example-bootstrap.kafka:9092 topic: example-topic consumer_group: example-group + +redis: + addr: "redis-server:6379" + db: 0 + password: "redis_password" + options: mirror_url: http://tiup.mirror.site lark_webhook_url: https://feishu.custom-bot-webhook # create and copy the url then paste here. - nightly_interval: 1h \ No newline at end of file + nightly_interval: 1h diff --git a/publisher/pkg/impl/fileserver_worker.go b/publisher/pkg/impl/fileserver_worker.go index f26bf457..105f0dad 100644 --- a/publisher/pkg/impl/fileserver_worker.go +++ b/publisher/pkg/impl/fileserver_worker.go @@ -13,12 +13,13 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/go-redis/redis/v8" "github.com/ks3sdklib/aws-sdk-go/aws" - "github.com/ks3sdklib/aws-sdk-go/aws/awsutil" "github.com/ks3sdklib/aws-sdk-go/aws/credentials" "github.com/ks3sdklib/aws-sdk-go/service/s3" "github.com/rs/zerolog" ) +const defaultMaxKS3Retries = 3 + type fsWorker struct { logger zerolog.Logger redisClient redis.Cmdable @@ -41,14 +42,21 @@ func NewFsWorker(logger *zerolog.Logger, redisClient redis.Cmdable, options map[ handler.options.LarkWebhookURL = options["lark_webhook_url"] handler.options.S3.BucketName = options["s3.bucket_name"] - cre := credentials.NewStaticCredentials(options["s3_access_key_id"], - options["s3_secret_access_key"], - options["s3_session_token"]) + cre := credentials.NewStaticCredentials( + options["s3.access_key"], + options["s3.secret_key"], + options["s3.session_token"]) handler.s3Client = s3.New(&aws.Config{ Credentials: cre, - Region: options["s3.region"], - Endpoint: options["s3.endpoint"], + Region: options["s3.region"], // Ref: https://docs.ksyun.com/documents/6761 + Endpoint: options["s3.endpoint"], // Ref: https://docs.ksyun.com/documents/6761 + MaxRetries: defaultMaxKS3Retries, + Logger: logger, }) + // Enable KS3 log when debug mode is enabled. + if logger.GetLevel() <= zerolog.DebugLevel { + handler.s3Client.Config.LogLevel = aws.LogOn + } return &handler, nil } @@ -135,20 +143,45 @@ func (p *fsWorker) notifyLark(publishInfo *PublishInfo, err error) { } func (p *fsWorker) publish(content io.ReadSeeker, info *PublishInfo) error { - targetPath := targetFsFullPaths(info) - // upload file to the KingSoft cloud object bucket with the target path as key. - - key := targetPath[0] - resp, err := p.s3Client.PutObject(&s3.PutObjectInput{ - Bucket: aws.String(p.options.S3.BucketName), // 存储空间名称,必填 - Key: aws.String(key), // 对象的key,必填 - Body: content, // 要上传的文件,必填 - ACL: aws.String("public-read"), // 对象的访问权限,非必填 + keys := targetFsFullPaths(info) + if len(keys) == 0 { + return nil + } + + bucketName := p.options.S3.BucketName + + // upload the artifact files to KS3 bucket. + for _, key := range keys { + _, err := p.s3Client.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(key), + Body: content, + }) + if err != nil { + p.logger. + Err(err). + Str("bucket", bucketName). + Str("key", key). + Msg("failed to upload file to KS3 bucket.") + return err + } + } + + // update git ref sha: download/refs/pingcap///sha1 + refKV := targetFsRefKeyValue(info) + _, err := p.s3Client.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(refKV[0]), + Body: bytes.NewReader([]byte(refKV[1])), }) if err != nil { + p.logger. + Err(err). + Str("bucket", bucketName). + Str("key", refKV[0]). + Msg("failed to upload content in KS3 bucket.") return err } - fmt.Println(awsutil.StringValue(resp)) - return nil + return nil } diff --git a/publisher/pkg/impl/funcs_fs.go b/publisher/pkg/impl/funcs_fs.go index d2f04827..df839716 100644 --- a/publisher/pkg/impl/funcs_fs.go +++ b/publisher/pkg/impl/funcs_fs.go @@ -207,9 +207,23 @@ func targetFsFullPaths(p *PublishInfo) []string { var ret []string // the / path: pingcap//// - ret = append(ret, filepath.Join("pingcap", p.Name, strings.ReplaceAll(p.Version, "#", "/"), p.EntryPoint)) + ret = append(ret, filepath.Join("download/builds/pingcap", p.Name, strings.ReplaceAll(p.Version, "#", "/"), p.EntryPoint)) // the / path: pingcap/// - ret = append(ret, filepath.Join("pingcap", p.Name, filepath.Base(strings.ReplaceAll(p.Version, "#", "/")), p.EntryPoint)) + ret = append(ret, filepath.Join("download/builds/pingcap", p.Name, filepath.Base(strings.ReplaceAll(p.Version, "#", "/")), p.EntryPoint)) + + return ret +} + +func targetFsRefKeyValue(p *PublishInfo) [2]string { + var ret [2]string + verParts := strings.Split(p.Version, "#") + if len(verParts) > 1 { + ret[0] = fmt.Sprintf("download/refs/pingcap/%s/%s/sha1", p.Name, verParts[0]) + ret[1] = verParts[1] + } else { + ret[0] = fmt.Sprintf("download/refs/pingcap/%s/%s/sha1", p.Name, "master") + ret[1] = verParts[0] + } return ret }