Skip to content
This repository has been archived by the owner on Jan 21, 2025. It is now read-only.

Commit

Permalink
Merge pull request #253 from konstellation-io/fix/kre-py-request-timeout
Browse files Browse the repository at this point in the history
add request_timeout as environment variable for kre-py
  • Loading branch information
Sergiodfdez authored Aug 24, 2020
2 parents fb6f871 + 87ebf15 commit 771c7ac
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 18 deletions.
2 changes: 2 additions & 0 deletions admin/k8s-manager/config.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
baseDomainName: "local"
sharedStorageClass: "standard"
sharedStorageSize: "1Gi"
entrypoint:
requestTimeout: "30"
server:
port: "50051"
kubernetes:
Expand Down
4 changes: 4 additions & 0 deletions admin/k8s-manager/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ type Config struct {

SharedStorageSize string `yaml:"sharedStorageSize" envconfig:"KRE_SHARED_STORAGE_SIZE"`

Entrypoint struct {
RequestTimeout string `yaml:"requestTimeout" envconfig:"KRE_ENTRYPOINT_REQUEST_TIMEOUT"`
} `yaml:"entrypoint"`

Server struct {
Port string `yaml:"port" envconfig:"KRE_PORT"`
} `yaml:"server"`
Expand Down
5 changes: 5 additions & 0 deletions admin/k8s-manager/kubernetes/version/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ func (m *Manager) createEntrypointDeployment(version *entity.Version) (*appsv1.D
Name: "KRT_NODE_ID",
Value: "entrypoint",
},
{
// NOTE: Time to wait for a message to do a round trip through a workflow.
Name: "KRT_REQUEST_TIMEOUT",
Value: m.config.Entrypoint.RequestTimeout,
},
}

return m.clientset.AppsV1().Deployments(ns).Create(&appsv1.Deployment{
Expand Down
2 changes: 2 additions & 0 deletions helm/kre/templates/config/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ data:
KRE_BASE_DOMAIN_NAME: {{ .Values.config.baseDomainName }}
KRE_SHARED_STORAGECLASS: {{ .Values.config.runtime.sharedStorageClass }}
KRE_SHARED_STORAGE_SIZE: {{ .Values.config.runtime.sharedStorageSize }}
KRE_ENTRYPOINT_REQUEST_TIMEOUT: "30"

# SMTP
KRE_SMTP_ENABLED: "{{ .Values.config.smtp.enabled }}"
{{- if .Values.config.smtp.enabled }}
Expand Down
7 changes: 3 additions & 4 deletions runners/kre-entrypoint/entrypoint-gen/entrypoint.py.tmpl
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
from grpclib.server import Stream

from kre_grpc import EntrypointKRE
Expand All @@ -7,9 +6,9 @@ from public_input_grpc import EntrypointBase
import public_input_pb2

class Entrypoint(EntrypointBase, EntrypointKRE):
def __init__(self, logger, nc, subjects):
logger.info(f"Entrypoint for '{os.environ['KRT_VERSION']}' initialized. ")
EntrypointKRE.__init__(self, logger, nc, subjects)
def __init__(self, logger, nc, subjects, config):
logger.info(f"Entrypoint for '{config.krt_version}' initialized. ")
EntrypointKRE.__init__(self, logger, nc, subjects, config)

{{ range .Methods }}
async def {{ .Name }}(self, stream: Stream[public_input_pb2.{{ .RequestType }}, public_input_pb2.{{ .ReturnsType }}]) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,17 @@ service Entrypoint {
actual, err := ioutil.ReadAll(out)
assert.NoError(t, err)

expected := `import os
from grpclib.server import Stream
expected := `from grpclib.server import Stream
from kre_grpc import EntrypointKRE
from public_input_grpc import EntrypointBase
import public_input_pb2
class Entrypoint(EntrypointBase, EntrypointKRE):
def __init__(self, logger, nc, subjects):
logger.info(f"Entrypoint for '{os.environ['KRT_VERSION']}' initialized. ")
EntrypointKRE.__init__(self, logger, nc, subjects)
def __init__(self, logger, nc, subjects, config):
logger.info(f"Entrypoint for '{config.krt_version}' initialized. ")
EntrypointKRE.__init__(self, logger, nc, subjects, config)
async def Test(self, stream: Stream[public_input_pb2.Request, public_input_pb2.Response]) -> None:
Expand Down
2 changes: 1 addition & 1 deletion runners/kre-entrypoint/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ echo "generating '${ENTRYPOINT_FILE}' file."
/app/entrypoint-gen -input $PROTO_FILE -output $ENTRYPOINT_FILE

# GENERATE protoc code
echo "generating entrypoint protofub files."
echo "generating entrypoint protobuf files."
python3 -m grpc_tools.protoc \
--proto_path="$(dirname "$PROTO_FILE")" \
--python_out=./src/entrypoint \
Expand Down
13 changes: 6 additions & 7 deletions runners/kre-entrypoint/src/kre_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@

# NOTE: EntrypointKRE will be extended by Entrypoint class auto-generated
class EntrypointKRE:
def __init__(self, logger, nc, subjects):
def __init__(self, logger, nc, subjects, config):
self.logger = logger
self.subjects = subjects
self.nc = nc

# FIXME: make timeout a config variable
self.request_timeout = 30
self.config = config

@abc.abstractmethod
async def make_response_object(self, subject, response):
Expand All @@ -37,7 +35,8 @@ async def process_message(self, stream, subject) -> None:
nats_subject = self.subjects[subject]
self.logger.info(f"Starting request/reply on NATS subject: '{nats_subject}'")

nats_reply = await self.nc.request(nats_subject, request_msg.marshal(), timeout=self.request_timeout)
nats_reply = await self.nc.request(nats_subject, request_msg.marshal(),
timeout=self.config.request_timeout)

self.logger.info(f"creating a response from message reply")
response_msg = KreNatsMessage(msg=nats_reply)
Expand All @@ -52,8 +51,8 @@ async def process_message(self, stream, subject) -> None:
self.logger.info(f'gRPC successfully response')

end = time.time()
self.logger.info(f"version[{os.environ['KRT_VERSION']}] "
f"node[{os.environ['KRT_NODE_NAME']}] "
self.logger.info(f"version[{self.config.krt_version}] "
f"node[{self.config.krt_node_name}] "
f"reply[{nats_reply.subject}] "
f"start[{datetime.datetime.utcfromtimestamp(start).isoformat()}] "
f"end[{datetime.datetime.utcfromtimestamp(end).isoformat()}] "
Expand Down
4 changes: 3 additions & 1 deletion runners/kre-entrypoint/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

class Config:
def __init__(self):
self.request_timeout = int(os.getenv('KRT_REQUEST_TIMEOUT', 30))

# Mandatory variables
try:
self.krt_version_id = os.environ['KRT_VERSION_ID']
Expand All @@ -36,7 +38,7 @@ async def process_messages(self):
self.logger.info(f"Loaded NATS subject file: {subjects}")

self.logger.info(f"Creating entrypoint service")
entrypoint = Entrypoint(self.logger, self.nc, subjects)
entrypoint = Entrypoint(self.logger, self.nc, subjects, self.config)

services = ServerReflection.extend([entrypoint])

Expand Down

0 comments on commit 771c7ac

Please sign in to comment.