diff --git a/src/COREMOD_memory/CMakeLists.txt b/src/COREMOD_memory/CMakeLists.txt index 8680a08d..2accb9ed 100644 --- a/src/COREMOD_memory/CMakeLists.txt +++ b/src/COREMOD_memory/CMakeLists.txt @@ -122,6 +122,7 @@ set(INCLUDEFILES # list scripts that should be installed on system set(SCRIPTFILES + scripts/milk-nettransmit scripts/milk-rmshmim scripts/milk-shmimave scripts/milk-shmimpurge diff --git a/src/COREMOD_memory/scripts/milk-nettransmit b/src/COREMOD_memory/scripts/milk-nettransmit new file mode 100755 index 00000000..01c307a1 --- /dev/null +++ b/src/COREMOD_memory/scripts/milk-nettransmit @@ -0,0 +1,98 @@ +#!/usr/bin/env bash + +MSdescr="Transmit SHM over network (TCP or UDP)" + +source milk-script-std-config + +TRANSMIT=0 +TARGET_IP="" +STREAM_NAME="" + +USE_UDP=0 +PORT="1" + +CPUSET="system" +RTPRIO=0 + +datestring="$(date -u +'%Y-%m-%dT%H:%M:%S.%NZ')" +daystring="$(date -u +'%Y%m%d')" + +MSextdescr=" +Transfer a SHM over the network. +Sender and receiver ends. Use TCP or UDP. + +" + +RequiredCommands=( milk ) +RequiredFiles=( ) +RequiredDirs=( ) + +# Options +MSarg+=( "port:int:Port number" ) + +MSopt+=( "s:stream:set_stream_name:stream_name[str]:Set stream name (for transmit)." ) +function set_stream_name() { + STREAM_NAME="$1" +} +MSopt+=( "T:transmit:set_transmit:target_ip[str]:Set in transmit mode (default is receive)." ) +function set_transmit() { + TRANSMIT=1 + TARGET_IP="$1" +} +MSopt+=( "U:udp:set_udp::Use UDP (default is TCP)." ) +function set_udp() { + USE_UDP=1 +} +MSopt+=( "c:cset:set_cset:cset[string]:set CPUset (default is system)." ) +function set_cset() { + CPUSET="$1" +} +MSopt+=( "p:prio:set_prio:prio[int]:Set RT priority [default 30]." ) +function set_prio() { + RTPRIO="$1" +} + +source milk-argparse +PORT="${inputMSargARRAY[0]}" + +echo "STREAM_NAME : ${STREAM_NAME}" +echo "TRANSMIT : ${TRANSMIT}" +echo "UDP : ${USE_UDP}" +echo "CPUSET : ${CPUSET}" +echo "RTPRIO : ${RTPRIO}" +echo "PORT : ${PORT}" +echo "IP : ${TARGET_IP}" + +if [ ${TRANSMIT} -eq 1 ]; then + if [[ "${STREAM_NAME}" == "" ]]; then + echo "ERROR: must provide -s for transmit [-T]." + exit 12 + fi +fi + +if [ ${USE_UDP} -eq 1 ]; then + milk_send_function="imudptransmit" + milk_receive_function="imudpreceive" +else + milk_send_function="imnetwtransmit" + milk_receive_function="imnetwreceive" +fi + +if [ ${TRANSMIT} -eq 1 ]; then # TRANSMIT + pname="netw-transmit-${STREAM_NAME}-${PORT}" + MILK_QUIET=1 OMP_NUM_THREADS=1 milk -n ${pname} << EOF +csetpmove ${CPUSET} +rtprio ${RTPRIO} +readshmim ${STREAM_NAME} +${milk_send_function} ${STREAM_NAME} ${TARGET_IP} ${PORT} 0 ${RTPRIO} +exitCLI +EOF +else # RECEIVE + pname="netw-receive-${PORT}" + MILK_QUIET=1 OMP_NUM_THREADS=1 milk -n ${pname} << EOF +csetpmove ${CPUSET} +rtprio ${RTPRIO} +${milk_receive_function} ${PORT} 0 ${RTPRIO} +exitCLI +EOF +fi diff --git a/src/COREMOD_memory/scripts/milk-streamFITSlog b/src/COREMOD_memory/scripts/milk-streamFITSlog index 06bf2735..1b50558b 100755 --- a/src/COREMOD_memory/scripts/milk-streamFITSlog +++ b/src/COREMOD_memory/scripts/milk-streamFITSlog @@ -1,5 +1,8 @@ #!/usr/bin/env bash +# This script uses milk-argparse +# See template milk-scriptexample in module milk_module_example for template and instructions + MSdescr="log stream to disk as FITS files" source milk-script-std-config @@ -30,7 +33,7 @@ cd ${milkFITSloggerdir} MSextdescr=" Log stream to disk as FITS cubes. -Logging processes are running in tmux session ${TMUXNAME}. +Logging processes are running in tmux session ${TMUXNAME}. User can connect to tmux session to control logging processes. List of actions: diff --git a/src/COREMOD_memory/stream_UDP.c b/src/COREMOD_memory/stream_UDP.c index 6fbfa600..7903f7f4 100644 --- a/src/COREMOD_memory/stream_UDP.c +++ b/src/COREMOD_memory/stream_UDP.c @@ -318,26 +318,26 @@ imageID COREMOD_MEMORY_image_NETUDPtransmit(const char *IDname, } ts.tv_sec += 2; - semr = ImageStreamIO_semtimedwait(data.image+ID, semtrig, &ts); + semr = ImageStreamIO_semtimedwait(data.image + ID, semtrig, &ts); if(iter == 0) { processinfo_WriteMessage(processinfo, "Driving sem to 0"); printf("Driving semaphore to zero ... "); fflush(stdout); - semval = ImageStreamIO_semvalue(data.image+ID, semtrig); + semval = ImageStreamIO_semvalue(data.image + ID, semtrig); int semvalcnt = semval; for(scnt = 0; scnt < semvalcnt; scnt++) { - semval = ImageStreamIO_semvalue(data.image+ID, semtrig); + semval = ImageStreamIO_semvalue(data.image + ID, semtrig); printf("sem = %d\n", semval); fflush(stdout); - ImageStreamIO_semtrywait(data.image+ID, semtrig); + ImageStreamIO_semtrywait(data.image + ID, semtrig); } printf("done\n"); fflush(stdout); - semval = ImageStreamIO_semvalue(data.image+ID, semtrig); + semval = ImageStreamIO_semvalue(data.image + ID, semtrig); printf("-> sem = %d\n", semval); fflush(stdout); @@ -568,7 +568,7 @@ imageID COREMOD_MEMORY_image_NETUDPreceive( } // create UDP socket - if((fds_server = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) + if((fds_server = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { printf("ERROR creating socket\n"); if(data.processinfo == 1) @@ -815,7 +815,7 @@ imageID COREMOD_MEMORY_image_NETUDPreceive( long first_dgram_bytes = n_udp_dgrams == 1 ? last_dgram_chunk + 2 : DGRAM_CHUNK_SIZE + 2; long this_dgram_bytes; - int abort_frame; + int abort_frame = 1; // Initial sync while(loopOK == 1) @@ -838,41 +838,72 @@ imageID COREMOD_MEMORY_image_NETUDPreceive( } } - // Resync to a 0-zth datagram if necessary - abort_frame = 0; - for(int n_dgram_wait = 0; n_dgram_wait < MAX_DATAGRAM_WAIT; ++n_dgram_wait) + if((data.processinfo == 1) && (processinfo->MeasureTiming == 1)) { - recvsize = recvfrom(fds_server, buff_udp, first_dgram_bytes, 0, - (struct sockaddr *)&sock_client, &slen_client); - if(recvsize < 0 || n_dgram_wait == MAX_DATAGRAM_WAIT - 1) - { - printf("ERROR recvfrom`()\n"); - socketOpen = 0; - break; - } + processinfo_exec_start(processinfo); + } - if(buff_udp[0] == MULTIGRAM_MAGIC && buff_udp[1] == 0) + // recvfrom should return a zero-th datagram. + + if(abort_frame) + { + // Purge buffer and resync to a 0-th datagram if necessary + // This while will terminate when MSG_DONTWAIT causes a + // errno = EAGAIN / EWOULDBLOCK, meaning the queue is empty. + while(recvfrom(fds_server, buff_udp, first_dgram_bytes, MSG_DONTWAIT, + (struct sockaddr *)&sock_client, &slen_client) >= 0) {} + // Now give ourselves a chance to grab a clean 0-th datagram. + for(int n_dgram_wait = 0; n_dgram_wait < MAX_DATAGRAM_WAIT; ++n_dgram_wait) { - memcpy(buff, buff_udp + 2, first_dgram_bytes - 2); - break; - } - } + recvsize = recvfrom(fds_server, buff_udp, first_dgram_bytes, 0, + (struct sockaddr *)&sock_client, &slen_client); + if(recvsize < 0 || n_dgram_wait == MAX_DATAGRAM_WAIT - 1) + { + printf("ERROR recvfrom() @ A [%d - %s]\n", errno, strerror(errno)); + loopOK = 0; + socketOpen = 0; + break; // This should be a double break... + } + if(buff_udp[0] == MULTIGRAM_MAGIC && buff_udp[1] == 0) + { + printf("-- Resync achieved after %d datagrams.\n", n_dgram_wait); + abort_frame = 0; + break; + } + } - if((data.processinfo == 1) && (processinfo->MeasureTiming == 1)) - { - processinfo_exec_start(processinfo); } - - if(recvsize == 0) + else { - socketOpen = 0; + // Normal operation + if(recvfrom(fds_server, buff_udp, first_dgram_bytes, 0, + (struct sockaddr *)&sock_client, &slen_client) < 0) + { + printf("ERROR recvfrom() @ B [%d - %s]\n", errno, strerror(errno)); + loopOK = 0; + socketOpen = 0; + break; + } } - if(socketOpen == 1) + if(socketOpen == 1 && loopOK == 1) { - // We already have the first datagram. + if(buff_udp[0] == MULTIGRAM_MAGIC && buff_udp[1] == 0) + { + // 0-th datagram is legit, memcpy it. + memcpy(buff, buff_udp + 2, first_dgram_bytes - 2); + } + else + { + // abort frame and go again + abort_frame = 1; + printf("Aborting frame at datagram 0.\n"); + continue; + } + + // Now we have the first datagram. // Weak copy although we now have all the metadata in buff imgmd_remote = (IMAGE_METADATA *)(ptr_buff_metadata); @@ -963,10 +994,10 @@ imageID COREMOD_MEMORY_image_NETUDPreceive( data.image[ID].md[0].cnt0++; for(semnb = 0; semnb < data.image[ID].md[0].sem; semnb++) { - semval = ImageStreamIO_semvalue(data.image+ID, semnb); + semval = ImageStreamIO_semvalue(data.image + ID, semnb); if(semval < SEMAPHORE_MAXVAL) { - ImageStreamIO_sempost(data.image+ID, semnb); + ImageStreamIO_sempost(data.image + ID, semnb); } } diff --git a/src/COREMOD_tools/CMakeLists.txt b/src/COREMOD_tools/CMakeLists.txt index 41a88bb1..216627aa 100644 --- a/src/COREMOD_tools/CMakeLists.txt +++ b/src/COREMOD_tools/CMakeLists.txt @@ -46,7 +46,7 @@ set_property (TEST "${TESTNAME}" PROPERTY PASS_REGULAR_EXPRESSION "COREMOD_tools # test that commands are registered -list(APPEND commandlist "tsetpmove" "tsetpmoveext" "csetpmove" "csetandprioext" "writef2file" "dispim3d" "ctsmstats") +list(APPEND commandlist "rtprio" "tsetpmove" "tsetpmoveext" "csetpmove" "csetandprioext" "writef2file" "dispim3d" "ctsmstats") foreach(CLIcmdname IN LISTS commandlist) diff --git a/src/COREMOD_tools/COREMOD_tools.c b/src/COREMOD_tools/COREMOD_tools.c index ca2f47f4..9f729a05 100644 --- a/src/COREMOD_tools/COREMOD_tools.c +++ b/src/COREMOD_tools/COREMOD_tools.c @@ -23,10 +23,8 @@ INIT_MODULE_LIB(COREMOD_tools) static errno_t init_module_CLI() { - mvprocTset_addCLIcmd(); - mvprocTsetExt_addCLIcmd(); - mvprocCPUset_addCLIcmd(); - mvprocCPUsetExt_addCLIcmd(); + cpuset_utils_addCLIcmd(); + fileutils_addCLIcmd(); imdisplay3d_addCLIcmd(); statusstat_addCLIcmd(); diff --git a/src/COREMOD_tools/mvprocCPUset.c b/src/COREMOD_tools/mvprocCPUset.c index 52807c85..dcab5faf 100644 --- a/src/COREMOD_tools/mvprocCPUset.c +++ b/src/COREMOD_tools/mvprocCPUset.c @@ -8,12 +8,15 @@ // Forward declaration(s) // ========================================== +int COREMOD_TOOLS_mvProcRTPrio(const int rtprio); + int COREMOD_TOOLS_mvProcTset(const char *tsetspec); int COREMOD_TOOLS_mvProcTsetExt(const int pid, const char *tsetspec); int COREMOD_TOOLS_mvProcCPUset(const char *csetname); + int COREMOD_TOOLS_mvProcCPUsetExt(const int pid, const char *csetname, const int rtprio); @@ -22,6 +25,20 @@ int COREMOD_TOOLS_mvProcCPUsetExt(const int pid, // Command line interface wrapper function(s) // ========================================== +errno_t COREMOD_TOOLS_mvProcRTPrio_cli() +{ + if(0 + CLI_checkarg(1, CLIARG_LONG) == 0) + { + COREMOD_TOOLS_mvProcRTPrio(data.cmdargtoken[1].val.numl); + + return CLICMD_SUCCESS; + } + else + { + return CLICMD_INVALID_ARG; + } +} + errno_t COREMOD_TOOLS_mvProcTset_cli() { if(0 + CLI_checkarg(1, CLIARG_STR_NOT_IMG) == 0) @@ -87,8 +104,15 @@ errno_t COREMOD_TOOLS_mvProcCPUsetExt_cli() // Register CLI command(s) // ========================================== -errno_t mvprocTset_addCLIcmd() +errno_t cpuset_utils_addCLIcmd() { + RegisterCLIcommand("rtprio", + __FILE__, + COREMOD_TOOLS_mvProcRTPrio_cli, + "Set current process SCHED_FIFO priority", + "", + "rtprio ", + "int COREMOD_TOOLS_mvProcRTPrio(const int rtprio)"); RegisterCLIcommand("tsetpmove", __FILE__, COREMOD_TOOLS_mvProcTset_cli, @@ -96,26 +120,13 @@ errno_t mvprocTset_addCLIcmd() "", "tsetpmove realtime", "int COREMOD_TOOLS_mvProcTset(const char *tsetspec)"); - - return RETURN_SUCCESS; -} - -errno_t mvprocTsetExt_addCLIcmd() -{ - RegisterCLIcommand( - "tsetpmoveext", - __FILE__, - COREMOD_TOOLS_mvProcTsetExt_cli, - "Assign taskset for any process", - " ", - "tsetpmoveext 33659 1-5", - "int COREMOD_TOOLS_mvProcTsetExt(const int pid, const char *tsetspec)"); - - return RETURN_SUCCESS; -} - -errno_t mvprocCPUset_addCLIcmd() -{ + RegisterCLIcommand("tsetpmoveext", + __FILE__, + COREMOD_TOOLS_mvProcTsetExt_cli, + "Assign taskset for any process", + " ", + "tsetpmoveext 33659 1-5", + "int COREMOD_TOOLS_mvProcTsetExt(const int pid, const char *tsetspec)"); RegisterCLIcommand("csetpmove", __FILE__, COREMOD_TOOLS_mvProcCPUset_cli, @@ -123,12 +134,6 @@ errno_t mvprocCPUset_addCLIcmd() "", "csetpmove realtime", "int COREMOD_TOOLS_mvProcCPUset(const char *csetname)"); - - return RETURN_SUCCESS; -} - -errno_t mvprocCPUsetExt_addCLIcmd() -{ RegisterCLIcommand("csetandprioext", __FILE__, COREMOD_TOOLS_mvProcCPUsetExt_cli, @@ -142,6 +147,34 @@ errno_t mvprocCPUsetExt_addCLIcmd() return RETURN_SUCCESS; } +int COREMOD_TOOLS_mvProcRTPrio(const int rtprio) +{ + if(rtprio <= 0) + { + PRINT_WARNING("Invoking RT prio with rtprio %d <= 0; skipping.", rtprio); + return EXIT_SUCCESS; + } + + char command[200]; + + if(seteuid(data.euid) != 0 || + setuid(data.euid) != 0) // This goes up to maximum privileges + { + PRINT_ERROR("seteuid/setuid error"); + } + + sprintf(command, "chrt -f -p %d %d\n", rtprio, getpid()); + printf("Executing command: %s\n", command); + + EXECUTE_SYSTEM_COMMAND_ERRCHECK("%s", command); + + if(setresuid(data.ruid, data.ruid, data.euid) != + 0) // Go back to normal privileges + { + PRINT_ERROR("seteuid error"); + } +} + int COREMOD_TOOLS_mvProcTset(const char *tsetspec) { // Pass down to extended version and return retcode back up @@ -257,5 +290,5 @@ int COREMOD_TOOLS_mvProcCPUsetExt(const int pid, PRINT_ERROR("seteuid error"); } - return (0); + return 0; } diff --git a/src/COREMOD_tools/mvprocCPUset.h b/src/COREMOD_tools/mvprocCPUset.h index aca50217..c55fe1c1 100644 --- a/src/COREMOD_tools/mvprocCPUset.h +++ b/src/COREMOD_tools/mvprocCPUset.h @@ -2,11 +2,9 @@ * @file mvprocCPUset.h */ -errno_t mvprocTset_addCLIcmd(); -errno_t mvprocTsetExt_addCLIcmd(); -errno_t mvprocCPUset_addCLIcmd(); -errno_t mvprocCPUsetExt_addCLIcmd(); +errno_t cpuset_utils_addCLIcmd(); +int COREMOD_TOOLS_mvProcRTPrio(const int rtprio); int COREMOD_TOOLS_mvProcTset(const char *tsetspec); int COREMOD_TOOLS_mvProcTsetExt(const int pid, const char *tsetspec); int COREMOD_TOOLS_mvProcCPUset(const char *csetname);