diff --git a/demo.py b/demo.py index 657bcd9..03a4902 100644 --- a/demo.py +++ b/demo.py @@ -27,7 +27,7 @@ def demo(): print("Correct!") else: print("Incorrect!") - similar = proofreader.get_similar(word, 0.5, chunks=20, upto=5) + similar = proofreader.get_similar(word, 0.5, chunks=20, upto=5, set_cache=True, use_cache=True) if similar is None: print("No similar words found.") elif showallsimilarities == "True": diff --git a/lesp/autocorrect.py b/lesp/autocorrect.py index b2241aa..d2b6ae6 100644 --- a/lesp/autocorrect.py +++ b/lesp/autocorrect.py @@ -1,12 +1,16 @@ from typing import List, Optional, Union - import concurrent.futures import os +import json class Proofreader: - def __init__(self, wordlist_path: str = "lesp-wordlist.txt") -> None: + def __init__(self, wordlist_path: str = "lesp-wordlist.txt", cache_file: str = "lesp_cache/lesp.cache") -> None: self.wordlist_path: str = wordlist_path self.load_wordlist() + self.cache_file: str = cache_file + self.cache: dict = {} + if cache_file: + self.load_cache(cache_file) def load_wordlist(self) -> None: try: @@ -20,6 +24,34 @@ def load_wordlist(self) -> None: raise ValueError("Invalid wordlist format. Words must contain only alphabetic characters.") except FileNotFoundError: raise FileNotFoundError(f"{self.wordlist_path} not found!") + + def load_cache(self, cache_file: str = "lesp.cache") -> None: + try: + with open(cache_file, "r") as f: + # Validate cache file format and how words are stored + temp_cache: dict = json.load(f) + # Must follow the format {"word": ["similar", "words"]} + if not all(isinstance(word, str) for word in temp_cache.keys() and not all(word.islower() and word.isalpha() for word in temp_cache.keys())): + raise ValueError("Invalid cache file format. Keys must be strings. Also the strings must be all-lowercase and contain only alphabetic characters.") + self.cache: dict = json.load(f) + except FileNotFoundError: + # Create the cache file (and directory. also possible if multiple directories are missing) + try: + os.makedirs(os.path.dirname(cache_file), exist_ok=True) + with open(cache_file, "w") as f: + json.dump({}, f) + except: + with open(cache_file, "w") as f: + json.dump({}, f) + except json.JSONDecodeError: + raise ValueError("Invalid cache file format. Must be a valid JSON file.") + + def save_cache(self) -> None: + try: + with open(self.cache_file, "w") as f: + json.dump(self.cache, f) + except FileNotFoundError: + raise FileNotFoundError(f"{self.cache_file} not found!") @staticmethod def get_similarity_score(word1: str, word2: str) -> float: @@ -56,7 +88,7 @@ def get_similar_worker(args: tuple) -> List[str]: def is_correct(self, word: str) -> bool: return word.lower() in self.wordlist - def get_similar(self, word: str, similarity_rate: float, chunks: int = 4, upto: int = 3) -> Optional[List[str]]: + def get_similar(self, word: str, similarity_rate: float, chunks: int = 4, upto: int = 3, use_cache: bool = False, set_cache: bool = False) -> Optional[List[str]]: if upto < 1: raise ValueError("Can only return 1 or more similar words.") if chunks < 1: @@ -68,6 +100,12 @@ def get_similar(self, word: str, similarity_rate: float, chunks: int = 4, upto: similar_words: List[str] = [] chunk_size: int = len(self.wordlist) // chunks + if use_cache and self.cache and self.cache_file and word in self.cache: + if self.cache[word] != []: + return self.cache[word][:upto] + else: + return None + chunks: List[tuple] = [(word, similarity_rate, self.wordlist[i:i + chunk_size]) for i in range(0, len(self.wordlist), chunk_size)] with concurrent.futures.ThreadPoolExecutor() as executor: @@ -78,6 +116,12 @@ def get_similar(self, word: str, similarity_rate: float, chunks: int = 4, upto: similar_words = list(set(similar_words)) + if set_cache and self.cache_file and word not in self.cache: + print("Setting cache for \"" + word + "\"") + self.cache[word] = similar_words + self.save_cache() + + if len(similar_words) == 0: return None else: @@ -202,3 +246,18 @@ def merge_delete(source: str, destination: str) -> None: raise FileNotFoundError(f"File not found: {str(e)}") except Exception as e: raise ValueError(f"Error during merge delete: {str(e)}") + + def clear_cache(self, cache_file: str = "lesp_cache/lesp.cache") -> None: + if cache_file: + try: + os.remove(cache_file) + self.cache = {} + # If there also was a directory, remove it + if os.path.isdir(os.path.dirname(cache_file)): + os.rmdir(os.path.dirname(cache_file)) + except FileNotFoundError: + raise FileNotFoundError(f"{cache_file} not found!") + else: + raise ValueError("Cache file not specified!") + +