Skip to content

Commit

Permalink
linting
Browse files Browse the repository at this point in the history
  • Loading branch information
mrosseel committed Feb 2, 2025
1 parent 160a1c7 commit 04f1d34
Show file tree
Hide file tree
Showing 11 changed files with 352 additions and 285 deletions.
1 change: 0 additions & 1 deletion python/PiFinder/camera_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,5 @@ def get_image_loop(
self.camera.start()
console_queue.put("CAM: Started camera")


except (BrokenPipeError, EOFError, FileNotFoundError):
logger.exception("Error in Camera Loop")
21 changes: 14 additions & 7 deletions python/PiFinder/catalogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ class PlanetCatalog(TimerCatalog):
# Shorter time delay when waiting for GPS lock
WAITING_FOR_GPS_DELAY = 10
short_delay = True

def __init__(self, dt: datetime.datetime, shared_state: SharedStateObj):
super().__init__("PL", "Planets")
self.shared_state = shared_state
Expand All @@ -644,12 +644,14 @@ def __init__(self, dt: datetime.datetime, shared_state: SharedStateObj):
@property
def time_delay_seconds(self) -> int:
# Return shorter delay if waiting for GPS lock
return self.DEFAULT_DELAY if not self.short_delay else self.WAITING_FOR_GPS_DELAY
return (
self.DEFAULT_DELAY if not self.short_delay else self.WAITING_FOR_GPS_DELAY
)

def init_planets(self, dt):
planet_dict = sf_utils.calc_planets(dt)
logger.debug(f"starting planet dict {planet_dict}")

if not planet_dict:
logger.debug("No GPS lock during initialization - will retry soon")
self.initialised = True # Still mark as initialized so timer starts
Expand Down Expand Up @@ -699,12 +701,16 @@ def do_timed_task(self):
with Timer("Planet Catalog periodic update"):
dt = self.shared_state.datetime()
logger.debug(f"Planet catalog update at {dt}")

if not dt or not sf_utils.observer_loc or not self.shared_state.location().lock:

if (
not dt
or not sf_utils.observer_loc
or not self.shared_state.location().lock
):
return

planet_dict = sf_utils.calc_planets(dt)

# If we just got GPS lock and previously had no planets, do a full reinit
if not self.get_objects():
logger.info("GPS lock acquired - reinitializing planet catalog")
Expand All @@ -723,7 +729,8 @@ def do_timed_task(self):
obj.mag_str = obj.mag.calc_two_mag_representation()
except (KeyError, ValueError) as e:
logger.error(f"Error updating planet {name}: {e}")



class CometCatalog(TimerCatalog):
"""Creates a catalog of comets"""

Expand Down
65 changes: 38 additions & 27 deletions python/PiFinder/gps_ubx.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
import logging

logger = logging.getLogger("GPS")
sats = [0,0]
sats = [0, 0]

MAX_GPS_ERROR = 50000 # 50 km


async def process_messages(parser, gps_queue, console_queue, error_info):
gps_locked = False
got_sat_update = False # Track if we got a NAV-SAT message this cycle
Expand All @@ -23,8 +24,8 @@ async def process_messages(parser, gps_queue, console_queue, error_info):
logger.debug("GPS: %s: %s", msg_class, msg)

if msg_class == "NAV-DOP":
error_info['error_2d'] = msg["hdop"]
error_info['error_3d'] = msg["pdop"]
error_info["error_2d"] = msg["hdop"]
error_info["error_3d"] = msg["pdop"]

elif msg_class == "NAV-SVINFO" and not got_sat_update:
# Fallback satellite info if NAV-SAT not available
Expand All @@ -38,11 +39,15 @@ async def process_messages(parser, gps_queue, console_queue, error_info):
elif msg_class == "NAV-SAT":
# Preferred satellite info source - not seen in the current pifinder gps versions
sats_seen = msg["nSat"]
sats_used = sum(1 for sat in msg.get("satellites", []) if sat.get("used", False))
sats_used = sum(
1 for sat in msg.get("satellites", []) if sat.get("used", False)
)
sats[0] = sats_seen
sats[1] = sats_used
gps_queue.put(("satellites", tuple(sats)))
logger.debug("Number of sats (NAV-SAT) seen: %i, used: %i", sats_seen, sats_used)
logger.debug(
"Number of sats (NAV-SAT) seen: %i, used: %i", sats_seen, sats_used
)

elif msg_class == "NAV-SOL":
# only source of truth for satellites used in a FIX
Expand All @@ -56,46 +61,52 @@ async def process_messages(parser, gps_queue, console_queue, error_info):
gps_locked = True
console_queue.put("GPS: Locked")
logger.debug("GPS locked")
gps_queue.put((
"fix",
{
"lat": msg["lat"],
"lon": msg["lon"],
"altitude": msg["altHAE"],
"source": "GPS",
"lock": gps_locked,
"lock_type": msg["mode"],
"error_in_m": msg["ecefpAcc"]
}
))
gps_queue.put(
(
"fix",
{
"lat": msg["lat"],
"lon": msg["lon"],
"altitude": msg["altHAE"],
"source": "GPS",
"lock": gps_locked,
"lock_type": msg["mode"],
"error_in_m": msg["ecefpAcc"],
},
)
)
logger.debug("GPS fix: %s", msg)

elif msg_class == "NAV-TIMEGPS":
if "time" in msg and "valid" in msg and msg["valid"]:
gps_queue.put((
"time",
{
"time": msg["time"],
"tAcc": msg["tAcc"] if "tAcc" in msg else -1,
"source": "GPS"
}
))
gps_queue.put(
(
"time",
{
"time": msg["time"],
"tAcc": msg["tAcc"] if "tAcc" in msg else -1,
"source": "GPS",
},
)
)
else:
logger.warning(f"TIMEGPS message does not qualify: {msg}")

await asyncio.sleep(0)


async def gps_main(gps_queue, console_queue, log_queue):
MultiprocLogging.configurer(log_queue)
error_info = {'error_2d': 999, 'error_3d': 999}
error_info = {"error_2d": 999, "error_3d": 999}

while True:
try:
parser = await UBXParser.connect(log_queue, host='127.0.0.1', port=2947)
parser = await UBXParser.connect(log_queue, host="127.0.0.1", port=2947)
await process_messages(parser, gps_queue, console_queue, error_info)
except Exception as e:
logger.error(f"Error in GPS monitor: {e}")
await asyncio.sleep(5)


def gps_monitor(gps_queue, console_queue, log_queue):
asyncio.run(gps_main(gps_queue, console_queue, log_queue))
Loading

0 comments on commit 04f1d34

Please sign in to comment.