From 87f1ea81ccd1ce3e3b65fe9ca90029f37af24cd4 Mon Sep 17 00:00:00 2001 From: Zhiguo Wu Date: Mon, 8 Jul 2024 11:30:01 +0800 Subject: [PATCH] BIGTOP-4154: Support streaming shell output into task log --- .../manager/common/shell/ShellExecutor.java | 29 ++++++------ .../common/thread/TaskLogThreadDecorator.java | 44 +++++++++++++++++++ 2 files changed, 58 insertions(+), 15 deletions(-) create mode 100644 bigtop-manager-common/src/main/java/org/apache/bigtop/manager/common/thread/TaskLogThreadDecorator.java diff --git a/bigtop-manager-common/src/main/java/org/apache/bigtop/manager/common/shell/ShellExecutor.java b/bigtop-manager-common/src/main/java/org/apache/bigtop/manager/common/shell/ShellExecutor.java index 0da7938f3..64bd6489d 100644 --- a/bigtop-manager-common/src/main/java/org/apache/bigtop/manager/common/shell/ShellExecutor.java +++ b/bigtop-manager-common/src/main/java/org/apache/bigtop/manager/common/shell/ShellExecutor.java @@ -18,6 +18,8 @@ */ package org.apache.bigtop.manager.common.shell; +import org.apache.bigtop.manager.common.thread.TaskLogThreadDecorator; + import lombok.extern.slf4j.Slf4j; import java.io.BufferedReader; @@ -305,23 +307,20 @@ private BufferedReader createBufferedReader(InputStream inputStream) { } private Thread createReaderThread(BufferedReader reader, StringBuilder msg) { - return new Thread() { - - @Override - public void run() { - try { - String line = reader.readLine(); - while ((line != null) && !isInterrupted()) { - consumer.accept(line); - msg.append(line); - msg.append(System.lineSeparator()); - line = reader.readLine(); - } - } catch (IOException ioe) { - log.warn("Error reading the stream", ioe); + TaskLogThreadDecorator decorator = new TaskLogThreadDecorator(); + return decorator.decorate(() -> { + try { + String line = reader.readLine(); + while ((line != null)) { + consumer.accept(line); + msg.append(line); + msg.append(System.lineSeparator()); + line = reader.readLine(); } + } catch (IOException ioe) { + log.warn("Error reading the stream", ioe); } - }; + }); } private void scheduleTimeoutTimer() { diff --git a/bigtop-manager-common/src/main/java/org/apache/bigtop/manager/common/thread/TaskLogThreadDecorator.java b/bigtop-manager-common/src/main/java/org/apache/bigtop/manager/common/thread/TaskLogThreadDecorator.java new file mode 100644 index 000000000..88b335667 --- /dev/null +++ b/bigtop-manager-common/src/main/java/org/apache/bigtop/manager/common/thread/TaskLogThreadDecorator.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.common.thread; + +import org.apache.commons.lang3.StringUtils; + +import org.slf4j.MDC; + +import jakarta.annotation.Nonnull; + +public class TaskLogThreadDecorator { + + public Thread decorate(@Nonnull Runnable runnable) { + String taskId = MDC.get("taskId"); + if (StringUtils.isNotBlank(taskId)) { + return new Thread(() -> { + try { + MDC.put("taskId", taskId); + runnable.run(); + } finally { + MDC.clear(); + } + }); + } else { + return new Thread(runnable); + } + } +}