diff --git a/bessctl/commands.py b/bessctl/commands.py index 60afa1571..a2d1939d8 100644 --- a/bessctl/commands.py +++ b/bessctl/commands.py @@ -47,7 +47,7 @@ import traceback import tempfile import signal -import collections +from collections import * import contextlib import sugar @@ -808,7 +808,7 @@ class Foo(Module): # using the same name twice as-module class or port-driver. # But the C++ code does not have the restriction on using # Foo() as *both* module *and* port-driver. - counts = collections.Counter(rsvd.keys()) + counts = Counter(rsvd.keys()) counts.update(class_names) counts.update(driver_names) dups = [k for k in counts if counts[k] > 1] @@ -1445,21 +1445,31 @@ def _show_port(cli, port): port_config = cli.bess.get_port_config(port.name) - cli.fout.write(' %-12s Driver %-10s HWaddr %-18s MTU %-6d\n' % - (port.name, port.driver, port.mac_addr, port_config.conf.mtu)) + cli.fout.write(' %-12s Driver %-10s HWaddr %-18s MTU %-5d %s\n' % + (port.name, port.driver, + port_config.conf.mac_addr, port_config.conf.mtu, + 'Enabled' if port_config.conf.admin_up else 'Disabled')) cli.fout.write(' %-12s Speed %-11s Link %-5s Duplex %-5s Autoneg %-5s\n' % ('', speed, link, duplex, autoneg)) stats = cli.bess.get_port_stats(port.name) - cli.fout.write(' Inc/RX ') - cli.fout.write('packets: {:<20,}'.format(stats.inc.packets)) - cli.fout.write('bytes: {:<20,}\n'.format(stats.inc.bytes)) - cli.fout.write('{:<14} dropped: {:<20,}\n'.format('', stats.inc.dropped)) + fmt_hdr = '{:>16} {:>18}{:>24}{:>18}\n' + fmt_body = '{:>16} {:>18,}{:>24,}{:>18,}\n' + cli.fout.write(fmt_hdr.format('', 'Packets', 'Bytes', 'Dropped')) - cli.fout.write(' Out/TX ') - cli.fout.write('packets: {:<20,}'.format(stats.out.packets)) - cli.fout.write('bytes: {:<20,}\n'.format(stats.out.bytes)) - cli.fout.write('{:<14} dropped: {:<20,}\n'.format('', stats.out.dropped)) + cli.fout.write(fmt_body.format('Inc/RX total', stats.inc.packets, + stats.inc.bytes, stats.inc.dropped)) + for qid in sorted(stats.inc_queues.keys()): + inc = stats.inc_queues[qid] + cli.fout.write(fmt_body.format('?' if qid == -1 else qid, + inc.packets, inc.bytes, inc.dropped)) + + cli.fout.write(fmt_body.format('Out/TX total', stats.out.packets, + stats.out.bytes, stats.out.dropped)) + for qid in sorted(stats.out_queues.keys()): + out = stats.out_queues[qid] + cli.fout.write(fmt_body.format('?' if qid == -1 else qid, + out.packets, out.bytes, out.dropped)) @cmd('show port', 'Show the status of all ports') @@ -1738,27 +1748,46 @@ def monitor_pipeline_bit(cli, opts): _monitor_pipeline(cli, 'bytes', 'Mbps', graph_args=opts) -PortRate = collections.namedtuple('PortRate', - ['inc_packets', 'inc_dropped', 'inc_bytes', - 'out_packets', 'out_dropped', 'out_bytes']) - +def _monitor_ports(cli, per_queue, port_names=[]): -def _monitor_ports(cli, *ports): + class Stats: + def __init__(self, timestamp, + inc_packets=0, inc_dropped=0, inc_bytes=0, + out_packets=0, out_dropped=0, out_bytes=0): + self.timestamp = timestamp + self.inc_packets = inc_packets + self.inc_dropped = inc_dropped + self.inc_bytes = inc_bytes + self.out_packets = out_packets + self.out_dropped = out_dropped + self.out_bytes = out_bytes def get_delta(old, new): sec_diff = new.timestamp - old.timestamp - delta = PortRate( - inc_packets=(new.inc.packets - old.inc.packets) / sec_diff, - inc_dropped=(new.inc.dropped - old.inc.dropped) / sec_diff, - inc_bytes=(new.inc.bytes - old.inc.bytes) / sec_diff, - out_packets=(new.out.packets - old.out.packets) / sec_diff, - out_dropped=(new.out.dropped - old.out.dropped) / sec_diff, - out_bytes=(new.out.bytes - old.out.bytes) / sec_diff) + delta = Stats( + timestamp=new.timestamp, + inc_packets=(new.inc_packets - old.inc_packets) / sec_diff, + inc_dropped=(new.inc_dropped - old.inc_dropped) / sec_diff, + inc_bytes=(new.inc_bytes - old.inc_bytes) / sec_diff, + out_packets=(new.out_packets - old.out_packets) / sec_diff, + out_dropped=(new.out_dropped - old.out_dropped) / sec_diff, + out_bytes=(new.out_bytes - old.out_bytes) / sec_diff) return delta + def get_total(arr): + total = copy.deepcopy(arr[0]) + for stat in arr[1:]: + total.inc_packets += stat.inc_packets + total.inc_dropped += stat.inc_dropped + total.inc_bytes += stat.inc_bytes + total.out_packets += stat.out_packets + total.out_dropped += stat.out_dropped + total.out_bytes += stat.out_bytes + return total + def print_header(timestamp): cli.fout.write('\n') - cli.fout.write('{:<20}{:>14}{:>10}{:>10} {:>14}{:>10}{:>10}\n'.format( + cli.fout.write('{:<22}{:>14}{:>10}{:>10} {:>14}{:>10}{:>10}\n'.format( time.strftime('%X') + str(timestamp % 1)[1:8], 'INC Mbps', 'Mpps', 'dropped', 'OUT Mbps', 'Mpps', 'Dropped')) @@ -1767,7 +1796,7 @@ def print_header(timestamp): def print_footer(): cli.fout.write('{}\n'.format('-' * 96)) - def print_delta(timestamp, port, delta, csv_f=None): + def print_delta(delta, port=None, qid=None): # If inc/out_bytes == 0 and inc_packets != 0, it means the # driver does not account packet bytes. # Use 0 rather than inaccurate numbers from Ethernet overheads. @@ -1781,87 +1810,145 @@ def print_delta(timestamp, port, delta, csv_f=None): else: out_mbps = 0. - data = (inc_mbps, delta.inc_packets / 1e6, long(delta.inc_dropped), out_mbps, delta.out_packets / 1e6, - long(delta.out_dropped)) - cli.fout.write('{:<20}{:>14.1f}{:>10.3f}{:>10d} {:>14.1f}{:>10.3f}{:>10d}\n'.format(port, *data)) + inc = '{:>14.1f}{:>10.3f}{:>10.0f}'.format(inc_mbps, delta.inc_packets / 1e6, delta.inc_dropped) + out = '{:>14.1f}{:>10.3f}{:>10.0f}'.format(out_mbps, delta.out_packets / 1e6, delta.out_dropped) + + if port is None: + name = 'Total' + else: + if per_queue: + first = (qid == -1 and port.name in has_unknown) or \ + (qid == 0 and port.name not in has_unknown) + name = '{}{}'.format((port.name + ':') if first else ' ' * (len(port.name) + 1), + '?' if qid == -1 else qid) + # NOTE: # of RX queues may not be different from # of TX queues + if qid >= port.num_inc_q: + inc = ' ' * 34 + if qid >= port.num_out_q: + out = ' ' * 34 + else: + name = '{}/{}'.format(port.name, port.driver) + + if (qid != -1) or (port.name in has_unknown): + cli.fout.write('{:<22}{} {}\n'.format(name, inc, out)) + #cli.fout.write('{:<22}{:>14.1f}{:>10.3f}{:>10.0f} {:>14.1f}{:>10.3f}{:>10.0f}\n'.format(name, *data)) if csv_f is not None: - csv_f.write('{},{},{}\n'.format(time.strftime('%X'), port, ','.join(map(lambda x: '{:.3f}'.format(x), data)))) + data = (inc_mbps, delta.inc_packets / 1e6, delta.inc_dropped, + out_mbps, delta.out_packets / 1e6, delta.out_dropped) + if per_queue: + name = '{}:{}'.format(port.name, '?' if qid == -1 else qid) + csv_f.write('{},{},{}\n'.format(time.strftime('%X'), name, ','.join(map(lambda x: '{:.3f}'.format(x), data)))) - def get_total(arr): - total = copy.deepcopy(arr[0]) - for stat in arr[1:]: - total.inc.packets += stat.inc.packets - total.inc.dropped += stat.inc.dropped - total.inc.bytes += stat.inc.bytes - total.out.packets += stat.out.packets - total.out.dropped += stat.out.dropped - total.out.bytes += stat.out.bytes - return total + def print_loop(): + last = {} + now = {} - def print_loop(csv_f=None): while True: - time.sleep(1) - + last = copy.deepcopy(now) for port in ports: - now[port] = cli.bess.get_port_stats(port) - - print_header(now[port].timestamp) + port_stats = cli.bess.get_port_stats(port.name) + if per_queue: + for i in range(-1, max(port.num_inc_q, port.num_out_q)): + inc = port_stats.inc_queues[i] + out = port_stats.out_queues[i] + + if i == -1: + if inc.packets != 0 or out.packets != 0: + has_unknown.add(port.name) + + s = Stats(timestamp=port_stats.timestamp) + if i < port.num_inc_q: + s.inc_packets = inc.packets + s.inc_dropped = inc.dropped + s.inc_bytes = inc.bytes + if i < port.num_out_q: + s.out_packets = out.packets + s.out_dropped = out.dropped + s.out_bytes = out.bytes + now[port.name, i] = s + else: + now[port.name] = Stats( + timestamp = port_stats.timestamp, + inc_packets = port_stats.inc.packets, + inc_dropped = port_stats.inc.dropped, + inc_bytes = port_stats.inc.bytes, + out_packets = port_stats.out.packets, + out_dropped = port_stats.out.dropped, + out_bytes = port_stats.out.bytes) + + if last != {}: + print_header(port_stats.timestamp) + + for port in ports: + if per_queue: + for i in range(-1, max(port.num_inc_q, port.num_out_q)): + print_delta(get_delta(last[port.name, i], now[port.name, i]), port, i) + else: + print_delta(get_delta(last[port.name], now[port.name]), port) - for port in ports: - print_delta(now[port].timestamp, '{}{}'.format(port, drivers[port]), - get_delta(last[port], now[port]), csv_f) + print_footer() - print_footer() + if per_queue or len(ports) > 1: + last_total = get_total(list(last.values())) + now_total = get_total(list(now.values())) + print_delta(get_delta(last_total, now_total)) - if len(ports) > 1: - print_delta(now[port].timestamp, 'Total', get_delta( - get_total(list(last.values())), - get_total(list(now.values()))), csv_f) + time.sleep(1) - for port in ports: - last[port] = now[port] - all_ports = sorted(cli.bess.list_ports().ports, key=lambda x: x.name) - drivers = {} - for port in all_ports: - drivers[port.name] = port.driver + all_ports = cli.bess.list_ports().ports - if not ports: - ports = [port.name for port in all_ports] + if not port_names: + ports = all_ports if not ports: raise cli.CommandError('No port to monitor') - - cli.fout.write('Monitoring ports: {}\n'.format(', '.join(ports))) - - last = {} - now = {} - - for port in ports: - last[port] = cli.bess.get_port_stats(port) + else: + ports = [] + port_map = {} + for port in all_ports: + port_map[port.name] = port + for port_name in port_names: + if port_name not in port_map: + raise cli.CommandError('Port %s does not exist' % port_name) + if port_name not in ports: + ports.append(port_map[port_name]) + + has_unknown = set() + ports = sorted(ports, key=lambda x: x.name) + cli.fout.write('Monitoring ports: {}\n'.format(', '.join([p.name for p in ports]))) try: csv_path = os.getenv('CSV', None) - with open(csv_path, 'w') if csv_path is not None else noop() as csv_f: - if csv_f is not None: + if csv_path: + with open(csv_path, 'w') as csv_f: csv_f.write('{}\n'.format(','.join( ('Timestamp', 'Port', 'Mbps In', 'Mpps In', 'Dropped In', 'Mbps Out', 'Mpps Out', 'Dropped Out')))) - print_loop(csv_f) + print_loop() + else: + csv_f = None + print_loop() except KeyboardInterrupt: pass @cmd('monitor port', 'Monitor the current traffic of all ports') def monitor_port_all(cli): - _monitor_ports(cli) + _monitor_ports(cli, False) @cmd('monitor port PORT...', 'Monitor the current traffic of specified ports') def monitor_port_all(cli, ports): - _monitor_ports(cli, *ports) + _monitor_ports(cli, False, ports) +@cmd('monitor queue', 'Monitor the per-queue stats of all ports') +def monitor_queue(cli): + _monitor_ports(cli, True) + +@cmd('monitor queue PORT...', 'Monitor the per-queue stats of specified ports') +def monitor_port_all(cli, ports): + _monitor_ports(cli, True, ports) -TcCounterRate = collections.namedtuple('TcCounterRate', - ['count', 'cycles', 'bits', 'packets']) +TcCounterRate = namedtuple('TcCounterRate', ['count', 'cycles', 'bits', 'packets']) def _monitor_tcs(cli, *tcs): @@ -1886,7 +1973,7 @@ def print_header(timestamp, name_len): def print_footer(name_len): cli.fout.write('{}\n'.format('-' * (72 + name_len))) - def print_delta(timestamp, tc, delta, name_len, csv_f=None): + def print_delta(tc, delta, name_len, csv_f=None): if delta.count >= 1: ppb = delta.packets / delta.count else: @@ -1913,7 +2000,7 @@ def print_loop(csv=None): print_header(now[tc].timestamp, max_len) for tc in tcs: - print_delta(now[tc].timestamp, 'W{} {}'.format(wids[tc], tc), + print_delta('W{} {}'.format(wids[tc], tc), get_delta(last[tc], now[tc]), max_len, csv) print_footer(max_len) diff --git a/core/bessctl.cc b/core/bessctl.cc index ee39e41aa..da395b992 100644 --- a/core/bessctl.cc +++ b/core/bessctl.cc @@ -515,6 +515,7 @@ class BESSControlImpl final : public BESSControl::Service { LOG(INFO) << "*** Resuming ***"; resume_all_workers(); + VLOG(1) << "*** Resumed ***"; return Status::OK; } @@ -1130,25 +1131,31 @@ class BESSControlImpl final : public BESSControl::Service { ::Port::PortStats stats = it->second->GetPortStats(); - response->mutable_inc()->set_packets(stats.inc.packets); - response->mutable_inc()->set_dropped(stats.inc.dropped); - response->mutable_inc()->set_bytes(stats.inc.bytes); - *response->mutable_inc()->mutable_requested_hist() = { - stats.inc.requested_hist.begin(), stats.inc.requested_hist.end()}; - *response->mutable_inc()->mutable_actual_hist() = { - stats.inc.actual_hist.begin(), stats.inc.actual_hist.end()}; - *response->mutable_inc()->mutable_diff_hist() = { - stats.inc.diff_hist.begin(), stats.inc.diff_hist.end()}; - - response->mutable_out()->set_packets(stats.out.packets); - response->mutable_out()->set_dropped(stats.out.dropped); - response->mutable_out()->set_bytes(stats.out.bytes); - *response->mutable_out()->mutable_requested_hist() = { - stats.out.requested_hist.begin(), stats.out.requested_hist.end()}; - *response->mutable_out()->mutable_actual_hist() = { - stats.out.actual_hist.begin(), stats.out.actual_hist.end()}; - *response->mutable_out()->mutable_diff_hist() = { - stats.out.diff_hist.begin(), stats.out.diff_hist.end()}; + auto *inc = response->mutable_inc(); + inc->set_packets(stats.inc.packets); + inc->set_dropped(stats.inc.dropped); + inc->set_bytes(stats.inc.bytes); + + for (const auto &[qid, v] : stats.inc_queues) { + bess::pb::GetPortStatsResponse::Stat inc_queue; + inc_queue.set_packets(v.packets); + inc_queue.set_dropped(v.dropped); + inc_queue.set_bytes(v.bytes); + response->mutable_inc_queues()->insert({qid, inc_queue}); + } + + auto *out = response->mutable_out(); + out->set_packets(stats.out.packets); + out->set_dropped(stats.out.dropped); + out->set_bytes(stats.out.bytes); + + for (const auto &[qid, v] : stats.out_queues) { + bess::pb::GetPortStatsResponse::Stat out_queue; + out_queue.set_packets(v.packets); + out_queue.set_dropped(v.dropped); + out_queue.set_bytes(v.bytes); + response->mutable_out_queues()->insert({qid, out_queue}); + } response->set_timestamp(get_epoch_time()); diff --git a/core/drivers/pmd.cc b/core/drivers/pmd.cc index a516a82d8..db3429fb5 100644 --- a/core/drivers/pmd.cc +++ b/core/drivers/pmd.cc @@ -189,7 +189,7 @@ static CommandResponse find_dpdk_vdev(const std::string &vdev, rte_dev_iterator iterator; RTE_ETH_FOREACH_MATCHING_DEV(port_id, vdev.c_str(), &iterator) { - LOG(INFO) << "port id: " << port_id << "matches vdev: " << vdev; + LOG(INFO) << "port id " << port_id << " matches vdev '" << vdev << "'"; rte_eth_iterator_cleanup(&iterator); break; } @@ -418,9 +418,6 @@ void PMDPort::DeInit() { } void PMDPort::CollectStats(bool reset) { - packet_dir_t dir; - queue_t qid; - if (reset) { rte_eth_stats_reset(dpdk_port_id_); return; @@ -434,7 +431,7 @@ void PMDPort::CollectStats(bool reset) { return; } - VLOG(1) << bess::utils::Format( + VLOG(2) << bess::utils::Format( "PMD port %d: ipackets %" PRIu64 " opackets %" PRIu64 " ibytes %" PRIu64 " obytes %" PRIu64 " imissed %" PRIu64 " ierrors %" PRIu64 " oerrors %" PRIu64 " rx_nombuf %" PRIu64, @@ -443,31 +440,35 @@ void PMDPort::CollectStats(bool reset) { port_stats_.inc.dropped = stats.imissed; - // i40e/net_e1000_igb PMD drivers, ixgbevf and net_bonding vdevs don't support - // per-queue stats - if (driver_ == "net_i40e" || driver_ == "net_i40e_vf" || - driver_ == "net_ixgbe_vf" || driver_ == "net_bonding" || - driver_ == "net_e1000_igb") { - // NOTE: - // - if link is down, tx bytes won't increase - // - if destination MAC address is incorrect, rx pkts won't increase + // NOTE: depending on DPDK PMD drivers, + // - if link is down, tx bytes may not increase + // - if destination is zero MAC address, rx pkts may not increase + + uint64_t any = 0; + + for (queue_t qid = 0; qid < num_queues[PACKET_DIR_INC]; qid++) { + auto &qstats = queue_stats_[PACKET_DIR_INC][qid]; + any |= (qstats.packets = stats.q_ipackets[qid]); + any |= (qstats.bytes = stats.q_ibytes[qid]); + any |= (qstats.dropped = stats.q_errors[qid]); + } + + for (queue_t qid = 0; qid < num_queues[PACKET_DIR_OUT]; qid++) { + auto &qstats = queue_stats_[PACKET_DIR_OUT][qid]; + any |= (qstats.packets = stats.q_opackets[qid]); + any |= (qstats.bytes = stats.q_obytes[qid]); + } + + // Some PMD drivers, such as i40e, ixgbevf and net_bonding, do not support + // per-queue stats. There doesn't seem to be a reliable way to detect whether + // the driver really supports per-queue stats or not. Instead, if all queue + // stats counters are 0, we assume that it only supports per-port stats + // Note that this heuristic is safe from potential double counting. + if (!any) { port_stats_.inc.packets = stats.ipackets; port_stats_.inc.bytes = stats.ibytes; port_stats_.out.packets = stats.opackets; port_stats_.out.bytes = stats.obytes; - } else { - dir = PACKET_DIR_INC; - for (qid = 0; qid < num_queues[dir]; qid++) { - queue_stats[dir][qid].packets = stats.q_ipackets[qid]; - queue_stats[dir][qid].bytes = stats.q_ibytes[qid]; - queue_stats[dir][qid].dropped = stats.q_errors[qid]; - } - - dir = PACKET_DIR_OUT; - for (qid = 0; qid < num_queues[dir]; qid++) { - queue_stats[dir][qid].packets = stats.q_opackets[qid]; - queue_stats[dir][qid].bytes = stats.q_obytes[qid]; - } } } @@ -479,12 +480,7 @@ int PMDPort::RecvPackets(queue_t qid, bess::Packet **pkts, int cnt) { int PMDPort::SendPackets(queue_t qid, bess::Packet **pkts, int cnt) { int sent = rte_eth_tx_burst(dpdk_port_id_, qid, reinterpret_cast(pkts), cnt); - auto &stats = queue_stats[PACKET_DIR_OUT][qid]; - int dropped = cnt - sent; - stats.dropped += dropped; - stats.requested_hist[cnt]++; - stats.actual_hist[sent]++; - stats.diff_hist[dropped]++; + queue_stats_[PACKET_DIR_OUT][qid].dropped += cnt - sent; return sent; } diff --git a/core/drivers/pmd.h b/core/drivers/pmd.h index 40ed6662c..769e0c368 100644 --- a/core/drivers/pmd.h +++ b/core/drivers/pmd.h @@ -79,7 +79,7 @@ class PMDPort final : public Port { void DeInit() override; /*! - * Copies rte port statistics into queue_stats datastructure (see port.h). + * Copies rte port statistics into queue_stats_ datastructure (see port.h). * * PARAMETERS: * * bool reset : if true, reset DPDK local statistics and return (do not @@ -121,8 +121,8 @@ class PMDPort final : public Port { */ int SendPackets(queue_t qid, bess::Packet **pkts, int cnt) override; - uint64_t GetFlags() const override { - return DRIVER_FLAG_SELF_INC_STATS | DRIVER_FLAG_SELF_OUT_STATS; + DriverFeatures GetFeatures() const override { + return {.offloadIncStats = true, .offloadOutStats = true}; } LinkStatus GetLinkStatus() override; diff --git a/core/modules/port_inc.cc b/core/modules/port_inc.cc index acdb6b1b6..52d80d515 100644 --- a/core/modules/port_inc.cc +++ b/core/modules/port_inc.cc @@ -110,33 +110,23 @@ std::string PortInc::GetDesc() const { struct task_result PortInc::RunTask(Context *ctx, bess::PacketBatch *batch, void *arg) { - if (children_overload_ > 0) { - return {.block = true, .packets = 0, .bits = 0}; - } - - Port *p = port_; - - if (!p->conf().admin_up) { + if (!port_->conf().admin_up || children_overload_ > 0) { return {.block = true, .packets = 0, .bits = 0}; } const queue_t qid = (queue_t)(uintptr_t)arg; - - uint64_t received_bytes = 0; - const int burst = ACCESS_ONCE(burst_); const int pkt_overhead = 24; - batch->set_cnt(p->RecvPackets(qid, batch->pkts(), burst)); - uint32_t cnt = batch->cnt(); - p->queue_stats[PACKET_DIR_INC][qid].requested_hist[burst]++; - p->queue_stats[PACKET_DIR_INC][qid].actual_hist[cnt]++; - p->queue_stats[PACKET_DIR_INC][qid].diff_hist[burst - cnt]++; + uint32_t cnt = port_->RecvPackets(qid, batch->pkts(), burst); if (cnt == 0) { return {.block = true, .packets = 0, .bits = 0}; } + batch->set_cnt(cnt); + // NOTE: we cannot skip this step since it might be used by scheduler. + uint64_t received_bytes = 0; if (prefetch_) { for (uint32_t i = 0; i < cnt; i++) { received_bytes += batch->pkts()[i]->total_len(); @@ -148,9 +138,8 @@ struct task_result PortInc::RunTask(Context *ctx, bess::PacketBatch *batch, } } - if (!(p->GetFlags() & DRIVER_FLAG_SELF_INC_STATS)) { - p->queue_stats[PACKET_DIR_INC][qid].packets += cnt; - p->queue_stats[PACKET_DIR_INC][qid].bytes += received_bytes; + if (!(port_->GetFeatures().offloadIncStats)) { + port_->IncreaseIncQueueCounters(qid, cnt, 0, received_bytes); } RunNextModule(ctx, batch); diff --git a/core/modules/port_out.cc b/core/modules/port_out.cc index d798a71a3..b2d3c40ac 100644 --- a/core/modules/port_out.cc +++ b/core/modules/port_out.cc @@ -91,41 +91,36 @@ std::string PortOut::GetDesc() const { } static inline int SendBatch(bess::PacketBatch *batch, Port *p, queue_t qid) { - uint64_t sent_bytes = 0; - int sent_pkts = 0; - - if (p->conf().admin_up) { - sent_pkts = p->SendPackets(qid, batch->pkts(), batch->cnt()); + if (!p->conf().admin_up) { + return 0; } - if (!(p->GetFlags() & DRIVER_FLAG_SELF_OUT_STATS)) { - const packet_dir_t dir = PACKET_DIR_OUT; + int sent_pkts = p->SendPackets(qid, batch->pkts(), batch->cnt()); + if (!(p->GetFeatures().offloadOutStats)) { + uint64_t sent_bytes = 0; for (int i = 0; i < sent_pkts; i++) { sent_bytes += batch->pkts()[i]->total_len(); } - p->queue_stats[dir][qid].packets += sent_pkts; - p->queue_stats[dir][qid].dropped += (batch->cnt() - sent_pkts); - p->queue_stats[dir][qid].bytes += sent_bytes; + p->IncreaseOutQueueCounters(qid, sent_pkts, batch->cnt() - sent_pkts, + sent_bytes); } return sent_pkts; } void PortOut::ProcessBatch(Context *ctx, bess::PacketBatch *batch) { - Port *p = port_; - CHECK(worker_queues_[ctx->wid] >= 0); queue_t qid = worker_queues_[ctx->wid]; int sent_pkts = 0; if (queue_users_[qid] == 1) { - sent_pkts = SendBatch(batch, p, qid); + sent_pkts = SendBatch(batch, port_, qid); } else { mcslock_node_t me; mcs_lock(&queue_locks_[qid], &me); - sent_pkts = SendBatch(batch, p, qid); + sent_pkts = SendBatch(batch, port_, qid); mcs_unlock(&queue_locks_[qid], &me); } diff --git a/core/modules/queue_inc.cc b/core/modules/queue_inc.cc index f857bb5df..221f2c07c 100644 --- a/core/modules/queue_inc.cc +++ b/core/modules/queue_inc.cc @@ -86,29 +86,23 @@ std::string QueueInc::GetDesc() const { struct task_result QueueInc::RunTask(Context *ctx, bess::PacketBatch *batch, void *arg) { - Port *p = port_; - - if (!p->conf().admin_up) { + if (!port_->conf().admin_up) { return {.block = true, .packets = 0, .bits = 0}; } const queue_t qid = (queue_t)(uintptr_t)arg; - - uint64_t received_bytes = 0; - const int burst = ACCESS_ONCE(burst_); const int pkt_overhead = 24; - batch->set_cnt(p->RecvPackets(qid, batch->pkts(), burst)); - uint32_t cnt = batch->cnt(); - p->queue_stats[PACKET_DIR_INC][qid].requested_hist[burst]++; - p->queue_stats[PACKET_DIR_INC][qid].actual_hist[cnt]++; - p->queue_stats[PACKET_DIR_INC][qid].diff_hist[burst - cnt]++; + uint32_t cnt = port_->RecvPackets(qid, batch->pkts(), burst); if (cnt == 0) { return {.block = true, .packets = 0, .bits = 0}; } + batch->set_cnt(cnt); + // NOTE: we cannot skip this step since it might be used by scheduler. + uint64_t received_bytes = 0; if (prefetch_) { for (uint32_t i = 0; i < cnt; i++) { received_bytes += batch->pkts()[i]->total_len(); @@ -120,9 +114,8 @@ struct task_result QueueInc::RunTask(Context *ctx, bess::PacketBatch *batch, } } - if (!(p->GetFlags() & DRIVER_FLAG_SELF_INC_STATS)) { - p->queue_stats[PACKET_DIR_INC][qid].packets += cnt; - p->queue_stats[PACKET_DIR_INC][qid].bytes += received_bytes; + if (!(port_->GetFeatures().offloadIncStats)) { + port_->IncreaseIncQueueCounters(qid, cnt, 0, received_bytes); } RunNextModule(ctx, batch); diff --git a/core/modules/queue_out.cc b/core/modules/queue_out.cc index 476ee7b68..da2b641da 100644 --- a/core/modules/queue_out.cc +++ b/core/modules/queue_out.cc @@ -74,27 +74,20 @@ std::string QueueOut::GetDesc() const { } void QueueOut::ProcessBatch(Context *, bess::PacketBatch *batch) { - Port *p = port_; - - const queue_t qid = qid_; - - uint64_t sent_bytes = 0; int sent_pkts = 0; - if (p->conf().admin_up) { - sent_pkts = p->SendPackets(qid, batch->pkts(), batch->cnt()); + if (port_->conf().admin_up) { + sent_pkts = port_->SendPackets(qid_, batch->pkts(), batch->cnt()); } - if (!(p->GetFlags() & DRIVER_FLAG_SELF_OUT_STATS)) { - const packet_dir_t dir = PACKET_DIR_OUT; - + if (!(port_->GetFeatures().offloadOutStats)) { + uint64_t sent_bytes = 0; for (int i = 0; i < sent_pkts; i++) { sent_bytes += batch->pkts()[i]->total_len(); } - p->queue_stats[dir][qid].packets += sent_pkts; - p->queue_stats[dir][qid].dropped += (batch->cnt() - sent_pkts); - p->queue_stats[dir][qid].bytes += sent_bytes; + port_->IncreaseOutQueueCounters(qid_, sent_pkts, batch->cnt() - sent_pkts, + sent_bytes); } if (sent_pkts < batch->cnt()) { diff --git a/core/port.cc b/core/port.cc index 347ebf81f..f2b40b48f 100644 --- a/core/port.cc +++ b/core/port.cc @@ -172,25 +172,22 @@ Port::PortStats Port::GetPortStats() { PortStats ret = port_stats_; + ret.inc_queues[-1] = port_stats_.inc; for (queue_t qid = 0; qid < num_queues[PACKET_DIR_INC]; qid++) { - const QueueStats &inc = queue_stats[PACKET_DIR_INC][qid]; - + const QueueStats &inc = queue_stats_[PACKET_DIR_INC][qid]; ret.inc.packets += inc.packets; ret.inc.dropped += inc.dropped; ret.inc.bytes += inc.bytes; - ret.inc.requested_hist += inc.requested_hist; - ret.inc.actual_hist += inc.actual_hist; - ret.inc.diff_hist += inc.diff_hist; + ret.inc_queues[qid] = inc; } + ret.out_queues[-1] = port_stats_.out; for (queue_t qid = 0; qid < num_queues[PACKET_DIR_OUT]; qid++) { - const QueueStats &out = queue_stats[PACKET_DIR_OUT][qid]; + const QueueStats &out = queue_stats_[PACKET_DIR_OUT][qid]; ret.out.packets += out.packets; ret.out.dropped += out.dropped; ret.out.bytes += out.bytes; - ret.out.requested_hist += out.requested_hist; - ret.out.actual_hist += out.actual_hist; - ret.out.diff_hist += out.diff_hist; + ret.out_queues[qid] = out; } return ret; diff --git a/core/port.h b/core/port.h index 193d13105..4f5407707 100644 --- a/core/port.h +++ b/core/port.h @@ -49,21 +49,22 @@ #include "utils/common.h" #include "utils/ether.h" -typedef uint8_t queue_t; +using queue_t = uint8_t; -#define MAX_QUEUES_PER_DIR 128 /* [0, 31] (for each RX/TX) */ - -#define DRIVER_FLAG_SELF_INC_STATS 0x0001 -#define DRIVER_FLAG_SELF_OUT_STATS 0x0002 +#define MAX_QUEUES_PER_DIR 128 /* [0, 127] (for each RX/TX) */ #define MAX_QUEUE_SIZE 4096 #define ETH_ALEN 6 -/* The term RX/TX could be very confusing for a virtual switch. - * Instead, we use the "incoming/outgoing" convention: - * - incoming: outside -> BESS - * - outgoing: BESS -> outside */ +// The term RX/TX could be very confusing for a virtual switch. +// Instead, we use the "incoming/outgoing" convention: +// - incoming: outside -> BESS +// - outgoing: BESS -> outside +// +// NOTE: for new code, avoid using arrays like `Foo bar[PACKET_DIR]`. +// The use of array doesn't really make the code more readable. +// Instead, follow the `Foo bar_inc; Foo bar_out` style. typedef enum { PACKET_DIR_INC = 0, PACKET_DIR_OUT = 1, @@ -176,25 +177,6 @@ class PortBuilder { // InitPortClass()? }; -struct BatchHistogram - : public std::array { - BatchHistogram &operator+=(const BatchHistogram &rhs) { - for (size_t i = 0; i < size(); i++) { - (*this)[i] += rhs[i]; - } - return *this; - } -}; - -struct QueueStats { - uint64_t packets; - uint64_t dropped; // Not all drivers support this for INC direction - uint64_t bytes; // It doesn't include Ethernet overhead - BatchHistogram requested_hist; - BatchHistogram actual_hist; - BatchHistogram diff_hist; -}; - class Port { public: struct LinkStatus { @@ -210,22 +192,38 @@ class Port { bool admin_up; }; + struct QueueStats { + uint64_t packets; + uint64_t dropped; // Not all drivers support this for INC direction + uint64_t bytes; // It doesn't include Ethernet overhead + }; + struct PortStats { QueueStats inc; QueueStats out; + + // Per-queue stat counters. The sum of all queues should match the above `inc`. + // Key -1 is used when exact queues are unknown. + std::map inc_queues; + std::map out_queues; + }; + + struct DriverFeatures { + bool offloadIncStats; + bool offloadOutStats; }; // overide this section to create a new driver ----------------------------- Port() : port_stats_(), + queue_stats_(), conf_(), name_(), driver_arg_(), port_builder_(), num_queues(), queue_size(), - users(), - queue_stats() { + users() { conf_.mac_addr.Randomize(); conf_.mtu = kDefaultMtu; conf_.admin_up = true; @@ -247,7 +245,7 @@ class Port { virtual size_t DefaultIncQueueSize() const { return kDefaultIncQueueSize; } virtual size_t DefaultOutQueueSize() const { return kDefaultOutQueueSize; } - virtual uint64_t GetFlags() const { return 0; } + virtual DriverFeatures GetFeatures() const { return {}; } /*! * Get any placement constraints that need to be met when receiving from this @@ -273,6 +271,7 @@ class Port { CommandResponse InitWithGenericArg(const google::protobuf::Any &arg); PortStats GetPortStats(); + std::map GetQueueStats(); /* queues == nullptr if _all_ queues are being acquired/released */ int AcquireQueues(const struct module *m, packet_dir_t dir, @@ -293,12 +292,31 @@ class Port { const PortBuilder *port_builder() const { return port_builder_; } + void IncreaseIncQueueCounters(queue_t qid, uint64_t received, + uint64_t dropped, uint64_t received_bytes) { + auto &qstats = queue_stats_[PACKET_DIR_INC][qid]; + qstats.packets += received; + qstats.dropped += dropped; + qstats.bytes += received_bytes; + } + + void IncreaseOutQueueCounters(queue_t qid, uint64_t sent, uint64_t dropped, + uint64_t sent_bytes) { + auto &qstats = queue_stats_[PACKET_DIR_OUT][qid]; + qstats.packets += sent; + qstats.dropped += dropped; + qstats.bytes += sent_bytes; + } + protected: friend class PortBuilder; - /* for stats that do NOT belong to any queues */ + // for stats that do NOT counted for any per-queue stats below PortStats port_stats_; + // per-queue stat counters + QueueStats queue_stats_[PACKET_DIRS][MAX_QUEUES_PER_DIR]; + // Current configuration Conf conf_; @@ -330,8 +348,6 @@ class Port { /* which modules are using this port? * TODO: more robust gate keeping */ const struct module *users[PACKET_DIRS][MAX_QUEUES_PER_DIR]; - - struct QueueStats queue_stats[PACKET_DIRS][MAX_QUEUES_PER_DIR]; }; #define ADD_DRIVER(_DRIVER, _NAME_TEMPLATE, _HELP) \ diff --git a/protobuf/bess_msg.proto b/protobuf/bess_msg.proto index 9e1b79a33..eafc158dc 100644 --- a/protobuf/bess_msg.proto +++ b/protobuf/bess_msg.proto @@ -321,20 +321,26 @@ message GetPortStatsResponse { // Histogram of how many times a given number of packets in a batch was // requested. - repeated uint64 requested_hist = 4; + reserved 4; // repeated uint64 requested_hist = 4; // Histogram of how many times a given number of packets in a batch were // actually processed. - repeated uint64 actual_hist = 5; + reserved 5; // repeated uint64 actual_hist = 5; // Histogram of the difference between the requested batch size and the // actual number of packets processed in that batch. - repeated uint64 diff_hist = 6; + reserved 6; //repeated uint64 diff_hist = 6; } Error error = 1; + Stat inc = 2; /// Port stats for incoming (Ext -> BESS) direction. Stat out = 3; /// Port stats for outgoing (BESS -> Ext) direction. + double timestamp = 4; /// Time that stat counters were read. + + /// qid -> Stat. qid -1 for "counters with unknown queue ID" + map inc_queues = 5; /// per-queue stats for RX direction + map out_queues = 6; /// per-queue stats for TX direction } message GetLinkStatusRequest { @@ -349,10 +355,6 @@ message GetLinkStatusResponse { bool link_up = 5; /// link up? } - - - - message ListMclassResponse { Error error = 1; repeated string names = 2; /// List of module types