Skip to content

Commit

Permalink
Merge pull request #70 from cevi/dev
Browse files Browse the repository at this point in the history
- fix wrong elevation data
- fix waypoint misaligment
  • Loading branch information
wp99cp authored Apr 26, 2022
2 parents d3a0710 + cb891f9 commit 3602547
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 125 deletions.
3 changes: 2 additions & 1 deletion python_program/automatic_walk_time_tables/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from automatic_walk_time_tables.geo_processing.find_walk_table_points import select_waypoints
from automatic_walk_time_tables.geo_processing.map_numbers import find_map_numbers
from automatic_walk_time_tables.map_downloader.create_map import MapCreator
from automatic_walk_time_tables.utils.path import Path_LV03
from automatic_walk_time_tables.walk_time_table.walk_table import plot_elevation_profile, create_walk_table
from automatic_walk_time_tables.utils.file_parser import parse_gpx_file, parse_kml_file
from automatic_walk_time_tables.utils import path
Expand All @@ -28,7 +29,7 @@ def __init__(self, args: argparse.Namespace, uuid: str = ''):
# get the extension of the file
extension = pathlib.Path(self.args.file_name).suffix

self.path_ = path.Path_LV03()
self.path_: Path_LV03 = path.Path_LV03()
self.path_.clear()

if extension == '.gpx':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def WGS84toLV03(self, latitude, longitude, ellHeight):
d = []
d.append(self.WGStoCHy(latitude, longitude))
d.append(self.WGStoCHx(latitude, longitude))
d.append(self.WGStoCHh(latitude, longitude, ellHeight))
d.append(ellHeight)
return d


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import os
from math import sqrt

from automatic_walk_time_tables.utils.point import Point_LV95

logger = logging.getLogger(__name__)


Expand All @@ -24,7 +26,7 @@ def add_to_database(file, db, typeIndex, name, x, y):
logger.debug("Database has " + str(len(db)) + " entries.")


def find_name(coord, dist):
def find_name(point: Point_LV95, dist):
"""
See also https://api3.geo.admin.ch/api/faq/index.html#which-layers-have-a-tooltip
fair use limit 20 Request per minute
Expand All @@ -36,8 +38,8 @@ def find_name(coord, dist):

flurname_list = [swiss_name
for swiss_name in database if
abs(swiss_name.x - coord[0]) < dist * 4 and
abs(swiss_name.y - coord[1]) < dist * 4]
abs(swiss_name.x - point.lat) < dist * 4 and
abs(swiss_name.y - point.lon) < dist * 4]

# print(list(map(lambda x: x.name, flurname_list)))

Expand All @@ -50,7 +52,7 @@ def find_name(coord, dist):
# Flurname swisstopo: 0.9

flurname_list.sort(key=lambda swiss_name:
sqrt((abs(swiss_name.x - coord[0]) ** 2 + abs(swiss_name.y - coord[1]) ** 2)) / (
sqrt((abs(swiss_name.x - point.lat) ** 2 + abs(swiss_name.y - point.lon) ** 2)) / (
2 if swiss_name.object_type in ['Haltestelle Bahn', 'Huegel', 'Pass', 'Strassenpass', 'Alpiner Gipfel',
'Gipfel',
] else
Expand All @@ -67,7 +69,7 @@ def find_name(coord, dist):
return ''

return ('bei/m ' if sqrt(
abs(flurname_list[0].x - coord[0]) ** 2 + abs(flurname_list[0].y - coord[1]) ** 2) > dist else '') + \
abs(flurname_list[0].x - point.lat) ** 2 + abs(flurname_list[0].y - point.lon) ** 2) > dist else '') + \
flurname_list[0].name


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
from copy import copy
from typing import List, Tuple

import geopy.distance
from automatic_walk_time_tables.utils import path, point
from automatic_walk_time_tables.utils.point import Point_LV03
from automatic_walk_time_tables.utils.way_point import WayPoint

logger = logging.getLogger(__name__)


def select_waypoints(path_ : path.Path, walk_point_limit=21):
def select_waypoints(path_: path.Path_LV03, walk_point_limit=21) -> Tuple[float, List[WayPoint], List[WayPoint]]:
"""
Algorithm that selects suitable points for the Marschzeittabelle.
Some parts are inspired by the Ramer–Douglas–Peucker algorithm. Especially
Expand Down Expand Up @@ -37,7 +38,7 @@ def select_waypoints(path_ : path.Path, walk_point_limit=21):
return total_distance, pts_step_2, pts_step_3


def reduce_number_of_points(pts_step_2: List[Tuple[float, point.Point]], walk_point_limit: int):
def reduce_number_of_points(pts_step_2: List[WayPoint], walk_point_limit: int):
"""
Final selection: Iteratively reduce the number of points to the maximum specified in walk_point_limit. To
achieve this we iteratively increase a maximum derivation (drv_limit) value until we have dropped enough points.
Expand All @@ -53,10 +54,10 @@ def reduce_number_of_points(pts_step_2: List[Tuple[float, point.Point]], walk_po
through the selected points, we increase drv_limit by 2 meters and try again.
"""

pts_step_3: List[Tuple[float, point.Point]] = copy(pts_step_2)
pts_step_3: List[WayPoint] = copy(pts_step_2)

pt_A: Tuple[float, point.Point] = None
pt_B: Tuple[float, point.Point] = None
pt_A: WayPoint = None
pt_B: WayPoint = None

drv_limit = 0

Expand All @@ -81,19 +82,22 @@ def reduce_number_of_points(pts_step_2: List[Tuple[float, point.Point]], walk_po
m, b = calc_secant_line(pt_A, pt_C)
secant_elev = calc_secant_elevation(m, b, pt_B)

if abs(secant_elev - pt_B[1].h) < drv_limit:
if abs(secant_elev - pt_B.point.h) < drv_limit:

# Check if B must be replaced by a previously dropped point D
pt_D = None
for pt in list(filter(lambda p: pt_A[0] < p[0] < pt_C[0], pts_dropped)):
for pt in list(
filter(lambda p: pt_A.accumulated_distance < p[0] < pt_C.accumulated_distance,
pts_dropped)):
secant_elev = calc_secant_elevation(m, b, pt)
if abs(secant_elev - pt[1].h) >= drv_limit:
if abs(secant_elev - pt.point.h) >= drv_limit:
pt_D = pt
break

if pt_D is not None: # Replace B with point D
pts_step_3.remove(pt_B)
index = next(x for x, val in enumerate(pts_step_3) if val[0] > pt_D[0])
index = next(x for x, val in enumerate(pts_step_3) if
val.accumulated_distance > pt_D.accumulated_distance)
pts_step_3.insert(index, pt_D)
pts_dropped.append(pt_B)

Expand All @@ -113,7 +117,7 @@ def reduce_number_of_points(pts_step_2: List[Tuple[float, point.Point]], walk_po
return pts_step_3


def remove_unnecessary_points(pts_step_1: List[Tuple[int, point.Point]]):
def remove_unnecessary_points(pts_step_1: List[WayPoint]):
"""
Now we loop through the preselected points and tighten the selection criteria.
Into the list pts_step_2 we include points according to the following rules:
Expand All @@ -132,26 +136,27 @@ def remove_unnecessary_points(pts_step_1: List[Tuple[int, point.Point]]):

drv_limit = 20

last_pt: Tuple[float, point.Point] = pts_step_1[0]
pts_step_2: List[Tuple[float, point.Point]] = [last_pt]
last_pt: WayPoint = pts_step_1[0]
pts_step_2: List[WayPoint] = [last_pt]

for pt in pts_step_1[1:]:

if last_pt is not None:

last_coord = get_coordinates(last_pt[1].to_WGS84())
coord = get_coordinates(pt[1].to_WGS84())
last_coord = get_coordinates(last_pt.point)
coord = get_coordinates(pt.point)

if (abs(last_pt[1].h - pt[1].h) > 20
and geopy.distance.distance(last_coord, coord).m > 250) \
or geopy.distance.distance(last_coord, coord).m > 1500: # TODO: this line seems obsole, bug?
if abs(last_pt.point.h - pt.point.h) > 20 and coord[0] - last_coord[0] > 250:

m, b = calc_secant_line(last_pt, pt)

# add point with max derivation
for intermediate_point in list(filter(lambda p: last_pt[0] < p[0] < pt[0], pts_step_1)):
for intermediate_point in \
list(filter(
lambda p: last_pt.accumulated_distance < p.accumulated_distance < pt.accumulated_distance,
pts_step_1)):

derivation = abs(calc_secant_elevation(m, b, intermediate_point) - intermediate_point[1].h)
derivation = abs(calc_secant_elevation(m, b, intermediate_point) - intermediate_point.point.h)
if drv_limit <= derivation:
pts_step_2.append(intermediate_point)
drv_limit = derivation
Expand All @@ -173,7 +178,7 @@ def get_coordinates(pt: point.Point):
return pt.lat, pt.lon


def preselection_step(path_ : path.Path):
def preselection_step(path_: path.Path):
"""
Preselection: Select all points from the tracking file which satisfy one of the following criteria.
This guarantees that all important points are considered. We call the set of selected points pts_step_1.
Expand All @@ -184,68 +189,66 @@ def preselection_step(path_ : path.Path):
- distance to last selected point bigger than 250 meters
"""

cumulated_distance = 0.0
pts_step_1: List[Tuple[float, point.Point]] = []
accumulated_distance = 0.0
way_points: List[WayPoint] = []

lastSlope = None
lastCoord = None
last_slope = 0
last_point = path_.points[0]

lastPoint = None
# Insert first point of path
way_points.append(WayPoint(0, last_point))

# (1) Preselection
for i,pt in enumerate(path_.points):
newCoord = get_coordinates(pt.to_WGS84())

if i == 0: # First point
pts_step_1.append((0, pt))
lastCoord = get_coordinates(pt.to_WGS84())
lastSlope = 0.
elif i == len(path_.points) - 1: # Last point
distDelta = geopy.distance.distance(lastCoord, newCoord).km
cumulated_distance += distDelta
pts_step_1.append((cumulated_distance, pt))
else: # Any other point
distDelta = geopy.distance.distance(lastCoord, newCoord).km
cumulated_distance += distDelta

if distDelta > 0.:
newSlope = (pt.h - pts_step_1[-1][1].h) / distDelta
if abs(newSlope) > 0.0:
if abs(newSlope - lastSlope) > 0.5: # Slope change
pts_step_1.append((cumulated_distance, pt))
lastSlope = newSlope
lastCoord = newCoord
continue

if geopy.distance.distance(lastCoord, newCoord).m > 250:
pts_step_1.append((cumulated_distance, pt))
lastCoord = newCoord
lastSlope = newSlope
continue

return cumulated_distance, pts_step_1


def calc_secant_elevation(m: float, b: float, pt_B: Tuple[float, point.Point]):
for i in range(len(path_.points) - 2):

last_coord: Point_LV03 = path_.points[i].to_LV03()
coord: Point_LV03 = path_.points[i + 1].to_LV03()

delta_dist = last_coord.distance(coord)
accumulated_distance += delta_dist

# skip duplicated points in GPX file, less than 1cm in distance
if delta_dist <= 0.01:
continue

slope = (coord.h - way_points[-1].point.h) / delta_dist
if abs(slope) > 0. and abs(slope - last_slope) > 0.5: # significant slope change
way_points.append(WayPoint(accumulated_distance, coord))

elif accumulated_distance - way_points[-1].accumulated_distance > 250:
way_points.append(WayPoint(accumulated_distance, coord))

# TODO: add significant local extremum in track elevation

# Insert last point of path
way_points.append(WayPoint(accumulated_distance, path_.points[-1]))

logger.info(""
"Total route length = {}m".format(accumulated_distance))

return accumulated_distance, way_points


def calc_secant_elevation(m: float, b: float, pt_B: WayPoint):
"""
Calculates the elevation of a point on the secant line defined by m and b. The point on the
secant line is chosen such that location matches the location of pkr_B.
elevation = m * loc_of_pt_B + b
"""

# pt_B[0] returns the location of point B in km
return m * (pt_B[0] * 1000) + b
# pt_B[0] returns the location of point B in meter
return m * pt_B.accumulated_distance + b


def calc_secant_line(pt_A: Tuple[float, point.Point], pt_C: Tuple[float, point.Point]):
def calc_secant_line(pt_A: WayPoint, pt_C: WayPoint):
"""
Constructs a secant line through points A and C, i.g. a linear function passing through point A and C.
Returns the slope and the intersect of a linear function through A and C.
"""

# the locations of pt_A and pt_C given in km.
x1, y1 = pt_A[0] * 1000, pt_A[1].h
x2, y2 = pt_C[0] * 1000, pt_C[1].h
# the locations of pt_A and pt_C given is given in m.
x1, y1 = pt_A.accumulated_distance, pt_A.point.h
x2, y2 = pt_C.accumulated_distance, pt_C.point.h

# if the location of A and C is identical, the slope m is defined as 0
m: float = (y1 - y2) / (x1 - x2) if (x1 - x2) != 0 else 0.0
Expand All @@ -256,30 +259,27 @@ def calc_secant_line(pt_A: Tuple[float, point.Point], pt_C: Tuple[float, point.P
return m, b


def prepare_for_plot(path_ : path.Path):
def prepare_for_plot(path_: path.Path_LV03):
"""
Prepares a gpx file for plotting.
Returns two list, one with the elevation of all points in the gpx file and one with the associated,
accumulated distance form the starting point.
"""

coord: Tuple[float, float] = None
coord: Point_LV03 = None
accumulated_distance: float = 0.0

distances: List[float] = []
heights: List[float] = []

for pt in path_.points:

newCoord = get_coordinates(pt.to_WGS84())

if coord is not None:
distDelta = geopy.distance.distance(coord, newCoord).km
accumulated_distance += distDelta
accumulated_distance += coord.distance(pt)

distances.append(accumulated_distance)
heights.append(pt.h)

coord = newCoord
coord = pt

return distances, heights
Loading

0 comments on commit 3602547

Please sign in to comment.