Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

i#7157 dyn inject: Improve ord tracking with synthetic records #7299

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
9 changes: 6 additions & 3 deletions clients/drcachesim/scheduler/scheduler_dynamic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,8 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::check_for_input_switch(
// boundaries so we live with those being before the switch.
// XXX: Once we insert kernel traces, we may have to try harder
// to stop before the post-syscall records.
if (this->record_type_is_instr_boundary(record, outputs_[output].last_record) &&
if (this->record_type_is_instr_boundary(record,
outputs_[output].last_record.record) &&
// We want to delay the context switch until after the injected syscall trace.
!outputs_[output].in_syscall_code) {
if (input->switch_to_input != sched_type_t::INVALID_INPUT_ORDINAL) {
Expand Down Expand Up @@ -507,7 +508,8 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::check_for_input_switch(
this->process_marker(*input, output, marker_type, marker_value);
}
if (options_.quantum_unit == sched_type_t::QUANTUM_INSTRUCTIONS &&
this->record_type_is_instr_boundary(record, outputs_[output].last_record) &&
this->record_type_is_instr_boundary(record,
outputs_[output].last_record.record) &&
!outputs_[output].in_context_switch_code) {
++input->instrs_in_quantum;
if (input->instrs_in_quantum > options_.quantum_duration_instrs) {
Expand Down Expand Up @@ -546,7 +548,8 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::check_for_input_switch(
// We only switch on instruction boundaries. We could possibly switch
// in between (e.g., scatter/gather long sequence of reads/writes) by
// setting input->switching_pre_instruction.
this->record_type_is_instr_boundary(record, outputs_[output].last_record)) {
this->record_type_is_instr_boundary(record,
outputs_[output].last_record.record)) {
if (outputs_[output].in_syscall_code) {
// XXX: Maybe this should be printed only once per-syscall-instance to
// reduce log spam.
Expand Down
105 changes: 72 additions & 33 deletions clients/drcachesim/scheduler/scheduler_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ typedef dynamorio::drmemtrace::record_file_reader_t<std::ifstream>
default_record_file_reader_t;
#endif

static constexpr bool IS_REAL = true;
static constexpr bool IS_SYNTHETIC = false;

std::string
replay_file_checker_t::check(archive_istream_t *infile)
{
Expand Down Expand Up @@ -564,8 +567,8 @@ scheduler_impl_tmpl_t<trace_entry_t, record_reader_t>::insert_switch_tid_pid(
tid.size = 0;
tid.addr = static_cast<addr_t>(input.tid);

input.queue.push_front(pid);
input.queue.push_front(tid);
input.queue.emplace_front(pid, IS_SYNTHETIC);
input.queue.emplace_front(tid, IS_SYNTHETIC);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm so this doesn't need to update real_records_in_queue b/c these are synthetic.

Maybe a helper is needed so adding goes through central places so no addition overlooks updating real_records_in_queue? I'm afraid someone will see this code and add a real record to the queue here and not update the counter b/c the existing code doesn't update it. Or just adding a comment here saying we don't need to increment is enough?

}

/***************************************************************************
Expand Down Expand Up @@ -656,14 +659,18 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::~scheduler_impl_tmpl_t()
outputs_[i].stats[memtrace_stream_t::SCHED_STAT_RUNQUEUE_STEALS]);
VPRINT(this, 1, " %-35s: %9" PRId64 "\n", "Runqueue rebalances",
outputs_[i].stats[memtrace_stream_t::SCHED_STAT_RUNQUEUE_REBALANCES]);
VPRINT(this, 1, " %-35s: %9" PRId64 "\n", "Ouput limits hit",
VPRINT(this, 1, " %-35s: %9" PRId64 "\n", "Output limits hit",
outputs_[i].stats[memtrace_stream_t::SCHED_STAT_HIT_OUTPUT_LIMIT]);
#ifndef NDEBUG
VPRINT(this, 1, " %-35s: %9" PRId64 "\n", "Runqueue lock acquired",
outputs_[i].ready_queue.lock->get_count_acquired());
VPRINT(this, 1, " %-35s: %9" PRId64 "\n", "Runqueue lock contended",
outputs_[i].ready_queue.lock->get_count_contended());
#endif
VPRINT(
this, 1, " %-35s: %9" PRId64 "\n", "Kernel switch sequence injections",
outputs_[i]
.stats[memtrace_stream_t::SCHED_STAT_KERNEL_SWITCH_SEQUENCE_INJECTIONS]);
}
}

Expand Down Expand Up @@ -850,7 +857,8 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::init(
? spec_type_t::USE_NOPS
// TODO i#5843: Add more flags for other options.
: spec_type_t::LAST_FROM_TRACE,
static_cast<int>(get_time_micros()), create_invalid_record(), verbosity_);
static_cast<int>(get_time_micros()),
cached_record_t(create_invalid_record(), IS_SYNTHETIC), verbosity_);
if (options_.single_lockstep_output)
outputs_.back().stream = global_stream_.get();
if (options_.schedule_record_ostream != nullptr) {
Expand Down Expand Up @@ -1523,8 +1531,9 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::get_initial_input_content(
// Maybe we should disallow it so we don't need checks like this?
options_.mapping != sched_type_t::MAP_AS_PREVIOUSLY) {
RecordType record = create_invalid_record();
bool is_record_real = false;
stream_status_t res =
advance_region_of_interest(/*output=*/-1, record, input);
advance_region_of_interest(/*output=*/-1, record, input, is_record_real);
if (res == sched_type_t::STATUS_SKIPPED) {
input.next_timestamp =
static_cast<uintptr_t>(input.reader->get_last_timestamp());
Expand All @@ -1548,7 +1557,7 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::get_initial_input_content(
// the non-consuming queue loop vs the consuming and queue-pushback
// reader loop.
for (const auto &record : input.queue) {
if (!process_next_initial_record(input, record, found_filetype,
if (!process_next_initial_record(input, record.record, found_filetype,
found_timestamp))
break;
}
Expand Down Expand Up @@ -1592,7 +1601,8 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::get_initial_input_content(
// we skip (see skip_instructions()). Thus, we abort with an error.
if (record_type_is_instr(record))
break;
input.queue.push_back(record);
input.queue.emplace_back(record, IS_REAL);
++input.real_records_in_queue;
++(*input.reader);
}
}
Expand Down Expand Up @@ -1632,7 +1642,8 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::open_reader(
RecordType record = **reader;
if (record_type_has_tid(record, tid))
break;
input.queue.push_back(record);
input.queue.emplace_back(record, IS_REAL);
++input.real_records_in_queue;
++(*reader);
}
if (tid == INVALID_THREAD_ID) {
Expand Down Expand Up @@ -1821,10 +1832,14 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::get_input_record_ordinal(
return 0;
uint64_t ord = inputs_[index].reader->get_record_ordinal();
if (get_instr_ordinal(inputs_[index]) == 0) {
uint64_t adjust =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this should go after the comment on lines 1839-1841.

inputs_[index].cur_from_queue && inputs_[index].is_cur_record_real ? 1 : 0;
adjust += inputs_[index].real_records_in_queue;
assert(ord >= adjust);
// Account for get_initial_input_content() readahead for filetype/timestamp.
// If this gets any more complex, the scheduler stream should track its
// own counts for every input and just ignore the input stream's tracking.
ord -= inputs_[index].queue.size() + (inputs_[index].cur_from_queue ? 1 : 0);
ord -= adjust;
}
return ord;
}
Expand Down Expand Up @@ -1852,7 +1867,8 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::get_input_first_timestamp(
return 0;
uint64_t res = inputs_[index].reader->get_first_timestamp();
if (get_instr_ordinal(inputs_[index]) == 0 &&
(!inputs_[index].queue.empty() || inputs_[index].cur_from_queue)) {
(inputs_[index].real_records_in_queue > 0 ||
(inputs_[index].cur_from_queue && inputs_[index].is_cur_record_real))) {
// Account for get_initial_input_content() readahead for filetype/timestamp.
res = 0;
}
Expand All @@ -1871,7 +1887,8 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::get_input_last_timestamp(
return 0;
uint64_t res = inputs_[index].reader->get_last_timestamp();
if (get_instr_ordinal(inputs_[index]) == 0 &&
(!inputs_[index].queue.empty() || inputs_[index].cur_from_queue)) {
(inputs_[index].real_records_in_queue > 0 ||
(inputs_[index].cur_from_queue && inputs_[index].is_cur_record_real))) {
// Account for get_initial_input_content() readahead for filetype/timestamp.
res = 0;
}
Expand All @@ -1881,7 +1898,8 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::get_input_last_timestamp(
template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_impl_tmpl_t<RecordType, ReaderType>::advance_region_of_interest(
output_ordinal_t output, RecordType &record, input_info_t &input)
output_ordinal_t output, RecordType &record, input_info_t &input,
bool &is_record_real)
{
assert(input.lock->owned_by_cur_thread());
uint64_t cur_instr = get_instr_ordinal(input);
Expand Down Expand Up @@ -1913,7 +1931,7 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::advance_region_of_interest(
if (status != sched_type_t::STATUS_OK)
return status;
}
input.queue.push_back(create_thread_exit(input.tid));
input.queue.push_back({ create_thread_exit(input.tid), IS_SYNTHETIC });
stream_status_t status = mark_input_eof(input);
// For early EOF we still need our synthetic exit so do not return it yet.
if (status != sched_type_t::STATUS_OK &&
Expand All @@ -1932,8 +1950,9 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::advance_region_of_interest(
if (input.cur_region > 0) {
VPRINT(this, 3, "skip_instructions input=%d: inserting separator marker\n",
input.index);
input.queue.push_back(record);
input.queue.emplace_back(record, is_record_real);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this need to update the count if is_record_real? That helper is looking better now.

record = create_region_separator_marker(input.tid, input.cur_region);
is_record_real = false;
}
return sched_type_t::STATUS_OK;
}
Expand Down Expand Up @@ -2011,11 +2030,12 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::clear_input_queue(input_info_t &i
int i = 0;
while (!input.queue.empty()) {
assert(i == 0 ||
(!record_type_is_instr(input.queue.front()) &&
!record_type_is_encoding(input.queue.front())));
(!record_type_is_instr(input.queue.front().record) &&
!record_type_is_encoding(input.queue.front().record)));
++i;
input.queue.pop_front();
}
input.real_records_in_queue = 0;
}

template <typename RecordType, typename ReaderType>
Expand All @@ -2032,8 +2052,8 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::skip_instructions(input_info_t &i
// For a skip of 0 we still need to clear non-instrs from the queue, but
// should not have an instr in there.
assert(skip_amount > 0 || input.queue.empty() ||
(!record_type_is_instr(input.queue.front()) &&
!record_type_is_encoding(input.queue.front())));
(!record_type_is_instr(input.queue.front().record) &&
!record_type_is_encoding(input.queue.front().record)));
clear_input_queue(input);
input.reader->skip_instructions(skip_amount);
VPRINT(this, 3, "skip_instructions: input=%d amount=%" PRIu64 "\n", input.index,
Expand Down Expand Up @@ -2072,7 +2092,8 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::skip_instructions(input_info_t &i
VPRINT(this, 3, "skip_instructions input=%d: inserting separator marker\n",
input.index);
input.queue.push_back(
create_region_separator_marker(input.tid, input.cur_region));
{ create_region_separator_marker(input.tid, input.cur_region),
IS_SYNTHETIC });
}
return sched_type_t::STATUS_SKIPPED;
}
Expand Down Expand Up @@ -2446,7 +2467,7 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::on_context_switch(
--i) {
RecordType record = switch_sequence_[switch_type][i];
record_type_set_tid(record, inputs_[new_input].tid);
inputs_[new_input].queue.emplace_front(record);
inputs_[new_input].queue.emplace_front(record, IS_SYNTHETIC);
}
VPRINT(this, 3, "Inserted %zu switch for type %d from %d.%d to %d.%d\n",
switch_sequence_[switch_type].size(), switch_type,
Expand Down Expand Up @@ -2536,6 +2557,7 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t outp
}
while (true) {
input->cur_from_queue = false;
input->is_cur_record_real = false;
if (input->needs_init) {
// We pay the cost of this conditional to support ipc_reader_t::init() which
// blocks and must be called right before reading its first record.
Expand All @@ -2547,9 +2569,12 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t outp
input->needs_init = false;
}
if (!input->queue.empty()) {
record = input->queue.front();
input->queue.pop_front();
record = input->queue.front().record;
input->cur_from_queue = true;
input->is_cur_record_real = input->queue.front().is_real;
input->queue.pop_front();
if (input->is_cur_record_real)
--input->real_records_in_queue;
} else {
// We again have a flag check because reader_t::init() does an initial ++
// and so we want to skip that on the first record but perform a ++ prior
Expand Down Expand Up @@ -2582,12 +2607,17 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t outp
continue;
} else {
record = **input->reader;
input->is_cur_record_real = true;
}
}
VPRINT(this, 5,
"next_record[%d]: candidate record from %d (@%" PRId64 "): ", output,
input->index, get_instr_ordinal(*input));
if (input->instrs_pre_read > 0 && record_type_is_instr(record))
// FIXME: This is likely too premature; we should either move it to later,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an issue #?

// or undo the decrement for cases we don't actually end up returning the
// pre-read instruction to the caller.
if (input->instrs_pre_read > 0 && input->is_cur_record_real &&
record_type_is_instr(record))
--input->instrs_pre_read;
VDO(this, 5, print_record(record););
bool need_new_input = false;
Expand All @@ -2605,7 +2635,9 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t outp
// We have to put the candidate record in the queue before we release
// the lock since another output may grab this input.
VPRINT(this, 5, "next_record[%d]: queuing candidate record\n", output);
input->queue.push_back(record);
input->queue.emplace_back(record, input->is_cur_record_real);
if (input->is_cur_record_real)
++input->real_records_in_queue;
lock.unlock();
res = pick_next_input(output, blocked_time);
if (res != sched_type_t::STATUS_OK && res != sched_type_t::STATUS_WAIT &&
Expand All @@ -2629,8 +2661,8 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t outp
// we've already reset to 0.
if (!preempt && options_.mapping == sched_type_t::MAP_TO_ANY_OUTPUT) {
if (options_.quantum_unit == sched_type_t::QUANTUM_INSTRUCTIONS &&
record_type_is_instr_boundary(record,
outputs_[output].last_record)) {
record_type_is_instr_boundary(
record, outputs_[output].last_record.record)) {
assert(inputs_[prev_input].instrs_in_quantum > 0);
--inputs_[prev_input].instrs_in_quantum;
} else if (options_.quantum_unit == sched_type_t::QUANTUM_TIME) {
Expand All @@ -2649,7 +2681,10 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t outp
lock.lock();
if (res != sched_type_t::STATUS_SKIPPED) {
// Get our candidate record back.
record = input->queue.back();
record = input->queue.back().record;
input->is_cur_record_real = input->queue.back().is_real;
if (input->is_cur_record_real)
--input->real_records_in_queue;
input->queue.pop_back();
}
}
Expand All @@ -2662,7 +2697,8 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t outp
if (input->needs_roi && options_.mapping != sched_type_t::MAP_AS_PREVIOUSLY &&
!input->regions_of_interest.empty()) {
input_ordinal_t prev_input = input->index;
res = advance_region_of_interest(output, record, *input);
res = advance_region_of_interest(output, record, *input,
input->is_cur_record_real);
if (res == sched_type_t::STATUS_SKIPPED) {
// We need either the queue or to re-de-ref the reader so we loop,
// but we do not want to come back here.
Expand All @@ -2689,7 +2725,7 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t outp
update_next_record(output, record);
VDO(this, 4, print_record(record););

outputs_[output].last_record = record;
outputs_[output].last_record = { record, input->is_cur_record_real };
record_type_has_tid(record, input->last_record_tid);
record_type_has_pid(record, input->pid);
return sched_type_t::STATUS_OK;
Expand Down Expand Up @@ -2727,6 +2763,9 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::update_next_record(output_ordinal
VPRINT(this, 2, "output %d base timestamp = %zu\n", output,
outputs_[output].base_timestamp);
}
// FIXME: When USE_INPUT_ORDINALS is enabled, this returns the input-local
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my head USE_INPUT_ORDINALS is never set for a dynamic schedule: but I actually don't see the docs or code disallowing that. I think we should disallow it. Please list that as the preferred solution; maybe that downgrades this to XXX as no known use case would get here.

// instruction ordinal (which not only is not global, but also counts the
// read-ahead instructions).
uint64_t instr_ord = outputs_[output].stream->get_instruction_ordinal();
uint64_t idle_count = outputs_[output].idle_count;
uintptr_t new_time = static_cast<uintptr_t>(
Expand Down Expand Up @@ -2756,11 +2795,11 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::unread_last_record(output_ordinal
input_info_t *&input)
{
auto &outinfo = outputs_[output];
if (record_type_is_invalid(outinfo.last_record))
if (record_type_is_invalid(outinfo.last_record.record))
return sched_type_t::STATUS_INVALID;
if (!outinfo.speculation_stack.empty())
return sched_type_t::STATUS_INVALID;
record = outinfo.last_record;
record = outinfo.last_record.record;
input = &inputs_[outinfo.cur_input];
std::lock_guard<mutex_dbg_owned> lock(*input->lock);
VPRINT(this, 4, "next_record[%d]: unreading last record, from %d\n", output,
Expand All @@ -2772,7 +2811,7 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::unread_last_record(output_ordinal
if (options_.quantum_unit == sched_type_t::QUANTUM_INSTRUCTIONS &&
record_type_is_instr(record))
--input->instrs_in_quantum;
outinfo.last_record = create_invalid_record();
outinfo.last_record = { create_invalid_record(), IS_SYNTHETIC };
return sched_type_t::STATUS_OK;
}

Expand All @@ -2784,7 +2823,7 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::start_speculation(
auto &outinfo = outputs_[output];
if (outinfo.speculation_stack.empty()) {
if (queue_current_record) {
if (record_type_is_invalid(outinfo.last_record))
if (record_type_is_invalid(outinfo.last_record.record))
return sched_type_t::STATUS_INVALID;
inputs_[outinfo.cur_input].queue.push_back(outinfo.last_record);
}
Expand Down
Loading
Loading