-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdetect_drift
executable file
·61 lines (54 loc) · 2.39 KB
/
detect_drift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#!/usr/bin/env python3
import subprocess
import argparse
from terraform_drift_detector import TerraformDriftDetector
from terraform_plan_results import TerraformPlanResults
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--on-new-drift', '-d', help = "Bash expression to be executed when new drift is detected", type = str)
parser.add_argument('--on-resolve', '-r', help = "Bash expression to be executed when all drift is resolved", type = str)
parser.add_argument('--state-file-path', '-s', help = "Where to store the drift state. Default: ./previous_plan_results", type = str, default = "previous_plan_results")
args = parser.parse_args()
previous_plan_results = TerraformPlanResults.load(args.state_file_path)
current_plan_results = run_terraform_plan()
drift_detector = TerraformDriftDetector(
previous_plan_results = previous_plan_results,
current_plan_results = current_plan_results
)
current_plan_results.save(args.state_file_path)
if drift_detector.drift_resolved():
print('All drift has been resolved!')
if args.on_resolve != None:
print(f"Triggering: {args.on_resolve}")
subprocess.run(args.on_resolve, shell=True)
elif drift_detector.new_drift_detected():
print('New drift detected!')
print('====================')
print(drift_detector.diff())
print('====================')
if args.on_new_drift != None:
print(f"Triggering: {args.on_new_drift}")
subprocess.run(args.on_new_drift, shell=True)
elif current_plan_results.changes_present():
print('Drift exists but has not changed.')
else:
print('Nothing to report. There is no drift.')
def run_terraform_plan() -> TerraformPlanResults:
# By splitting this into two steps we can suppress the "Refreshing..."
# lines from the output.
# See https://github.com/hashicorp/terraform/issues/27214#issuecomment-742809948
PLAN_COMMAND = "terraform plan -detailed-exitcode -out=tfplan"
SHOW_COMMAND = "terraform show -no-color tfplan"
completed_plan_process = subprocess.run(
PLAN_COMMAND.split(), stdout=subprocess.DEVNULL
)
# TODO: Check that plan succeeded before running show
completed_show_process = subprocess.run(
SHOW_COMMAND.split(), capture_output=True
)
return TerraformPlanResults(
exit_code = completed_plan_process.returncode,
message = completed_show_process.stdout.decode("UTF-8")
)
if __name__ == "__main__":
main()