Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cpu utilization is not up transcoding shortage is very slow #2291

Open
nocarethancare opened this issue Oct 17, 2024 · 0 comments
Open

cpu utilization is not up transcoding shortage is very slow #2291

nocarethancare opened this issue Oct 17, 2024 · 0 comments

Comments

@nocarethancare
Copy link

When I am using grabber and recorder to codec the rtmp live stream, the cpu utilization of the java program is not running up. How can I improve the CPU utilization of javacv when I am using version 1.5.10
my code :
package com.mango5g.media.cloud.service.listener;

import lombok.extern.slf4j.Slf4j;
import me.zhyd.oauth.utils.StringUtils;
import org.bytedeco.ffmpeg.global.avcodec;
import org.bytedeco.ffmpeg.global.avutil;
import org.bytedeco.javacv.*;
import org.bytedeco.opencv.global.opencv_imgcodecs;
import org.bytedeco.opencv.opencv_core.Mat;

import java.io.IOException;
import java.nio.ShortBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

@slf4j
public class StreamEngine {
private final long restartLength;
private String MAJOR_STREAM_URL;

private String READY_STREAM_URL;
private String SLAVE_STREAM_URL;
private String OUTPUT_STREAM_URL;
private FFmpegFrameRecorder recorder = null;
private volatile Boolean stopFlag = false;
private volatile Boolean allreadyFlag = false;
private String stopStr = "false";
private Boolean flag = true;
private FFmpegFrameGrabber readyGrabber;
private FFmpegFrameGrabber slaveGrabber;
private Boolean slaveFlag = false;
private static final String r1_441_25 = "/migu/nosignal/r1_441_25";
private static final String r2_441_30 = "/migu/nosignal/r2_441_30";
private static final String r3_480_25 = "/migu/nosignal/r3_480_25";
private static final String r4_480_30 = "/migu/nosignal/r4_480_30";

private FFmpegStreamer fFmpegStreamer;
private String liveName;

private String slaveStreamRtmpUrl;

private String ffmpegbin;

public boolean getAllreadyFlag() {
    return this.allreadyFlag;
}

public StreamEngine(String inputUrl, String outputUrl, String readyUrl, String readyU, String slaveU, String liveName, String ffmpegbin, Long restartLength) {
    this.MAJOR_STREAM_URL = inputUrl;
    this.OUTPUT_STREAM_URL = outputUrl;
    this.SLAVE_STREAM_URL = readyUrl;
    this.READY_STREAM_URL = readyU;
    this.liveName = liveName;
    this.ffmpegbin = ffmpegbin;
    this.restartLength = restartLength;
    if (StringUtils.isNotEmpty(this.liveName)) {
        this.slaveStreamRtmpUrl = slaveU + "/" + liveName;
    }
    log.info("=======inputUrl:{}===outputUrl:{}==readyUrl:{}===readyU:{}=========", inputUrl, outputUrl, readyUrl, readyU);
}


public void run() {
    log.info("=========liveName:{}== stopFlag:{}==hash:{}================", liveName, stopFlag, stopFlag.hashCode());

    ExecutorService executor = Executors.newSingleThreadExecutor();
    AtomicBoolean majorStreamActive = new AtomicBoolean(true);
    FFmpegFrameGrabber majorGrabber = null;
    try {
        majorGrabber = new FFmpegFrameGrabber(MAJOR_STREAM_URL);
        slaveFlag = StringUtils.isNotEmpty(SLAVE_STREAM_URL);
			 /*            // 设置帧过滤器模板,将应用于每个源
			             String filterStr = String.format("fps=fps=%d", majorGrabber.getFrameRate());
			             frameFilter = new FFmpegFrameFilter(filterStr, recorder.getImageWidth(), recorder.getImageHeight());
			             frameFilter.setPixelFormat(recorder.getPixelFormat());*/
        //  readyGrabber.setFrameRate(30);
        majorGrabber.setOption("stimeout", "5000000");
        majorGrabber.setOption("threads", "8");
        majorGrabber.start();
        log.info("========majorStream==准备就绪=====");
        if (slaveFlag) {
            fFmpegStreamer = new FFmpegStreamer();
            fFmpegStreamer.startStreaming(ffmpegbin, SLAVE_STREAM_URL, slaveStreamRtmpUrl, majorGrabber);
            Thread.sleep(2000);
            slaveGrabber = new FFmpegFrameGrabber(slaveStreamRtmpUrl);
            try {
                slaveGrabber.setOption("stimeout", "3000000");
                slaveGrabber.start();
                log.info("============用户垫片备用流连接成功========");
            } catch (FrameGrabber.Exception e) {
                slaveFlag = false;
                log.info("============用户垫片备用流不存在========");
            }
        }
        int sampleRate = majorGrabber.getSampleRate();
        double frameRate = majorGrabber.getFrameRate();
        log.info("========main:sampleRate->{}===frameRate:{}=========", sampleRate, frameRate);
        //todo 待开放
        if (sampleRate == 48000) {
            if (frameRate == 25) {
                readyGrabber = new FFmpegFrameGrabber(READY_STREAM_URL + r3_480_25);
            } else {
                readyGrabber = new FFmpegFrameGrabber(READY_STREAM_URL + r4_480_30);
            }
        } else {
            if (frameRate == 25) {
                readyGrabber = new FFmpegFrameGrabber(READY_STREAM_URL + r1_441_25);
            } else {
                readyGrabber = new FFmpegFrameGrabber(READY_STREAM_URL + r2_441_30);
            }

        }
        readyGrabber = new FFmpegFrameGrabber(READY_STREAM_URL);
        try {
            readyGrabber.setOption("stimeout", "3000000");
            readyGrabber.start();
            log.info("========readyStream连接成功==");
        } catch (Exception e) {
            log.error("========readyStream连接失败==:{}", e.getMessage());
        }
        int audioChannels = majorGrabber.getAudioChannels();
        log.info("====majorGrabber.getAudioChannels:{}", audioChannels);
        recorder = new FFmpegFrameRecorder(OUTPUT_STREAM_URL, majorGrabber.getImageWidth(), majorGrabber.getImageHeight(), audioChannels);
        //recorder.setInterleaved(true);
        recorder.setVideoOption("preset", "ultrafast - tune fastdecode");
        recorder.setOption("vsync", "1");
        recorder.setVideoOption("allow_sw", "1");
        // 关键帧间隔,可能影响启动速度和视频质量
        recorder.setGopSize(15);
        recorder.setVideoOption("tune", "zerolatency");
        //添加缓存
        recorder.setVideoOption("bufsize", "10485760");
        recorder.setVideoOption("crf", "30");
        recorder.setSampleRate(majorGrabber.getSampleRate());
        recorder.setFrameRate(majorGrabber.getFrameRate());
        recorder.setVideoBitrate(majorGrabber.getVideoBitrate());
        recorder.setFormat("flv");
        recorder.setOption("threads", "8");
        int videoCodec = majorGrabber.getVideoCodec();
        String videoCodecName = majorGrabber.getVideoCodecName();
        log.info("=====源头 编码:{}=={}==", videoCodecName, videoCodec);
        // recorder.setVideoCodec(majorGrabber.getVideoCodec());
        if (videoCodecName.equals("hevc") || videoCodec == 173) {
            recorder.setVideoCodecName("libx265"); // 使用软件编码器
            log.info("====h265 软编====");
        } else {
            recorder.setVideoCodec(majorGrabber.getVideoCodec());
        }
        int audioBitrate = majorGrabber.getAudioBitrate();
        log.info("===1majorGrabber 音频参数:Channels:{}==AudioBitrate:{}==AudioCodec:{}===", audioChannels, audioBitrate, majorGrabber.getAudioCodecName());
        recorder.setPixelFormat(avutil.AV_PIX_FMT_YUV420P);
        recorder.setAudioBitrate(audioBitrate);
        recorder.setAudioCodec(avcodec.AV_CODEC_ID_AAC);
        if (audioChannels != 0) {
            recorder.setAudioChannels(audioChannels);
            if (audioBitrate == 0) {
                audioBitrate = 128000;
            }
            recorder.setAudioBitrate(audioBitrate);
            recorder.setAudioCodec(avcodec.AV_CODEC_ID_AAC);
        }
        log.info("===2majorGrabber 音频参数:Channels:{}==AudioBitrate:{}==AudioCodec:{}===", audioChannels, audioBitrate, majorGrabber.getAudioCodecName());
        recorder.start();
        String tbn = recorder.getVideoOption("preset");
        log.info("======推流编码器已经启动==preset:{}===", tbn);
        this.Execute(majorGrabber, recorder, "major", majorStreamActive, executor);
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        try {
            log.info("===============释放资源中=============");
            if (recorder != null) {
                log.info("===============释放资源中= recorder.stop============");
                recorder.stop();
                log.info("===============释放资源中= recorder.release============");
                recorder.release();
            }
            log.info("===============释放资源中= majorGrabber.stop============");
            majorGrabber.stop();
            log.info("===============释放资源中= readyGrabber.stop============");
            readyGrabber.stop();
            log.info("===============java成功释放cv资源=============");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

private void Execute(FFmpegFrameGrabber majorGrabber, FFmpegFrameRecorder recorder, String major, AtomicBoolean majorStreamActive, ExecutorService executor) throws InterruptedException, IOException {
    if (majorStreamActive.get()) {
        //majorStream
        while (true) {
            if (stopFlag || (!majorStreamActive.get())) {
                break;
            }
            processStream(majorGrabber, "major", majorStreamActive, executor);
        }
    } else {
        //processImageAndSilentAudioStream(recorder,majorGrabber,majorStreamActive);
        //readyStream
        while (true) {

            if (stopFlag || (majorStreamActive.get())) {
                break;
            }
            if (slaveFlag) {
                processStream(slaveGrabber, "slave", majorStreamActive, executor);
            } else {
                processStream(readyGrabber, "ready", majorStreamActive, executor);
            }
        }
    }
    if (!stopFlag) {
        Execute(majorGrabber, recorder, "major", majorStreamActive, executor);
    }
}

private void processStream(FFmpegFrameGrabber grabber, String streamType, AtomicBoolean majorStreamActive, ExecutorService executor) {
    try {
        Frame frame = grabber.grabFrame();
        if (frame == null) {
            throw new Exception("========Failed to grab frame from " + streamType + " stream");
        }
        this.allreadyFlag = true;
        if (streamType.equals("slave")) {
            log.info("==slave===:{}", frame);
        }
        recorder.record(frame);
    } catch (Exception e) {
        log.error("========Error recording frame from " + streamType + " stream:{}", e.getMessage());
        flag = true;
        if (e.getMessage().contains("-32")) {
            try {
                recorder.stop();
                recorder.start();
            } catch (Exception ex) {
                log.info("==restart recorder stop error===:{}", ex.getMessage());
                recorder = null;
                stopFlag = true;
                stopStr = "true";
            }
        }
        if ("major".equals(streamType)) {
            log.info("========Major stream failed, attempting to restart...");
            if (slaveFlag) {
                log.info("======slaveGrabber.restart start===");
                try {
                    slaveGrabber.restart();
                } catch (Exception ex) {
                    log.info("==restart slave stream error===:{}", ex.getMessage());
                }
                log.info("======slaveGrabber.restart end===");
            } else {
                new Thread(() -> {
                    try {
                        log.info("======readyGrabber.restart start===");
                        readyGrabber.restart();
                        log.info("======readyGrabber.restart end===");
                    } catch (Exception ex) {
                        log.info("==restart ready stream error===:{}", ex.getMessage());
                    }
                }).start();
            }
            majorStreamActive.set(false);
            restartMajorStream(grabber, majorStreamActive, executor, slaveFlag);
        } else {
            if (slaveFlag) {
                log.info("======slaveGrabber.restart start===");
                try {
                    slaveGrabber.restart();
                } catch (Exception ex) {
                    log.info("==restart slave stream error===:{}", ex.getMessage());
                }
                log.info("======slaveGrabber.restart end===");
            } else {
                new Thread(() -> {
                    try {
                        log.info("======readyGrabber.restart start===");
                        readyGrabber.restart();
                        log.info("======readyGrabber.restart end===");
                    } catch (Exception ex) {
                        log.info("==restart ready stream error===:{}", ex.getMessage());
                    }
                }).start();
            }
        }
    }
}

private long restartTime_start = 0;

private void restartMajorStream(FFmpegFrameGrabber grabber, AtomicBoolean active, ExecutorService executor, boolean slaveFlag) {
    executor.submit(() -> {
        try {
            if (restartTime_start == 0) {
                restartTime_start = System.currentTimeMillis();
            }
            grabber.restart();
            active.set(true);
            flag = true;
            restartTime_start = 0;
            Thread.sleep(1000);
            // readyGrabber.restart();
            log.info("========Major stream restarted successfully.");
        } catch (Exception e) {
            log.info("========Failed to restart major stream, will try again...");
            log.info("=========liveName:{}== stopFlag:{}==hash:{}================", liveName, stopFlag, stopFlag.hashCode());
            if ((!slaveFlag) && (System.currentTimeMillis() - restartTime_start) > restartLength) {
                log.info("======major stream restart for whole day ====");
                log.info("======now to stop ====");
                return;
            }
            if (this.stopFlag || stopStr.equals("true")) {
                return;
            }
            restartMajorStream(grabber, active, executor, slaveFlag);
        }
    });
}


private void processImageAndSilentAudioStream(FFmpegFrameRecorder recorder, FFmpegFrameGrabber grabber, AtomicBoolean majorStreamActive) throws InterruptedException, IOException {
    // 加载静态图片
    String imageFile = "/Users/nocare/Desktop/imgtestpro/J48HSW25_511_1709715180016.jpeg";
    // 创建视频帧的转换器
    OpenCVFrameConverter.ToMat converter = new OpenCVFrameConverter.ToMat();

    // 读取图片转换为视频帧
    Mat image = opencv_imgcodecs.imread(imageFile);
    Frame frame = converter.convert(image);

    //视频
    new Thread(() -> {
        while (!majorStreamActive.get()) {
            if (stopFlag) break;
            try {
                long s = System.currentTimeMillis();
                log.info("=========video=======");
                log.info("=========video ok=======");
                long e = System.currentTimeMillis();
                Thread.sleep(40); // 按帧率等待
                log.info("Using frame from ready stream (static image) initTime:" + (e - s));
                recorder.record(frame);
                log.info("=======recorder.timestamp:{}==", recorder.getTimestamp());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }).start();
    //音频
    new Thread(() -> {
        ShortBuffer sBuf = ShortBuffer.allocate(1024);
        sBuf.clear();
        while (!majorStreamActive.get()) {
            if (stopFlag) break;
            try {
                long s = System.currentTimeMillis();
                log.info("=========audio=======");
                log.info("=========audio  ok=======");
                long e = System.currentTimeMillis();
                Thread.sleep(23); // 按帧率等待
                log.info("Using frame from ready stream (silent audio) initTime:" + (e - s));
                sBuf.flip();
                recorder.recordSamples(44100, 2, sBuf);
                log.info("=======recorder.timestamp:{}==", recorder.getTimestamp());
                sBuf.clear();

                // recorder.record(silentAudioFrame);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }).start();

}

public Frame createSilentAudioFrame(int sampleRate, int numChannels, int frameSize) {
    Frame silentFrame = new Frame();
    short[] audioData = new short[frameSize]; // 音频缓冲区,全部初始化为0
    silentFrame.samples = new ShortBuffer[]{ShortBuffer.wrap(audioData)};
    silentFrame.sampleRate = sampleRate;
    silentFrame.audioChannels = numChannels;
			       /*  // 创建一个短整型缓冲区,用于存储静默音频数据
			         ShortBuffer silentBuffer = ShortBuffer.allocate(frameSize * numChannels);

			         // 填充缓冲区为零,表示静默
			         for (int i = 0; i < silentBuffer.capacity(); i++) {
			             silentBuffer.put((short) 0);
			         }
			         silentBuffer.rewind();

			         // 创建一个音频帧,并设置相应的属性
			         Frame audioFrame = new Frame();
			         audioFrame.sampleRate = sampleRate;
			         audioFrame.audioChannels = numChannels;
			         audioFrame.samples = new ShortBuffer[]{silentBuffer};*/

    return silentFrame;
}

public void stop() {
    this.stopFlag = true;
    stopStr = "true";
    log.info("============停止 落地任务 stopFlag:{}============", stopFlag);
    if (fFmpegStreamer != null) {
        fFmpegStreamer.stopStreaming();
    }
}

}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants