Skip to content

Commit

Permalink
Fix minikube service failed on python3.8+ (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
lidongze0629 authored Dec 18, 2020
1 parent e7883db commit a570a31
Showing 1 changed file with 35 additions and 40 deletions.
75 changes: 35 additions & 40 deletions python/graphscope/deploy/kubernetes/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@

import atexit
import logging
import multiprocessing
import os
import random
import re
import subprocess
import sys
import threading
import time

from kubernetes import client as kube_client
Expand Down Expand Up @@ -484,50 +484,45 @@ def _waiting_for_services_ready(self):
logger.info("Coordinator service is ready.")

def _get_minikube_service(self, namespace, service_name):
def minikube_get_service_url(queue, namespace, service_name):
try:
cmd = ["minikube", "service", service_name, "-n", namespace, "--url"]
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
for line in process.stdout:
queue.put(line.decode("utf-8"))
except Exception as e:
queue.put(str(e))

pqueue = multiprocessing.Queue()
p = multiprocessing.Process(
def minikube_get_service_url(process, rlt, messages):
for line in process.stdout:
line = line.decode("utf-8")
messages.append(line)
for match in re.finditer(self._url_pattern, line):
rlt.append(match.group())
return

cmd = ["minikube", "service", service_name, "-n", namespace, "--url"]
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
rlt = []
messages = []
t = threading.Thread(
target=minikube_get_service_url,
name="minikube_service",
args=(
pqueue,
namespace,
service_name,
process,
rlt,
messages,
),
)
p.start()
# 10 seconds is enough
p.join(10)
if p.is_alive():
p.terminate()
p.join()
minikube_service_endpoint_url = None
msgs = ""
while not pqueue.empty():
msg = pqueue.get()
msgs += msg
# check for url in string
for match in re.finditer(self._url_pattern, msg):
minikube_service_endpoint_url = match.group()
if minikube_service_endpoint_url is not None:
endpoint_match = re.search(
self._endpoint_pattern, minikube_service_endpoint_url
)
return "{}:{}".format(
endpoint_match.group("host"), endpoint_match.group("port")
t.start()
try:
# 10 seconds is enough
t.join(timeout=10)
except: # noqa: E722
pass
process.terminate()

if not rlt:
raise RuntimeError(
"Minikube get service error: {}".format("".join(messages))
)
else:
raise RuntimeError("Minikube get service error: {}".format(msgs))

endpoint_match = re.search(self._endpoint_pattern, rlt[0])
return "{}:{}".format(
endpoint_match.group("host"), endpoint_match.group("port")
)

def _get_coordinator_endpoint(self):
# Note that only support NodePort service type
Expand Down

0 comments on commit a570a31

Please sign in to comment.