forked from losuler/opnsense-update-notify
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
executable file
·162 lines (155 loc) · 7.86 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#!/usr/bin/env python3
# vim: set ts=4 sts=4 et sw=4 ft=python:
# Python Modules
import json # Parse JSON from OPNsense
import sys # Arguments and exit calls
import argparse # Options
from configparser import ConfigParser, ExtendedInterpolation # Config Parsing
import requests # Communication
from urllib3 import disable_warnings
from urllib3.exceptions import InsecureRequestWarning
disable_warnings(InsecureRequestWarning)
# Pretty printing for debugging
# End of Python Modules
# Functions
## Function: parse_res
## Description: This will parse the API response from OPNsense
def parse_res(resp):
if resp['status'] != "none":
if args.verbose and args.config:
print(f"[INFO] Current Product Version: {resp['product']['product_version']}")
print(f"[INFO] New Product Version: {resp['product']['product_latest']}")
# If current version and new version don't match we must have an upgrade!
if resp['product']['product_version'] != resp['product']['product_latest']:
message = 'OPNsense upgrade available!\n'
message += "Current version: " + resp['product']['product_version'] + " New version: " + resp['product']['product_latest'] + "\n"
# Determine if reboot is required
if resp['needs_reboot'] != "0":
message += "Reboot required for this upgrade\n"
message += resp['status_msg'] + "\n"
# They match, just package updates maybe?
if resp['product']['product_version'] == resp['product']['product_latest']:
message = 'OPNsense updates available!\n'
# Check if there are any new packages to install as part of this update
new_pkgs = resp['new_packages']
if args.verbose and args.config and len(new_pkgs) > 0:
print(f"[INFO] Number of new packages: {len(new_pkgs)}")
if len(new_pkgs) > 0:
# Looks like we have new packages, let's provide them in the notification
message += 'New:\n'
# Check if we have many or just one, then provide a package name and versioning
if type(new_pkgs) == dict:
for pkg in new_pkgs:
message += f"{new_pkgs[pkg]['name']} {new_pkgs[pkg]['version']}\n"
else:
for pkg in new_pkgs:
message += f"{pkg['name']} {pkg['version']}\n"
# Check if there are any updated packages to install as part of this update
upg_pkgs = resp['upgrade_packages']
if args.verbose and args.config and len(upg_pkgs) > 0:
print(f"[INFO] Number of upgraded packages: {len(upg_pkgs)}")
if len(upg_pkgs) > 0:
# Looks like we have upgraded packages, let's provide them in the notification
message += 'Upgrade:\n'
# Check if we have many or just one, then provide a package name, new version and old version
if type(upg_pkgs) == dict:
for pkg in upg_pkgs:
message += f"{new_pkgs[pkg]['name']} from {new_pkgs[pkg]['current_version']}" + \
f" to {new_pkgs[pkg]['new_version']}\n"
else:
for pkg in upg_pkgs:
message += f"{pkg['name']} from {pkg['current_version']}" + \
f" to {pkg['new_version']}\n"
# Check if there are any packages to reinstall as part of this update
reinst_pkgs = resp['reinstall_packages']
if args.verbose and args.config and len(reinst_pkgs) > 0:
print(f"[INFO] Number of packages to reinstall: {len(reinst_pkgs)}")
if len(reinst_pkgs) > 0:
# Looks like we have packages to reinstall, let's provide them in the notification
message += 'Reinstall:\n'
# Check if we have many or just one, then provide a package name and old version
if type(reinst_pkgs) == dict:
for pkg in reinst_pkgs:
message += f"{new_pkgs[pkg]['name']} {new_pkgs[pkg]['version']}\n"
else:
for pkg in reinst_pkgs:
message += f"{pkg['name']} {pkg['version']}\n"
# All done here so let's return the notification message
return message
else:
return 0
## Function: send_telegram
## Description: This will send a notification via Telegram
def send_telegram(msg, chatid, token):
url = f'https://api.telegram.org/bot{token}/sendMessage?text={msg}&chat_id={chatid}'
r = requests.get(url)
return r
## Function: send_pushover
## Description: This will send a notification via Pushover
def send_pushover(msg, token, user):
payload = {"message": msg, "user": user, "token": token }
r = requests.post('https://api.pushover.net/1/messages.json', data=payload, headers={'User-Agent': 'Python'})
return r
# End of Functions
# Main
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose", action="store_true", help="Increase output verbosity / debug")
parser.add_argument(
"-c",
"--config",
help="Full path location to config.ini configuration file",
type=str,
required=True,
)
args = parser.parse_args()
if args.verbose and args.config:
print(f"[INFO] Received configuration file path as {args.config}")
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
# Parse Configuration files
config = ConfigParser(interpolation=ExtendedInterpolation())
config.read(args.config)
# Setup OPNsense variables based on the configuration
url = config["opnsense"]["url"]
# verify is false if self signed
verify = not config["opnsense"]["self_signed"]
api_key = config["opnsense"]["api_key"]
api_secret = config["opnsense"]["api_secret"]
if args.verbose and args.config:
print(f"[INFO] URL: {config['opnsense']['url']}")
print(f"[INFO] Verify: {config['opnsense']['self_signed']}")
print(f"[INFO] API Key: {config['opnsense']['api_key']}")
print(f"[INFO] API Secret: {config['opnsense']['api_secret']}")
# Request the Update status via API from OPNsense
r = requests.get(url, verify=verify, auth=(api_key, api_secret))
if r.status_code == 200:
res = json.loads(r.text)
message = parse_res(res)
if message:
if args.verbose and args.config:
# Since we craft the message as a long string with new lines, break them for verbose mode
for msg in message.splitlines():
print(f"[INFO] Message: {msg}")
# Parse the emitters, this supports more than one emitter!
for emit in config['emitter']['emitter'].strip().splitlines():
if args.verbose and args.config:
print(f"[INFO] Detected Emitter: {emit}")
if emit == "telegram":
if args.verbose and args.config:
print(f"[INFO] Sending Telegram notification")
response = send_telegram(message,config['telegram']['chatid'],config['telegram']['token'])
if args.verbose and args.config:
print(f"[INFO] Telegram response: {response}")
if emit == "pushover":
if args.verbose and args.config:
print(f"[INFO] Sending Pushover notification")
response = send_pushover(message,config['pushover']['app_token'],config['pushover']['user_token'])
if args.verbose and args.config:
print(f"[INFO] Pushover response: {response}")
if message == 0:
print("[INFO] There are no updates or major upgrades available")
if args.verbose and args.config:
# Since we craft the message as a long string with new lines, break them for verbose mode
print("[INFO] There are no updates or major upgrades available")
sys.exit(0)
# End of Main