Skip to content
This repository has been archived by the owner on Jan 21, 2025. It is now read-only.

Commit

Permalink
fix: fix PR suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
dovixman committed Nov 6, 2023
1 parent 2c95d7a commit 0a5b9c2
Show file tree
Hide file tree
Showing 23 changed files with 168 additions and 112 deletions.
28 changes: 28 additions & 0 deletions go-sdk/internal/common/kai_constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,31 @@ const (
LoggerWorkflowID = "workflow_id"
LoggerProcessID = "process_id"
)

const (
ConfigAppConfigPathKey = "APP_CONFIG_PATH"
ConfigRunnerLoggerLevelKey = "runner.logger.level"
ConfigRunnerLoggerOutputPathsKey = "runner.logger.output_paths"
ConfigRunnerLoggerErrorOutputPathsKey = "runner.logger.error_output_paths"
ConfigRunnerLoggerEncodingKey = "runner.logger.encoding"
ConfigRunnerSubscriberAckWaitTimeKey = "runner.subscriber.ack_wait_time"
ConfigMetadataProductIDKey = "metadata.product_id"
ConfigMetadataWorkflowIDKey = "metadata.workflow_name"
ConfigMetadataProcessIDKey = "metadata.process_name"
ConfigMetadataVersionIDKey = "metadata.version_tag"
ConfigMetadataBasePathKey = "metadata.base_path"
ConfigNatsURLKey = "nats.url"
ConfigNatsStreamKey = "nats.stream"
ConfigNatsOutputKey = "nats.output"
ConfigNatsInputsKey = "nats.inputs"
ConfigNatsEphemeralStorage = "nats.object_store"
ConfigCcGlobalBucketKey = "centralized_configuration.global.bucket"
ConfigCcProductBucketKey = "centralized_configuration.product.bucket"
ConfigCcWorkflowBucketKey = "centralized_configuration.workflow.bucket"
ConfigCcProcessBucketKey = "centralized_configuration.process.bucket"
ConfigMinioEndpointKey = "minio.endpoint"
ConfigMinioClientUserKey = "minio.access_key_id"
ConfigMinioClientPasswordKey = "minio.access_key_secret"
ConfigMinioUseSslKey = "minio.use_ssl"
ConfigMinioBucketKey = "minio.bucket"
)
10 changes: 5 additions & 5 deletions go-sdk/runner/exit/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (er *Runner) getLoggerWithName() logr.Logger {
}

func (er *Runner) startSubscriber() {
inputSubjects := viper.GetStringSlice("nats.inputs")
inputSubjects := viper.GetStringSlice(common.ConfigNatsInputsKey)

if len(inputSubjects) == 0 {
er.getLoggerWithName().Info("Undefined input subjects")
Expand All @@ -49,7 +49,7 @@ func (er *Runner) startSubscriber() {
nats.DeliverNew(),
nats.Durable(consumerName),
nats.ManualAck(),
nats.AckWait(viper.GetDuration("runner.subscriber.ack_wait_time")),
nats.AckWait(viper.GetDuration(common.ConfigRunnerSubscriberAckWaitTimeKey)),
)
if err != nil {
er.getLoggerWithName().Error(err, "Error subscribing to NATS subject",
Expand Down Expand Up @@ -174,7 +174,7 @@ func (er *Runner) publishError(requestID, errMsg string) {
responseMsg := &kai.KaiNatsMessage{
RequestId: requestID,
Error: errMsg,
FromNode: viper.GetString("metadata.process_name"),
FromNode: viper.GetString(common.ConfigMetadataProcessIDKey),
MessageType: kai.MessageType_ERROR,
}
er.publishResponse(responseMsg, "")
Expand Down Expand Up @@ -206,7 +206,7 @@ func (er *Runner) publishResponse(responseMsg *kai.KaiNatsMessage, channel strin
}

func (er *Runner) getOutputSubject(channel string) string {
outputSubject := viper.GetString("nats.output")
outputSubject := viper.GetString(common.ConfigNatsOutputKey)
if channel != "" {
return fmt.Sprintf("%s.%s", outputSubject, channel)
}
Expand Down Expand Up @@ -259,7 +259,7 @@ func (er *Runner) getResponseHandler(subject string) Handler {
}

func (er *Runner) getMaxMessageSize() (int64, error) {
streamInfo, err := er.jetstream.StreamInfo(viper.GetString("nats.stream"))
streamInfo, err := er.jetstream.StreamInfo(viper.GetString(common.ConfigNatsStreamKey))
if err != nil {
return 0, fmt.Errorf("error getting stream's max message size: %w", err)
}
Expand Down
66 changes: 34 additions & 32 deletions go-sdk/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"time"

"github.com/konstellation-io/kai-sdk/go-sdk/internal/common"

"github.com/go-logr/logr"
"github.com/go-logr/zapr"
"github.com/konstellation-io/kai-sdk/go-sdk/runner/exit"
Expand Down Expand Up @@ -45,23 +47,23 @@ func NewRunner() *Runner {

func validateConfig(keys []string) {
var mandatoryConfigKeys = []string{
"metadata.product_id",
"metadata.workflow_name",
"metadata.process_name",
"metadata.version_tag",
"metadata.base_path",
"nats.url",
"nats.stream",
"nats.output",
"centralized_configuration.global.bucket",
"centralized_configuration.product.bucket",
"centralized_configuration.workflow.bucket",
"centralized_configuration.process.bucket",
"minio.endpoint",
"minio.access_key_id",
"minio.access_key_secret",
"minio.use_ssl",
"minio.bucket",
common.ConfigMetadataProductIDKey,
common.ConfigMetadataWorkflowIDKey,
common.ConfigMetadataProcessIDKey,
common.ConfigMetadataVersionIDKey,
common.ConfigMetadataBasePathKey,
common.ConfigNatsURLKey,
common.ConfigNatsStreamKey,
common.ConfigNatsOutputKey,
common.ConfigCcGlobalBucketKey,
common.ConfigCcProductBucketKey,
common.ConfigCcWorkflowBucketKey,
common.ConfigCcProcessBucketKey,
common.ConfigMinioEndpointKey,
common.ConfigMinioClientUserKey,
common.ConfigMinioClientPasswordKey,
common.ConfigMinioUseSslKey,
common.ConfigMinioBucketKey,
}

for _, key := range mandatoryConfigKeys {
Expand All @@ -76,8 +78,8 @@ func initializeConfiguration() {
viper.SetEnvPrefix("KAI")
viper.AutomaticEnv()

if viper.IsSet("APP_CONFIG_PATH") {
viper.AddConfigPath(viper.GetString("APP_CONFIG_PATH"))
if viper.IsSet(common.ConfigAppConfigPathKey) {
viper.AddConfigPath(viper.GetString(common.ConfigAppConfigPathKey))
}

viper.SetConfigName("config")
Expand All @@ -90,8 +92,8 @@ func initializeConfiguration() {
viper.SetConfigType("yaml")
viper.AddConfigPath(".")

if viper.IsSet("APP_CONFIG_PATH") {
viper.AddConfigPath(viper.GetString("APP_CONFIG_PATH"))
if viper.IsSet(common.ConfigAppConfigPathKey) {
viper.AddConfigPath(viper.GetString(common.ConfigAppConfigPathKey))
}

err = viper.MergeInConfig()
Expand All @@ -104,16 +106,16 @@ func initializeConfiguration() {
validateConfig(keys)

// Set viper default values
viper.SetDefault("metadata.base_path", "/")
viper.SetDefault("runner.subscriber.ack_wait_time", 22*time.Hour)
viper.SetDefault("runner.logger.level", "InfoLevel")
viper.SetDefault("runner.logger.encoding", "console")
viper.SetDefault("runner.logger.output_paths", []string{"stdout"})
viper.SetDefault("runner.logger.error_output_paths", []string{"stderr"})
viper.SetDefault(common.ConfigMetadataBasePathKey, "/")
viper.SetDefault(common.ConfigRunnerSubscriberAckWaitTimeKey, 22*time.Hour)
viper.SetDefault(common.ConfigRunnerLoggerLevelKey, "InfoLevel")
viper.SetDefault(common.ConfigRunnerLoggerEncodingKey, "console")
viper.SetDefault(common.ConfigRunnerLoggerOutputPathsKey, []string{"stdout"})
viper.SetDefault(common.ConfigRunnerLoggerErrorOutputPathsKey, []string{"stderr"})
}

func getNatsConnection(logger logr.Logger) (*nats.Conn, error) {
nc, err := nats.Connect(viper.GetString("nats.url"))
nc, err := nats.Connect(viper.GetString(common.ConfigNatsURLKey))
if err != nil {
logger.Error(err, "Error connecting to NATS")
return nil, err
Expand All @@ -137,15 +139,15 @@ func getLogger() logr.Logger {

config := zap.NewDevelopmentConfig()

logLevel, err := zap.ParseAtomicLevel(viper.GetString("runner.logger.level"))
logLevel, err := zap.ParseAtomicLevel(viper.GetString(common.ConfigRunnerLoggerLevelKey))
if err != nil {
logLevel = zap.NewAtomicLevelAt(zap.InfoLevel)
}

config.Level = zap.NewAtomicLevelAt(logLevel.Level())
config.OutputPaths = viper.GetStringSlice("runner.logger.output_paths")
config.ErrorOutputPaths = viper.GetStringSlice("runner.logger.error_output_paths")
config.Encoding = viper.GetString("runner.logger.encoding")
config.OutputPaths = viper.GetStringSlice(common.ConfigRunnerLoggerOutputPathsKey)
config.ErrorOutputPaths = viper.GetStringSlice(common.ConfigRunnerLoggerErrorOutputPathsKey)
config.Encoding = viper.GetString(common.ConfigRunnerLoggerEncodingKey)

logger, err := config.Build()
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions go-sdk/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"testing"

"github.com/konstellation-io/kai-sdk/go-sdk/internal/common"

"github.com/spf13/viper"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -109,15 +111,15 @@ func (s *SdkRunnerTestSuite) TestNewExitRunner_WithoutDefaultHandler_ExpectPanic

func (s *SdkRunnerTestSuite) TestNewRunner_MissingMandatoryKey() {
// Given
natsURL := viper.GetString("nats.url")
natsURL := viper.GetString(common.ConfigNatsURLKey)
viper.Set("nats", nil)

// Then
s.Panicsf(func() {
// When
runner.NewTestRunner(nil, &s.js)
}, "Missing mandatory key 'nats'")
viper.Set("nats.url", natsURL)
viper.Set(common.ConfigNatsURLKey, natsURL)
}

func TestRunnerTestSuite(t *testing.T) {
Expand Down
10 changes: 5 additions & 5 deletions go-sdk/runner/task/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (tr *Runner) getLoggerWithName() logr.Logger {
}

func (tr *Runner) startSubscriber() {
inputSubjects := viper.GetStringSlice("nats.inputs")
inputSubjects := viper.GetStringSlice(common.ConfigNatsInputsKey)

if len(inputSubjects) == 0 {
tr.getLoggerWithName().Info("Undefined input subjects")
Expand All @@ -49,7 +49,7 @@ func (tr *Runner) startSubscriber() {
nats.DeliverNew(),
nats.Durable(consumerName),
nats.ManualAck(),
nats.AckWait(viper.GetDuration("runner.subscriber.ack_wait_time")),
nats.AckWait(viper.GetDuration(common.ConfigRunnerSubscriberAckWaitTimeKey)),
)
if err != nil {
tr.getLoggerWithName().Error(err, "Error subscribing to NATS subject",
Expand Down Expand Up @@ -182,7 +182,7 @@ func (tr *Runner) publishError(requestID, errMsg string) {
responseMsg := &kai.KaiNatsMessage{
RequestId: requestID,
Error: errMsg,
FromNode: viper.GetString("metadata.process_name"),
FromNode: viper.GetString(common.ConfigMetadataProcessIDKey),
MessageType: kai.MessageType_ERROR,
}
tr.publishResponse(responseMsg, "")
Expand Down Expand Up @@ -215,7 +215,7 @@ func (tr *Runner) publishResponse(responseMsg *kai.KaiNatsMessage, channel strin
}

func (tr *Runner) getOutputSubject(channel string) string {
outputSubject := viper.GetString("nats.output")
outputSubject := viper.GetString(common.ConfigNatsOutputKey)
if channel != "" {
return fmt.Sprintf("%s.%s", outputSubject, channel)
}
Expand Down Expand Up @@ -268,7 +268,7 @@ func (tr *Runner) getResponseHandler(subject string) Handler {
}

func (tr *Runner) getMaxMessageSize() (int64, error) {
streamInfo, err := tr.jetstream.StreamInfo(viper.GetString("nats.stream"))
streamInfo, err := tr.jetstream.StreamInfo(viper.GetString(common.ConfigNatsStreamKey))
if err != nil {
return 0, fmt.Errorf("error getting stream's max message size: %w", err)
}
Expand Down
10 changes: 5 additions & 5 deletions go-sdk/runner/trigger/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (tr *Runner) getLoggerWithName() logr.Logger {
}

func (tr *Runner) startSubscriber() {
inputSubjects := viper.GetStringSlice("nats.inputs")
inputSubjects := viper.GetStringSlice(common.ConfigNatsInputsKey)
subscriptions := make([]*nats.Subscription, 0, len(inputSubjects))

for _, subject := range inputSubjects {
Expand All @@ -44,7 +44,7 @@ func (tr *Runner) startSubscriber() {
nats.DeliverNew(),
nats.Durable(fmt.Sprintf("%s-%s", consumerName, uuid.New().String())),
nats.ManualAck(),
nats.AckWait(viper.GetDuration("runner.subscriber.ack_wait_time")),
nats.AckWait(viper.GetDuration(common.ConfigRunnerSubscriberAckWaitTimeKey)),
)
if err != nil {
tr.getLoggerWithName().Error(err, "Error subscribing to NATS subject",
Expand Down Expand Up @@ -155,7 +155,7 @@ func (tr *Runner) publishError(requestID, errMsg string) {
responseMsg := &kai.KaiNatsMessage{
RequestId: requestID,
Error: errMsg,
FromNode: viper.GetString("metadata.process_name"),
FromNode: viper.GetString(common.ConfigMetadataProcessIDKey),
MessageType: kai.MessageType_ERROR,
}
tr.publishResponse(responseMsg, "")
Expand Down Expand Up @@ -187,7 +187,7 @@ func (tr *Runner) publishResponse(responseMsg *kai.KaiNatsMessage, channel strin
}

func (tr *Runner) getOutputSubject(channel string) string {
outputSubject := viper.GetString("nats.output")
outputSubject := viper.GetString(common.ConfigNatsOutputKey)
if channel != "" {
return fmt.Sprintf("%s.%s", outputSubject, channel)
}
Expand Down Expand Up @@ -230,7 +230,7 @@ func (tr *Runner) prepareOutputMessage(msg []byte) ([]byte, error) {
}

func (tr *Runner) getMaxMessageSize() (int64, error) {
streamInfo, err := tr.jetstream.StreamInfo(viper.GetString("nats.stream"))
streamInfo, err := tr.jetstream.StreamInfo(viper.GetString(common.ConfigNatsStreamKey))
if err != nil {
return 0, fmt.Errorf("error getting stream's max message size: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"errors"
"fmt"

"github.com/konstellation-io/kai-sdk/go-sdk/internal/common"

"github.com/go-logr/logr"
"github.com/nats-io/nats.go"
"github.com/spf13/viper"
Expand Down Expand Up @@ -49,7 +51,7 @@ func initKVStores(logger logr.Logger, js nats.JetStreamContext) (
) {
wrapErr := utilErrors.Wrapper("configuration init: %w")

name := viper.GetString("centralized_configuration.global.bucket")
name := viper.GetString(common.ConfigCcGlobalBucketKey)
logger.V(1).Info("Initializing global key-value store",
"name", name)

Expand All @@ -59,7 +61,7 @@ func initKVStores(logger logr.Logger, js nats.JetStreamContext) (
return nil, nil, nil, nil, wrapErr(err)
}

name = viper.GetString("centralized_configuration.product.bucket")
name = viper.GetString(common.ConfigCcProductBucketKey)
logger.V(1).Info("Initializing product key-value store",
"name", name)

Expand All @@ -71,7 +73,7 @@ func initKVStores(logger logr.Logger, js nats.JetStreamContext) (

logger.V(1).Info("Product key-value store initialized")

name = viper.GetString("centralized_configuration.workflow.bucket")
name = viper.GetString(common.ConfigCcWorkflowBucketKey)
logger.V(1).Info("Initializing workflow key-value store",
"name", name)

Expand All @@ -83,7 +85,7 @@ func initKVStores(logger logr.Logger, js nats.JetStreamContext) (

logger.V(1).Info("Workflow key-value store initialized")

name = viper.GetString("centralized_configuration.process.bucket")
name = viper.GetString(common.ConfigCcProcessBucketKey)
logger.V(1).Info("Initializing process key-value store",
"name", name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"errors"
"testing"

"github.com/konstellation-io/kai-sdk/go-sdk/internal/common"

centralizedConfiguration "github.com/konstellation-io/kai-sdk/go-sdk/sdk/centralized-configuration"

"github.com/go-logr/logr"
Expand All @@ -16,16 +18,16 @@ import (
)

const (
globalBucketProp = "centralized_configuration.global.bucket"
globalBucketProp = common.ConfigCcGlobalBucketKey
globalBucketVal = "global-bucket"
wrongGlobalBucketVal = "some-global-bucket"
productBucketProp = "centralized_configuration.product.bucket"
productBucketProp = common.ConfigCcProductBucketKey
productBucketVal = "product-bucket"
wrongProductBucketVal = "some-product-bucket"
workflowBucketProp = "centralized_configuration.workflow.bucket"
workflowBucketProp = common.ConfigCcWorkflowBucketKey
workflowBucketVal = "workflow-bucket"
wrongWorkflowBucketVal = "some-workflow-bucket"
processBucketProp = "centralized_configuration.process.bucket"
processBucketProp = common.ConfigCcProcessBucketKey
processBucketVal = "process-bucket"
wrongProcessBucketVal = "some-process-bucket"
keyValue = "KeyValue"
Expand Down
Loading

0 comments on commit 0a5b9c2

Please sign in to comment.