Skip to content

Commit

Permalink
Merge branch 'dev' of https://github.com/milk-org/milk into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
oguyon committed Mar 3, 2024
2 parents 12ca1d6 + ac35dd2 commit 0b0efbd
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 71 deletions.
1 change: 1 addition & 0 deletions src/COREMOD_memory/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ set(INCLUDEFILES

# list scripts that should be installed on system
set(SCRIPTFILES
scripts/milk-nettransmit
scripts/milk-rmshmim
scripts/milk-shmimave
scripts/milk-shmimpurge
Expand Down
98 changes: 98 additions & 0 deletions src/COREMOD_memory/scripts/milk-nettransmit
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#!/usr/bin/env bash

MSdescr="Transmit SHM over network (TCP or UDP)"

source milk-script-std-config

TRANSMIT=0
TARGET_IP=""
STREAM_NAME=""

USE_UDP=0
PORT="1"

CPUSET="system"
RTPRIO=0

datestring="$(date -u +'%Y-%m-%dT%H:%M:%S.%NZ')"
daystring="$(date -u +'%Y%m%d')"

MSextdescr="
Transfer a SHM over the network.
Sender and receiver ends. Use TCP or UDP.
"

RequiredCommands=( milk )
RequiredFiles=( )
RequiredDirs=( )

# Options
MSarg+=( "port:int:Port number" )

MSopt+=( "s:stream:set_stream_name:stream_name[str]:Set stream name (for transmit)." )
function set_stream_name() {
STREAM_NAME="$1"
}
MSopt+=( "T:transmit:set_transmit:target_ip[str]:Set in transmit mode (default is receive)." )
function set_transmit() {
TRANSMIT=1
TARGET_IP="$1"
}
MSopt+=( "U:udp:set_udp::Use UDP (default is TCP)." )
function set_udp() {
USE_UDP=1
}
MSopt+=( "c:cset:set_cset:cset[string]:set CPUset (default is system)." )
function set_cset() {
CPUSET="$1"
}
MSopt+=( "p:prio:set_prio:prio[int]:Set RT priority [default 30]." )
function set_prio() {
RTPRIO="$1"
}

source milk-argparse
PORT="${inputMSargARRAY[0]}"

echo "STREAM_NAME : ${STREAM_NAME}"
echo "TRANSMIT : ${TRANSMIT}"
echo "UDP : ${USE_UDP}"
echo "CPUSET : ${CPUSET}"
echo "RTPRIO : ${RTPRIO}"
echo "PORT : ${PORT}"
echo "IP : ${TARGET_IP}"

if [ ${TRANSMIT} -eq 1 ]; then
if [[ "${STREAM_NAME}" == "" ]]; then
echo "ERROR: must provide -s <stream_name> for transmit [-T]."
exit 12
fi
fi

if [ ${USE_UDP} -eq 1 ]; then
milk_send_function="imudptransmit"
milk_receive_function="imudpreceive"
else
milk_send_function="imnetwtransmit"
milk_receive_function="imnetwreceive"
fi

if [ ${TRANSMIT} -eq 1 ]; then # TRANSMIT
pname="netw-transmit-${STREAM_NAME}-${PORT}"
MILK_QUIET=1 OMP_NUM_THREADS=1 milk -n ${pname} << EOF
csetpmove ${CPUSET}
rtprio ${RTPRIO}
readshmim ${STREAM_NAME}
${milk_send_function} ${STREAM_NAME} ${TARGET_IP} ${PORT} 0 ${RTPRIO}
exitCLI
EOF
else # RECEIVE
pname="netw-receive-${PORT}"
MILK_QUIET=1 OMP_NUM_THREADS=1 milk -n ${pname} << EOF
csetpmove ${CPUSET}
rtprio ${RTPRIO}
${milk_receive_function} ${PORT} 0 ${RTPRIO}
exitCLI
EOF
fi
5 changes: 4 additions & 1 deletion src/COREMOD_memory/scripts/milk-streamFITSlog
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#!/usr/bin/env bash

# This script uses milk-argparse
# See template milk-scriptexample in module milk_module_example for template and instructions

MSdescr="log stream to disk as FITS files"

source milk-script-std-config
Expand Down Expand Up @@ -30,7 +33,7 @@ cd ${milkFITSloggerdir}
MSextdescr="
Log stream to disk as FITS cubes.
Logging processes are running in tmux session ${TMUXNAME}.
Logging processes are running in tmux session ${TMUXNAME}.
User can connect to tmux session to control logging processes.
List of actions:
Expand Down
97 changes: 64 additions & 33 deletions src/COREMOD_memory/stream_UDP.c
Original file line number Diff line number Diff line change
Expand Up @@ -318,26 +318,26 @@ imageID COREMOD_MEMORY_image_NETUDPtransmit(const char *IDname,
}
ts.tv_sec += 2;

semr = ImageStreamIO_semtimedwait(data.image+ID, semtrig, &ts);
semr = ImageStreamIO_semtimedwait(data.image + ID, semtrig, &ts);

if(iter == 0)
{
processinfo_WriteMessage(processinfo, "Driving sem to 0");
printf("Driving semaphore to zero ... ");
fflush(stdout);
semval = ImageStreamIO_semvalue(data.image+ID, semtrig);
semval = ImageStreamIO_semvalue(data.image + ID, semtrig);
int semvalcnt = semval;
for(scnt = 0; scnt < semvalcnt; scnt++)
{
semval = ImageStreamIO_semvalue(data.image+ID, semtrig);
semval = ImageStreamIO_semvalue(data.image + ID, semtrig);
printf("sem = %d\n", semval);
fflush(stdout);
ImageStreamIO_semtrywait(data.image+ID, semtrig);
ImageStreamIO_semtrywait(data.image + ID, semtrig);
}
printf("done\n");
fflush(stdout);

semval = ImageStreamIO_semvalue(data.image+ID, semtrig);
semval = ImageStreamIO_semvalue(data.image + ID, semtrig);
printf("-> sem = %d\n", semval);
fflush(stdout);

Expand Down Expand Up @@ -568,7 +568,7 @@ imageID COREMOD_MEMORY_image_NETUDPreceive(
}

// create UDP socket
if((fds_server = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
if((fds_server = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0)
{
printf("ERROR creating socket\n");
if(data.processinfo == 1)
Expand Down Expand Up @@ -815,7 +815,7 @@ imageID COREMOD_MEMORY_image_NETUDPreceive(
long first_dgram_bytes = n_udp_dgrams == 1 ? last_dgram_chunk + 2 :
DGRAM_CHUNK_SIZE + 2;
long this_dgram_bytes;
int abort_frame;
int abort_frame = 1; // Initial sync


while(loopOK == 1)
Expand All @@ -838,41 +838,72 @@ imageID COREMOD_MEMORY_image_NETUDPreceive(
}
}

// Resync to a 0-zth datagram if necessary
abort_frame = 0;
for(int n_dgram_wait = 0; n_dgram_wait < MAX_DATAGRAM_WAIT; ++n_dgram_wait)
if((data.processinfo == 1) && (processinfo->MeasureTiming == 1))
{
recvsize = recvfrom(fds_server, buff_udp, first_dgram_bytes, 0,
(struct sockaddr *)&sock_client, &slen_client);
if(recvsize < 0 || n_dgram_wait == MAX_DATAGRAM_WAIT - 1)
{
printf("ERROR recvfrom`()\n");
socketOpen = 0;
break;
}
processinfo_exec_start(processinfo);
}

if(buff_udp[0] == MULTIGRAM_MAGIC && buff_udp[1] == 0)
// recvfrom should return a zero-th datagram.

if(abort_frame)
{
// Purge buffer and resync to a 0-th datagram if necessary
// This while will terminate when MSG_DONTWAIT causes a
// errno = EAGAIN / EWOULDBLOCK, meaning the queue is empty.
while(recvfrom(fds_server, buff_udp, first_dgram_bytes, MSG_DONTWAIT,
(struct sockaddr *)&sock_client, &slen_client) >= 0) {}
// Now give ourselves a chance to grab a clean 0-th datagram.
for(int n_dgram_wait = 0; n_dgram_wait < MAX_DATAGRAM_WAIT; ++n_dgram_wait)
{
memcpy(buff, buff_udp + 2, first_dgram_bytes - 2);
break;
}
}
recvsize = recvfrom(fds_server, buff_udp, first_dgram_bytes, 0,
(struct sockaddr *)&sock_client, &slen_client);
if(recvsize < 0 || n_dgram_wait == MAX_DATAGRAM_WAIT - 1)
{
printf("ERROR recvfrom() @ A [%d - %s]\n", errno, strerror(errno));
loopOK = 0;
socketOpen = 0;
break; // This should be a double break...
}


if(buff_udp[0] == MULTIGRAM_MAGIC && buff_udp[1] == 0)
{
printf("-- Resync achieved after %d datagrams.\n", n_dgram_wait);
abort_frame = 0;
break;
}
}

if((data.processinfo == 1) && (processinfo->MeasureTiming == 1))
{
processinfo_exec_start(processinfo);
}

if(recvsize == 0)
else
{
socketOpen = 0;
// Normal operation
if(recvfrom(fds_server, buff_udp, first_dgram_bytes, 0,
(struct sockaddr *)&sock_client, &slen_client) < 0)
{
printf("ERROR recvfrom() @ B [%d - %s]\n", errno, strerror(errno));
loopOK = 0;
socketOpen = 0;
break;
}
}

if(socketOpen == 1)
if(socketOpen == 1 && loopOK == 1)
{
// We already have the first datagram.
if(buff_udp[0] == MULTIGRAM_MAGIC && buff_udp[1] == 0)
{
// 0-th datagram is legit, memcpy it.
memcpy(buff, buff_udp + 2, first_dgram_bytes - 2);
}
else
{
// abort frame and go again
abort_frame = 1;
printf("Aborting frame at datagram 0.\n");
continue;
}

// Now we have the first datagram.

// Weak copy although we now have all the metadata in buff
imgmd_remote = (IMAGE_METADATA *)(ptr_buff_metadata);
Expand Down Expand Up @@ -963,10 +994,10 @@ imageID COREMOD_MEMORY_image_NETUDPreceive(
data.image[ID].md[0].cnt0++;
for(semnb = 0; semnb < data.image[ID].md[0].sem; semnb++)
{
semval = ImageStreamIO_semvalue(data.image+ID, semnb);
semval = ImageStreamIO_semvalue(data.image + ID, semnb);
if(semval < SEMAPHORE_MAXVAL)
{
ImageStreamIO_sempost(data.image+ID, semnb);
ImageStreamIO_sempost(data.image + ID, semnb);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/COREMOD_tools/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ set_property (TEST "${TESTNAME}" PROPERTY PASS_REGULAR_EXPRESSION "COREMOD_tools

# test that commands are registered

list(APPEND commandlist "tsetpmove" "tsetpmoveext" "csetpmove" "csetandprioext" "writef2file" "dispim3d" "ctsmstats")
list(APPEND commandlist "rtprio" "tsetpmove" "tsetpmoveext" "csetpmove" "csetandprioext" "writef2file" "dispim3d" "ctsmstats")

foreach(CLIcmdname IN LISTS commandlist)

Expand Down
6 changes: 2 additions & 4 deletions src/COREMOD_tools/COREMOD_tools.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ INIT_MODULE_LIB(COREMOD_tools)

static errno_t init_module_CLI()
{
mvprocTset_addCLIcmd();
mvprocTsetExt_addCLIcmd();
mvprocCPUset_addCLIcmd();
mvprocCPUsetExt_addCLIcmd();
cpuset_utils_addCLIcmd();

fileutils_addCLIcmd();
imdisplay3d_addCLIcmd();
statusstat_addCLIcmd();
Expand Down
Loading

0 comments on commit 0b0efbd

Please sign in to comment.