Skip to content

Commit

Permalink
Merge pull request robotology#3147 from traversaro/fixshmemunixstream…
Browse files Browse the repository at this point in the history
…carriersfornwc

Fix use of shmem and unix_stream carriers in remote_controlboard and multipleanalogsensorclient network wrapper clients
  • Loading branch information
randaz81 authored Nov 12, 2024
2 parents 1e4c5ee + da454ae commit 91decec
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 2 deletions.
7 changes: 6 additions & 1 deletion doc/release/master.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ New Features
* Removed h264 Carrier
* Added gstreamer carrier with extended functionalities.
* Added new gstreamers plugins: yarpvideosource, yarpvidepassthrough, yarpvideosink
* Fixed segfault on disconnection with shmem carrier.

### Devices

Expand Down Expand Up @@ -98,7 +99,11 @@ New Features

#### FakePythonSpeechTranscription

* Added new device `FakePythonSpeechTranscription`. The device is also an example which demonstrates the encapsulation of python code inside a c++ device implementing a Yarp interface.
* Added new device `FakePythonSpeechTranscription`. The device is also an example which demonstrates the encapsulation of python code inside a c++ device implementing a Yarp interface.

#### multipleanalogsensorsclient

* Always establish the `rpc` connection with the `tcp` carrier, instead of using the `carrier` option as done in YARP <= 3.9 .

### GUIs

Expand Down
10 changes: 10 additions & 0 deletions src/carriers/shmem_carrier/ShmemInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ bool ShmemInputStreamImpl::isOk() const

bool ShmemInputStreamImpl::open(int port, ACE_SOCK_Stream* pSock, int size)
{
std::lock_guard<std::recursive_mutex> l_guard(m_generalMutex);

m_pSock = pSock;

m_pAccessMutex = m_pWaitDataMutex = nullptr;
Expand Down Expand Up @@ -102,6 +104,8 @@ bool ShmemInputStreamImpl::open(int port, ACE_SOCK_Stream* pSock, int size)

bool ShmemInputStreamImpl::Resize()
{
std::lock_guard<std::recursive_mutex> l_guard(m_generalMutex);

++m_ResizeNum;

ACE_Shared_Memory* pNewMap;
Expand Down Expand Up @@ -155,6 +159,8 @@ bool ShmemInputStreamImpl::Resize()

int ShmemInputStreamImpl::read(char* data, int len)
{
std::lock_guard<std::recursive_mutex> l_guard(m_generalMutex);

m_pAccessMutex->acquire();

if (m_pHeader->close) {
Expand Down Expand Up @@ -193,6 +199,8 @@ int ShmemInputStreamImpl::read(char* data, int len)

yarp::conf::ssize_t ShmemInputStreamImpl::read(yarp::os::Bytes& b)
{
std::lock_guard<std::recursive_mutex> l_guard(m_generalMutex);

m_ReadSerializerMutex.lock();

if (!m_bOpen) {
Expand Down Expand Up @@ -230,6 +238,8 @@ yarp::conf::ssize_t ShmemInputStreamImpl::read(yarp::os::Bytes& b)

void ShmemInputStreamImpl::close()
{
std::lock_guard<std::recursive_mutex> l_guard(m_generalMutex);

if (!m_bOpen) {
return;
}
Expand Down
3 changes: 3 additions & 0 deletions src/carriers/shmem_carrier/ShmemInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ class ShmemInputStreamImpl

std::mutex m_ReadSerializerMutex;

// Mutex for the whole class, to avoid concurrent close and read
std::recursive_mutex m_generalMutex;

ACE_Shared_Memory* m_pMap;
char* m_pData;
ShmemHeader_t* m_pHeader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ bool MultipleAnalogSensorsClient::open(yarp::os::Searchable& config)

// Connect ports
if (!m_externalConnection) {
ok = yarp::os::Network::connect(localRPCPortName, remoteRPCPortName, m_carrier);
// RPC port needs to be tcp, therefore no carrier option is added here
ok = yarp::os::Network::connect(localRPCPortName, remoteRPCPortName);
if (!ok) {
yCError(MULTIPLEANALOGSENSORSCLIENT,
"Failure connecting port %s to %s.",
Expand Down

0 comments on commit 91decec

Please sign in to comment.