Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sanity tests for parallel instance loading #6126

Merged
merged 13 commits into from
Aug 10, 2023
190 changes: 187 additions & 3 deletions qa/L0_lifecycle/lifecycle_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
sys.path.append("../common")

import concurrent.futures
import json
import os
import shutil
import signal
Expand Down Expand Up @@ -2789,7 +2790,7 @@ def test_load_gpu_limit(self):
except Exception as ex:
self.assertTrue(False, "unexpected error {}".format(ex))

def test_concurrent_load_speedup(self):
def test_concurrent_model_load_speedup(self):
# Initialize client
try:
triton_client = grpcclient.InferenceServerClient(
Expand Down Expand Up @@ -2835,7 +2836,7 @@ def test_concurrent_load_speedup(self):
for model_name in model_pair:
self.assertTrue(triton_client.is_model_ready(model_name))

def test_concurrent_load(self):
def test_concurrent_model_load(self):
# Initialize client
try:
triton_client = grpcclient.InferenceServerClient(
Expand Down Expand Up @@ -2864,7 +2865,7 @@ def test_concurrent_load(self):
model_metadata = triton_client.get_model_metadata("identity_model")
self.assertEqual(model_metadata.platform, "python")

def test_concurrent_load_unload(self):
def test_concurrent_model_load_unload(self):
# Initialize client
try:
triton_client = grpcclient.InferenceServerClient(
Expand Down Expand Up @@ -2916,6 +2917,189 @@ def test_concurrent_load_unload(self):
for model_name in model_names:
self.assertEqual(is_load, triton_client.is_model_ready(model_name))

def test_concurrent_model_instance_load_speedup(self):
# Initialize client
try:
triton_client = httpclient.InferenceServerClient(
"localhost:8000", verbose=True
)
except Exception as ex:
self.assertTrue(False, "unexpected error {}".format(ex))
models = ["identity_fp32"]
kthui marked this conversation as resolved.
Show resolved Hide resolved
# Create 2 instances which each have a delay time of 10 seconds.
num_instances = 2
instance_group = [{"kind": "KIND_CPU", "count": num_instances}]
config = {"instance_group": instance_group}
for model in models:
# Instances should be loaded concurrently for supported backends
start_time = time.time()
try:
triton_client.load_model(model, config=json.dumps(config))
except Exception as ex:
self.assertTrue(False, "unexpected error {}".format(ex))
end_time = time.time()
loading_time = end_time - start_time
print(f"Time to load {num_instances} instances: {loading_time}")

# Each of the two models has a minimum loading delay of 10 seconds
# Speedup is observed when the concurrent loading time < 20 seconds
# but use a tighter bound of 15 seconds
self.assertLess(
loading_time, 15.0, "Concurrent loading speedup not observed"
)
# Concurrent loading time cannot be < 10 seconds
self.assertGreaterEqual(
loading_time, 10.0, "Invalid concurrent loading time"
)
# Make sure the models are loaded
self.assertTrue(triton_client.is_server_live())
self.assertTrue(triton_client.is_server_ready())
self.assertTrue(triton_client.is_model_ready(model))

def _call_with_timeout(self, callable, timeout_secs):
# Setup handler for timing out call
def timeout_handler(sig, frame):
raise TimeoutError()

signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout_secs)
result = callable()
return result

def _call_with_expected_timeout(self, callable, timeout_secs=3):
# Call callable with expectation that it will timeout
try:
self._call_with_timeout(callable, timeout_secs)
except TimeoutError:
print("Inference timed out as expected.")
return
except Exception as ex:
self.assertTrue(False, "unexpected error {}".format(ex))
else:
self.assertTrue(False, "unexpected success, call should've timed out.")

def _get_fp32_io(self, client_type):
# Config
input_names = ["INPUT0", "INPUT1"]
output_names = ["OUTPUT0", "OUTPUT1"]
dtype, dims, shape = ("TYPE_FP32", [-1, 16], [1, 16])
input_config = [
{"name": name, "data_type": dtype, "dims": dims} for name in input_names
]
output_config = [
{"name": name, "data_type": dtype, "dims": dims} for name in output_names
]
# Inputs
inputs = []
for name in input_names:
inputs.append(
client_type.InferInput(name, shape, dtype.replace("TYPE_", ""))
)
inputs[-1].set_data_from_numpy(np.ones(shape, dtype=np.float32))
return input_config, output_config, inputs

def test_concurrent_model_instance_load_sanity(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my understanding. The goal of this test is to check if the included backends are capable of loading model instances concurrently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal is to check that nothing crashes and burns internally when trying to load instances in parallel for the supported backends. All it does it load N instances of various kinds, and ensure that exactly N instances are loaded (via sequence batch slot logic) and complete a successful inference (no exception is raised on the N inferences).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Open to ideas if you think there's a better way to test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the test case does not belong here, because the L0_lifecycle is for testing the correctness of the Triton core, while the correctness of the backend implementation belongs to the testing of each backend. Since the test is similar for all backends, maybe a new L0_backend_thread_safety (or something similar) can be added for this test?
@GuanLuo @tanmayv25 should correct me if you think otherwise

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kthui do you have any feedback on the test itself other than how it is named? Or do you think it is sufficient for now?

Also @Tabrizian @tanmayv25 any feedback?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering how is this testing the parallel instance feature? I see that it tests whether all the instances are ready or not but I don't see any testing in this specific case for parallel instance loading. I believe this test case would pass before this feature too. Please correct me if I'm missing anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Tabrizian you are correct that this subtest doesn't explicitly the parallelism in terms of speedup, I did that in the other subtest as it is easy to do for a python model, similar to the existing parallel model loading tests.

This sanity subtest was written as more of a sanity check that when loading a model with a significant (N=5, though upon writing I can probably increase this) instance count that (1) nothing crashes and burns, with higher instance counts it would be likely to reveal a race condition or segfault, hang, etc. if there was a bug, and (2) verify that exactly N instances (not more, not less, due to any possible race condition) do actually get loaded and can handle inference.

Upon reflection though, I can probably modify this test to do a serial load (env var) and then a parallel load and just check that the parallel load is faster at all. Initially I was afraid of this check introducing flakiness, but ideally it should consistently be faster on the types of CI systems we have with higher core counts.

Let me know if that makes sense, or if you have other ideas.

cpu, gpu = "KIND_CPU", "KIND_GPU"
default_kinds = [cpu, gpu]
backend_kinds = {"plan": [gpu], "openvino": [cpu]}
try:
client_type = httpclient
triton_client = client_type.InferenceServerClient(
"localhost:8000", verbose=True
)
except Exception as ex:
self.assertTrue(False, "unexpected error {}".format(ex))

backends = os.environ.get("PARALLEL_BACKENDS", "").split()
self.assertTrue(len(backends) > 0, "PARALLEL_BACKENDS wasn't set")

num_instances = 5
input_config, output_config, inputs = self._get_fp32_io(client_type)
for backend in backends:
model = tu.get_model_name(backend, np.float32, np.float32, np.float32)
kinds = backend_kinds.get(backend, default_kinds)
for kind in kinds:
with self.subTest(backend=backend, model=model, kind=kind):
# Setup model config
instance_group = {"kind": kind, "count": num_instances}
# Disable batching to guarantee 1 request per instance
# Configure sequence batching such that each instance cannot accept new requests
# while it is busy with an ongoing sequence. This way we can guarantee sending 1 request to each instance.
max_batch_size = 0
sequence_timeout_secs = 10
sequence_batching = {
"direct": {},
"max_sequence_idle_microseconds": sequence_timeout_secs
* 1000000,
}
config = {
"instance_group": instance_group,
"max_batch_size": max_batch_size,
"sequence_batching": sequence_batching,
"input": input_config,
"output": output_config,
}
print(
f"~~~ Backend: [{backend}], Model: [{model}], Config: [{config}] ~~~"
)
# Load the model
try:
triton_client.load_model(model, config=json.dumps(config))
except Exception as ex:
self.assertTrue(False, "unexpected error {}".format(ex))

# Make sure the model is loaded
self.assertTrue(triton_client.is_server_live())
self.assertTrue(triton_client.is_model_ready(model))
print(
"Model Repository Index after load:",
triton_client.get_model_repository_index(),
)

# Test inference on each instance
for i in range(1, num_instances + 1):
try:
triton_client.infer(
model, inputs, sequence_id=i, sequence_start=True
)
except Exception as ex:
self.assertTrue(
False, "unexpected inference error {}".format(ex)
)

# Each instance should be busy until their sequence times out, so
# an additional infer call should time out. If it doesn't time out, something
# is wrong and the test should fail.
callable = partial(
triton_client.infer,
model,
inputs,
sequence_id=num_instances + 1,
sequence_start=True,
)
self._call_with_expected_timeout(callable, timeout_secs=3)

# Unload the model
try:
triton_client.unload_model(model)
except Exception as ex:
self.assertTrue(False, "unexpected error {}".format(ex))

# Allow server to fully unload model before next test iteration
num_tries = 10
for i in range(num_tries):
if triton_client.is_server_ready():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the server become not ready when a model is unloaded? I was wondering whether we should make unload_model blocking? I think it is really confusing that it is async. (cc @kthui @GuanLuo). Also, even if it is async we should probably update this example to better demonstrate the usage: https://github.com/triton-inference-server/client/blob/f7c45d382639f31a5162666034fae7852681f082/src/python/examples/simple_grpc_model_control.py#L98-L101

Copy link
Contributor Author

@rmccorm4 rmccorm4 Aug 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the server become not ready when a model is unloaded?

Server ready is true when all models are ready for inferencing, so while a model is in the unloading state, server is considered not ready.

I was wondering whether we should make unload_model blocking? I think it is really confusing that it is async.

I have been thinking about this recently when seeing Jacky's model update tests requiring polling, Kris's BLS test unload issues, and this test as well. I second that it's definitely confusing and unintuitive.

I do believe we document that it returns without waiting for model to finish in most places. However, I still think it is unituitive and not symmetric with load_model which is blocking.

Copy link
Contributor Author

@rmccorm4 rmccorm4 Aug 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: I understand there's good reason for an async unload to allow inflight requests to finish without a long blocking/wait time, but it would be nice to have an alternative or optional blocking call to support both.

Couldn't the client also perform the blocking call asynchronously to do other work while waiting if needed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reasoning is that when unload returns, the model is no longer accessible from the client. Does it matter from the client's perspective that whether the model is fully cleaned up on server side when unload returns?

break
print(
f"[Attempt {i}] Server not ready yet, sleeping and retrying. Current repository index: {triton_client.get_model_repository_index()}"
)
time.sleep(6)
print(
"Model Repository Index after unload attempts:",
triton_client.get_model_repository_index(),
)
self.assertTrue(triton_client.is_server_ready())


if __name__ == "__main__":
unittest.main()
88 changes: 82 additions & 6 deletions qa/L0_lifecycle/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -1713,7 +1713,7 @@ wait $SERVER_PID

LOG_IDX=$((LOG_IDX+1))

# LifeCycleTest.test_concurrent_load_speedup
# LifeCycleTest.test_concurrent_model_load_speedup
rm -rf models
mkdir models
MODEL_NAME="identity_zero_1_int32"
Expand Down Expand Up @@ -1743,7 +1743,7 @@ if [ "$SERVER_PID" == "0" ]; then
fi

set +e
python $LC_TEST LifeCycleTest.test_concurrent_load_speedup >>$CLIENT_LOG 2>&1
python $LC_TEST LifeCycleTest.test_concurrent_model_load_speedup >>$CLIENT_LOG 2>&1
if [ $? -ne 0 ]; then
cat $CLIENT_LOG
echo -e "\n***\n*** Test Failed\n***"
Expand All @@ -1756,7 +1756,7 @@ wait $SERVER_PID

LOG_IDX=$((LOG_IDX+1))

# LifeCycleTest.test_concurrent_load
# LifeCycleTest.test_concurrent_model_load
rm -rf models models_v1 models_v2
mkdir models models_v2
cp -r identity_zero_1_int32 models/identity_model && \
Expand All @@ -1778,7 +1778,7 @@ if [ "$SERVER_PID" == "0" ]; then
fi

set +e
python $LC_TEST LifeCycleTest.test_concurrent_load >>$CLIENT_LOG 2>&1
python $LC_TEST LifeCycleTest.test_concurrent_model_load >>$CLIENT_LOG 2>&1
if [ $? -ne 0 ]; then
cat $CLIENT_LOG
echo -e "\n***\n*** Test Failed\n***"
Expand All @@ -1791,7 +1791,7 @@ wait $SERVER_PID

LOG_IDX=$((LOG_IDX+1))

# LifeCycleTest.test_concurrent_load_unload
# LifeCycleTest.test_concurrent_model_load_unload
rm -rf models
mkdir models
cp -r identity_zero_1_int32 models && mkdir -p models/identity_zero_1_int32/1
Expand All @@ -1813,7 +1813,7 @@ if [ "$SERVER_PID" == "0" ]; then
fi

set +e
python $LC_TEST LifeCycleTest.test_concurrent_load_unload >>$CLIENT_LOG 2>&1
python $LC_TEST LifeCycleTest.test_concurrent_model_load_unload >>$CLIENT_LOG 2>&1
if [ $? -ne 0 ]; then
cat $CLIENT_LOG
echo -e "\n***\n*** Test Failed\n***"
Expand All @@ -1824,6 +1824,82 @@ set -e
kill $SERVER_PID
wait $SERVER_PID

# LifeCycleTest.test_concurrent_model_instance_load_speedup
rm -rf models
mkdir models
MODEL_NAME="identity_fp32"
cp -r ../python_models/${MODEL_NAME} models/ && (cd models/${MODEL_NAME} && \
mkdir 1 && mv model.py 1 && \
echo " def initialize(self, args):" >> 1/model.py && \
echo " import time" >> 1/model.py && \
echo " time.sleep(10)" >> 1/model.py)
rm models/${MODEL_NAME}/config.pbtxt

SERVER_ARGS="--model-repository=`pwd`/models --model-control-mode=explicit"
SERVER_LOG="./inference_server_$LOG_IDX.log"
run_server
if [ "$SERVER_PID" == "0" ]; then
echo -e "\n***\n*** Failed to start $SERVER\n***"
cat $SERVER_LOG
exit 1
fi

set +e
python $LC_TEST LifeCycleTest.test_concurrent_model_instance_load_speedup >>$CLIENT_LOG 2>&1
if [ $? -ne 0 ]; then
cat $CLIENT_LOG
echo -e "\n***\n*** Test Failed\n***"
RET=1
fi
set -e

kill $SERVER_PID
wait $SERVER_PID

LOG_IDX=$((LOG_IDX+1))

# LifeCycleTest.test_concurrent_model_instance_load_sanity
rm -rf models
mkdir models
# Sanity check loading multiple instances in parallel for each supported backend
PARALLEL_BACKENDS="python onnx"
for backend in ${PARALLEL_BACKENDS} ; do
model="${backend}_float32_float32_float32"
model_dir="models/${model}"
if [[ $backend == "python" ]]; then
cp -r ../python_models/identity_fp32 ${model_dir}
mkdir ${model_dir}/1 && mv ${model_dir}/model.py ${model_dir}/1
rm ${model_dir}/config.pbtxt
else
mkdir models/${model}
cp -r $DATADIR/qa_model_repository/${model}/1 models/${model}/1
fi
done

SERVER_ARGS="--model-repository=`pwd`/models --model-control-mode=explicit --log-verbose=2"
SERVER_LOG="./inference_server_$LOG_IDX.log"
run_server
if [ "$SERVER_PID" == "0" ]; then
echo -e "\n***\n*** Failed to start $SERVER\n***"
cat $SERVER_LOG
exit 1
fi

set +e
PARALLEL_BACKENDS=${PARALLEL_BACKENDS} python $LC_TEST LifeCycleTest.test_concurrent_model_instance_load_sanity >>$CLIENT_LOG 2>&1
if [ $? -ne 0 ]; then
cat $CLIENT_LOG
echo -e "\n***\n*** Test Failed\n***"
RET=1
fi
set -e

kill $SERVER_PID
wait $SERVER_PID

LOG_IDX=$((LOG_IDX+1))


if [ $RET -eq 0 ]; then
echo -e "\n***\n*** Test Passed\n***"
fi
Expand Down
5 changes: 4 additions & 1 deletion qa/L0_onnx_optimization/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@ for MODEL in \
models/${MODEL}_test && \
rm -fr models/${MODEL}_test/2 && \
rm -fr models/${MODEL}_test/3 && \
# Set instance count > 1 to test parallel instance loading across all EPs
INSTANCE_COUNT=5
(cd models/${MODEL}_test && \
sed -i 's/_float32_float32_float32/&_test/' config.pbtxt) && \
sed -i 's/_float32_float32_float32/&_test/' config.pbtxt && \
echo -e "\ninstance_group { count: ${INSTANCE_COUNT} }" >> config.pbtxt) && \
# CUDA EP optimization params
cp -r models/${MODEL}_test models/${MODEL}_cuda_config && \
(cd models/${MODEL}_cuda_config && \
Expand Down
5 changes: 5 additions & 0 deletions qa/python_models/bls_model_loading/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import time
Fixed Show fixed Hide fixed
import unittest

import numpy as np
Expand All @@ -38,6 +39,10 @@ def tearDown(self):
# The unload call does not wait for the requested model to be fully
# unloaded before returning.
pb_utils.unload_model(self.model_name)
# TODO: Make this more robust to wait until fully unloaded
print("Sleep 30 seconds to make sure model finishes unloading...")
time.sleep(30)
print("Done sleeping.")

def test_load_unload_model(self):
self.assertFalse(pb_utils.is_model_ready(model_name=self.model_name))
Expand Down