Skip to content

Commit

Permalink
Fixes flame invocation and handle invocation results
Browse files Browse the repository at this point in the history
  • Loading branch information
Adriano Santos committed Oct 11, 2024
1 parent 37711e1 commit 70c9295
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 103 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#version=1.4.3
version=1.4.4-rc.22
version=1.4.4-rc.35
registry=eigr

CLUSTER_NAME=spawn-k8s
Expand Down
5 changes: 3 additions & 2 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ config :do_it, DoIt.Commfig,
filename: "spawn_cli.json"

config :flame, :terminator,
shutdown_timeout: :timer.minutes(2)
failsafe_timeout: :timer.minutes(1)
shutdown_timeout: :timer.minutes(3),
failsafe_timeout: :timer.minutes(1),
log: :debug

# config :spawn_statestores, Statestores.Vault,
# json_library: Jason,
Expand Down
4 changes: 2 additions & 2 deletions examples/helloworld-ts/host.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ metadata:
annotations:
spawn-eigr.io/actor-system: spawn-system
spawn-eigr.io/sidecar-logger-level: debug
spawn-eigr.io/sidecar-image-tag: "docker.io/eigr/spawn-proxy:1.4.4-rc.22"
spawn-eigr.io/sidecar-init-container-image-tag: "docker.io/eigr/spawn-initializer:1.4.4-rc.22"
spawn-eigr.io/sidecar-image-tag: "docker.io/eigr/spawn-proxy:1.4.4-rc.35"
spawn-eigr.io/sidecar-init-container-image-tag: "docker.io/eigr/spawn-initializer:1.4.4-rc.35"
spec:
replicas: 2
host:
Expand Down
24 changes: 16 additions & 8 deletions lib/actors/actor/entity/entity.ex
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ defmodule Actors.Actor.Entity do
alias Eigr.Functions.Protocol.Actors.Healthcheck.HealthCheckReply
alias Eigr.Functions.Protocol.Actors.Healthcheck.Status, as: HealthcheckStatus

alias Eigr.Functions.Protocol.ActorInvocationResponse

alias Eigr.Functions.Protocol.State.Checkpoint
alias Eigr.Functions.Protocol.State.Revision

Expand Down Expand Up @@ -154,15 +156,21 @@ defmodule Actors.Actor.Entity do
state: state
}

try do
resp = FlameScheduler.schedule_and_invoke(task, &Invocation.invoke/2)
FlameScheduler.schedule_and_invoke(task, &Invocation.invoke/2)
|> IO.inspect(label: "Remoting Scheduler raw response")
|> case do
{:reply, {:ok, %ActorInvocationResponse{}} = resp, %EntityState{} = _state, _signal} =
payload ->
Logger.debug("Remoting Scheduler response for invocation: #{inspect(resp)}")

payload

unexpect ->
Logger.error(
"Error during Remoting Scheduler invocation. Details: #{inspect(unexpect)}"
)

IO.inspect(resp,
label: "FlameScheduler invoke response ---------------------------"
)
catch
error ->
IO.inspect(error, label: "Error during Flame Scheduler invocation ---------")
unexpect
end

_ ->
Expand Down
151 changes: 65 additions & 86 deletions lib/spawn/cluster/provisioner_pool_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,57 +20,33 @@ defmodule Spawn.Cluster.ProvisionerPoolSupervisor do
children =
Enum.map(actor_configs, fn cfg ->
Logger.info("Setup Task Actor with config: #{inspect(cfg)}")

build_flame_pool(cfg, env)
end)

Supervisor.init(children, strategy: :one_for_one)
end

defp build_flame_pool(%{"actorName" => name} = cfg, :prod) do
defp build_flame_pool(%{"actorName" => name} = cfg, env) do
pool_name = build_worker_pool_name(__MODULE__, name)
Logger.info("Create pool for Actor #{name}. Pool Name #{inspect(pool_name)}")
Logger.info("Creating pool for Actor #{name}. Pool Name: #{inspect(pool_name)}")

opts =
[
name: pool_name,
backend:
{FLAMEK8sBackend,
app_container_name: "sidecar",
runner_pod_tpl: fn current_manifest -> build_pod_template(cfg, current_manifest) end},
backend: pool_backend(cfg, env),
log: :debug
] ++ get_worker_pool_config(cfg)

{FLAME.Pool, opts}
end

defp build_flame_pool(%{"actorName" => name} = cfg, _env) do
pool_name = build_worker_pool_name(__MODULE__, name)
Logger.info("Creating default pool with name #{inspect(pool_name)}")

opts =
[
name: pool_name,
backend: FLAME.LocalBackend,
log: :debug
] ++ get_worker_pool_config(cfg)

{FLAME.Pool, opts}
defp pool_backend(cfg, :prod) do
{FLAMEK8sBackend,
app_container_name: "sidecar",
runner_pod_tpl: fn current_manifest -> build_pod_template(cfg, current_manifest) end}
end

defp build_flame_pool(cfg, _env) do
pool_name = Module.concat(__MODULE__, "Default")
Logger.info("Creating default pool with name #{inspect(pool_name)}")

opts =
[
name: pool_name,
backend: FLAME.LocalBackend,
log: :debug
] ++ get_worker_pool_config(cfg)

{FLAME.Pool, opts}
end
defp pool_backend(_, _env), do: FLAME.LocalBackend

defp get_worker_pool_config(cfg) do
worker_pool_config = Map.get(cfg, "workerPool", %{})
Expand All @@ -87,76 +63,79 @@ defmodule Spawn.Cluster.ProvisionerPoolSupervisor do
]
end

defp build_pod_template(%{"topology" => topology} = _cfg, template) do
update_in(template["metadata"], &Map.drop(&1, ["resourceVersion"]))
|> maybe_put_node_selector(topology)
|> maybe_put_toleration(topology)

# |> update_in(["containers", Access.at(0)], fn container ->
# container
# |> Map.put_new("env", [])
# |> Map.update!("env", fn env ->
# [
# %{
# "name" => "POD_NAME",
# "valueFrom" => %{"fieldRef" => %{"fieldPath" => "metadata.name"}}
# },
# %{
# "name" => "POD_IP",
# "valueFrom" => %{"fieldRef" => %{"fieldPath" => "status.podIP"}}
# },
# %{
# "name" => "POD_NAMESPACE",
# "valueFrom" => %{"fieldRef" => %{"fieldPath" => "metadata.namespace"}}
# },
# %{"name" => "FLAME_PARENT", "value" => encoded_parent}
# | Enum.reject(
# env,
# &(&1["name"] in ["FLAME_PARENT", "POD_NAME", "POD_NAMESPACE", "POD_IP"])
# )
# ]
# |> put_new_env("NODE_COOKIE", Node.get_cookie())
# end)
defp build_pod_template(cfg, template) do
Logger.debug("Building pod template...")

template
|> update_pod_metadata()
|> update_pod_spec()
|> remove_probes_from_containers()
|> maybe_put_node_selector(cfg)
|> maybe_put_toleration(cfg)
end

defp put_new_env(env, _name, :nocookie), do: env
defp update_pod_metadata(template) do
target_metadata = %{
"name" => "target-pod",
"namespace" => Access.get(template, "metadata")["namespace"],
"annotations" => %{
"prometheus.io/path" => "/metrics",
"prometheus.io/port" => "9001",
"prometheus.io/scrape" => "true"
}
}

put_in(template["metadata"], target_metadata)
end

defp put_new_env(env, name, value) do
case get_in(env, [Access.filter(&(&1["name"] == name))]) do
[] -> [%{"name" => name, "value" => value} | env]
_ -> env
end
defp update_pod_spec(template) do
spec = template["spec"]

target_spec = %{
"initContainers" => spec["initContainers"],
"containers" => spec["containers"],
"volumes" => spec["volumes"],
"serviceAccount" => spec["serviceAccount"],
"serviceAccountName" => spec["serviceAccountName"],
"terminationGracePeriodSeconds" => spec["terminationGracePeriodSeconds"],
"restartPolicy" => "Never"
}

put_in(template["spec"], target_spec)
end

defp build_pod_template(_cfg, template),
do: update_in(template["metadata"], &Map.drop(&1, ["resourceVersion"]))
defp remove_probes_from_containers(template) do
updated_containers =
template["spec"]["containers"]
|> Enum.map(&Map.drop(&1, ["readinessProbe", "livenessProbe"]))

defp maybe_put_node_selector(template, %{"nodeSelector" => selector}) do
new_label_map =
get_in(template, ["metadata", "labels"])
|> Kernel.||(%{})
|> Map.merge(%{"io.eigr.spawn/worker" => "true"})
put_in(template["spec"]["containers"], updated_containers)
end

template
|> put_in(["metadata", "labels"], new_label_map)
|> put_in(["spec", "nodeSelector"], selector)
defp maybe_put_node_selector(template, %{"topology" => topology}) do
update_metadata_with_labels(template)
|> put_in(["spec", "nodeSelector"], topology["nodeSelector"])
end

defp maybe_put_node_selector(template, _cfg), do: template

defp maybe_put_toleration(template, %{"topology" => topology}) do
update_metadata_with_labels(template)
|> put_in(["spec", "tolerations"], topology["tolerations"])
end

defp maybe_put_node_selector(template, _topology), do: template
defp maybe_put_toleration(template, _cfg), do: template

defp maybe_put_toleration(template, %{"tolerations" => toleration}) do
defp update_metadata_with_labels(template) do
new_label_map =
get_in(template, ["metadata", "labels"])
template
|> get_in(["metadata", "labels"])
|> Kernel.||(%{})
|> Map.merge(%{"io.eigr.spawn/worker" => "true"})

template
|> put_in(["metadata", "labels"], new_label_map)
|> put_in(["spec", "tolerations"], toleration)
put_in(template["metadata"]["labels"], new_label_map)
end

defp maybe_put_toleration(template, _topology), do: template

defp parse_config(""), do: []

defp parse_config(encoded_cfg) do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ defmodule SpawnOperator.K8s.Proxy.Deployment do

actor_host_function_envs =
Map.get(host_params, "env", []) ++
updated_default_envs
updated_default_envs

actor_host_function_resources =
Map.get(host_params, "resources", @default_actor_host_resources)
Expand Down
2 changes: 1 addition & 1 deletion spawn_operator/spawn_operator/manifest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ spec:
valueFrom:
fieldRef:
fieldPath: spec.serviceAccountName
image: docker.io/eigr/spawn-operator:1.4.4-rc.22
image: docker.io/eigr/spawn-operator:1.4.4-rc.35
livenessProbe:
failureThreshold: 3
httpGet:
Expand Down
2 changes: 0 additions & 2 deletions spawn_proxy/proxy/lib/proxy/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ defmodule Proxy.Application do

OpentelemetryEcto.setup([:spawn_statestores, :repo])
Config.load()
System.get_env("RELEASE_COOKIE")
|> IO.inspect(label: "Application Cookie ------------------------------")

Logger.configure(level: Config.get(:logger_level))

Expand Down

0 comments on commit 70c9295

Please sign in to comment.