-
Notifications
You must be signed in to change notification settings - Fork 3
/
locate-from-pickle
executable file
·146 lines (122 loc) · 4.17 KB
/
locate-from-pickle
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#! /usr/bin/python3
# usage: locate-from-db output-dir calibration basemap pickle
import argparse
import collections
import datetime
import gzip
import multiprocessing
import os
import pickle
import sys
import time
import zlib
from math import inf as Inf
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), 'lib')))
import ageo
_time_0 = time.monotonic()
def progress(message, *args):
global _time_0
sys.stderr.write(
("{}: " + message + "\n").format(
datetime.timedelta(seconds = time.monotonic() - _time_0),
*args))
def warning(message, *args):
sys.stderr.write(
("\t*** " + message + "\n").format(*args))
def load_calibration(cfname):
try:
with gzip.open(cfname, "rb") as fp:
# FIXME: duplicates code from 'calibrate'
cal_cbg, cal_oct, cal_spo = pickle.load(fp)
minmax = ageo.ranging.MinMax
gaussn = ageo.ranging.Gaussian
return {
"cbg-m-1": (cal_cbg, minmax, False),
"oct-m-1": (cal_oct, minmax, False),
"spo-m-a": (cal_spo, minmax, True),
"spo-g-a": (cal_spo, gaussn, True)
}
except (OSError, zlib.error, pickle.UnpicklingError) as e:
sys.stderr.write("unable to load calibration: {}: {}\n"
.format(cfname, e))
sys.exit(1)
Position = collections.namedtuple("Position",
("ipv4", "label", "ilabel", "lon", "lat"))
# these are filled in in main()
positions = None
basemap = None
calibrations = None
def process_batch(args):
global positions, basemap
ofname, mode, metadata, measurements = args
cals, ranging, use_all = calibrations[mode]
bnd = basemap.bounds
obsv = []
for landmark, rtts in measurements.items():
if landmark not in positions:
continue
lpos = positions[landmark]
if use_all:
calibration = cals[0]
elif landmark in cals:
calibration = cals[landmark]
elif lpos.label in cals:
calibration = cals[lpos.label]
elif lpos.ilabel in cals:
calibration = cals[lpos.ilabel]
else:
continue
obs = ageo.Observation(
basemap=basemap,
ref_lat=lpos.lat,
ref_lon=lpos.lon,
range_fn=ranging,
calibration=calibration,
rtts=rtts)
obsv.append(obs)
bnd = bnd.intersection(obs.bounds)
if bnd.is_empty:
return mode + " (empty intersection region)", str(metadata['id'])
if not obsv:
return mode + " (no observations)", str(metadata['id'])
loc = obsv[0]
for obs in obsv[1:]:
loc = loc.intersection(obs, bnd)
#loc = loc.intersection(basemap, bnd)
loc.annotations.update(metadata)
loc.save(ofname)
return mode, metadata['id']
def marshal_batches(args, rdr, modes):
try:
while True:
(_, (metadata, measurements)) = rdr.load()
for mode in modes:
ofname = os.path.join(args.output_dir,
mode + "-" + str(metadata['id']) + ".h5")
if not os.path.exists(ofname):
yield (ofname, mode, metadata, measurements)
except EOFError:
return
def main():
global positions, calibrations, basemap
ap = argparse.ArgumentParser()
ap.add_argument("output_dir")
ap.add_argument("calibration")
ap.add_argument("basemap")
ap.add_argument("pickle")
args = ap.parse_args()
progress("preparing...")
# FIXME: duplicates code from 'calibrate'
calibrations = load_calibration(args.calibration)
basemap = ageo.Map(args.basemap)
with gzip.open(args.pickle, "rb") as pf:
rdr = pickle.Unpickler(pf)
# The first item in the file is the batch list, which we don't need.
rdr.load()
positions = rdr.load()
with multiprocessing.Pool() as pool:
for tag, id in pool.imap_unordered(
process_batch,
marshal_batches(args, rdr, calibrations.keys())):
progress("{}: {}", id, tag)
main()