Skip to content

Commit

Permalink
refactor(userspace/libsinsp)!: clear up ownership model of thread man…
Browse files Browse the repository at this point in the history
…ager

Signed-off-by: Jason Dellaluce <[email protected]>
  • Loading branch information
jasondellaluce committed Feb 21, 2024
1 parent ed0be53 commit b0e6dda
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 80 deletions.
2 changes: 1 addition & 1 deletion userspace/libsinsp/container_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ const sinsp_container_info::container_mount_info *sinsp_container_info::mount_by

std::shared_ptr<sinsp_threadinfo> sinsp_container_info::get_tinfo(sinsp* inspector) const
{
std::shared_ptr<sinsp_threadinfo> tinfo(inspector->build_threadinfo());
std::shared_ptr<sinsp_threadinfo> tinfo(inspector->build_threadinfo().release());
tinfo->m_tid = -1;
tinfo->m_pid = -1;
tinfo->m_vtid = -2;
Expand Down
73 changes: 35 additions & 38 deletions userspace/libsinsp/parsers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1097,15 +1097,15 @@ void sinsp_parser::parse_clone_exit_caller(sinsp_evt *evt, int64_t child_tid)
/*=============================== CHILD ALREADY THERE ===========================*/

/* See if the child is already there, if yes and it is valid we return immediately */
sinsp_threadinfo* child_tinfo = m_inspector->get_thread_ref(child_tid, false, true).get();
if(child_tinfo != nullptr)
sinsp_threadinfo* existing_child_tinfo = m_inspector->get_thread_ref(child_tid, false, true).get();
if(existing_child_tinfo != nullptr)
{
/* If this was an inverted clone, all is fine, we've already taken care
* of adding the thread table entry in the child.
* Otherwise, we assume that the entry is there because we missed the proc exit event
* for a previous thread and we replace the tinfo.
*/
if(child_tinfo->m_flags & PPM_CL_CLONE_INVERTED)
if(existing_child_tinfo->m_flags & PPM_CL_CLONE_INVERTED)
{
return;
}
Expand All @@ -1125,7 +1125,7 @@ void sinsp_parser::parse_clone_exit_caller(sinsp_evt *evt, int64_t child_tid)
/* Allocate the new thread info and initialize it.
* We avoid `malloc` here and get the item from a preallocated list.
*/
child_tinfo = m_inspector->build_threadinfo();
auto child_tinfo = m_inspector->build_threadinfo();

/* Initialise last exec time to zero (can be overridden in the case of a
* thread clone)
Expand Down Expand Up @@ -1157,7 +1157,7 @@ void sinsp_parser::parse_clone_exit_caller(sinsp_evt *evt, int64_t child_tid)
if(fd_table_ptr != NULL)
{
child_tinfo->get_fdtable().clear();
fd_table_ptr->const_loop([child_tinfo](int64_t fd, const sinsp_fdinfo& info) {
fd_table_ptr->const_loop([&child_tinfo](int64_t fd, const sinsp_fdinfo& info) {
/* Track down that those are cloned fds */
auto newinfo = info.clone();
newinfo->set_is_cloned();
Expand Down Expand Up @@ -1355,7 +1355,7 @@ void sinsp_parser::parse_clone_exit_caller(sinsp_evt *evt, int64_t child_tid)
case PPME_SYSCALL_CLONE3_X:
parinfo = evt->get_param(14);
child_tinfo->set_cgroups(parinfo->m_val, parinfo->m_len);
m_inspector->m_container_manager.resolve_container(child_tinfo, m_inspector->is_live() || m_inspector->is_syscall_plugin());
m_inspector->m_container_manager.resolve_container(child_tinfo.get(), m_inspector->is_live() || m_inspector->is_syscall_plugin());
break;
}

Expand Down Expand Up @@ -1419,20 +1419,25 @@ void sinsp_parser::parse_clone_exit_caller(sinsp_evt *evt, int64_t child_tid)
/*=============================== ADD THREAD TO THE TABLE ===========================*/

/* Until we use the shared pointer we need it here, after we can move it at the end */
bool thread_added = m_inspector->add_thread(child_tinfo);
auto new_child = m_inspector->add_thread(std::move(child_tinfo));
if (!new_child)
{
// note: we expect the thread manager to log a warning already
return;
}

/* Refresh user / loginuser / group */
if(child_tinfo->m_container_id.empty() == false)
if(new_child->m_container_id.empty() == false)
{
child_tinfo->set_user(child_tinfo->m_user.uid);
child_tinfo->set_loginuser(child_tinfo->m_loginuser.uid);
child_tinfo->set_group(child_tinfo->m_group.gid);
new_child->set_user(new_child->m_user.uid);
new_child->set_loginuser(new_child->m_loginuser.uid);
new_child->set_group(new_child->m_group.gid);
}

/* If there's a listener, invoke it */
if(m_inspector->get_observer())
{
m_inspector->get_observer()->on_clone(evt, child_tinfo, tid_collision);
m_inspector->get_observer()->on_clone(evt, new_child.get(), tid_collision);
}

/* If we had to erase a previous entry for this tid and rebalance the table,
Expand All @@ -1444,14 +1449,8 @@ void sinsp_parser::parse_clone_exit_caller(sinsp_evt *evt, int64_t child_tid)
reset(evt);
DBG_SINSP_INFO("tid collision for %" PRIu64 "(%s)",
tid_collision,
child_tinfo->m_comm.c_str());
}

if(!thread_added)
{
delete child_tinfo;
new_child->m_comm.c_str());
}

/*=============================== ADD THREAD TO THE TABLE ===========================*/

return;
Expand Down Expand Up @@ -1510,7 +1509,7 @@ void sinsp_parser::parse_clone_exit_child(sinsp_evt *evt)
/* Allocate the new thread info and initialize it.
* We must avoid `malloc` here and get the item from a preallocated list.
*/
sinsp_threadinfo *child_tinfo = m_inspector->build_threadinfo();
auto child_tinfo = m_inspector->build_threadinfo();

/* Initialise last exec time to zero (can be overridden in the case of a
* thread clone)
Expand Down Expand Up @@ -1626,7 +1625,6 @@ void sinsp_parser::parse_clone_exit_child(sinsp_evt *evt)
{
/* Invalidate the thread_info associated with this event */
evt->set_tinfo(nullptr);
delete child_tinfo;
return;
}

Expand Down Expand Up @@ -1751,7 +1749,7 @@ void sinsp_parser::parse_clone_exit_child(sinsp_evt *evt)
if(fd_table_ptr != NULL)
{
child_tinfo->get_fdtable().clear();
fd_table_ptr->loop([child_tinfo](int64_t fd, const sinsp_fdinfo& info) {
fd_table_ptr->loop([&child_tinfo](int64_t fd, const sinsp_fdinfo& info) {
/* Track down that those are cloned fds.
* This flag `FLAGS_IS_CLONED` seems to be never used...
*/
Expand Down Expand Up @@ -1906,7 +1904,7 @@ void sinsp_parser::parse_clone_exit_child(sinsp_evt *evt)
case PPME_SYSCALL_CLONE3_X:
parinfo = evt->get_param(14);
child_tinfo->set_cgroups(parinfo->m_val, parinfo->m_len);
m_inspector->m_container_manager.resolve_container(child_tinfo, m_inspector->is_live());
m_inspector->m_container_manager.resolve_container(child_tinfo.get(), m_inspector->is_live());
break;
}

Expand All @@ -1933,28 +1931,34 @@ void sinsp_parser::parse_clone_exit_child(sinsp_evt *evt)
/*=============================== CREATE NEW THREAD-INFO ===========================*/

/* Add the new thread to the table */
bool thread_added = m_inspector->add_thread(child_tinfo);
auto new_child = m_inspector->add_thread(std::move(child_tinfo));
if (!new_child)
{
// note: we expect the thread manager to log a warning already
evt->set_tinfo(nullptr);
return;
}

/* Update the evt->get_tinfo() of the child.
* We update it here, in this way the `on_clone`
* callback will use updated info.
*/
evt->set_tinfo(child_tinfo);
evt->set_tinfo(new_child.get());

/* Refresh user / loginuser / group */
if(child_tinfo->m_container_id.empty() == false)
if(new_child->m_container_id.empty() == false)
{
child_tinfo->set_user(child_tinfo->m_user.uid);
child_tinfo->set_loginuser(child_tinfo->m_loginuser.uid);
child_tinfo->set_group(child_tinfo->m_group.gid);
new_child->set_user(new_child->m_user.uid);
new_child->set_loginuser(new_child->m_loginuser.uid);
new_child->set_group(new_child->m_group.gid);
}

//
// If there's a listener, invoke it
//
if(m_inspector->get_observer())
{
m_inspector->get_observer()->on_clone(evt, child_tinfo, tid_collision);
m_inspector->get_observer()->on_clone(evt, new_child.get(), tid_collision);
}

/* If we had to erase a previous entry for this tid and rebalance the table,
Expand All @@ -1966,17 +1970,10 @@ void sinsp_parser::parse_clone_exit_child(sinsp_evt *evt)
{
reset(evt);
/* Right now we have collisions only on the clone() caller */
DBG_SINSP_INFO("tid collision for %" PRIu64 "(%s)", tid_collision, child_tinfo->m_comm.c_str());
}

if(!thread_added)
{
evt->set_tinfo(nullptr);
delete child_tinfo;
DBG_SINSP_INFO("tid collision for %" PRIu64 "(%s)", tid_collision, new_child->m_comm.c_str());
}

/*=============================== CREATE NEW THREAD-INFO ===========================*/

return;
}

Expand Down
40 changes: 14 additions & 26 deletions userspace/libsinsp/sinsp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -846,29 +846,23 @@ void sinsp::on_new_entry_from_proc(void* context,
{
ASSERT(tinfo != NULL);

bool thread_added = false;
sinsp_threadinfo* newti = build_threadinfo();
threadinfo_map_t::ptr_t sinsp_tinfo;
auto newti = build_threadinfo();
newti->init(tinfo);
if(is_nodriver())
{
auto sinsp_tinfo = find_thread(tid, true);
if(sinsp_tinfo == nullptr || newti->m_clone_ts > sinsp_tinfo->m_clone_ts)
auto existing_tinfo = find_thread(tid, true);
if(existing_tinfo == nullptr || newti->m_clone_ts > existing_tinfo->m_clone_ts)
{
thread_added = m_thread_manager->add_thread(newti, true);
sinsp_tinfo = m_thread_manager->add_thread(std::move(newti), true);
}
}
else
{
thread_added = m_thread_manager->add_thread(newti, true);
sinsp_tinfo = m_thread_manager->add_thread(std::move(newti), true);
}
if (!thread_added)
if (sinsp_tinfo)
{
delete newti;
}
else
{
auto sinsp_tinfo = find_thread(tid, true);

// in case the inspector is configured with an internal filter,
// we filter out thread infos in case we determine them not passing
// the given filter. Filtered out thread infos will not be dumped
Expand Down Expand Up @@ -932,17 +926,11 @@ void sinsp::on_new_entry_from_proc(void* context,
{
ASSERT(tinfo != NULL);

sinsp_threadinfo* newti = build_threadinfo();
auto newti = build_threadinfo();
newti->init(tinfo);

if (!m_thread_manager->add_thread(newti, true)) {
ASSERT(false);
delete newti;
return;
}

sinsp_tinfo = find_thread(tid, true);
if (!sinsp_tinfo) {
sinsp_tinfo = m_thread_manager->add_thread(std::move(newti), true);
if (sinsp_tinfo == nullptr) {
ASSERT(false);
return;
}
Expand Down Expand Up @@ -1409,9 +1397,9 @@ threadinfo_map_t::ptr_t sinsp::get_thread_ref(int64_t tid, bool query_os_if_not_
return m_thread_manager->get_thread_ref(tid, query_os_if_not_found, lookup_only, main_thread);
}

bool sinsp::add_thread(const sinsp_threadinfo *ptinfo)
std::shared_ptr<sinsp_threadinfo> sinsp::add_thread(std::unique_ptr<sinsp_threadinfo> ptinfo)
{
return m_thread_manager->add_thread((sinsp_threadinfo*)ptinfo, false);
return m_thread_manager->add_thread(std::move(ptinfo), false);
}

void sinsp::remove_thread(int64_t tid)
Expand Down Expand Up @@ -2103,10 +2091,10 @@ bool sinsp_thread_manager::remove_inactive_threads()
return false;
}

sinsp_threadinfo*
std::unique_ptr<sinsp_threadinfo>
libsinsp::event_processor::build_threadinfo(sinsp* inspector)
{
return new sinsp_threadinfo(inspector);
return std::make_unique<sinsp_threadinfo>(inspector);
}

std::unique_ptr<sinsp_fdinfo>
Expand Down
6 changes: 3 additions & 3 deletions userspace/libsinsp/sinsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -526,10 +526,10 @@ class SINSP_PUBLIC sinsp : public capture_stats_source

libsinsp::event_processor* m_external_event_processor;

inline sinsp_threadinfo* build_threadinfo()
inline std::unique_ptr<sinsp_threadinfo> build_threadinfo()
{
return m_external_event_processor ? m_external_event_processor->build_threadinfo(this)
: m_thread_manager->new_threadinfo().release();
: m_thread_manager->new_threadinfo();
}

inline std::unique_ptr<sinsp_fdinfo> build_fdinfo()
Expand Down Expand Up @@ -998,7 +998,7 @@ class SINSP_PUBLIC sinsp : public capture_stats_source

bool remove_inactive_threads();

bool add_thread(const sinsp_threadinfo *ptinfo);
std::shared_ptr<sinsp_threadinfo> add_thread(std::unique_ptr<sinsp_threadinfo> ptinfo);

void set_mode(sinsp_mode_t value)
{
Expand Down
2 changes: 1 addition & 1 deletion userspace/libsinsp/sinsp_external_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class event_processor
* If this is overridden by the event processor, the processor MUST be registered
* before the sinsp object is init-ed
*/
virtual sinsp_threadinfo* build_threadinfo(sinsp* inspector);
virtual std::unique_ptr<sinsp_threadinfo> build_threadinfo(sinsp* inspector);

/**
* Some event processors allocate different fd info types with extra data.
Expand Down
16 changes: 8 additions & 8 deletions userspace/libsinsp/threadinfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1482,7 +1482,7 @@ std::unique_ptr<sinsp_fdinfo> sinsp_thread_manager::new_fdinfo() const
* 2. We are doing a proc scan with a callback or without. (`from_scap_proctable==true`)
* 3. We are trying to obtain thread info from /proc through `get_thread_ref`
*/
bool sinsp_thread_manager::add_thread(sinsp_threadinfo *threadinfo, bool from_scap_proctable)
std::shared_ptr<sinsp_threadinfo> sinsp_thread_manager::add_thread(std::unique_ptr<sinsp_threadinfo> threadinfo, bool from_scap_proctable)
{

/* We have no more space */
Expand All @@ -1500,10 +1500,10 @@ bool sinsp_thread_manager::add_thread(sinsp_threadinfo *threadinfo, bool from_sc
}
m_inspector->get_sinsp_stats_v2()->m_n_drops_full_threadtable++;
}
return false;
return nullptr;
}

auto tinfo_shared_ptr = std::shared_ptr<sinsp_threadinfo>(threadinfo);
auto tinfo_shared_ptr = std::shared_ptr<sinsp_threadinfo>(std::move(threadinfo));

if(!from_scap_proctable)
{
Expand All @@ -1520,14 +1520,14 @@ bool sinsp_thread_manager::add_thread(sinsp_threadinfo *threadinfo, bool from_sc
}

tinfo_shared_ptr->compute_program_hash();
m_threadtable.put(std::move(tinfo_shared_ptr));
m_threadtable.put(tinfo_shared_ptr);

if (m_inspector != nullptr && m_inspector->get_sinsp_stats_v2())
{
m_inspector->get_sinsp_stats_v2()->m_n_added_threads++;
}

return true;
return tinfo_shared_ptr;
}

/* Taken from `find_new_reaper` kernel function:
Expand Down Expand Up @@ -2055,7 +2055,7 @@ threadinfo_map_t::ptr_t sinsp_thread_manager::get_thread_ref(int64_t tid, bool q
scap_proc.ptid = -1;

// unfortunately, sinsp owns the threade factory
sinsp_threadinfo* newti = m_inspector->build_threadinfo();
auto newti = m_inspector->build_threadinfo();

m_n_proc_lookups++;

Expand Down Expand Up @@ -2118,8 +2118,8 @@ threadinfo_map_t::ptr_t sinsp_thread_manager::get_thread_ref(int64_t tid, bool q
//
// Done. Add the new thread to the list.
//
add_thread(newti, false);
sinsp_proc = find_thread(tid, lookup_only);
add_thread(std::move(newti), false);
sinsp_proc = find_thread(tid, lookup_only);
}

return sinsp_proc;
Expand Down
5 changes: 2 additions & 3 deletions userspace/libsinsp/threadinfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ class SINSP_PUBLIC sinsp_thread_manager: public libsinsp::state::table<int64_t>

std::unique_ptr<sinsp_fdinfo> new_fdinfo() const;

bool add_thread(sinsp_threadinfo *threadinfo, bool from_scap_proctable);
threadinfo_map_t::ptr_t add_thread(std::unique_ptr<sinsp_threadinfo> threadinfo, bool from_scap_proctable);
sinsp_threadinfo* find_new_reaper(sinsp_threadinfo*);
void remove_thread(int64_t tid);
// Returns true if the table is actually scanned
Expand Down Expand Up @@ -887,8 +887,7 @@ class SINSP_PUBLIC sinsp_thread_manager: public libsinsp::state::table<int64_t>
}
entry.release();
tinfo->m_tid = key;
add_thread(tinfo, false);
return get_entry(key);
return add_thread(std::unique_ptr<sinsp_threadinfo>(tinfo), false);
}

bool erase_entry(const int64_t& key) override
Expand Down

0 comments on commit b0e6dda

Please sign in to comment.