-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathengine.py
72 lines (55 loc) · 2.32 KB
/
engine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
from misc.utils import Colors
from service import AIService, ChecklistService, ContentService, EmailService
class Engine:
def __init__(
self,
ai_service: AIService,
checklist_service: ChecklistService,
content_service: ContentService,
email_service: EmailService,
):
self.ai = ai_service
self.checklists = checklist_service
self.content = content_service
self.emails = email_service
def _get_checklists(self):
ids = self.checklists.get_checklist_ids()
total = len(ids)
print("Got", total, "checklist IDs.")
checklists = []
skipped_checklist_ids = []
for i, id in enumerate(ids, 1):
print(f"[{i}/{total}]", f"{id}: reading checklist.")
checklist = self.checklists.get_checklist_detail(id)
if not checklist or (
checklists and self.checklists.is_duplicate(checklist, checklists[-1])
):
which = "incomplete" if not checklist else "duplicate"
print(f"[{i}/{total}]", f"{id}: skipping {which} checklist.")
skipped_checklist_ids.append(id)
else:
checklists.append(checklist)
self.checklists.mark_skipped(skipped_checklist_ids)
print("Got", len(checklists), "complete checklists.")
return checklists
def _write_articles(self, checklists):
articles = []
for i, data in enumerate(checklists, 1):
id = data.pop("id")
source = data.pop("source")
print(f"[{i}/{len(checklists)}]", f"{id}: writing article.")
if content := self.ai.write_article(data):
data.update(content=content, source=source, id=id)
articles.append(data)
return articles
def _publish_articles(self, articles):
if self.content.publish(*articles):
color = Colors.green if len(articles) else Colors.yellow
print(color("Published", len(articles), "articles."))
def run(self):
print("Starting engine...")
if checklists := self._get_checklists():
if articles := self._write_articles(checklists):
self._publish_articles(articles)
if self.emails.run_campaign():
print(Colors.green("Sent newsletter."))