Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New k8s #1532

Merged
merged 10 commits into from
Feb 26, 2024
Merged

New k8s #1532

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 125 additions & 52 deletions IM/connectors/Kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import json
import requests
import time
import os
import re
from netaddr import IPNetwork, IPAddress
try:
Expand Down Expand Up @@ -155,22 +156,84 @@ def _create_volume_claim(self, claim_data, auth_data):
self.log_exception("Error connecting with Kubernetes API server")
return False

def _create_volumes(self, namespace, system, pod_name, auth_data, persistent=False):
def _delete_config_map(self, namespace, cm_name, auth_data):
try:
self.log_debug("Deleting CM: %s/%s" % (namespace, cm_name))
uri = "/api/v1/namespaces/%s/%s/%s" % (namespace, "configmaps", cm_name)
resp = self.create_request('DELETE', uri, auth_data)

if resp.status_code == 404:
self.log_warn("Trying to remove a non existing ConfigMap: " + cm_name)
return True
elif resp.status_code != 200:
self.log_error("Error deleting the ConfigMap: " + resp.txt)
return False
else:
return True
except Exception:
self.log_exception("Error connecting with Kubernetes API server")
return False

def _delete_config_maps(self, pod_data, auth_data):
if 'volumes' in pod_data['spec']:
for volume in pod_data['spec']['volumes']:
if 'configMap' in volume and 'name' in volume['configMap']:
cm_name = volume['configMap']['name']
success = self._delete_config_map(pod_data["metadata"]["namespace"], cm_name, auth_data)
if not success:
self.log_error("Error deleting ConfigMap:" + cm_name)

def _create_config_maps(self, namespace, system, pod_name, auth_data):
res = []
cont = 1
while ((system.getValue("disk." + str(cont) + ".size") or
system.getValue("disk." + str(cont) + ".image.url")) and
system.getValue("disk." + str(cont) + ".mount_path")):
volume_id = system.getValue("disk." + str(cont) + ".image.url")
disk_mount_path = system.getValue("disk." + str(cont) + ".mount_path")
disk_size = system.getFeature("disk." + str(cont) + ".size").getValue('B')
if not disk_mount_path.startswith('/'):
disk_mount_path = '/' + disk_mount_path
name = "%s-%d" % (pod_name, cont)

if persistent:
claim_data = {'apiVersion': 'v1', 'kind': 'PersistentVolumeClaim'}
claim_data['metadata'] = {'name': name, 'namespace': namespace}
while system.getValue("disk." + str(cont) + ".mount_path"):

if (system.getValue("disk." + str(cont) + ".content") and
not system.getValue("disk." + str(cont) + ".size")):

mount_path = system.getValue("disk." + str(cont) + ".mount_path")
content = system.getValue("disk." + str(cont) + ".content")
if not mount_path.startswith('/'):
mount_path = '/' + mount_path
name = "%s-cm-%d" % (pod_name, cont)

cm_data = self._gen_basic_k8s_elem(namespace, name, 'ConfigMap')
cm_data['data'] = {os.path.basename(mount_path): content}

try:
self.log_debug("Creating ConfigMap: %s/%s" % (namespace, name))
headers = {'Content-Type': 'application/json'}
uri = "/api/v1/namespaces/%s/%s" % (namespace, "configmaps")
svc_resp = self.create_request('POST', uri, auth_data, headers, cm_data)
if svc_resp.status_code != 201:
self.error_messages += "Error creating configmap for pod %s: %s" % (name, svc_resp.text)
self.log_warn("Error creating configmap: %s" % svc_resp.text)
else:
res.append((name, mount_path))
except Exception:
self.error_messages += "Error creating configmap to access pod %s" % name
self.log_exception("Error creating configmap.")

cont += 1

return res

def _create_volumes(self, namespace, system, pod_name, auth_data):
res = []
cont = 1
while system.getValue("disk." + str(cont) + ".mount_path"):

if (system.getValue("disk." + str(cont) + ".size") or
system.getValue("disk." + str(cont) + ".image.url")):

volume_id = system.getValue("disk." + str(cont) + ".image.url")
disk_mount_path = system.getValue("disk." + str(cont) + ".mount_path")
disk_size = system.getFeature("disk." + str(cont) + ".size").getValue('B')
if not disk_mount_path.startswith('/'):
disk_mount_path = '/' + disk_mount_path
name = "%s-%d" % (pod_name, cont)

claim_data = self._gen_basic_k8s_elem(namespace, name, 'PersistentVolumeClaim')
claim_data['spec'] = {'accessModes': ['ReadWriteOnce'], 'resources': {
'requests': {'storage': disk_size}}}

Expand All @@ -181,11 +244,9 @@ def _create_volumes(self, namespace, system, pod_name, auth_data, persistent=Fal
self.log_debug("Creating PVC: %s/%s" % (namespace, name))
success = self._create_volume_claim(claim_data, auth_data)
if success:
res.append((name, disk_size, disk_mount_path, persistent))
res.append((name, disk_size, disk_mount_path))
else:
self.log_error("Error creating PersistentVolumeClaim:" + name)
else:
res.append((name, disk_size, disk_mount_path, persistent))

cont += 1

Expand Down Expand Up @@ -218,12 +279,7 @@ def create_service_data(self, namespace, name, outports, public, auth_data, vm):
self.log_exception("Error creating service.")

def _generate_service_data(self, namespace, name, outports, public):
service_data = {'apiVersion': 'v1', 'kind': 'Service'}
service_data['metadata'] = {
'name': name,
'namespace': namespace,
'labels': {'name': name}
}
service_data = self._gen_basic_k8s_elem(namespace, name, 'Service')

ports = []
if outports:
Expand Down Expand Up @@ -269,12 +325,7 @@ def create_ingress(self, namespace, name, dns, port, auth_data):
return False

def _generate_ingress_data(self, namespace, name, dns, port):
ingress_data = {'apiVersion': 'networking.k8s.io/v1', 'kind': 'Ingress'}
ingress_data['metadata'] = {
'name': name,
'namespace': namespace,
'labels': {'name': name}
}
ingress_data = self._gen_basic_k8s_elem(namespace, name, 'Ingress', 'networking.k8s.io/v1')

host = None
path = "/"
Expand Down Expand Up @@ -326,6 +377,16 @@ def _generate_ingress_data(self, namespace, name, dns, port):

return ingress_data

@staticmethod
def _gen_basic_k8s_elem(namespace, name, kind, version="v1"):
k8s_elem = {'apiVersion': version, 'kind': kind}
k8s_elem['metadata'] = {
'name': name,
'namespace': namespace,
'labels': {'name': name}
}
return k8s_elem

@staticmethod
def _get_env_variables(radl_system):
env_vars = []
Expand All @@ -338,7 +399,7 @@ def _get_env_variables(radl_system):
env_vars.append({'name': key, 'value': value})
return env_vars

def _generate_pod_data(self, namespace, name, outports, system, volumes, tags):
def _generate_pod_data(self, namespace, name, outports, system, volumes, configmaps, tags):
cpu = str(system.getValue('cpu.count'))
memory = "%s" % system.getFeature('memory.size').getValue('B')
image_url = urlparse(system.getValue("disk.0.image.url"))
Expand All @@ -353,13 +414,7 @@ def _generate_pod_data(self, namespace, name, outports, system, volumes, tags):
ports.append({'containerPort': outport.get_local_port(),
'protocol': outport.get_protocol().upper()})

pod_data = {'apiVersion': 'v1', 'kind': 'Pod'}
pod_data['metadata'] = {
'name': name,
'namespace': namespace,
'labels': {'name': name}
}

pod_data = self._gen_basic_k8s_elem(namespace, name, 'Pod')
# Add instance tags
if tags:
for k, v in tags.items():
Expand All @@ -382,20 +437,31 @@ def _generate_pod_data(self, namespace, name, outports, system, volumes, tags):
if system.getValue("docker.privileged") == 'yes':
containers[0]['securityContext'] = {'privileged': True}

pod_data['spec'] = {'restartPolicy': 'OnFailure'}

if volumes:
containers[0]['volumeMounts'] = []
for (v_name, _, v_mount_path, _) in volumes:
pod_data['spec']['volumes'] = []

for (v_name, _, v_mount_path) in volumes:
containers[0]['volumeMounts'].append(
{'name': v_name, 'mountPath': v_mount_path})
pod_data['spec']['volumes'].append(
{'name': v_name, 'persistentVolumeClaim': {'claimName': v_name}})

pod_data['spec'] = {'containers': containers, 'restartPolicy': 'OnFailure'}
if configmaps:
containers[0]['volumeMounts'] = containers[0].get('volumeMounts', [])
pod_data['spec']['volumes'] = pod_data['spec'].get('volumes', [])

if volumes:
pod_data['spec']['volumes'] = []
for (v_name, _, _, persistent) in volumes:
if persistent:
pod_data['spec']['volumes'].append(
{'name': v_name, 'persistentVolumeClaim': {'claimName': v_name}})
for (cm_name, cm_mount_path) in configmaps:
containers[0]['volumeMounts'].append(
{'name': cm_name, 'mountPath': cm_mount_path, "readOnly": True,
'subPath': os.path.basename(cm_mount_path)})
pod_data['spec']['volumes'].append(
{'name': cm_name,
'configMap': {'name': cm_name}})

pod_data['spec']['containers'] = containers

return pod_data

Expand Down Expand Up @@ -444,7 +510,9 @@ def launch(self, inf, radl, requested_radl, num_vm, auth_data):
inf.add_vm(vm)
pod_name = re.sub('[!"#$%&\'()*+,/:;<=>?@[\\]^`{|}~_]', '-', system.name)

volumes = self._create_volumes(namespace, system, pod_name, auth_data, True)
volumes = self._create_volumes(namespace, system, pod_name, auth_data)

configmaps = self._create_config_maps(namespace, system, pod_name, auth_data)

tags = self.get_instance_tags(system, auth_data, inf)

Expand All @@ -456,7 +524,7 @@ def launch(self, inf, radl, requested_radl, num_vm, auth_data):
elif priv_net:
outports = priv_net.getOutPorts()

pod_data = self._generate_pod_data(namespace, pod_name, outports, system, volumes, tags)
pod_data = self._generate_pod_data(namespace, pod_name, outports, system, volumes, configmaps, tags)

self.log_debug("Creating POD: %s/%s" % (namespace, pod_name))
uri = "/api/v1/namespaces/%s/%s" % (namespace, "pods")
Expand All @@ -469,23 +537,27 @@ def launch(self, inf, radl, requested_radl, num_vm, auth_data):
self._delete_volume_claims(pod_data, auth_data)
except Exception:
self.log_exception("Error deleting volumes.")
else:
dns_name = system.getValue("net_interface.0.dns_name")

self.create_service_data(namespace, pod_name, outports, pub_net, auth_data, vm)
try:
self._delete_config_maps(pod_data, auth_data)
except Exception:
self.log_exception("Error deleting configmaps.")

else:
output = json.loads(resp.text)
vm.id = output["metadata"]["name"]
vm.info.systems[0].setValue('instance_id', str(vm.id))
vm.info.systems[0].setValue('instance_name', str(vm.id))
vm.destroy = False

dns_name = system.getValue("net_interface.0.dns_name")
self.create_service_data(namespace, pod_name, outports, pub_net, auth_data, vm)

if dns_name and outports:
port = outports[0].get_local_port()
ingress_created = self.create_ingress(namespace, pod_name, dns_name, port, auth_data)
if not ingress_created:
vm.info.systems[0].delValue("net_interface.0.dns_name")

vm.destroy = False
res.append((True, vm))

except Exception as ex:
Expand Down Expand Up @@ -575,6 +647,7 @@ def finalize(self, vm, last, auth_data):
else:
pod_data = json.loads(output)
self._delete_volume_claims(pod_data, auth_data)
self._delete_config_maps(pod_data, auth_data)
success, msg = self._delete_pod(vm, auth_data)
if not success:
self.log_error("Error deleting Pod %s: %s" % (vm.id, msg))
Expand Down
47 changes: 35 additions & 12 deletions IM/tosca/Tosca.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class Tosca:
"""

ARTIFACTS_PATH = os.path.dirname(os.path.realpath(__file__)) + "/tosca-types/artifacts"
ARTIFACTS_REMOTE_REPO = "https://raw.githubusercontent.com/indigo-dc/tosca-types/master/artifacts/"
ARTIFACTS_REMOTE_REPO = "https://raw.githubusercontent.com/grycap/tosca/main/artifacts/"
GET_TIMEOUT = 20

logger = logging.getLogger('InfrastructureManager')
Expand Down Expand Up @@ -2189,9 +2189,9 @@ def _get_oscar_service_json(self, node):

return res

def _gen_k8s_volumes(self, node, nodetemplates, value):
def _gen_k8s_volumes(self, node, nodetemplates, value, cont=1):
"""Get the volumes attached to an K8s container."""
volumes = []
cont = 1
# volume format should be "volume_name:mount_path"
for vol in value:
size = None
Expand All @@ -2217,21 +2217,44 @@ def _gen_k8s_volumes(self, node, nodetemplates, value):
cont += 1
return volumes

def _gen_k8s_configmaps(self, res, cms):
"""Get the configmaps attached to an K8s container."""
cont = 1
for cm in cms:
mount_path = cm.get("deploy_path")
cm_file = cm.get("file")
content = cm.get("properties", {}).get("content", "")
if content:
res.setValue('disk.%d.content' % cont, content)
# if content is not empty file is ignored
if cm_file and not content:
resp = self.cache_session.get(cm_file, timeout=self.GET_TIMEOUT)
if resp.status_code != 200:
raise Exception("Error downloading file %s: %s\n%s" % (cm_file, resp.reason, resp.text))
res.setValue('disk.%d.content' % cont, resp.text)
if content or cm_file:
res.setValue('disk.%d.mount_path' % cont, mount_path)
cont += 1

return cont

def _gen_k8s_system(self, node, nodetemplates):
"""Get the volumes attached to an K8s container."""
"""Generate the system for a K8s app."""
res = system(node.name)
nets = []
cms = []

artifacts = node.type_definition.get_value('artifacts', node.entity_tpl, True)
if len(artifacts) != 1:
raise Exception("Only one artifact is supported for K8s container.")
for artifact in list(self._get_node_artifacts(node).values()):
if artifact.get("type") == "tosca.artifacts.Deployment.Image.Container.Docker":
image = self._final_function_result(artifact.get("file"), node)
elif artifact.get("type") == "tosca.artifacts.File" and artifact.get("deploy_path"):
cms.append(artifact)

artifact = list(artifacts.values())[0]
image = self._final_function_result(artifact.get("file", None), node)
if not image:
raise Exception("No image specified for K8s container.")
if "tosca.artifacts.Deployment.Image.Container.Docker" != artifact.get("type", None):
raise Exception("Only Docker images are supported for K8s container.")

cont = self._gen_k8s_configmaps(res, cms)

repo = artifact.get("repository", None)
if repo:
repo_url = self._get_repository_url(repo)
Expand Down Expand Up @@ -2268,7 +2291,7 @@ def _gen_k8s_system(self, node, nodetemplates):
value = int(ScalarUnit_Size(value).get_num_from_scalar_unit('B'))
res.setValue("memory.size", value, 'B')
elif prop.name == 'volumes':
for num, size, mount_path, volume_id in self._gen_k8s_volumes(node, nodetemplates, value):
for num, size, mount_path, volume_id in self._gen_k8s_volumes(node, nodetemplates, value, cont):
if volume_id:
res.setValue('disk.%d.image.url' % num, volume_id)
if size:
Expand Down
2 changes: 1 addition & 1 deletion test/files/tosca_add.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ topology_template:
interfaces:
Standard:
configure:
implementation: mysql/mysql_db_import.yml
implementation: https://raw.githubusercontent.com/indigo-dc/tosca-types/master/artifacts/mysql/mysql_db_import.yml
inputs:
db_name: { get_property: [ SELF, name ] }
db_data: { get_artifact: [ SELF, db_content ] }
Expand Down
2 changes: 1 addition & 1 deletion test/files/tosca_create.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ topology_template:
interfaces:
Standard:
configure:
implementation: mysql/mysql_db_import.yml
implementation: https://raw.githubusercontent.com/indigo-dc/tosca-types/master/artifacts/mysql/mysql_db_import.yml
inputs:
db_name: { get_property: [ SELF, name ] }
db_data: { get_artifact: [ SELF, db_content ] }
Expand Down
Loading