Skip to content

Commit

Permalink
Merge pull request #4 from Qetesh/deny-allow-list-wildcard-url
Browse files Browse the repository at this point in the history
support Deny-/allow list wildcard url
  • Loading branch information
Qetesh authored Oct 11, 2024
2 parents e775f7b + 92fa745 commit 999afd4
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 20 deletions.
10 changes: 10 additions & 0 deletions common/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import logging
from yaml import safe_load

config = safe_load(open('config.yml', encoding='utf8'))
logger = logging.getLogger(__name__)
logger.setLevel(config.get('log_level', 'INFO'))
formatter = logging.Formatter('%(asctime)s - %(filename)s - %(lineno)d - %(levelname)s - %(message)s')
console = logging.StreamHandler()
console.setFormatter(formatter)
logger.addHandler(console)
Empty file added core/__init__.py
Empty file.
31 changes: 31 additions & 0 deletions core/entry_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import fnmatch

def filter_entry(config, agent, entry):
start_with_list = [name[1]['title'] for name in config['agents'].items()]
style_block = [name[1]['style_block'] for name in config['agents'].items()]
[start_with_list.append('<pre') for i in style_block if i]

# Todo Compatible with whitelist/blacklist parameter, to be removed
allow_list = agent[1].get('allow_list') if agent[1].get('allow_list') is not None else agent[1].get('whitelist')
deny_list = agent[1]['deny_list'] if agent[1].get('deny_list') is not None else agent[1].get('blacklist')

# filter, if not content starts with start flag
if not entry['content'].startswith(tuple(start_with_list)):

# filter, if in allow_list
if allow_list is not None:
if any(fnmatch.fnmatch(entry['feed']['site_url'], pattern) for pattern in allow_list):
return True

# filter, if not in deny_list
elif deny_list is not None:
if any(fnmatch.fnmatch(entry['feed']['site_url'], pattern) for pattern in deny_list):
return False
else:
return True

# filter, if allow_list and deny_list are both None
elif allow_list is None and deny_list is None:
return True

return False
24 changes: 4 additions & 20 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import concurrent.futures
import time
import logging
import traceback
from common.logger import logger
from core.entry_filter import filter_entry

import miniflux
from markdownify import markdownify as md
Expand All @@ -13,41 +14,24 @@
miniflux_client = miniflux.Client(config['miniflux']['base_url'], api_key=config['miniflux']['api_key'])
llm_client = OpenAI(base_url=config['llm']['base_url'], api_key=config['llm']['api_key'])

logger = logging.getLogger(__name__)
logger.setLevel(config.get('log_level', 'INFO'))
formatter = logging.Formatter('%(asctime)s - %(filename)s - %(lineno)d - %(levelname)s - %(message)s')
console = logging.StreamHandler()
console.setFormatter(formatter)
logger.addHandler(console)

def process_entry(entry):
llm_result = ''
start_with_list = [name[1]['title'] for name in config['agents'].items()]
style_block = [name[1]['style_block'] for name in config['agents'].items()]
[start_with_list.append('<pre') for i in style_block if i]

for agent in config['agents'].items():
# Todo Compatible with whitelist/blacklist parameter, to be removed
allow_list = agent[1].get('allow_list') if agent[1].get('allow_list') is not None else agent[1].get('whitelist')
deny_list = agent[1]['deny_list'] if agent[1].get('deny_list') is not None else agent[1].get('blacklist')

messages = [
{"role": "system", "content": agent[1]['prompt']},
{"role": "user", "content": "The following is the input content:\n---\n " + md(entry['content']) }
]
# filter, if AI is not generating, and in allow_list, or not in deny_list
if ((not entry['content'].startswith(tuple(start_with_list))) and
(((allow_list is not None) and (entry['feed']['site_url'] in allow_list)) or
(deny_list is not None and entry['feed']['site_url'] not in deny_list) or
(allow_list is None and deny_list is None))):
if filter_entry(config, agent, entry):
completion = llm_client.chat.completions.create(
model=config['llm']['model'],
messages= messages,
timeout=config.get('llm', {}).get('timeout', 60)
)

response_content = completion.choices[0].message.content
logger.info(f"\nagents:{agent[0]} \nfeed_title:{entry['title']} \nresult:{response_content}")
logger.info(f"agents:{agent[0]} feed_title:{entry['title']} result:{response_content}")

if agent[1]['style_block']:
llm_result = (llm_result + '<pre style="white-space: pre-wrap;"><code>\n'
Expand Down
125 changes: 125 additions & 0 deletions tests/test_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import unittest
from yaml import safe_load
from core.entry_filter import filter_entry

test_config = '''
{
"test_style_block": {
"agents": {
"test": {
"title": "🌐AI 翻译",
"style_block": true,
"allow_list": ,
"deny_list":
}
}
},
"test_allow_list": {
"agents": {
"test": {
"title": "🌐AI 翻译",
"style_block": false,
"allow_list": [
"https://9to5mac.com/",
"https://home.kpmg/*"
],
"deny_list":
}
}
},
"test_deny_list": {
"agents": {
"test": {
"title": "🌐AI 翻译",
"style_block": false,
"allow_list": ,
"deny_list": [
"https://9to5mac.com/",
"https://home.kpmg/cn/zh/home/insights.html"
]
}
}
},
"test_None": {
"agents": {
"test": {
"title": "🌐AI 翻译",
"style_block": false,
"allow_list": ,
"deny_list":
}
}
}
}
'''

test_entries = '''
{
"test_style_block":
{
"entry":
{
"content": '<pre',
"feed":
{
"site_url": "https://weibo.com/1906286443/OAih1wghK",
},
},
"result": False,
},
"test_allow_list":
{
"entry":
{
"content": '123',
"feed":
{
"site_url": "https://home.kpmg/cn/zh/home/insights.html",
},
},
"result": True,
},
"test_deny_list":
{
"entry":
{
"content": '123',
"feed":
{
"site_url": "https://weibo.com/1906286443/OAih1wghK",
},
},
"result": True,
},
"test_None":
{
"entry":
{
"content": '123',
"feed":
{
"site_url": "https://weibo.com/1906286443/OAih1wghK",
},
},
"result": True,
},
}
'''

configs = safe_load(test_config)
entries = safe_load(test_entries)

class MyTestCase(unittest.TestCase):
def test_entry_filter(self):
i = 0

for agent in configs.items():
entry = entries[list(configs.keys())[i]]
result = filter_entry(configs['test_style_block'], agent, entry['entry'])
self.assertEqual(result, entry['result'])
i += 1


if __name__ == '__main__':
unittest.main()

0 comments on commit 999afd4

Please sign in to comment.