-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfuncs.py
executable file
·226 lines (174 loc) · 6.5 KB
/
funcs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
import numpy as np
import networkx as nx
def compute_and_save_all_shortest_paths(G):
"""
Input: nx.graph, G
Output: list, paths, where paths[start][end] = [ path1, path2 ]
where path1 = [node1,nod2] is one of (possibly more than one) shortest paths from
start to end
"""
nodes = [node for node in G.nodes()]
n = len(nodes)
paths = list(np.zeros((n,n)))
paths = [list(i) for i in paths]
for start in nodes:
for end in nodes:
path = nx.all_shortest_paths(G,start,end)
path = [p for p in path]
paths[start][end] = path
return paths
def random_walk_covertime(G, m = 1,num_trials=10):
"""
Does a random walk until all nodes have been covered
at least m times. Does this for num_trials times.
Input: nx.graph, G
int, m
Output: list, Ts, covertimes
"""
nodes = [node for node in G.nodes()] #assume labeled 0,1,2,....n-1
Ts = []
trial = 0
while trial < num_trials:
trial += 1
counts = np.zeros(len(nodes)) #count[i] count of node is
time = 0
current_position = np.random.choice(nodes)
counts[current_position] += 1
num_unvisited_nodes = np.sum(counts < m)
while num_unvisited_nodes > 0:
time += 1
neighbours = list(G.neighbors(current_position))
new_position = np.random.choice(neighbours)
counts[new_position] += 1
current_position = new_position
num_unvisited_nodes = np.sum(counts < m)
Ts.append(time)
return Ts
def taxi_drive_covertime(G,m,num_trials=10):
"""
Does a a taxi-drive until all nodes have been covered
at least m times. Does this for num_trials times.
Input: nx.graph, G
int, m
Output: list, Ts, covertimes
"""
nodes = [node for node in G.nodes()]
Ts = []
trial = 0
paths = compute_and_save_all_shortest_paths(G)
while trial < num_trials:
trial += 1
time = 0
counts = np.zeros(len(nodes)) #count[i] count of node is
current_position = np.random.choice(nodes)
counts[current_position] += 1
num_unvisited_nodes = np.sum(counts < m)
while num_unvisited_nodes > 0:
nodes_minus_origin = nodes[:current_position] + nodes[current_position+1:]
destination = np.random.choice(nodes_minus_origin)
#Pick one of possibly many shortest paths
all_shortest_paths = paths[current_position][destination]
temp = np.random.choice(range(len(all_shortest_paths)))
path = all_shortest_paths[temp]
path = path[1:] #remove the origin
#Traverse path
for node in path:
counts[node] += 1
num_unvisited_nodes = np.sum(counts < m)
time += 1
current_position = destination
Ts.append(time)
return Ts
def taxi_drive_stationary_densities(G,counts,T):
"""
Does an urban explorer process on the nx.graph G for T timesteps.
Counts[i] = number of times nodes i has been touched. I'll feed this in
as an arrays of zeros to start. But I can run simulations back-to-back
if I need to.
Input: nx.graph, G
array, counts, see above
int, T, num time steps
Output: array, counts
"""
nodes = [node for node in G.nodes()]
Ts = []
trial = 0
paths = compute_and_save_all_shortest_paths(G)
trial += 1
time = 0
current_position = np.random.choice(nodes)
counts[current_position] += 1
while time < T:
nodes_minus_origin = nodes[:current_position] + nodes[current_position+1:]
destination = np.random.choice(nodes_minus_origin)
#Pick one of possibly many shortest paths
all_shortest_paths = paths[current_position][destination]
temp = np.random.choice(range(len(all_shortest_paths)))
path = all_shortest_paths[temp]
path = path[1:] #remove the origin
#Traverse path
for node in path:
counts[node] += 1
time += 1
current_position = destination
return counts
def taxi_drive_returntime(G,start_node,num_trials=10):
"""
Finds the return time for a taxi-drive for given node
Input: nx.graph, G
int, start_node, node to start the process on (nodes are labeled serially from 0)
Output: array, Ts, return times
"""
nodes = [node for node in G.nodes()]
Ts = []
trial = 0
paths = f.compute_and_save_all_shortest_paths(G)
while trial < num_trials:
trial += 1
time = 0
counts = np.zeros(len(nodes)) #count[i] count of node is
current_position = start_node
counts[current_position] += 1
while True:
cond = False
nodes_minus_origin = nodes[:current_position] + nodes[current_position+1:]
destination = np.random.choice(nodes_minus_origin)
#Pick one of possibly many shortest paths
all_shortest_paths = paths[current_position][destination]
temp = np.random.choice(range(len(all_shortest_paths)))
path = all_shortest_paths[temp]
path = path[1:] #remove the origin
#Traverse path
for node in path:
current_position = node
time += 1
if current_position == start_node:
cond = True
break
if cond == True:
break
Ts.append(time)
return Ts
def random_walk_returntime(G,start_node,num_trials=10):
"""
Finds the return time for a taxi-drive for given node
Input: nx.graph, G
int, start_node, node to start the process on (nodes are labeled serially from 0)
Output: array, Ts, return times
"""
nodes = [node for node in G.nodes()] #assume labeled 0,1,2,....n-1
Ts = []
trial = 0
while trial < num_trials:
trial += 1
time = 0
current_position = start_node
while True:
time += 1
neighbours = list(G.neighbors(current_position))
new_position = np.random.choice(neighbours)
current_position = new_position
if current_position == start_node:
break
Ts.append(time)
return Ts