diff --git a/bwscanner/aggregate.py b/bwscanner/aggregate.py index 3a5d197..0706fd7 100644 --- a/bwscanner/aggregate.py +++ b/bwscanner/aggregate.py @@ -55,18 +55,22 @@ def write_aggregate_data(tor, scan_dirs, file_name="aggregate_measurements"): for relay_fp in measurements.keys(): log.debug("Aggregating measurements for {relay}", relay=relay_fp) - mean_bw = int(sum(measurements[relay_fp]) // len(measurements[relay_fp])) + mean_bw = int(sum(measurements[relay_fp]) // + len(measurements[relay_fp])) # Calculated the "filtered bandwidth" filtered_bws = [bw for bw in measurements[relay_fp] if bw >= mean_bw] if filtered_bws: mean_filtered_bw = int(sum(filtered_bws) // len(filtered_bws)) if not filtered_bws or mean_filtered_bw <= 0: - log.debug("Could not calculate a valid filtered bandwidth, skipping relay.") + log.debug("Could not calculate a valid filtered bandwidth, " + "skipping relay.") continue - routerstatus_info = yield tor.protocol.get_info_raw('ns/id/' + relay_fp.lstrip("$")) - descriptor_info = yield tor.protocol.get_info_raw('desc/id/' + relay_fp.lstrip("$")) + routerstatus_info = yield tor.protocol.get_info_raw( + 'ns/id/' + relay_fp.lstrip("$")) + descriptor_info = yield tor.protocol.get_info_raw( + 'desc/id/' + relay_fp.lstrip("$")) relay_routerstatus = RouterStatusEntryV3(routerstatus_info) relay_descriptor = RelayDescriptor(descriptor_info) @@ -79,15 +83,17 @@ def write_aggregate_data(tor, scan_dirs, file_name="aggregate_measurements"): num_measurements = len(measurements[relay_fp]) circ_fail_rate = num_failures / (num_measurements + num_failures) else: - log.debug("Not enough measurements to calculate the circuit fail rate.") + log.debug( + "Not enough measurements to calculate the circuit fail rate.") circ_fail_rate = 0.0 desc_bw = relay_descriptor.average_bandwidth - line_format = ("node_id={} nick={} strm_bw={} filt_bw={} circ_fail_rate={} " - "desc_bw={} ns_bw={}\n") + line_format = ("node_id={} nick={} strm_bw={} filt_bw={} " + "circ_fail_rate={} desc_bw={} ns_bw={}\n") - aggregate_file.write(line_format.format(relay_fp, nickname, mean_bw, mean_filtered_bw, - circ_fail_rate, desc_bw, ns_bw)) + aggregate_file.write(line_format.format( + relay_fp, nickname, mean_bw, mean_filtered_bw, + circ_fail_rate, desc_bw, ns_bw)) aggregate_file.close() log.info("Finished outputting the aggregated measurements to {file}.", diff --git a/bwscanner/attacher.py b/bwscanner/attacher.py index 4c87d6f..abb90ad 100644 --- a/bwscanner/attacher.py +++ b/bwscanner/attacher.py @@ -1,6 +1,7 @@ import txtorcon from twisted.internet import defer, reactor, endpoints -from txtorcon.interface import CircuitListenerMixin, IStreamAttacher, StreamListenerMixin +from txtorcon.interface import (CircuitListenerMixin, IStreamAttacher, + StreamListenerMixin) from zope.interface import implementer from bwscanner.logger import log @@ -101,6 +102,7 @@ class StreamClosedListener(StreamListenerMixin): immediately after a stream completes rather than wait for the circuit to time out. """ + def __init__(self, circ): self.circ = circ @@ -114,7 +116,8 @@ def options_need_new_consensus(tor_config, new_options): the Tor config with the new options. """ if "UseMicroDescriptors" in new_options: - if tor_config.UseMicroDescriptors != new_options["UseMicroDescriptors"]: + if tor_config.UseMicroDescriptors != \ + new_options["UseMicroDescriptors"]: log.debug("Changing UseMicroDescriptors from {current} to {new}.", current=tor_config.UseMicroDescriptors, new=new_options["UseMicroDescriptors"]) @@ -128,7 +131,8 @@ def wait_for_newconsensus(tor_state): def got_newconsensus(event): log.debug("Got NEWCONSENSUS event: {event}", event=event) got_consensus.callback(event) - tor_state.protocol.remove_event_listener('NEWCONSENSUS', got_newconsensus) + tor_state.protocol.remove_event_listener( + 'NEWCONSENSUS', got_newconsensus) tor_state.protocol.add_event_listener('NEWCONSENSUS', got_newconsensus) return got_consensus @@ -146,7 +150,8 @@ def connect_to_tor(launch_tor, circuit_build_timeout, control_port=None, tor_options = { 'LearnCircuitBuildTimeout': 0, # Disable adaptive circuit timeouts. 'CircuitBuildTimeout': circuit_build_timeout, - 'UseEntryGuards': 0, # Disable UseEntryGuards to avoid PathBias warnings. + # Disable UseEntryGuards to avoid PathBias warnings. + 'UseEntryGuards': 0, 'UseMicroDescriptors': 0, 'FetchUselessDescriptors': 1, 'FetchDirInfoEarly': 1, @@ -163,7 +168,8 @@ def connect_to_tor(launch_tor, circuit_build_timeout, control_port=None, else: log.info("Trying to connect to a running Tor instance.") if control_port: - endpoint = endpoints.TCP4ClientEndpoint(reactor, "localhost", control_port) + endpoint = endpoints.TCP4ClientEndpoint( + reactor, "localhost", control_port) else: endpoint = None tor = yield txtorcon.connect(reactor, endpoint) diff --git a/bwscanner/circuit.py b/bwscanner/circuit.py index 2c972da..2c85ec1 100644 --- a/bwscanner/circuit.py +++ b/bwscanner/circuit.py @@ -49,6 +49,7 @@ class ExitScan(CircuitGenerator): of the consensus in the exit position once. The first and second hops are selected randomly. """ + def __init__(self, state): super(ExitScan, self).__init__(state) random.shuffle(self.exits) @@ -72,6 +73,7 @@ class TwoHop(CircuitGenerator): Select two hop circuits with the relay to be measured and a random exit relay of similar bandwidth. """ + def __init__(self, state, partitions=1, this_partition=1, slice_width=50): """ TwoHop can be called multiple times with different partition @@ -88,10 +90,13 @@ def circuit_generator(): choose an exit relay of similar bandwidth for the circuit """ num_relays = len(self.relays) - relay_subset = range(this_partition-1, num_relays, partitions) - log.info("Performing a measurement scan with {count} relays.", count=len(relay_subset)) + relay_subset = range(this_partition - 1, num_relays, partitions) + log.info( + "Performing a measurement scan with {count} relays.", + count=len(relay_subset)) - # Choose relays in a random order fromm the relays in this partition set. + # Choose relays in a random order fromm the relays in this + # partition set. for i in random.sample(relay_subset, len(relay_subset)): relay = self.relays[i] yield relay, self.exit_by_bw(relay) @@ -116,12 +121,12 @@ def exit_by_bw(self, relay): if (exit.bandwidth < relay.bandwidth) and (i != num_exits): continue - exit_slice = self.exits[i:i+self._slice_width] + exit_slice = self.exits[i:i + self._slice_width] exits_needed = self._slice_width - len(exit_slice) # There isn't enough exits, pick some slower exits for this slice. if exits_needed: - slice_start = max(0, i-exits_needed) + slice_start = max(0, i - exits_needed) exit_slice = self.exits[slice_start:i] + exit_slice if relay in exit_slice: diff --git a/bwscanner/fetcher.py b/bwscanner/fetcher.py index 93a71f4..a44ba08 100644 --- a/bwscanner/fetcher.py +++ b/bwscanner/fetcher.py @@ -3,8 +3,9 @@ from twisted.internet import interfaces, reactor, defer, protocol from twisted.internet.endpoints import TCP4ClientEndpoint -from twisted.web.client import (SchemeNotSupported, Agent, BrowserLikePolicyForHTTPS, - ResponseDone, PotentialDataLoss, PartialDownloadError) +from twisted.web.client import (SchemeNotSupported, Agent, + BrowserLikePolicyForHTTPS, ResponseDone, + PotentialDataLoss, PartialDownloadError) from txsocksx.client import SOCKS5ClientFactory from txsocksx.tls import TLSWrapClientEndpoint from zope.interface import implementer @@ -16,9 +17,10 @@ def get_tor_socks_endpoint(tor_state): proxy_endpoint = tor_state.protocol.get_conf("SocksPort") def extract_port_value(result): - # Get the first SOCKS port number if any. SocksPort can be a single string or a list. - # Tor now also has support for unix domain SOCKS sockets so we need to be careful to just - # pick a SOCKS port. + # Get the first SOCKS port number if any. SocksPort can be a + # single string or a list. + # Tor now also has support for unix domain SOCKS sockets so + # we need to be careful to just pick a SOCKS port. if isinstance(result['SocksPort'], list): port = next(port for port in result['SocksPort'] if port.isdigit()) else: @@ -61,12 +63,15 @@ def connect(self, protocol_factory): Implements L{IStreamClientEndpoint.connect} to connect via TCP, after SOCKS5 negotiation and Tor circuit construction is done. """ - proxy_factory = SOCKS5ClientFactory(self.host, self.port, protocol_factory) - self.tor_socks_endpoint.addCallback(lambda end: end.connect(proxy_factory)) + proxy_factory = SOCKS5ClientFactory( + self.host, self.port, protocol_factory) + self.tor_socks_endpoint.addCallback( + lambda end: end.connect(proxy_factory)) def _create_circ(proto): hp = proto.transport.getHost() - d = self.state._attacher.create_circuit(hp.host, hp.port, self.path) + d = self.state._attacher.create_circuit( + hp.host, hp.port, self.path) d.addErrback(proxy_factory.deferred.errback) return proxy_factory.deferred @@ -96,9 +101,11 @@ def _getEndpoint(self, parsedURI, host=None, port=None): if hasattr(self, '_wrapContextFactory'): tls_policy = self._wrapContextFactory(host, port) elif hasattr(self, '_policyForHTTPS'): - tls_policy = self._policyForHTTPS().creatorForNetloc(host, port) + tls_policy = self._policyForHTTPS().\ + creatorForNetloc(host, port) else: - raise NotImplementedError("Cannot create a TLS validation policy.") + raise NotImplementedError( + "Cannot create a TLS validation policy.") endpoint = self._tlsWrapper(tls_policy, endpoint) return endpoint @@ -142,7 +149,9 @@ def connectionLost(self, reason): else: self.deferred.errback(reason) else: - log.debug("Deferred already called before connectionLost on hashingReadBodyProtocol.") + log.debug( + "Deferred already called before connectionLost on " + "hashingReadBodyProtocol.") def hashingReadBody(response): diff --git a/bwscanner/listener.py b/bwscanner/listener.py index b165489..bff20c4 100644 --- a/bwscanner/listener.py +++ b/bwscanner/listener.py @@ -131,18 +131,18 @@ def circ_avg_bw(self, circuit): for r, w, d in self.bw_samples(circuit): # r and w are in units of bytes/second # d is units of second - r_avg += (r**2)/d - w_avg += (w**2)/d + r_avg += (r**2) / d + w_avg += (w**2) / d n_samples += 1 bytes_r_total += r bytes_w_total += w duration += d if n_samples > 1: - wf = n_samples*bytes_r_total + wf = n_samples * bytes_r_total return {'path': circuit.path, - 'r_bw': int(r_avg/wf), - 'w_bw': int(w_avg/wf), + 'r_bw': int(r_avg / wf), + 'w_bw': int(w_avg / wf), 'duration': duration, 'samples': n_samples, 'bytes_r': bytes_r_total, diff --git a/bwscanner/logger.py b/bwscanner/logger.py index 49f36a4..37771fe 100644 --- a/bwscanner/logger.py +++ b/bwscanner/logger.py @@ -21,7 +21,8 @@ def setup_logging(log_level, log_name, log_directory=""): Configure the logger to use the specified log file and log level """ log_filter = LogLevelFilterPredicate() - log_filter.setLogLevelForNamespace("bwscanner", LogLevel.levelWithName(log_level.lower())) + log_filter.setLogLevelForNamespace( + "bwscanner", LogLevel.levelWithName(log_level.lower())) # Set up logging log_file = DailyLogFile(log_name, log_directory) @@ -29,7 +30,8 @@ def setup_logging(log_level, log_name, log_directory=""): console_observer = FileLogObserver(sys.stdout, log_event_format) file_filter_observer = FilteringLogObserver(file_observer, (log_filter,)) - console_filter_observer = FilteringLogObserver(console_observer, (log_filter,)) + console_filter_observer = FilteringLogObserver( + console_observer, (log_filter,)) globalLogPublisher.addObserver(file_filter_observer) globalLogPublisher.addObserver(console_filter_observer) diff --git a/bwscanner/measurement.py b/bwscanner/measurement.py index 514991d..ca54284 100644 --- a/bwscanner/measurement.py +++ b/bwscanner/measurement.py @@ -46,12 +46,12 @@ def __init__(self, state, clock, measurement_dir, **kwargs): self.circuits = None self.baseurl = 'https://bwauth.torproject.org/bwauth.torproject.org' self.bw_files = { - 64*1024: ("64M", "913b3c5df256d62235f955fa936e7a4e2d5e0cb6"), - 32*1024: ("32M", "a536076ef51c2cfff607fec2d362671e031d6b48"), - 16*1024: ("16M", "e91690ed2abf05e347b61aafaa23abf2a2b3292f"), - 8*1024: ("8M", "c690229b300945ec4ba872b80e8c443e2e1750f0"), - 4*1024: ("4M", "94f7bc6679a4419b080debd70166c2e43e80533d"), - 2*1024: ("2M", "9793cc92932598898d22497acdd5d732037b1a13"), + 64 * 1024: ("64M", "913b3c5df256d62235f955fa936e7a4e2d5e0cb6"), + 32 * 1024: ("32M", "a536076ef51c2cfff607fec2d362671e031d6b48"), + 16 * 1024: ("16M", "e91690ed2abf05e347b61aafaa23abf2a2b3292f"), + 8 * 1024: ("8M", "c690229b300945ec4ba872b80e8c443e2e1750f0"), + 4 * 1024: ("4M", "94f7bc6679a4419b080debd70166c2e43e80533d"), + 2 * 1024: ("2M", "9793cc92932598898d22497acdd5d732037b1a13"), } self.result_sink = ResultSink(self.measurement_dir, chunk_size=10) @@ -69,9 +69,9 @@ def choose_file_size(self, path): XXX: Should we just use the bandwidth of the measured relay instead? """ - avg_bw = sum([r.bandwidth for r in path])/len(path) + avg_bw = sum([r.bandwidth for r in path]) / len(path) for size in sorted(self.bw_files.keys()): - if avg_bw*5 < size: + if avg_bw * 5 < size: return size return max(self.bw_files.keys()) @@ -136,7 +136,8 @@ def get_circuit_bw(result): report['path_desc_bws'].append((yield self.get_r_desc_bw(relay))) report['path_ns_bws'].append((yield self.get_r_ns_bw(relay))) report['path_bws'] = [r.bandwidth for r in path] - log.info("Download successful for router {fingerprint}.", fingerprint=path[0].id_hex) + log.info( + "Download successful for router {fingerprint}.", fingerprint=path[0].id_hex) defer.returnValue(report) def circ_failure(failure): @@ -154,7 +155,8 @@ def timeoutDeferred(deferred, timeout): def cancelDeferred(deferred): deferred.cancel() - delayedCall = self.clock.callLater(timeout, cancelDeferred, deferred) + delayedCall = self.clock.callLater( + timeout, cancelDeferred, deferred) def gotResult(result): if delayedCall.active(): @@ -181,7 +183,8 @@ def get_r_ns_bw(self, router): raw_descriptor = yield self.state.protocol.get_info_raw('ns/id/{}'.format(router.id_hex)) router_ns_entry = RouterStatusEntryV3(raw_descriptor) - defer.returnValue((router_ns_entry.bandwidth, router_ns_entry.is_unmeasured)) + defer.returnValue((router_ns_entry.bandwidth, + router_ns_entry.is_unmeasured)) @defer.inlineCallbacks def get_r_desc_bw(self, router): diff --git a/bwscanner/partition_scan.py b/bwscanner/partition_scan.py index cdf7042..023e651 100644 --- a/bwscanner/partition_scan.py +++ b/bwscanner/partition_scan.py @@ -74,8 +74,10 @@ def __init__(self, state, clock, log_dir, stopped, relays, shared_secret, partit consensus += relay + "," consensus_hash = hashlib.sha256(consensus).digest() shared_secret_hash = hashlib.sha256(shared_secret).digest() - prng_seed = hashlib.pbkdf2_hmac('sha256', consensus_hash, shared_secret_hash, iterations=1) - self.circuits = lazy2HopCircuitGenerator(relays, this_partition, partitions, prng_seed) + prng_seed = hashlib.pbkdf2_hmac( + 'sha256', consensus_hash, shared_secret_hash, iterations=1) + self.circuits = lazy2HopCircuitGenerator( + relays, this_partition, partitions, prng_seed) # XXX adjust me self.result_sink = ResultSink(log_dir, chunk_size=1000) @@ -119,23 +121,28 @@ def circuit_build_failure(f): return None time_start = self.now() - d = build_timeout_circuit(self.state, self.clock, route, self.circuit_life_duration) + d = build_timeout_circuit( + self.state, self.clock, route, self.circuit_life_duration) d.addCallback(circuit_build_success) d.addErrback(circuit_build_timeout) d.addErrback(circuit_build_failure) self.tasks.append(d) def start_prometheus_exportor(self): - self.count_success = Counter('circuit_build_success_counter', 'successful circuit builds') - self.count_failure = Counter('circuit_build_failure_counter', 'failed circuit builds') - self.count_timeout = Counter('circuit_build_timeout_counter', 'timed out circuit builds') + self.count_success = Counter( + 'circuit_build_success_counter', 'successful circuit builds') + self.count_failure = Counter( + 'circuit_build_failure_counter', 'failed circuit builds') + self.count_timeout = Counter( + 'circuit_build_timeout_counter', 'timed out circuit builds') if not has_prometheus_client: return root = Resource() root.putChild(b'metrics', MetricsResource()) factory = Site(root) - reactor.listenTCP(self.prometheus_port, factory, interface=self.prometheus_interface) + reactor.listenTCP(self.prometheus_port, factory, + interface=self.prometheus_interface) def start(self): self.start_prometheus_exportor() @@ -147,7 +154,8 @@ def pop(): except StopIteration: self.stop() else: - self.call_id = self.clock.callLater(self.circuit_build_duration, pop) + self.call_id = self.clock.callLater( + self.circuit_build_duration, pop) self.clock.callLater(0, pop) def stop(self): diff --git a/bwscanner/partition_shuffle.py b/bwscanner/partition_shuffle.py index cdd14b9..ff2df18 100644 --- a/bwscanner/partition_shuffle.py +++ b/bwscanner/partition_shuffle.py @@ -42,7 +42,7 @@ def next_bounded(self, maximum): # interpret them as an unsigned integer: word = int(prng_bytes.encode('hex'), 0x10) # adjust for modulo bias by discarding word if larger than our set: - if word <= maximum or 0 == (1+maximum) % (256**prng_bytes_to_read): + if word <= maximum or 0 == (1 + maximum) % (256**prng_bytes_to_read): return word @@ -108,7 +108,8 @@ def shuffle_sets(relays, prng_seed): for hop_number in xrange(2): # 2 == circuit length # we shuffle the set of relays twice, to get two independent lists # (to spread the coordinates over more nodes) - shuffled_sets[hop_number] = tuple(fisher_yates_shuffle(relays, shared_prng)) + shuffled_sets[hop_number] = tuple( + fisher_yates_shuffle(relays, shared_prng)) return shuffled_sets diff --git a/bwscanner/scanner.py b/bwscanner/scanner.py index a720fa6..725b200 100755 --- a/bwscanner/scanner.py +++ b/bwscanner/scanner.py @@ -18,6 +18,7 @@ class ScanInstance(object): """ Store the configuration and state for the CLI tool. """ + def __init__(self, data_dir): self.data_dir = data_dir self.measurement_dir = os.path.join(data_dir, 'measurements') @@ -31,12 +32,17 @@ def __repr__(self): @click.group() @click.option('--data-dir', type=click.Path(), - default=os.environ.get("BWSCANNER_DATADIR", click.get_app_dir('bwscanner')), + default=os.environ.get( + "BWSCANNER_DATADIR", click.get_app_dir('bwscanner')), help='Directory where bwscan should stores its measurements and ' 'other data.') -@click.option('-l', '--loglevel', help='The logging level the scanner will use (default: info)', - default='info', type=click.Choice(['debug', 'info', 'warn', 'error', 'critical'])) -@click.option('-f', '--logfile', type=click.Path(), help='The file the log will be written to', +@click.option('-l', '--loglevel', + help='The logging level the scanner will use (default: info)', + default='info', + type=click.Choice( + ['debug', 'info', 'warn', 'error', 'critical'])) +@click.option('-f', '--logfile', type=click.Path(), + help='The file the log will be written to', default=os.environ.get("BWSCANNER_LOGFILE", 'bwscanner.log')) @click.option('--launch-tor/--no-launch-tor', default=False, help='Launch Tor or try to connect to an existing Tor instance.') @@ -48,7 +54,8 @@ def cli(ctx, data_dir, loglevel, logfile, launch_tor, circuit_build_timeout): """ The bwscan tool measures Tor relays and calculates their bandwidth. These bandwidth measurements can then be aggregate to create the bandwidth - values used by the Tor bandwidth authorities when creating the Tor consensus. + values used by the Tor bandwidth authorities when creating the Tor + consensus. """ # Create the data directory if it doesn't exist data_dir = os.path.abspath(data_dir) @@ -70,7 +77,8 @@ def cli(ctx, data_dir, loglevel, logfile, launch_tor, circuit_build_timeout): @click.option('--current-partition', '-c', default=1, help='Scan a particular subset / partition of the relays.') @click.option('--timeout', default=120, - help='Timeout for measurement HTTP requests (default: %ds).' % 120) + help='Timeout for measurement HTTP requests (default: %ds).' + % 120) @click.option('--request-limit', default=10, help='Limit the number of simultaneous bandwidth measurements ' '(default: %d).' % 10) @@ -83,7 +91,8 @@ def scan(scan, partitions, current_partition, timeout, request_limit): # XXX: check that each run is producing the same input set! scan_time = str(int(time.time())) - scan_data_dir = os.path.join(scan.measurement_dir, '{}.running'.format(scan_time)) + scan_data_dir = os.path.join( + scan.measurement_dir, '{}.running'.format(scan_time)) if not os.path.isdir(scan_data_dir): os.makedirs(scan_data_dir) @@ -104,8 +113,8 @@ def rename_finished_scan(deferred): def get_recent_scans(measurement_dir): - return sorted([name for name in os.listdir(measurement_dir) if name.isdigit()], - reverse=True) + return sorted([name for name in os.listdir(measurement_dir) + if name.isdigit()], reverse=True) @cli.command(short_help="List available bandwidth measurement directories.") @@ -130,31 +139,40 @@ def list(scan): @pass_scan def aggregate(scan, scan_name, previous): """ - Command to aggregate BW measurements and create the bandwidth file for the BWAuths + Command to aggregate BW measurements and create the bandwidth file + for the BWAuths """ # Aggregate the specified scan if scan_name: # Confirm that the specified scan directory exists scan_dir_path = os.path.join(scan.measurement_dir, scan_name) if not os.path.isdir(scan_dir_path): - log.warn("Could not find scan data directory {scan_dir}.", scan_dir=scan_dir_path) + log.warn( + "Could not find scan data directory {scan_dir}.", + scan_dir=scan_dir_path) sys.exit(-1) scan_data_dirs = [scan_dir_path] - log.info("Aggregating bandwidth measurements for scan {scan_name}.", scan_name=scan_name) + log.info( + "Aggregating bandwidth measurements for scan {scan_name}.", + scan_name=scan_name) else: # Aggregate the n previous scan runs try: # Use the most recent completed scan by default - recent_scan_names = get_recent_scans(scan.measurement_dir)[:previous] + recent_scan_names = get_recent_scans( + scan.measurement_dir)[:previous] except IndexError: log.warn("Could not find any completed scan data.") sys.exit(-1) - scan_data_dirs = [os.path.join(scan.measurement_dir, name) for name in recent_scan_names] - log.info("Aggregating data from past {count} scans.", count=len(scan_data_dirs)) + scan_data_dirs = [os.path.join(scan.measurement_dir, name) + for name in recent_scan_names] + log.info("Aggregating data from past {count} scans.", count=len( + scan_data_dirs)) - scan.tor_state.addCallback(lambda tor_state: write_aggregate_data(tor_state, scan_data_dirs)) + scan.tor_state.addCallback(lambda tor_state: + write_aggregate_data(tor_state, scan_data_dirs)) scan.tor_state.addErrback(lambda failure: log.failure("Unexpected error")) scan.tor_state.addCallback(lambda _: reactor.stop()) reactor.run() diff --git a/bwscanner/writer.py b/bwscanner/writer.py index 8858520..5bd0f6a 100644 --- a/bwscanner/writer.py +++ b/bwscanner/writer.py @@ -47,7 +47,8 @@ def write(): log_path = os.path.join(self.out_dir, "%s-scan.json" % (datetime.datetime.utcnow().isoformat())) - self.current_task.addCallback(lambda ign: threads.deferToThread(write)) + self.current_task.addCallback( + lambda ign: threads.deferToThread(write)) # buffer is not full, return deferred for current batch return self.current_task @@ -65,7 +66,8 @@ def flush(): json.dump(self.buffer, wf, sort_keys=True) finally: wf.close() - log.info("Finished writing measurement values to {log_path}.", log_path=log_path) + log.info( + "Finished writing measurement values to {log_path}.", log_path=log_path) def maybe_do_work(result): if len(self.buffer) != 0: diff --git a/scripts/aggregate.py b/scripts/aggregate.py index 2f0d48d..dd89d0b 100644 --- a/scripts/aggregate.py +++ b/scripts/aggregate.py @@ -32,7 +32,7 @@ # The guard measurement period is based on the client turnover # rate for guard nodes -GUARD_SAMPLE_RATE = 2*7*24*60*60 # 2wks +GUARD_SAMPLE_RATE = 2 * 7 * 24 * 60 * 60 # 2wks # PID constant defaults. May be overridden by consensus # https://en.wikipedia.org/wiki/PID_controller#Ideal_versus_standard_PID_form @@ -62,10 +62,10 @@ # all the CPU once these things run for a year or so. # Note that the Guard measurement interval of 2 weeks means that this # value can't get much below that. -MAX_AGE = 2*GUARD_SAMPLE_RATE +MAX_AGE = 2 * GUARD_SAMPLE_RATE # If the resultant scan file is older than 1.5 days, something is wrong -MAX_SCAN_AGE = 60*60*24*1.5 +MAX_SCAN_AGE = 60 * 60 * 24 * 1.5 # path to git repos (.git) PATH_TO_TORFLOW_REPO = '../../.git/' @@ -80,7 +80,8 @@ def base10_round(bw_val): logger.info("Zero input bandwidth.. Upping to 1") return 1 else: - ret = int(max((1000, round(round(bw_val, -(int(math.log10(bw_val))-2)), -3)))/1000) + ret = int( + max((1000, round(round(bw_val, -(int(math.log10(bw_val)) - 2)), -3))) / 1000) if ret == 0: logger.info("Zero output bandwidth.. Upping to 1") return 1 @@ -121,7 +122,7 @@ def revert_to_vote(self, vote): self.measured_at = vote.measured_at # Set def copy_vote(self, vote): - self.new_bw = vote.bw*1000 # Not set yet + self.new_bw = vote.bw * 1000 # Not set yet self.pid_bw = vote.pid_bw # Not set yet self.pid_error_sum = vote.pid_error_sum # Not set yet self.pid_delta = vote.pid_delta # Not set yet @@ -129,20 +130,20 @@ def copy_vote(self, vote): def get_pid_bw(self, prev_vote, kp, ki, kd, kidecay, update=True): if not update: return self.use_bw \ - + kp*self.use_bw*self.pid_error \ - + ki*self.use_bw*self.pid_error_sum \ - + kd*self.use_bw*self.pid_delta + + kp * self.use_bw * self.pid_error \ + + ki * self.use_bw * self.pid_error_sum \ + + kd * self.use_bw * self.pid_delta self.prev_error = prev_vote.pid_error self.pid_bw = self.use_bw \ - + kp*self.use_bw*self.pid_error \ - + ki*self.use_bw*self.integral_error() \ - + kd*self.use_bw*self.d_error_dt() + + kp * self.use_bw * self.pid_error \ + + ki * self.use_bw * self.integral_error() \ + + kd * self.use_bw * self.d_error_dt() # We decay the interval each round to keep it bounded. # This decay is non-standard. We do it to avoid overflow - self.pid_error_sum = prev_vote.pid_error_sum*kidecay + self.pid_error + self.pid_error_sum = prev_vote.pid_error_sum * kidecay + self.pid_error return self.pid_bw @@ -187,20 +188,26 @@ def add_line(self, line): class Line: def __init__(self, line, slice_file, timestamp): - self.fingerprint = re.search("[\s]*node_id=([\S]+)[\s]*", line).group(1) + self.fingerprint = re.search( + "[\s]*node_id=([\S]+)[\s]*", line).group(1) self.nickname = re.search("[\s]*nick=([\S]+)[\s]*", line).group(1) - self.strm_bw = int(re.search("[\s]*strm_bw=([\S]+)[\s]*", line).group(1)) - self.filt_bw = int(re.search("[\s]*filt_bw=([\S]+)[\s]*", line).group(1)) + self.strm_bw = int( + re.search("[\s]*strm_bw=([\S]+)[\s]*", line).group(1)) + self.filt_bw = int( + re.search("[\s]*filt_bw=([\S]+)[\s]*", line).group(1)) self.ns_bw = int(re.search("[\s]*ns_bw=([\S]+)[\s]*", line).group(1)) - self.desc_bw = int(re.search("[\s]*desc_bw=([\S]+)[\s]*", line).group(1)) + self.desc_bw = int( + re.search("[\s]*desc_bw=([\S]+)[\s]*", line).group(1)) self.slice_file = slice_file self.measured_at = timestamp try: - self.circ_fail_rate = float(re.search("[\s]*circ_fail_rate=([\S]+)[\s]*", line).group(1)) + self.circ_fail_rate = float( + re.search("[\s]*circ_fail_rate=([\S]+)[\s]*", line).group(1)) except: self.circ_fail_rate = 0 try: - self.strm_fail_rate = float(re.search("[\s]*strm_fail_rate=([\S]+)[\s]*", line).group(1)) + self.strm_fail_rate = float( + re.search("[\s]*strm_fail_rate=([\S]+)[\s]*", line).group(1)) except: self.strm_fail_rate = 0 @@ -208,15 +215,21 @@ def __init__(self, line, slice_file, timestamp): class Vote: def __init__(self, line): # node_id=$DB8C6D8E0D51A42BDDA81A9B8A735B41B2CF95D1 bw=231000 diff=209281 nick=rainbowwarrior measured_at=1319822504 - self.fingerprint = re.search("[\s]*node_id=([\S]+)[\s]*", line).group(1) + self.fingerprint = re.search( + "[\s]*node_id=([\S]+)[\s]*", line).group(1) self.nickname = re.search("[\s]*nick=([\S]+)[\s]*", line).group(1) self.bw = int(re.search("[\s]+bw=([\S]+)[\s]*", line).group(1)) - self.measured_at = int(re.search("[\s]*measured_at=([\S]+)[\s]*", line).group(1)) + self.measured_at = int( + re.search("[\s]*measured_at=([\S]+)[\s]*", line).group(1)) try: - self.pid_error = float(re.search("[\s]*pid_error=([\S]+)[\s]*", line).group(1)) - self.pid_error_sum = float(re.search("[\s]*pid_error_sum=([\S]+)[\s]*", line).group(1)) - self.pid_delta = float(re.search("[\s]*pid_delta=([\S]+)[\s]*", line).group(1)) - self.pid_bw = float(re.search("[\s]*pid_bw=([\S]+)[\s]*", line).group(1)) + self.pid_error = float( + re.search("[\s]*pid_error=([\S]+)[\s]*", line).group(1)) + self.pid_error_sum = float( + re.search("[\s]*pid_error_sum=([\S]+)[\s]*", line).group(1)) + self.pid_delta = float( + re.search("[\s]*pid_delta=([\S]+)[\s]*", line).group(1)) + self.pid_bw = float( + re.search("[\s]*pid_bw=([\S]+)[\s]*", line).group(1)) except: logger.info("No previous PID data.") self.pid_bw = self.bw @@ -224,9 +237,11 @@ def __init__(self, line): self.pid_delta = 0 self.pid_error_sum = 0 try: - self.updated_at = int(re.search("[\s]*updated_at=([\S]+)[\s]*", line).group(1)) + self.updated_at = int( + re.search("[\s]*updated_at=([\S]+)[\s]*", line).group(1)) except: - logger.info("No updated_at field for "+self.nickname+"="+self.fingerprint) + logger.info("No updated_at field for " + + self.nickname + "=" + self.fingerprint) self.updated_at = self.measured_at @@ -294,19 +309,20 @@ def __init__(self, c): self.use_mercy = True logger.info("Showing mercy on gimpy nodes") if c_params.get('bwauthkp'): - self.K_p = int(c_params.get('bwauthkp'))/10000.0 + self.K_p = int(c_params.get('bwauthkp')) / 10000.0 logger.info("Got K_p=%f from consensus." % self.K_p) if c_params.get('bwauthti'): - self.T_i = (int(c_params.get('bwauthti'))/10000.0) + self.T_i = (int(c_params.get('bwauthti')) / 10000.0) logger.info("Got T_i=%f from consensus." % self.T_i) if c_params.get('bwauthtd'): - self.T_d = (int(c_params.get('bwauthtd'))/10000.0) + self.T_d = (int(c_params.get('bwauthtd')) / 10000.0) logger.info("Got T_d=%f from consensus." % self.T_d) if c_params.get('bwauthtidecay'): - self.T_i_decay = (int(c_params.get('bwauthtidecay'))/10000.0) - logger.info("Got T_i_decay=%f from consensus." % self.T_i_decay) + self.T_i_decay = (int(c_params.get('bwauthtidecay')) / 10000.0) + logger.info("Got T_i_decay=%f from consensus." % + self.T_i_decay) if c_params.get('bwauthpidmax'): - self.pid_max = (int(c_params.get('bwauthpidmax'))/10000.0) + self.pid_max = (int(c_params.get('bwauthpidmax')) / 10000.0) logger.info("Got pid_max=%f from consensus." % self.pid_max) if c_params.get('bwauthguardrate'): self.guard_sample_rate = int(c_params.get('bwauthguardrate')) @@ -317,10 +333,10 @@ def __init__(self, c): self.K_i = 0 self.K_i_decay = 0 else: - self.K_i = self.K_p/self.T_i - self.K_i_decay = (1.0-self.T_i_decay/self.T_i) + self.K_i = self.K_p / self.T_i + self.K_i_decay = (1.0 - self.T_i_decay / self.T_i) - self.K_d = self.K_p*self.T_d + self.K_d = self.K_p * self.T_d logger.info("Got K_p=%f K_i=%f K_d=%f K_i_decay=%f" % (self.K_p, self.K_i, self.K_d, self.K_i_decay)) @@ -328,7 +344,7 @@ def __init__(self, c): self.bw_weights = {} if bw_weights: for weight, value in bw_weights.items(): - self.bw_weights[weight] = int(value)/10000.0 + self.bw_weights[weight] = int(value) / 10000.0 else: logger.warning("No bandwidth weights in consensus!") self.bw_weights["Wgd"] = 0 @@ -337,12 +353,12 @@ def __init__(self, c): def write_file_list(datadir): files = { - 64*1024: "64M", - 32*1024: "32M", - 16*1024: "16M", - 8*1024: "8M", - 4*1024: "4M", - 2*1024: "2M", + 64 * 1024: "64M", + 32 * 1024: "32M", + 16 * 1024: "16M", + 8 * 1024: "8M", + 4 * 1024: "4M", + 2 * 1024: "2M", 1024: "1M", 512: "512k", 256: "256k", @@ -353,7 +369,7 @@ def write_file_list(datadir): 0: "16k", } file_sizes = files.keys() - node_fbws = map(lambda x: 5*x.filt_bw, nodes.itervalues()) + node_fbws = map(lambda x: 5 * x.filt_bw, nodes.itervalues()) file_pairs = [] file_sizes.sort(reverse=True) node_fbws.sort() @@ -365,14 +381,14 @@ def write_file_list(datadir): # that 5*bw < file, and do this for each file size. for bw in node_fbws: i += 1 - pct = 100-(100*i)/len(node_fbws) + pct = 100 - (100 * i) / len(node_fbws) # If two different file sizes serve one percentile, go with the # smaller file size (ie skip this one) if pct == prev_pct: continue for f in xrange(len(file_sizes)): - if bw > file_sizes[f]*1024 and file_sizes[f] > prev_size: - next_f = max(f-1, 0) + if bw > file_sizes[f] * 1024 and file_sizes[f] > prev_size: + next_f = max(f - 1, 0) file_pairs.append((pct, files[file_sizes[next_f]])) prev_size = file_sizes[f] prev_pct = pct @@ -380,13 +396,13 @@ def write_file_list(datadir): file_pairs.reverse() - outfile = file(datadir+"/bwfiles.new", "w") + outfile = file(datadir + "/bwfiles.new", "w") for f in file_pairs: - outfile.write(str(f[0])+" "+f[1]+"\n") + outfile.write(str(f[0]) + " " + f[1] + "\n") outfile.write(".\n") outfile.close() # atomic on POSIX - os.rename(datadir+"/bwfiles.new", datadir+"/bwfiles") + os.rename(datadir + "/bwfiles.new", datadir + "/bwfiles") def main(scan_dirs): @@ -397,7 +413,8 @@ def main(scan_dirs): for n in ns_list: if n.bandwidth is None: n.bandwidth = -1 - ns_list.sort(lambda x, y: int(y.bandwidth/10000.0 - x.bandwidth/10000.0)) + ns_list.sort(lambda x, y: int( + y.bandwidth / 10000.0 - x.bandwidth / 10000.0)) for n in ns_list: if n.bandwidth == -1: n.bandwidth = None @@ -412,11 +429,12 @@ def main(scan_dirs): n = ns_list[i] n.list_rank = i if n.bandwidth is None: - logger.info("Your Tor is not providing NS w bandwidths for "+n.fingerprint) + logger.info( + "Your Tor is not providing NS w bandwidths for " + n.fingerprint) else: got_ns_bw = True n.measured = False - prev_consensus["$"+n.fingerprint] = n + prev_consensus["$" + n.fingerprint] = n if not got_ns_bw: # Sometimes the consensus lacks a descriptor. In that case, @@ -479,11 +497,12 @@ def main(scan_dirs): n = nodes[line.fingerprint] n.add_line(line) except ValueError, e: - logger.info("Conversion error "+str(e)+" at "+l) + logger.info("Conversion error " + str(e) + " at " + l) except AttributeError, e: - logger.info("Slice file format error "+str(e)+" at "+l) + logger.info("Slice file format error " + str(e) + " at " + l) except Exception, e: - logger.warning("Unknown slice parse error "+str(e)+" at "+l) + logger.warning("Unknown slice parse error " + + str(e) + " at " + l) traceback.print_exc() fp.close() @@ -511,44 +530,53 @@ def main(scan_dirs): # the group_by_class setting, and just reset the values if it is not set. for cl in ["Guard+Exit", "Guard", "Exit", "Middle"]: - c_nodes = filter(lambda n: n.node_class() == cl, nodes.itervalues()) - true_filt_avg[cl] = sum(map(lambda n: n.filt_bw, c_nodes))/float(len(c_nodes)) - true_strm_avg[cl] = sum(map(lambda n: n.strm_bw, c_nodes))/float(len(c_nodes)) - true_circ_avg[cl] = sum(map(lambda n: (1.0-n.circ_fail_rate), - c_nodes))/float(len(c_nodes)) + c_nodes = filter(lambda n: n.node_class() + == cl, nodes.itervalues()) + true_filt_avg[cl] = sum( + map(lambda n: n.filt_bw, c_nodes)) / float(len(c_nodes)) + true_strm_avg[cl] = sum( + map(lambda n: n.strm_bw, c_nodes)) / float(len(c_nodes)) + true_circ_avg[cl] = sum(map(lambda n: (1.0 - n.circ_fail_rate), + c_nodes)) / float(len(c_nodes)) # FIXME: This may be expensive pid_tgt_avg[cl] = true_filt_avg[cl] - prev_pid_avg = 2*pid_tgt_avg[cl] + prev_pid_avg = 2 * pid_tgt_avg[cl] while prev_pid_avg > pid_tgt_avg[cl]: - f_nodes = filter(lambda n: n.desc_bw >= pid_tgt_avg[cl], c_nodes) + f_nodes = filter(lambda n: n.desc_bw >= + pid_tgt_avg[cl], c_nodes) prev_pid_avg = pid_tgt_avg[cl] - pid_tgt_avg[cl] = sum(map(lambda n: n.filt_bw, f_nodes))/float(len(f_nodes)) + pid_tgt_avg[cl] = sum( + map(lambda n: n.filt_bw, f_nodes)) / float(len(f_nodes)) - logger.info("Network true_filt_avg["+cl+"]: "+str(true_filt_avg[cl])) - logger.info("Network pid_tgt_avg["+cl+"]: "+str(pid_tgt_avg[cl])) - logger.info("Network true_circ_avg["+cl+"]: "+str(true_circ_avg[cl])) + logger.info( + "Network true_filt_avg[" + cl + "]: " + str(true_filt_avg[cl])) + logger.info( + "Network pid_tgt_avg[" + cl + "]: " + str(pid_tgt_avg[cl])) + logger.info( + "Network true_circ_avg[" + cl + "]: " + str(true_circ_avg[cl])) filt_avg = sum(map(lambda n: n.filt_bw, - nodes.itervalues()))/float(len(nodes)) + nodes.itervalues())) / float(len(nodes)) strm_avg = sum(map(lambda n: n.strm_bw, - nodes.itervalues()))/float(len(nodes)) - circ_avg = sum(map(lambda n: (1.0-n.circ_fail_rate), - nodes.itervalues()))/float(len(nodes)) - logger.info("Network filt_avg: "+str(filt_avg)) - logger.info("Network circ_avg: "+str(circ_avg)) + nodes.itervalues())) / float(len(nodes)) + circ_avg = sum(map(lambda n: (1.0 - n.circ_fail_rate), + nodes.itervalues())) / float(len(nodes)) + logger.info("Network filt_avg: " + str(filt_avg)) + logger.info("Network circ_avg: " + str(circ_avg)) if not cs_junk.group_by_class: # FIXME: This may be expensive pid_avg = filt_avg - prev_pid_avg = 2*pid_avg + prev_pid_avg = 2 * pid_avg f_nodes = nodes.values() while prev_pid_avg > pid_avg: f_nodes = filter(lambda n: n.desc_bw >= pid_avg, f_nodes) prev_pid_avg = pid_avg - pid_avg = sum(map(lambda n: n.filt_bw, f_nodes))/float(len(f_nodes)) + pid_avg = sum(map(lambda n: n.filt_bw, f_nodes) + ) / float(len(f_nodes)) for cl in ["Guard+Exit", "Guard", "Exit", "Middle"]: true_filt_avg[cl] = filt_avg @@ -556,12 +584,14 @@ def main(scan_dirs): true_circ_avg[cl] = circ_avg pid_tgt_avg[cl] = pid_avg - logger.info("Network pid_avg: "+str(pid_avg)) + logger.info("Network pid_avg: " + str(pid_avg)) else: logger.info("PID control disabled") - filt_avg = sum(map(lambda n: n.filt_bw, nodes.itervalues()))/float(len(nodes)) - strm_avg = sum(map(lambda n: n.strm_bw, nodes.itervalues()))/float(len(nodes)) + filt_avg = sum( + map(lambda n: n.filt_bw, nodes.itervalues())) / float(len(nodes)) + strm_avg = sum( + map(lambda n: n.strm_bw, nodes.itervalues())) / float(len(nodes)) for cl in ["Guard+Exit", "Guard", "Exit", "Middle"]: true_filt_avg[cl] = filt_avg true_strm_avg[cl] = strm_avg @@ -577,7 +607,7 @@ def main(scan_dirs): for n in nodes.itervalues(): if n.fingerprint in prev_votes.vote_map and n.fingerprint in prev_consensus: if "Guard" in prev_consensus[n.fingerprint].flags and \ - "Exit" not in prev_consensus[n.fingerprint].flags: + "Exit" not in prev_consensus[n.fingerprint].flags: if n.measured_at != prev_votes.vote_map[n.fingerprint].measured_at: guard_cnt += 1 guard_measure_time += (n.measured_at - @@ -585,20 +615,23 @@ def main(scan_dirs): else: if n.updated_at != prev_votes.vote_map[n.fingerprint].updated_at: node_cnt += 1 - node_measure_time += (n.updated_at - prev_votes.vote_map[n.fingerprint].updated_at) + node_measure_time += (n.updated_at - + prev_votes.vote_map[n.fingerprint].updated_at) # TODO: We may want to try to use this info to autocompute T_d and # maybe T_i? if node_cnt > 0: - logger.info("Avg of "+str(node_cnt)+" node update intervals: "+str((node_measure_time/node_cnt)/3600.0)) + logger.info("Avg of " + str(node_cnt) + " node update intervals: " + + str((node_measure_time / node_cnt) / 3600.0)) if guard_cnt > 0: - logger.info("Avg of "+str(guard_cnt)+" guard measurement interval: "+str((guard_measure_time/guard_cnt)/3600.0)) + logger.info("Avg of " + str(guard_cnt) + " guard measurement interval: " + + str((guard_measure_time / guard_cnt) / 3600.0)) tot_net_bw = 0 for n in nodes.itervalues(): - n.fbw_ratio = n.filt_bw/true_filt_avg[n.node_class()] - n.sbw_ratio = n.strm_bw/true_strm_avg[n.node_class()] + n.fbw_ratio = n.filt_bw / true_filt_avg[n.node_class()] + n.sbw_ratio = n.strm_bw / true_strm_avg[n.node_class()] if cs_junk.bwauth_pid_control: if cs_junk.use_desc_bw: @@ -607,25 +640,25 @@ def main(scan_dirs): n.use_bw = n.ns_bw if cs_junk.use_pid_tgt: - n.pid_error = (n.strm_bw - pid_tgt_avg[n.node_class()]) \ - / pid_tgt_avg[n.node_class()] - # use filt_bw for pid_error < 0 - if cs_junk.use_mercy: - if cs_junk.use_desc_bw: - if n.pid_error_sum < 0 and n.pid_error < 0: - n.pid_error = (n.filt_bw - pid_tgt_avg[n.node_class()]) \ - / pid_tgt_avg[n.node_class()] - else: - if n.desc_bw > n.ns_bw and n.pid_error < 0: - n.pid_error = (n.filt_bw - pid_tgt_avg[n.node_class()]) \ - / pid_tgt_avg[n.node_class()] + n.pid_error = (n.strm_bw - pid_tgt_avg[n.node_class()]) \ + / pid_tgt_avg[n.node_class()] + # use filt_bw for pid_error < 0 + if cs_junk.use_mercy: + if cs_junk.use_desc_bw: + if n.pid_error_sum < 0 and n.pid_error < 0: + n.pid_error = (n.filt_bw - pid_tgt_avg[n.node_class()]) \ + / pid_tgt_avg[n.node_class()] + else: + if n.desc_bw > n.ns_bw and n.pid_error < 0: + n.pid_error = (n.filt_bw - pid_tgt_avg[n.node_class()]) \ + / pid_tgt_avg[n.node_class()] else: if cs_junk.use_best_ratio and n.sbw_ratio > n.fbw_ratio: n.pid_error = (n.strm_bw - true_strm_avg[n.node_class()]) \ - / true_strm_avg[n.node_class()] + / true_strm_avg[n.node_class()] else: n.pid_error = (n.filt_bw - true_filt_avg[n.node_class()]) \ - / true_filt_avg[n.node_class()] + / true_filt_avg[n.node_class()] # XXX: Refactor the following 3 clauses out into it's own function, so we can log # only in the event of update? @@ -634,13 +667,13 @@ def main(scan_dirs): # Compute circ_error relative to 1.0 (full success), but only # apply it if it is both below the network avg and worse than # the pid_error - if (1.0-n.circ_fail_rate) < true_circ_avg[n.node_class()]: + if (1.0 - n.circ_fail_rate) < true_circ_avg[n.node_class()]: circ_error = -n.circ_fail_rate # ((1.0-fail) - 1.0)/1.0 if circ_error < 0 and circ_error < n.pid_error: logger.info( - "CPU overload for %s node %s=%s desc=%d ns=%d pid_error=%f circ_error=%f circ_fail=%f" % - (n.node_class(), n.nickname, n.fingerprint, n.desc_bw, n.ns_bw, - n.pid_error, circ_error, n.circ_fail_rate)) + "CPU overload for %s node %s=%s desc=%d ns=%d pid_error=%f circ_error=%f circ_fail=%f" % + (n.node_class(), n.nickname, n.fingerprint, n.desc_bw, n.ns_bw, + n.pid_error, circ_error, n.circ_fail_rate)) n.pid_error = min(circ_error, n.pid_error) # Don't accumulate too much amplification for fast nodes @@ -650,7 +683,7 @@ def main(scan_dirs): (n.node_class(), n.nickname, n.fingerprint, n.desc_bw, n.ns_bw, n.pid_error_sum)) n.pid_error_sum = cs_junk.pid_max else: - if float(n.ns_bw)/n.desc_bw > cs_junk.pid_max and n.pid_error > 0: + if float(n.ns_bw) / n.desc_bw > cs_junk.pid_max and n.pid_error > 0: logger.info("Capping feedback for %s node %s=%s desc=%d ns=%d pid_error=%f" % (n.node_class(), n.nickname, n.fingerprint, n.desc_bw, n.ns_bw, n.pid_error)) n.pid_error = 0 @@ -732,11 +765,11 @@ def main(scan_dirs): if n.use_bw == n.desc_bw: weight = 1.0 else: - weight = (1.0-cs_junk.bw_weights["Wgd"]) + weight = (1.0 - cs_junk.bw_weights["Wgd"]) n.new_bw = n.get_pid_bw(prev_votes.vote_map[n.fingerprint], - cs_junk.K_p*weight, - cs_junk.K_i*weight, - cs_junk.K_d*weight, + cs_junk.K_p * weight, + cs_junk.K_i * weight, + cs_junk.K_d * weight, cs_junk.K_i_decay) else: n.new_bw = n.get_pid_bw(prev_votes.vote_map[n.fingerprint], @@ -748,10 +781,11 @@ def main(scan_dirs): # Reset values. Don't vote/sample this measurement round. n.revert_to_vote(prev_votes.vote_map[n.fingerprint]) else: # No prev vote, pure consensus feedback this round - n.new_bw = n.use_bw + cs_junk.K_p*n.use_bw*n.pid_error + n.new_bw = n.use_bw + cs_junk.K_p * n.use_bw * n.pid_error n.pid_error_sum = n.pid_error n.pid_bw = n.new_bw - logger.debug("No prev vote for node "+n.nickname+": Consensus feedback") + logger.debug("No prev vote for node " + + n.nickname + ": Consensus feedback") else: # No PID feedback # Choose the larger between sbw and fbw if n.sbw_ratio > n.fbw_ratio: @@ -761,7 +795,7 @@ def main(scan_dirs): n.pid_error = 0 n.pid_error_sum = 0 - n.new_bw = n.desc_bw*n.ratio + n.new_bw = n.desc_bw * n.ratio n.pid_bw = n.new_bw # for transition between pid/no-pid n.change = n.new_bw - n.desc_bw @@ -773,46 +807,49 @@ def main(scan_dirs): if (IGNORE_GUARDS and ("Guard" in prev_consensus[n.fingerprint].flags and "Exit" not in prev_consensus[n.fingerprint].flags)): - logger.info("Skipping voting for guard "+n.nickname) + logger.info("Skipping voting for guard " + n.nickname) n.ignore = True elif "Authority" in prev_consensus[n.fingerprint].flags: - logger.debug("Skipping voting for authority "+n.nickname) + logger.debug("Skipping voting for authority " + n.nickname) n.ignore = True # Go through the list and cap them to NODE_CAP for n in nodes.itervalues(): if n.new_bw >= 0x7fffffff: - logger.warning("Bandwidth of "+n.node_class()+" node "+n.nickname+"="+n.fingerprint+" exceeded maxint32: "+str(n.new_bw)) + logger.warning("Bandwidth of " + n.node_class() + " node " + n.nickname + + "=" + n.fingerprint + " exceeded maxint32: " + str(n.new_bw)) n.new_bw = 0x7fffffff if cs_junk.T_i > 0 and cs_junk.T_i_decay > 0 \ and math.fabs(n.pid_error_sum) > \ - math.fabs(2*cs_junk.T_i*n.pid_error/cs_junk.T_i_decay): - logger.info("Large pid_error_sum for node " + n.fingerprint + "=" + n.nickname+": " + + math.fabs(2 * cs_junk.T_i * n.pid_error / cs_junk.T_i_decay): + logger.info("Large pid_error_sum for node " + n.fingerprint + "=" + n.nickname + ": " + str(n.pid_error_sum) + " vs " + str(n.pid_error)) - if n.new_bw > tot_net_bw*NODE_CAP: + if n.new_bw > tot_net_bw * NODE_CAP: logger.info("Clipping extremely fast " + n.node_class() + " node " + n.fingerprint + "=" + n.nickname + - " at " + str(100*NODE_CAP) + "% of network capacity (" + - str(n.new_bw) + "->" + str(int(tot_net_bw*NODE_CAP))+") " + + " at " + str(100 * NODE_CAP) + "% of network capacity (" + + str(n.new_bw) + "->" + str(int(tot_net_bw * NODE_CAP)) + ") " + " pid_error=" + str(n.pid_error) + " pid_error_sum=" + str(n.pid_error_sum)) - n.new_bw = int(tot_net_bw*NODE_CAP) + n.new_bw = int(tot_net_bw * NODE_CAP) n.pid_error_sum = 0 # Don't let unused error accumulate... if n.new_bw <= 0: if n.fingerprint in prev_consensus: - logger.info(n.node_class() + " node " + n.fingerprint + "=" + n.nickname + " has bandwidth <= 0: " + str(n.new_bw)) + logger.info(n.node_class() + " node " + n.fingerprint + + "=" + n.nickname + " has bandwidth <= 0: " + str(n.new_bw)) else: - logger.info("New node " + n.fingerprint + "=" + n.nickname + " has bandwidth < 0: " + str(n.new_bw)) + logger.info("New node " + n.fingerprint + "=" + + n.nickname + " has bandwidth < 0: " + str(n.new_bw)) n.new_bw = 1 oldest_measured = min(map(lambda n: n.measured_at, - filter(lambda n: n.fingerprint in prev_consensus, - nodes.itervalues()))) - logger.info("Oldest measured node: "+time.ctime(oldest_measured)) + filter(lambda n: n.fingerprint in prev_consensus, + nodes.itervalues()))) + logger.info("Oldest measured node: " + time.ctime(oldest_measured)) oldest_updated = min(map(lambda n: n.updated_at, - filter(lambda n: n.fingerprint in prev_consensus, - nodes.itervalues()))) - logger.info("Oldest updated node: "+time.ctime(oldest_updated)) + filter(lambda n: n.fingerprint in prev_consensus, + nodes.itervalues()))) + logger.info("Oldest updated node: " + time.ctime(oldest_updated)) missed_nodes = 0.0 missed_bw = 0 @@ -838,45 +875,56 @@ def main(scan_dirs): missed_bw += n.bandwidth # We still tend to miss about 80 nodes even with these # checks.. Possibly going in and out of hibernation? - logger.debug("Didn't measure " + n.fingerprint + "=" + n.nickname + " at " + str(round((100.0*n.list_rank)/max_rank, 1)) + " " + str(n.bandwidth)) + logger.debug("Didn't measure " + n.fingerprint + "=" + n.nickname + " at " + str( + round((100.0 * n.list_rank) / max_rank, 1)) + " " + str(n.bandwidth)) - measured_pct = round(100.0*len(nodes)/(len(nodes)+missed_nodes), 1) - measured_bw_pct = 100.0 - round((100.0*missed_bw)/tot_bw, 1) + measured_pct = round(100.0 * len(nodes) / (len(nodes) + missed_nodes), 1) + measured_bw_pct = 100.0 - round((100.0 * missed_bw) / tot_bw, 1) if measured_pct < MIN_REPORT: - logger.info("Did not measure " + str(MIN_REPORT) + "% of nodes yet (" + str(measured_pct) + "%)") + logger.info("Did not measure " + str(MIN_REPORT) + + "% of nodes yet (" + str(measured_pct) + "%)") sys.exit(1) # Notification hack because #2286/#4359 is annoying arma if measured_bw_pct < 75: logger.warning( - "Only measured %f of the previous consensus bandwidth despite measuring %f of the nodes" % - (measured_bw_pct, measured_pct)) + "Only measured %f of the previous consensus bandwidth despite measuring %f of the nodes" % + (measured_bw_pct, measured_pct)) elif measured_bw_pct < 95: logger.info( - "Only measured %f of the previous consensus bandwidth despite measuring %f of the nodes" % - (measured_bw_pct, measured_pct)) + "Only measured %f of the previous consensus bandwidth despite measuring %f of the nodes" % + (measured_bw_pct, measured_pct)) for cl in ["Guard+Exit", "Guard", "Exit", "Middle"]: c_nodes = filter(lambda n: n.node_class() == cl, nodes.itervalues()) nc_nodes = filter(lambda n: n.pid_error < 0, c_nodes) pc_nodes = filter(lambda n: n.pid_error > 0, c_nodes) - logger.info("Avg " + cl + " pid_error=" + str(sum(map(lambda n: n.pid_error, c_nodes))/len(c_nodes))) - logger.info("Avg " + cl + " |pid_error|=" + str(sum(map(lambda n: abs(n.pid_error), c_nodes))/len(c_nodes))) - logger.info("Avg " + cl + " +pid_error=+" + str(sum(map(lambda n: n.pid_error, pc_nodes))/len(pc_nodes))) - logger.info("Avg " + cl + " -pid_error=" + str(sum(map(lambda n: n.pid_error, nc_nodes))/len(nc_nodes))) + logger.info("Avg " + cl + " pid_error=" + + str(sum(map(lambda n: n.pid_error, c_nodes)) / len(c_nodes))) + logger.info("Avg " + cl + " |pid_error|=" + + str(sum(map(lambda n: abs(n.pid_error), c_nodes)) / len(c_nodes))) + logger.info("Avg " + cl + " +pid_error=+" + + str(sum(map(lambda n: n.pid_error, pc_nodes)) / len(pc_nodes))) + logger.info("Avg " + cl + " -pid_error=" + + str(sum(map(lambda n: n.pid_error, nc_nodes)) / len(nc_nodes))) n_nodes = filter(lambda n: n.pid_error < 0, nodes.itervalues()) p_nodes = filter(lambda n: n.pid_error > 0, nodes.itervalues()) - logger.info("Avg network pid_error=" + str(sum(map(lambda n: n.pid_error, nodes.itervalues()))/len(nodes))) - logger.info("Avg network |pid_error|=" + str(sum(map(lambda n: abs(n.pid_error), nodes.itervalues()))/len(nodes))) - logger.info("Avg network +pid_error=+" + str(sum(map(lambda n: n.pid_error, p_nodes))/len(p_nodes))) - logger.info("Avg network -pid_error=" + str(sum(map(lambda n: n.pid_error, n_nodes))/len(n_nodes))) + logger.info("Avg network pid_error=" + + str(sum(map(lambda n: n.pid_error, nodes.itervalues())) / len(nodes))) + logger.info("Avg network |pid_error|=" + + str(sum(map(lambda n: abs(n.pid_error), nodes.itervalues())) / len(nodes))) + logger.info("Avg network +pid_error=+" + + str(sum(map(lambda n: n.pid_error, p_nodes)) / len(p_nodes))) + logger.info("Avg network -pid_error=" + + str(sum(map(lambda n: n.pid_error, n_nodes)) / len(n_nodes))) logger.info("Measured " + str(measured_pct) + "% of all tor nodes (" + str(measured_bw_pct) + "% of previous consensus bw).") n_print = nodes.values() - n_print.sort(lambda x, y: int(y.pid_error*1000) - int(x.pid_error*1000)) + n_print.sort(lambda x, y: int(y.pid_error * 1000) - + int(x.pid_error * 1000)) """ for scanner in scanner_timestamps.iterkeys(): @@ -892,11 +940,13 @@ def main(scan_dirs): for n in n_print: if not n.ignore: # Turns out str() is more accurate than %lf - out.write("node_id=" + n.fingerprint + " bw=" + str(base10_round(n.new_bw)) + " nick=" + n.nickname + " measured_at=" + str(int(n.measured_at))+" updated_at="+str(int(n.updated_at))+" pid_error="+str(n.pid_error)+" pid_error_sum="+str(n.pid_error_sum)+" pid_bw="+str(int(n.pid_bw))+" pid_delta="+str(n.pid_delta)+" circ_fail="+str(n.circ_fail_rate)+"\n") + out.write("node_id=" + n.fingerprint + " bw=" + str(base10_round(n.new_bw)) + " nick=" + n.nickname + " measured_at=" + str(int(n.measured_at)) + " updated_at=" + str(int(n.updated_at)) + + " pid_error=" + str(n.pid_error) + " pid_error_sum=" + str(n.pid_error_sum) + " pid_bw=" + str(int(n.pid_bw)) + " pid_delta=" + str(n.pid_delta) + " circ_fail=" + str(n.circ_fail_rate) + "\n") out.close() write_file_list(scan_dirs[0]) + if __name__ == "__main__": try: main(sys.argv[1:]) @@ -905,7 +955,7 @@ def main(scan_dirs): logger.warning("Socket error. Are the scanning Tors running?") sys.exit(1) except Exception, e: - logger.error("Exception during aggregate: "+str(e)) + logger.error("Exception during aggregate: " + str(e)) traceback.print_exc() sys.exit(1) sys.exit(0) diff --git a/scripts/detect_partitions.py b/scripts/detect_partitions.py index 8cb75e7..56ac5ef 100755 --- a/scripts/detect_partitions.py +++ b/scripts/detect_partitions.py @@ -21,6 +21,7 @@ from bwscanner.partition_scan import ProbeAll2HopCircuits + def get_router_list_from_consensus(tor_state, consensus): """ arguments @@ -40,6 +41,7 @@ def get_router_list_from_consensus(tor_state, consensus): sys.exit(1) return routers + def get_router_list_from_file(tor_state, relay_list_file): """ arguments @@ -73,7 +75,8 @@ def get_router_list_from_file(tor_state, relay_list_file): def main(tor_control, tor_data, log_dir, relay_list, consensus, secret, partitions, this_partition, build_duration, circuit_timeout, prometheus_port, prometheus_interface): - log.startLogging( sys.stdout ) + log.startLogging(sys.stdout) + def start_tor(): config = txtorcon.TorConfig() config.DataDirectory = tor_data @@ -82,18 +85,21 @@ def get_random_tor_ports(): d2 = txtorcon.util.available_tcp_port(reactor) d2.addCallback(lambda port: config.__setattr__('SocksPort', port)) d2.addCallback(lambda _: txtorcon.util.available_tcp_port(reactor)) - d2.addCallback(lambda port: config.__setattr__('ControlPort', port)) + d2.addCallback( + lambda port: config.__setattr__('ControlPort', port)) return d2 def launch_and_get_protocol(ignore): d2 = txtorcon.launch_tor(config, reactor, stdout=sys.stdout) - d2.addCallback(lambda tpp: txtorcon.TorState(tpp.tor_protocol).post_bootstrap) + d2.addCallback(lambda tpp: txtorcon.TorState( + tpp.tor_protocol).post_bootstrap) d2.addCallback(lambda state: state.protocol) return d2 d = get_random_tor_ports().addCallback(launch_and_get_protocol) + def change_torrc(result): - config.UseEntryGuards=0 + config.UseEntryGuards = 0 d2 = config.save() d2.addCallback(lambda ign: result) return d2 @@ -110,6 +116,7 @@ def change_torrc(result): d = txtorcon.build_tor_connection(endpoint, build_state=True) secret_hash = hashlib.sha256(secret).digest() + def start_probe(tor_state): if consensus is not None: routers = get_router_list_from_consensus(tor_state, consensus) @@ -122,6 +129,7 @@ def start_probe(tor_state): partitions, this_partition, build_duration, circuit_timeout, prometheus_port, prometheus_interface) print "starting scan" probe.start() + def signal_handler(signal, frame): print "signal caught, stopping probe" d = probe.stop() @@ -131,5 +139,6 @@ def signal_handler(signal, frame): d.addCallback(start_probe) reactor.run() + if __name__ == '__main__': main() diff --git a/scripts/exitip.py b/scripts/exitip.py index db0fa8a..316ea63 100644 --- a/scripts/exitip.py +++ b/scripts/exitip.py @@ -12,6 +12,7 @@ from txsocksx.errors import HostUnreachable, TTLExpired from txtorcon import TorConfig + def fetch(path, url, state): agent = OnionRoutedAgent(reactor, path=path, state=state) request = agent.request("GET", url) @@ -27,6 +28,7 @@ def parse_ip(body): return exit_ip, None request.addCallback(parse_ip) + def err(failure): failure.trap(defer.CancelledError, ResponseNeverReceived, ResponseFailed, HostUnreachable, TTLExpired) @@ -34,12 +36,15 @@ def err(failure): request.addErrback(err) return request + def run_scan(state): circuits = ExitScan(state) url = 'https://check.torproject.org' - outfile = open("exit-addresses.%s.json" % datetime.datetime.utcnow().isoformat(), 'w+') + outfile = open("exit-addresses.%s.json" % + datetime.datetime.utcnow().isoformat(), 'w+') all_tasks_done = defer.Deferred() tasks = [] + def pop(circuits): try: tasks.append(task.deferLater( @@ -54,19 +59,24 @@ def pop(circuits): reactor.callLater(0, pop, circuits) return all_tasks_done + def shutdown(ignore): reactor.stop() + def add_attacher(state): state.set_attacher(SOCKSClientStreamAttacher(state), reactor) return state + def setup_failed(failure): log.err(failure) + def save_results(result, outfile): outfile.write(json.dumps(dict([r[1] for r in result if r[1] != None]))) + def main(): log.startLogging(sys.stdout) tor = start_tor(TorConfig()) @@ -76,5 +86,6 @@ def main(): tor.addBoth(shutdown) reactor.run() + if __name__ == '__main__': main() diff --git a/scripts/twist.py b/scripts/twist.py index ad2a127..a043dca 100644 --- a/scripts/twist.py +++ b/scripts/twist.py @@ -43,34 +43,35 @@ logging.basicConfig(level=logging.DEBUG) + def __install(): log = logging.getLogger('tpython') log.info('setting up twisted reactor in ipython loop') - + from twisted.internet import _threadedselect _threadedselect.install() - + from twisted.internet import reactor from collections import deque from IPython.lib import inputhook from IPython import InteractiveShell - + q = deque() - + def reactor_wake(twisted_loop_next, q=q): q.append(twisted_loop_next) - + def reactor_work(*_args): if q: while len(q): q.popleft()() return 0 - + def reactor_start(*_args): log.info('starting twisted reactor in ipython') reactor.interleave(reactor_wake) # @UndefinedVariable inputhook.set_inputhook(reactor_work) - + def reactor_stop(): if reactor.threadpool: # @UndefinedVariable log.info('stopping twisted threads') @@ -81,6 +82,7 @@ def reactor_stop(): ip = InteractiveShell.instance() ask_exit = ip.ask_exit + def ipython_exit(): reactor_stop() return ask_exit() @@ -91,7 +93,9 @@ def ipython_exit(): return reactor + reactor = __install() from txtorcon import build_local_tor_connection -build_local_tor_connection(reactor).addCallback(lambda tor: globals().update(tor=tor)) +build_local_tor_connection(reactor).addCallback( + lambda tor: globals().update(tor=tor)) diff --git a/test/template.py b/test/template.py index 0e495ae..71e0db7 100644 --- a/test/template.py +++ b/test/template.py @@ -13,9 +13,9 @@ class TorTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): self.tor_state = yield connect_to_tor( - launch_tor=False, - control_port=int(os.environ.get('CHUTNEY_CONTROL_PORT')), - circuit_build_timeout=30, + launch_tor=False, + control_port=int(os.environ.get('CHUTNEY_CONTROL_PORT')), + circuit_build_timeout=30, ) self.attacher = SOCKSClientStreamAttacher(self.tor_state) diff --git a/test/test_detect_partitions.py b/test/test_detect_partitions.py index 2a8ec55..02cae21 100644 --- a/test/test_detect_partitions.py +++ b/test/test_detect_partitions.py @@ -31,13 +31,15 @@ def test_shuffle_generator1(self): partitions = 3 consensus_hash = hashlib.sha256('REPLACEME consensus hash').digest() shared_secret = hashlib.sha256('REPLACEME shared secret').digest() - prng_seed = hashlib.pbkdf2_hmac('sha256', consensus_hash, shared_secret, iterations=1) + prng_seed = hashlib.pbkdf2_hmac( + 'sha256', consensus_hash, shared_secret, iterations=1) all_partitions = [] for partition_id in range(partitions): partition = [circuit for circuit in - lazy2HopCircuitGenerator(relays, partition_id, partitions, prng_seed)] + lazy2HopCircuitGenerator(relays, partition_id, + partitions, prng_seed)] all_partitions += partition - self.assertEqual(len(all_partitions), (total_relays**2)-total_relays) + self.assertEqual(len(all_partitions), (total_relays**2) - total_relays) def test_shuffle_generator2(self): total_relays = 80 @@ -45,13 +47,15 @@ def test_shuffle_generator2(self): partitions = 4 consensus_hash = hashlib.sha256('REPLACEME consensus hash').digest() shared_secret = hashlib.sha256('REPLACEME shared secret').digest() - prng_seed = hashlib.pbkdf2_hmac('sha256', consensus_hash, shared_secret, iterations=1) + prng_seed = hashlib.pbkdf2_hmac( + 'sha256', consensus_hash, shared_secret, iterations=1) all_partitions = [] for partition_id in range(partitions): partition = [circuit for circuit in - lazy2HopCircuitGenerator(relays, partition_id, partitions, prng_seed)] + lazy2HopCircuitGenerator(relays, partition_id, + partitions, prng_seed)] all_partitions += partition - self.assertEqual(len(all_partitions), (total_relays**2)-total_relays) + self.assertEqual(len(all_partitions), (total_relays**2) - total_relays) def test_permutations(self): total_relays = 40 @@ -63,14 +67,17 @@ def test_permutations(self): consensus_hash = hashlib.sha256('REPLACEME consensus hash').digest() shared_secret = hashlib.sha256('REPLACEME shared secret').digest() - prng_seed = hashlib.pbkdf2_hmac('sha256', consensus_hash, shared_secret, iterations=1) + prng_seed = hashlib.pbkdf2_hmac( + 'sha256', consensus_hash, shared_secret, iterations=1) circuit_generator = lazy2HopCircuitGenerator(routers, this_partition, partitions, prng_seed) - circuits = map(lambda x: (str(x[0]), str(x[1])), [circuit for circuit in circuit_generator]) - expected = [('relay17', 'relay25'), ('relay10', 'relay26'), ('relay8', 'relay3'), - ('relay20', 'relay37'), ('relay7', 'relay26'), ('relay29', 'relay28'), - ('relay12', 'relay38'), ('relay7', 'relay14'), ('relay2', 'relay4'), - ('relay16', 'relay3')] + circuits = map(lambda x: (str(x[0]), str(x[1])), [ + circuit for circuit in circuit_generator]) + expected = [('relay17', 'relay25'), ('relay10', 'relay26'), + ('relay8', 'relay3'), ('relay20', 'relay37'), + ('relay7', 'relay26'), ('relay29', 'relay28'), + ('relay12', 'relay38'), ('relay7', 'relay14'), + ('relay2', 'relay4'), ('relay16', 'relay3')] self.failUnlessEqual(circuits[:10], expected) @@ -95,7 +102,7 @@ def build_circuit(self, routers=None, using_guards=True): else: cmd += ',' if isinstance(router, basestring) and len(router) == 40 \ - and hashFromHexId(router): + and hashFromHexId(router): cmd += router else: cmd += router.id_hex[1:] @@ -129,8 +136,9 @@ def stopped(): build_duration = .2 circuit_timeout = 10 probe = ProbeAll2HopCircuits(tor_state, clock, log_dir, stopped, - relays, secret, partitions, this_partition, - build_duration, circuit_timeout) + relays, secret, partitions, + this_partition, build_duration, + circuit_timeout) probe.start() for _ in range(len(relays)**2 - len(relays)): try: diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 7bf44e3..d0332f1 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -32,7 +32,8 @@ class TestOnionRoutedTCPClientEndpoint(TorTestCase): def test_connect_tcp(self): endpoint = random.choice(self.routers) # Connect to a routers OR port as a general TCP connection test - ore = OnionRoutedTCPClientEndpoint(str(endpoint.ip), int(endpoint.or_port), + ore = OnionRoutedTCPClientEndpoint(str(endpoint.ip), + int(endpoint.or_port), self.tor_state, self.random_path()) proto = yield ore.connect(MockProtocol()) self.failUnlessIsInstance(proto, MockProtocol) diff --git a/test/test_listener.py b/test/test_listener.py index eeed7c7..38495f8 100644 --- a/test/test_listener.py +++ b/test/test_listener.py @@ -58,14 +58,15 @@ class TestStreamBandwidthListener(TorTestCase): @defer.inlineCallbacks def setUp(self): yield super(TestStreamBandwidthListener, self).setUp() - self.fetch_size = 8*2**20 # 8MB - self.stream_bandwidth_listener = yield StreamBandwidthListener(self.tor_state) + self.fetch_size = 8 * 2**20 # 8MB + self.stream_bandwidth_listener = \ + yield StreamBandwidthListener(self.tor_state) class DummyResource(Resource): isLeaf = True def render_GET(self, request): - return 'a'*8*2**20 + return 'a' * 8 * 2**20 self.port = yield available_tcp_port(reactor) self.site = Site(DummyResource()) @@ -77,11 +78,13 @@ def render_GET(self, request): @defer.inlineCallbacks def test_circ_bw(self): r = yield self.do_fetch() - bw_events = self.stream_bandwidth_listener.circ_bw_events.get(r['circ']) + bw_events = self.stream_bandwidth_listener.circ_bw_events.get( + r['circ']) assert bw_events # XXX: why are the counters reversed!? -> See StreamBandwidthListener # docstring. - # assert self.fetch_size/2 <= sum([x[1] for x in bw_events]) <= self.fetch_size + # assert self.fetch_size/2 <= sum([x[1] for x in bw_events]) + # <= self.fetch_size assert sum([x[1] for x in bw_events]) <= self.fetch_size # either this is backward, or we wrote more bytes than read?! assert sum([x[2] for x in bw_events]) >= sum([x[1] for x in bw_events]) @@ -89,44 +92,54 @@ def test_circ_bw(self): @defer.inlineCallbacks def test_stream_bw(self): r = yield self.do_fetch() - bw_events = self.stream_bandwidth_listener.stream_bw_events.get(r['circ']) + bw_events = self.stream_bandwidth_listener.stream_bw_events.get( + r['circ']) assert bw_events - assert self.fetch_size/2 <= sum([x[1] for x in bw_events]) <= self.fetch_size + assert self.fetch_size / \ + 2 <= sum([x[1] for x in bw_events]) <= self.fetch_size @defer.inlineCallbacks def test_bw_samples(self): r = yield self.do_fetch() - bw_events = self.stream_bandwidth_listener.stream_bw_events.get(r['circ']) + bw_events = self.stream_bandwidth_listener.stream_bw_events.get( + r['circ']) assert bw_events # XXX: Where are these self.fetch_size/n magic values coming from? - assert self.fetch_size/4 <= sum([x[1] for x in bw_events]) <= self.fetch_size + assert self.fetch_size / \ + 4 <= sum([x[1] for x in bw_events]) <= self.fetch_size # XXX: If the measurement happens in under 1 second, we will have one # STREAM_BW, and will not be able to calculate BW samples. if len(bw_events) == 1: raise self.not_enough_measurements - bw_samples = [x for x in self.stream_bandwidth_listener.bw_samples(r['circ'])] + bw_samples = [ + x for x in self.stream_bandwidth_listener.bw_samples(r['circ'])] assert bw_samples - assert self.fetch_size/2 <= sum([x[0] for x in bw_samples]) <= self.fetch_size - assert r['duration'] * .5 < sum([x[2] for x in bw_samples]) < r['duration'] * 2 + assert self.fetch_size / \ + 2 <= sum([x[0] for x in bw_samples]) <= self.fetch_size + assert r['duration'] * .5 < sum([x[2] for x in bw_samples]) < \ + r['duration'] * 2 @defer.inlineCallbacks def test_circ_avg_bw(self): r = yield self.do_fetch() - bw_events = self.stream_bandwidth_listener.stream_bw_events.get(r['circ']) + bw_events = self.stream_bandwidth_listener.stream_bw_events.get( + r['circ']) # XXX: these complete too quickly to sample sufficient bytes... assert bw_events - assert self.fetch_size/4 <= sum([x[1] for x in bw_events]) <= self.fetch_size + assert self.fetch_size / \ + 4 <= sum([x[1] for x in bw_events]) <= self.fetch_size if len(bw_events) == 1: raise self.not_enough_measurements circ_avg_bw = self.stream_bandwidth_listener.circ_avg_bw(r['circ']) assert circ_avg_bw is not None assert circ_avg_bw['path'] == r['circ'].path - assert self.fetch_size/4 <= circ_avg_bw['bytes_r'] <= self.fetch_size + assert self.fetch_size / 4 <= circ_avg_bw['bytes_r'] <= self.fetch_size assert 0 < circ_avg_bw['duration'] <= r['duration'] - assert (circ_avg_bw['bytes_r']/4 < (circ_avg_bw['samples'] * circ_avg_bw['r_bw']) < - circ_avg_bw['bytes_r']*2) + assert (circ_avg_bw['bytes_r'] / 4 < + (circ_avg_bw['samples'] * circ_avg_bw['r_bw']) < + circ_avg_bw['bytes_r'] * 2) @defer.inlineCallbacks def do_fetch(self): @@ -137,7 +150,8 @@ def do_fetch(self): request = yield agent.request("GET", url) body = yield readBody(request) assert len(body) == self.fetch_size - circ = [c for c in self.tor_state.circuits.values() if c.path == path][0] + circ = [c for c in self.tor_state.circuits.values() + if c.path == path][0] assert isinstance(circ, Circuit) # XXX: Wait for circuit to close, then I think we can be sure that diff --git a/test/test_measurement.py b/test/test_measurement.py index 4cc9fce..aededc8 100644 --- a/test/test_measurement.py +++ b/test/test_measurement.py @@ -25,10 +25,10 @@ class DummyResource(Resource): def render_GET(self, request): size = request.uri.split('/')[-1] if 'k' in size: - size = int(size[:-1])*(2**10) + size = int(size[:-1]) * (2**10) elif 'M' in size: - size = int(size[:-1])*(2**20) - return 'a'*size + size = int(size[:-1]) * (2**20) + return 'a' * size self.port = yield available_tcp_port(reactor) self.test_service = yield reactor.listenTCP( @@ -55,7 +55,8 @@ def check_all_routers_measured(measurement_dir): measurements.extend(json.load(result_file)) for measurement in measurements: - measured_relays.update({str(router) for router in measurement['path']}) + measured_relays.update({str(router) + for router in measurement['path']}) failed_measurements = [measurement for measurement in measurements if 'failure' in measurement] diff --git a/test/test_writer.py b/test/test_writer.py index 6f89696..512ba4a 100644 --- a/test/test_writer.py +++ b/test/test_writer.py @@ -19,7 +19,7 @@ def test_send_multiple_chunk_size(self): test_data = {'test_method': 'test_send_chunk_size'} num_chunks = randint(121, 212) deferreds = [] - for _ in xrange(chunk_size*num_chunks): + for _ in xrange(chunk_size * num_chunks): deferreds += [self.result_sink.send(test_data)] def validate(_, dirname, fnames):