diff --git a/configuration.py b/configuration.py index 633443ed..7b85ae99 100644 --- a/configuration.py +++ b/configuration.py @@ -22,6 +22,7 @@ def get_project_root() -> str: chart_folder_path = os.path.join(project_path, "content", "charts") sql_file_path = os.path.join(project_path, "content", "database", "confluence_pages_sql.db") +db_url = 'sqlite:///' + sql_file_path vector_folder_path = os.path.join(project_path, "content", "vectors", "confluence_pages") interactions_folder_path = os.path.join(project_path, "content", "vectors", "confluence_interactions") diff --git a/confluence/importer.py b/confluence/importer.py index 1dbf78ce..30d45111 100644 --- a/confluence/importer.py +++ b/confluence/importer.py @@ -4,14 +4,13 @@ import time from configuration import api_host, api_port -from database.page_manager import get_page_ids_missing_embeds, store_pages_data +from database.page_manager import PageManager from database.space_manager import SpaceManager from vector.create_vector_db import add_embeds_to_vector_db from .client import ConfluenceClient from .retriever import retrieve_space - # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') @@ -34,7 +33,7 @@ def submit_embedding_creation_request(page_id: str): def generate_missing_page_embeddings(retry_limit: int = 3, wait_time: int = 5) -> None: for attempt in range(retry_limit): # Retrieve the IDs of pages that are still missing embeddings. - page_ids = get_page_ids_missing_embeds() + page_ids = PageManager().get_page_ids_missing_embeds() # If there are no pages missing embeddings, exit the loop and end the process. if not page_ids: logging.info("All pages have embeddings. Process complete.") @@ -52,13 +51,12 @@ def generate_missing_page_embeddings(retry_limit: int = 3, wait_time: int = 5) - # After waiting, retrieve the list of pages still missing embeddings to see if the list has decreased. # This retrieval is crucial to ensure that the loop only continues if there are still pages that need processing. - if (page_ids := get_page_ids_missing_embeds()): + if page_ids := PageManager().get_page_ids_missing_embeds(): logging.info(f"After attempt {attempt + 1}, {len(page_ids)} pages are still missing embeds.") else: logging.info("All pages now have embeddings. Process complete.") break # Break out of the loop if there are no more pages missing embeddings. - # After exhausting the retry limit, check if there are still pages without embeddings. if page_ids: logging.info("Some pages still lack embeddings after all attempts.") @@ -82,10 +80,10 @@ def tui_choose_space(): def import_space(space_key, space_name): - import_date=datetime.now().strftime('%Y-%m-%d %H:%M:%S') + import_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S') pages = retrieve_space(space_key) - store_pages_data(space_key, pages) + PageManager().store_pages_data(space_key, pages) generate_missing_page_embeddings() SpaceManager().upsert_space_info( diff --git a/database/bookmarked_conversation_manager.py b/database/bookmarked_conversation_manager.py index dbab53d2..24d42a9d 100644 --- a/database/bookmarked_conversation_manager.py +++ b/database/bookmarked_conversation_manager.py @@ -1,41 +1,32 @@ -from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import SQLAlchemyError -from configuration import sql_file_path -from database.bookmarked_conversation import BookmarkedConversation, Base +from models.bookmarked_conversation import BookmarkedConversation from datetime import datetime, timezone +from database.database import Database + class BookmarkedConversationManager: def __init__(self): - self.engine = create_engine('sqlite:///' + sql_file_path) - Base.metadata.create_all(self.engine) - self.Session = sessionmaker(bind=self.engine) + self.db = Database() def add_bookmarked_conversation(self, title, body, thread_id): - try: - with self.Session() as session: - new_conversation = BookmarkedConversation(title=title, body=body, thread_id=thread_id) - session.add(new_conversation) - session.commit() - return new_conversation.id - except SQLAlchemyError as e: - print(f"Error adding bookmarked conversation: {e}") - return None + new_conversation = BookmarkedConversation(title=title, body=body, thread_id=thread_id) + self.db.add_object(new_conversation) def update_posted_on_confluence(self, thread_id): try: - with self.Session() as session: + with self.db.get_session() as session: conversation = session.query(BookmarkedConversation).filter_by(thread_id=thread_id).first() if conversation: conversation.posted_on_confluence = datetime.now(timezone.utc) session.commit() except SQLAlchemyError as e: print(f"Error updating conversation with Confluence timestamp: {e}") + return None def get_unposted_conversations(self): try: - with self.Session() as session: + with self.db.get_session() as session: return session.query(BookmarkedConversation).filter_by(posted_on_confluence=None).all() except SQLAlchemyError as e: print(f"Error getting unposted conversations: {e}") - return None \ No newline at end of file + return None diff --git a/database/database.py b/database/database.py new file mode 100644 index 00000000..8ab022fd --- /dev/null +++ b/database/database.py @@ -0,0 +1,90 @@ +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from sqlalchemy.exc import SQLAlchemyError +from configuration import db_url +from models.qa_interaction import QAInteraction +from models.space_info import SpaceInfo +from models.page_data import PageData +from models.bookmarked_conversation import BookmarkedConversation +from models.quiz_question import QuizQuestion +from models.user_score import UserScore + + +class Database: + """ + Class providing access to a SQLAlchemy database. + + This class implements the Singleton pattern for creating and managing a connection to a SQLAlchemy database. + It provides methods for getting database sessions and accessing the SQLAlchemy Engine object. + + Attributes: + engine (sqlalchemy.engine.Engine): The SQLAlchemy Engine object representing the connection to the database. + Session (sqlalchemy.orm.Session): The SQLAlchemy session factory used for creating database sessions. + """ + + _instance = None + + def __new__(cls): + """ + Create a new instance of the Database class. + + If an instance of the class has not been created yet, it is created; otherwise, the existing instance is returned. + + Returns: + Database: An instance of the Database class. + """ + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._init_engine() + cls._instance._create_tables() + return cls._instance + + def _create_tables(self): + """ + Create tables in the database if they do not exist. + """ + for model in [QAInteraction, SpaceInfo, PageData, BookmarkedConversation, QuizQuestion, UserScore]: + model.metadata.create_all(self.engine) + + def _init_engine(self): + """ + Initialize the SQLAlchemy Engine object and session factory. + + Creates the Engine object for connecting to the database and the session factory for creating database sessions. + """ + self.engine = create_engine(db_url) + self.Session = sessionmaker(bind=self.engine) + + def get_session(self): + """ + Get a new database session. + + Returns: + sqlalchemy.orm.Session: A new database session. + """ + return self.Session() + + def add_object(self, obj): + """ + Adds the given object to the database. + + Args: + obj: The object to add to the database. + + Returns: + int or None: The ID of the added object if successful, None otherwise. + + Raises: + None + + """ + try: + with self.get_session() as session: + session.add(obj) + session.commit() + return obj + except SQLAlchemyError as e: + class_name = obj.__class__.__name__ + print(f"Error adding object of type {class_name}: {e}") + session.rollback() + return None diff --git a/database/interaction_manager.py b/database/interaction_manager.py index ff3c3ccd..8f977b67 100644 --- a/database/interaction_manager.py +++ b/database/interaction_manager.py @@ -1,161 +1,106 @@ # ./database/interaction_manager.py -from sqlalchemy import create_engine, Column, Integer, String, DateTime, Text -from sqlalchemy.orm import sessionmaker, declarative_base from datetime import datetime, timezone +from models.qa_interaction import QAInteraction +from sqlalchemy.exc import SQLAlchemyError +from database.database import Database import json -from configuration import sql_file_path - - -# Define the base class for SQLAlchemy models -Base = declarative_base() - - -class QAInteractions(Base): - __tablename__ = 'qa_interactions' - interaction_id = Column(Integer, primary_key=True) - question_text = Column(Text) - thread_id = Column(String) - assistant_thread_id = Column(String) - answer_text = Column(Text) - channel_id = Column(String) - slack_user_id = Column(String) - question_timestamp = Column(DateTime) - answer_timestamp = Column(DateTime) - comments = Column(Text, default=json.dumps([])) - last_embedded = Column(DateTime) - embed = Column(Text, default=json.dumps([])) - class QAInteractionManager: def __init__(self): - self.engine = create_engine('sqlite:///' + sql_file_path) - self.Session = sessionmaker(bind=self.engine) + self.db = Database() def add_question_and_answer(self, question, answer, thread_id, assistant_thread_id, channel_id, question_ts, answer_ts, slack_user_id): - session = self.Session() - try: - serialized_answer = json.dumps(answer.__dict__) if not isinstance(answer, str) else answer - interaction = QAInteractions( - question_text=question, - thread_id=thread_id, - assistant_thread_id=assistant_thread_id, - answer_text=serialized_answer, - channel_id=channel_id, - question_timestamp=question_ts, - answer_timestamp=answer_ts, - comments=json.dumps([]), # Initialize an empty list of comments - slack_user_id=slack_user_id # Store the Slack user ID - ) - session.add(interaction) - session.commit() - except Exception as e: - session.rollback() - raise e - finally: - session.close() + + serialized_answer = json.dumps(answer.__dict__) if not isinstance(answer, str) else answer + interaction = QAInteraction( + question_text=question, + thread_id=thread_id, + assistant_thread_id=assistant_thread_id, + answer_text=serialized_answer, + channel_id=channel_id, + question_timestamp=question_ts, + answer_timestamp=answer_ts, + comments=json.dumps([]), + slack_user_id=slack_user_id + ) + self.db.add_object(interaction) def add_comment_to_interaction(self, thread_id, comment): - session = self.Session() try: - interaction = session.query(QAInteractions).filter_by(thread_id=thread_id).first() - if interaction: - if interaction.comments is None: - interaction.comments = json.dumps([]) - comments = json.loads(interaction.comments) - comments.append(comment) - interaction.comments = json.dumps(comments) - session.commit() - except Exception as e: - session.rollback() + with self.db.get_session() as session: + interaction = session.query(QAInteraction).filter_by(thread_id=thread_id).first() + if interaction: + if interaction.comments is None: + interaction.comments = json.dumps([]) + comments = json.loads(interaction.comments) + comments.append(comment) + interaction.comments = json.dumps(comments) + session.commit() + except SQLAlchemyError as e: + print(f"Error adding comment to interaction: {e}") raise e - finally: - session.close() def get_interaction_by_thread_id(self, thread_id): - session = self.Session() try: - return session.query(QAInteractions).filter_by(thread_id=thread_id).first() - finally: - session.close() + with self.db.get_session() as session: + return session.query(QAInteraction).filter_by(thread_id=thread_id).first() + except SQLAlchemyError as e: + print(f"Error getting interaction by thread ID: {e}") def get_interaction_by_interaction_id(self, interaction_id): - session = self.Session() try: - return session.query(QAInteractions).filter_by(interaction_id=interaction_id).first() - finally: - session.close() + with self.db.get_session() as session: + return session.query(QAInteraction).filter_by(id=interaction_id).first() + except SQLAlchemyError as e: + print(f"Error getting interaction by interaction ID: {e}") def get_interactions_by_interaction_ids(self, interaction_ids): - session = self.Session() try: - # The query filters QAInteractions by checking if the interaction_id is in the list of interaction_ids - return session.query(QAInteractions).filter(QAInteractions.interaction_id.in_(interaction_ids)).all() - except Exception as e: - session.rollback() - raise e - finally: - session.close() + with self.db.get_session() as session: + return session.query(QAInteraction).filter(QAInteraction.id.in_(interaction_ids)).all() + except SQLAlchemyError as e: + print(f"Error getting interactions by interaction IDs: {e}") def get_qa_interactions(self): - session = self.Session() - try: - return session.query(QAInteractions).all() - finally: - session.close() - - def get_all_interactions(self): - session = self.Session() try: - return session.query(QAInteractions).all() - finally: - session.close() + with self.db.get_session() as session: + return session.query(QAInteraction).all() + except SQLAlchemyError as e: + print(f"Error getting QA interactions: {e}") def add_embed_to_interaction(self, interaction_id, embed): - session = self.Session() try: - interaction = session.query(QAInteractions).filter_by(interaction_id=interaction_id).first() - if interaction: - interaction.embed = json.dumps(embed) - interaction.last_embedded = datetime.now(timezone.utc) - session.commit() - except Exception as e: + with self.db.get_session() as session: + interaction = session.query(QAInteraction).filter_by(id=interaction_id).first() + if interaction: + interaction.embed = json.dumps(embed) + interaction.last_embedded = datetime.now(timezone.utc) + session.commit() + except SQLAlchemyError as e: + print(f"Error adding embed to interaction: {e}") session.rollback() - raise e - finally: - session.close() + return def get_interactions_without_embeds(self): - session = self.Session() try: - # Filter interactions where embed is either None, the JSON representation of an empty list, or an empty string - return session.query(QAInteractions).filter( - (QAInteractions.embed.is_(None)) | - (QAInteractions.embed == json.dumps([])) | - (QAInteractions.embed == '') - ).all() - finally: - session.close() + with self.db.get_session() as session: + return session.query(QAInteraction).filter( + (QAInteraction.embed.is_(None)) | + (QAInteraction.embed == json.dumps([])) | + (QAInteraction.embed == '') + ).all() + except SQLAlchemyError as e: + print(f"Error getting interactions without embeds: {e}") def get_interactions_with_embeds(self): - session = self.Session() try: - # Filter interactions where embed is either None, the JSON representation of an empty list, or an empty string - return session.query(QAInteractions).filter( - (QAInteractions.embed.is_not(None)) | - (QAInteractions.embed != json.dumps([])) | - (QAInteractions.embed != '') - ).all() - finally: - session.close() - - -# Set up the database engine and create tables if they don't exist -engine = create_engine('sqlite:///' + sql_file_path) -Base.metadata.bind = engine -Base.metadata.create_all(engine) - -# Create a session maker object to manage database sessions -Session = sessionmaker(bind=engine) -session = Session() + with self.db.get_session() as session: + return session.query(QAInteraction).filter( + (QAInteraction.embed.is_not(None)) | + (QAInteraction.embed == json.dumps([])) | + (QAInteraction.embed == '') + ).all() + except SQLAlchemyError as e: + print(f"Error getting interactions with embeds: {e}") diff --git a/database/page_manager.py b/database/page_manager.py index 8557f858..b47dab28 100644 --- a/database/page_manager.py +++ b/database/page_manager.py @@ -1,284 +1,153 @@ # ./database/nur_database.py from typing import List, Optional -from sqlalchemy import create_engine, Column, Integer, String, DateTime, Text, Boolean -from sqlalchemy.orm import sessionmaker, declarative_base # Updated import -import sqlite3 -from configuration import sql_file_path +from sqlalchemy.exc import SQLAlchemyError +from models.page_data import PageData +from database.database import Database from datetime import datetime, timezone import json -from sqlalchemy.exc import SQLAlchemyError - - -# Define the base class for SQLAlchemy models -Base = declarative_base() - - -# Define the PageData model -class PageData(Base): - """ - SQLAlchemy model for storing Confluence page data. - """ - __tablename__ = 'page_data' - id = Column(Integer, primary_key=True) - page_id = Column(String) - space_key = Column(String) - title = Column(String) - author = Column(String) - createdDate = Column(DateTime) - lastUpdated = Column(DateTime) - content = Column(Text) - comments = Column(Text, default=json.dumps([])) - last_embedded = Column(DateTime) - embed = Column(Text, default=json.dumps([])) +class PageManager: + def __init__(self): + self.db = Database() -def parse_datetime(date_string): - """ - Convert an ISO format datetime string to a datetime object. + def parse_datetime(self, date_string): + """ + Convert an ISO format datetime string to a datetime object. - Args: - date_string (str): ISO format datetime string. + Args: + date_string (str): ISO format datetime string. - Returns: - datetime: A datetime object. - """ - return datetime.fromisoformat(date_string.replace('Z', '+00:00')) + Returns: + datetime: A datetime object. + """ + return datetime.fromisoformat(date_string.replace('Z', '+00:00')) + def store_pages_data(self, space_key, pages): + """ + Store Confluence page data into the database. -def store_pages_data(space_key, pages): - """ - Store Confluence page data into the database. - - Args: - space_key (str): The key of the Confluence space. - pages (lits): A list of page data - """ - with Session() as session: + Args: + space_key (str): The key of the Confluence space. + pages (list): A list of page data + """ for page in pages: page_id = page['pageId'] - created_date = parse_datetime(page['createdDate']) - last_updated = parse_datetime(page['lastUpdated']) - new_page = PageData(page_id=page_id, space_key=space_key, title=page['title'], author=page['author'], - createdDate=created_date, - lastUpdated=last_updated, + createdDate=self.parse_datetime(page['createdDate']), + lastUpdated=self.parse_datetime(page['lastUpdated']), content=page['content'], comments=page['comments'] ) - session.add(new_page) - + self.db.add_object(new_page) print(f"Page with ID {page_id} written to database") - session.commit() - - -def get_page_ids_missing_embeds(): - """ - Retrieve the page IDs of pages that are missing embeddings. - :return: A list of page IDs. - """ - session = Session() - records = session.query(PageData).filter( - (PageData.lastUpdated > PageData.last_embedded) | - (PageData.last_embedded.is_(None)) - ).all() - page_ids = [record.page_id for record in records] - session.close() - return page_ids - - -def get_all_page_data_from_db(space_key=None): - """ - Retrieve all page data and embeddings from the database. If a space_key is provided, - filter the records to only include pages from that specific space. - :param space_key: Optional; the specific space key to filter pages by. - :return: Tuple of page_ids (list of page IDs), all_documents (list of document strings), and embeddings (list of embeddings as strings) - """ - session = Session() - - if space_key: - records = session.query(PageData).filter(PageData.space_key == space_key).all() - else: - records = session.query(PageData).all() - - page_ids = [record.page_id for record in records] - embeddings = [record.embed for record in records] # Assuming embed is directly stored as a string - all_documents = [ - f"Page id: {record.page_id}, space key: {record.space_key}, title: {record.title}, " - f"author: {record.author}, created date: {record.createdDate}, last updated: {record.lastUpdated}, " - f"content: {record.content}, comments: {record.comments}" - for record in records - ] - - session.close() - return page_ids, all_documents, embeddings - -def get_page_data_from_db(): - """ - Retrieve all page data and embeddings from the database. - This query filter does the following: - PageData.lastUpdated > PageData.last_embedded: - It selects records where the lastUpdated timestamp is more recent than the last_embedded timestamp. This would typically mean that the page has been updated since the last time its embedding was generated and stored. - PageData.last_embedded.is_(None): - It selects records where the last_embedded field is None, which likely indicates that an embedding has never been generated for the page. - :return: Tuple of page_ids (list of page IDs), all_documents (list of document strings), and embeddings (list of embeddings as strings) - """ - session = Session() - records = session.query(PageData).filter( - (PageData.lastUpdated > PageData.last_embedded) | - (PageData.last_embedded.is_(None)) - ).all() - - page_ids = [record.page_id for record in records] - embeddings = [record.embed for record in records] # Assuming embed is directly stored as a string - all_documents = [ - f"Page id: {record.page_id}, space key: {record.space_key}, title: {record.title}, " - f"author: {record.author}, created date: {record.createdDate}, last updated: {record.lastUpdated}, " - f"content: {record.content}, comments: {record.comments}" - for record in records - ] - - session.close() - return page_ids, all_documents, embeddings - - -def add_or_update_embed_vector(page_id, embed_vector): - """ - Add or update the embed vector data for a specific page in the database, and update the last_embedded timestamp. - - Args: - page_id (str): The ID of the page to update. - embed_vector: The embed vector data to be added or updated, expected to be a list of floats. - """ - # Serialize the embed_vector to a JSON string here - embed_vector_json = json.dumps(embed_vector) - with Session() as session: - try: - page = session.query(PageData).filter_by(page_id=page_id).first() - - if page: - page.embed = embed_vector_json # Store the serialized list - page.last_embedded = datetime.now(timezone.utc) - print(f"Embed vector and last_embedded timestamp for page ID {page_id} have been updated.") + def get_page_ids_missing_embeds(self): + """ + Retrieve the page IDs of pages that are missing embeddings. + :return: A list of page IDs. + """ + with self.db.get_session() as session: + records = session.query(PageData).filter( + (PageData.lastUpdated > PageData.last_embedded) | + (PageData.last_embedded.is_(None)) + ).all() + page_ids = [record.page_id for record in records] + return page_ids + + def get_all_page_data_from_db(self, space_key=None): + """ + Retrieve all page data and embeddings from the database. If a space_key is provided, + filter the records to only include pages from that specific space. + :param space_key: Optional; the specific space key to filter pages by. + :return: Tuple of page_ids (list of page IDs), all_documents (list of document strings), and embeddings (list of embeddings as strings) + """ + with self.db.get_session() as session: + if space_key: + records = session.query(PageData).filter(PageData.space_key == space_key).all() else: - print(f"No page found with ID {page_id}. Consider handling this case as needed.") + records = session.query(PageData).all() + + formatted = self.format_page_data(records) + return formatted + + def format_page_data(self, records): + page_ids = [record.page_id for record in records] + embeddings = [record.embed for record in records] # Assuming embed is directly stored as a string + all_documents = [ + f"Page id: {record.page_id}, space key: {record.space_key}, title: {record.title}, " + f"author: {record.author}, created date: {record.createdDate}, last updated: {record.lastUpdated}, " + f"content: {record.content}, comments: {record.comments}" + for record in records + ] + return page_ids, all_documents, embeddings + + def add_or_update_embed_vector(self, page_id, embed_vector): + """ + Add or update the embed vector data for a specific page in the database, and update the last_embedded timestamp. + + Args: + page_id (str): The ID of the page to update. + embed_vector: The embed vector data to be added or updated, expected to be a list of floats. + """ + embed_vector_json = json.dumps(embed_vector) - session.commit() + try: + with self.db.get_session() as session: + page = session.query(PageData).filter_by(page_id=page_id).first() + + if page: + page.embed = embed_vector_json # Store the serialized list + page.last_embedded = datetime.now(timezone.utc) + print(f"Embed vector and last_embedded timestamp for page ID {page_id} have been updated.") + session.commit() + else: + print(f"No page found with ID {page_id}. Consider handling this case as needed.") except SQLAlchemyError as e: session.rollback() raise e - -def get_page_data_by_ids(page_ids): - """ - Retrieve specific page data from the database by page IDs. - :param page_ids: A list of page IDs to retrieve data for. - :return: Tuple of all_documents (list of document strings) and page_ids (list of page IDs) - """ - if not page_ids: - return [], [] - - # Connect to the SQLite database - conn = sqlite3.connect(sql_file_path) - cursor = conn.cursor() - - # Prepare the query with placeholders for page IDs - placeholders = ','.join('?' for _ in page_ids) - query = f"SELECT * FROM page_data WHERE page_id IN ({placeholders})" - - # Execute the query with the list of page IDs - cursor.execute(query, page_ids) - records = cursor.fetchall() - - # Process each record into a string - all_documents = [] - retrieved_page_ids = [] - for record in records: - document = ( - f"Page id: {record[1]}, space key: {record[2]}, title: {record[3]}, " - f"author: {record[4]}, created date: {record[5]}, last updated: {record[6]}, " - f"content: {record[7]}, comments: {record[8]}" - ) - all_documents.append(document) - retrieved_page_ids.append(record[1]) - - # Close the SQLite connection - conn.close() - return all_documents, retrieved_page_ids - - -def update_embed_date(page_ids): - """ - Update the last_embedded timestamp in the database for the given page IDs. - :param page_ids: - :return: - """ - conn = sqlite3.connect(sql_file_path) - cursor = conn.cursor() - current_time = datetime.now() - for page_id in page_ids: - cursor.execute("UPDATE page_data SET last_embedded = ? WHERE page_id = ?", (current_time, page_id)) - conn.commit() - conn.close() - return True - - -def find_page(page_id) -> Optional[PageData]: - """ - Find a page in the database by its ID. - :param page_id: The ID of the page to find. - :return: The page data if found, or None if not found. - """ - session = Session() - page_record = session.query(PageData).filter_by(page_id=page_id).first() - session.close() - return page_record - - -def find_pages(page_ids) -> List[PageData]: - """ - Find multiple pages in the database by their IDs. - :param page_ids: A list of page IDs to find. - :return: A list of page data if found, or an empty list if not found. - """ - session = Session() - pages = session.query(PageData).filter(PageData.page_id.in_(page_ids)).all() - session.close() - return pages - - -def format_page_for_llm(page: PageData) -> str: - """ - Format a page for use with the LLM. - :param page: The page data. - :return: The formatted page content as a string. - """ - page_data = { - 'spaceKey': page.space_key, - 'pageId': page.page_id, - 'title': page.title, - 'author': page.author, - 'createdDate': page.createdDate, - 'lastUpdated': page.lastUpdated, - 'content': page.content, - 'comments': page.comments - } - - content = "" - for key, value in page_data.items(): - content += f"{key}: {value}\n" - return content - - -# Set up the database engine and create tables if they don't exist -engine = create_engine('sqlite:///' + sql_file_path) -Base.metadata.bind = engine -Base.metadata.create_all(engine) - -# Create a session maker object to manage database sessions -Session = sessionmaker(bind=engine) -session = Session() + def find_page(self, page_id) -> Optional[PageData]: + """ + Find a page in the database by its ID. + :param page_id: The ID of the page to find. + :return: The page data if found, or None if not found. + """ + with self.db.get_session() as session: + page_record = session.query(PageData).filter_by(page_id=page_id).first() + return page_record + + def find_pages(self, page_ids) -> List[PageData]: + """ + Find multiple pages in the database by their IDs. + :param page_ids: A list of page IDs to find. + :return: A list of page data if found, or an empty list if not found. + """ + with self.db.get_session() as session: + pages = session.query(PageData).filter(PageData.page_id.in_(page_ids)).all() + return pages + + def format_page_for_llm(self, page: PageData) -> str: + """ + Format a page for use with the LLM. + :param page: The page data. + :return: The formatted page content as a string. + """ + page_data = { + 'spaceKey': page.space_key, + 'pageId': page.page_id, + 'title': page.title, + 'author': page.author, + 'createdDate': page.createdDate, + 'lastUpdated': page.lastUpdated, + 'content': page.content, + 'comments': page.comments + } + + content = "" + for key, value in page_data.items(): + content += f"{key}: {value}\n" + return content diff --git a/database/quiz_question_manager.py b/database/quiz_question_manager.py index ae8643dc..925a4972 100644 --- a/database/quiz_question_manager.py +++ b/database/quiz_question_manager.py @@ -1,53 +1,40 @@ -from sqlalchemy import create_engine, Column, Integer, String, DateTime, Text -from sqlalchemy.orm import declarative_base -from sqlalchemy.orm import sessionmaker -from configuration import sql_file_path from datetime import datetime, timezone from sqlalchemy.exc import SQLAlchemyError from interactions.quiz_question_dto import QuizQuestionDTO - -Base = declarative_base() - - -class QuizQuestion(Base): - __tablename__ = 'quiz_questions' - id = Column(Integer, primary_key=True) - question_text = Column(Text) - thread_id = Column(String) # Slack thread ID for tracking conversations - summary = Column(Text) # Summary of the conversation - posted_on_slack = Column(DateTime) # Timestamp when posted on Slack - posted_on_confluence = Column(DateTime, nullable=True) # Timestamp when posted on Confluence +from models.quiz_question import QuizQuestion +from database.database import Database class QuizQuestionManager: def __init__(self): - # Initialize the database engine - self.engine = create_engine('sqlite:///' + sql_file_path) - Base.metadata.create_all(self.engine) # Create tables if they don't exist - self.Session = sessionmaker(bind=self.engine) + self.db = Database() def add_quiz_question(self, question_text): try: - with self.Session() as session: + with self.db.get_session() as session: new_question = QuizQuestion(question_text=question_text, posted_on_slack=datetime.now(timezone.utc)) session.add(new_question) session.commit() - # Convert and return a QuizQuestionDTO object - return QuizQuestionDTO( - id=new_question.id, + print(f"New question ID: {new_question.id}") + dto = QuizQuestionDTO( + question_id=new_question.id, question_text=new_question.question_text, thread_id=new_question.thread_id, summary=new_question.summary, posted_on_slack=new_question.posted_on_slack, posted_on_confluence=new_question.posted_on_confluence ) + + print(f"DTO: {dto.id}") + return dto + except SQLAlchemyError as e: print(f"Error adding quiz question: {e}") return None def update_with_summary(self, question_id, summary): try: - with self.Session() as session: + with self.db.get_session() as session: question = session.query(QuizQuestion).filter_by(id=question_id).first() if question: question.summary = summary @@ -57,7 +44,7 @@ def update_with_summary(self, question_id, summary): def update_with_summary_by_thread_id(self, thread_id, summary): try: - with self.Session() as session: + with self.db.get_session() as session: question = session.query(QuizQuestion).filter_by(thread_id=thread_id).first() print(f"question: {question}") if question: @@ -69,7 +56,7 @@ def update_with_summary_by_thread_id(self, thread_id, summary): def update_with_thread_id(self, question_id, thread_id): try: - with self.Session() as session: + with self.db.get_session() as session: question = session.query(QuizQuestion).filter_by(id=question_id).first() if question: question.thread_id = thread_id @@ -78,20 +65,9 @@ def update_with_thread_id(self, question_id, thread_id): except SQLAlchemyError as e: print(f"Error updating quiz question with thread ID: {e}") - def update_confluence_timestamp(self, question_id): - try: - with self.Session() as session: - question = session.query(QuizQuestion).filter_by(id=question_id).first() - if question: - question.posted_on_confluence = datetime.now(timezone.utc) - session.commit() - except SQLAlchemyError as e: - print(f"Error updating Confluence timestamp: {e}") - - # get all thread_ids for questions that have not been posted on Confluence def get_unposted_questions_timestamps(self): try: - with self.Session() as session: + with self.db.get_session() as session: questions = session.query(QuizQuestion).filter_by(posted_on_confluence=None).all() return [question.thread_id for question in questions] except SQLAlchemyError as e: diff --git a/database/score_manager.py b/database/score_manager.py new file mode 100644 index 00000000..553ba753 --- /dev/null +++ b/database/score_manager.py @@ -0,0 +1,55 @@ +from models.user_score import UserScore +from slack.client import get_bot_user_id +from database.database import Database +from sqlalchemy.exc import SQLAlchemyError + + +class ScoreManager: + def __init__(self): + self.db = Database() + self.bot_user_id = get_bot_user_id() + + def add_or_update_score(self, slack_user_id, category, points=1): + """ + Adds or updates the score for a user in a given category. If the user does not exist, creates a new record. + """ + if slack_user_id == self.bot_user_id: + print("Skipping score update for bot user.") + return + try: + with self.db.get_session() as session: + user_score = session.query(UserScore).filter_by(slack_user_id=slack_user_id).first() + if not user_score: + # If the user score does not exist, create a new one with default scores initialized to 0 + user_score = UserScore(slack_user_id=slack_user_id, seeker_score=0, revealer_score=0, luminary_score=0) + session.add(user_score) + session.flush() # Flush here to ensure user_score is persisted before we try to update it + + if category == 'seeker': + user_score.seeker_score += points + elif category == 'revealer': + user_score.revealer_score += points + elif category == 'luminary': + user_score.luminary_score += points + else: + raise ValueError("Invalid category provided.") + + session.commit() + except SQLAlchemyError as e: + print(f"Error adding or updating score: {e}") + session.rollback() + raise e + + def get_top_users(self, category, top_n=10): + """ + Retrieves the top N users for a given category. + """ + with self.db.get_session() as session: + if category == 'seeker': + return session.query(UserScore).order_by(UserScore.seeker_score.desc()).limit(top_n).all() + elif category == 'revealer': + return session.query(UserScore).order_by(UserScore.revealer_score.desc()).limit(top_n).all() + elif category == 'luminary': + return session.query(UserScore).order_by(UserScore.luminary_score.desc()).limit(top_n).all() + else: + raise ValueError("Invalid category provided.") diff --git a/database/space_manager.py b/database/space_manager.py index 7f2f1173..71b62ed1 100644 --- a/database/space_manager.py +++ b/database/space_manager.py @@ -1,35 +1,11 @@ -# ./database/space_manager.py - -from sqlalchemy import create_engine, Column, Integer, String, DateTime -from sqlalchemy.orm import sessionmaker, declarative_base from datetime import datetime -from configuration import sql_file_path - - -Base = declarative_base() - - -class SpaceInfo(Base): - """ - SQLAlchemy model for storing Confluence space data. - """ - __tablename__ = 'space_info' - - id = Column(Integer, primary_key=True) - space_key = Column(String, nullable=False) - space_name = Column(String, nullable=False) - last_import_date = Column(DateTime, nullable=False) - -def init_db(): - engine = create_engine('sqlite:///' + sql_file_path) - Base.metadata.bind = engine - Base.metadata.create_all(engine) - return sessionmaker(bind=engine)() +from models.space_info import SpaceInfo +from database.database import Database class SpaceManager: def __init__(self): - self.session = init_db() + self.db = Database() def add_space_info(self, space_key, space_name, last_import_date): """Add a new space to the database.""" @@ -38,33 +14,32 @@ def add_space_info(self, space_key, space_name, last_import_date): space_name=space_name, last_import_date=datetime.strptime(last_import_date, '%Y-%m-%d %H:%M:%S') ) - self.session.add(new_space) - self.session.commit() + self.db.add_object(new_space) def update_space_info(self, space_key, last_import_date): """Update the last import date of an existing space.""" - space = self.session.query(SpaceInfo).filter_by(space_key=space_key).first() - if space: - space.last_import_date = datetime.strptime(last_import_date, '%Y-%m-%d %H:%M:%S') - self.session.commit() - else: - print(f"Space with key {space_key} not found.") + with self.db.get_session() as session: + space = session.query(SpaceInfo).filter_by(space_key=space_key).first() + if space: + space.last_import_date = datetime.strptime(last_import_date, '%Y-%m-%d %H:%M:%S') + session.commit() + else: + print(f"Space with key {space_key} not found.") def upsert_space_info(self, space_key, space_name, last_import_date): """Insert or update space information based on the existence of the space key.""" - existing_space = self.session.query(SpaceInfo).filter_by(space_key=space_key).first() - if existing_space: - # The space exists, update the last import date. - existing_space.last_import_date = datetime.strptime(last_import_date, '%Y-%m-%d %H:%M:%S') - operation = 'Updated' - else: - # The space does not exist, create a new record. - new_space = SpaceInfo( - space_key=space_key, - space_name=space_name, - last_import_date=datetime.strptime(last_import_date, '%Y-%m-%d %H:%M:%S') - ) - self.session.add(new_space) - operation = 'Added' - self.session.commit() - return operation + with self.db.get_session() as session: + existing_space = session.query(SpaceInfo).filter_by(space_key=space_key).first() + if existing_space: + existing_space.last_import_date = datetime.strptime(last_import_date, '%Y-%m-%d %H:%M:%S') + operation = 'Updated' + else: + new_space = SpaceInfo( + space_key=space_key, + space_name=space_name, + last_import_date=datetime.strptime(last_import_date, '%Y-%m-%d %H:%M:%S') + ) + session.add(new_space) + operation = 'Added' + session.commit() + return operation diff --git a/gamification/models/__init__.py b/gamification/models/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/gamification/score_manager.py b/gamification/score_manager.py deleted file mode 100644 index 999d41e1..00000000 --- a/gamification/score_manager.py +++ /dev/null @@ -1,64 +0,0 @@ -from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker -from gamification.models.user_score import UserScore, Base -from configuration import sql_file_path # Assuming there's a configuration file with DB path -from slack.client import get_bot_user_id - - -class ScoreManager: - def __init__(self): - self.engine = create_engine('sqlite:///' + sql_file_path) - Base.metadata.create_all(self.engine) - self.Session = sessionmaker(bind=self.engine) - self.bot_user_id = get_bot_user_id() - - def add_or_update_score(self, slack_user_id, category, points=1): - """ - Adds or updates the score for a user in a given category. If the user does not exist, creates a new record. - """ - # Check if the user ID is the bot's user ID - if slack_user_id == self.bot_user_id: - print("Skipping score update for bot user.") - return - session = self.Session() - try: - user_score = session.query(UserScore).filter_by(slack_user_id=slack_user_id).first() - if not user_score: - # If the user score does not exist, create a new one with default scores initialized to 0 - user_score = UserScore(slack_user_id=slack_user_id, seeker_score=0, revealer_score=0, luminary_score=0) - session.add(user_score) - session.flush() # Flush here to ensure user_score is persisted before we try to update it - - # Increment the score based on the category - if category == 'seeker': - user_score.seeker_score += points - elif category == 'revealer': - user_score.revealer_score += points - elif category == 'luminary': - user_score.luminary_score += points - else: - raise ValueError("Invalid category provided.") - - session.commit() - except Exception as e: - session.rollback() - raise e - finally: - session.close() - - def get_top_users(self, category, top_n=10): - """ - Retrieves the top N users for a given category. - """ - session = self.Session() - try: - if category == 'seeker': - return session.query(UserScore).order_by(UserScore.seeker_score.desc()).limit(top_n).all() - elif category == 'revealer': - return session.query(UserScore).order_by(UserScore.revealer_score.desc()).limit(top_n).all() - elif category == 'luminary': - return session.query(UserScore).order_by(UserScore.luminary_score.desc()).limit(top_n).all() - else: - raise ValueError("Invalid category provided.") - finally: - session.close() diff --git a/interactions/identify_knowledge_gap.py b/interactions/identify_knowledge_gap.py index b9c80800..42e8ea92 100644 --- a/interactions/identify_knowledge_gap.py +++ b/interactions/identify_knowledge_gap.py @@ -10,8 +10,8 @@ from open_ai.assistants.utility import extract_assistant_response, initiate_client from open_ai.assistants.thread_manager import ThreadManager from open_ai.assistants.assistant_manager import AssistantManager -from database.interaction_manager import QAInteractionManager, QAInteractions -from database.quiz_question_manager import QuizQuestionManager, QuizQuestion +from database.interaction_manager import QAInteractionManager, QAInteraction +from database.quiz_question_manager import QuizQuestionManager from slack.message_manager import post_questions_to_slack @@ -60,7 +60,7 @@ def retrieve_relevant_interaction_ids(query: str) -> List[str]: return interaction_ids -def format_interactions(interactions: List['QAInteractions']) -> Tuple[str, List[str]]: +def format_interactions(interactions: List['QAInteraction']) -> Tuple[str, List[str]]: """ Format a list of QAInteraction objects into a human-readable string and collect user IDs. @@ -69,7 +69,7 @@ def format_interactions(interactions: List['QAInteractions']) -> Tuple[str, List If comments deserialization fails, it defaults to an empty list. Parameters: - interactions (List[QAInteractions]): A list of QAInteraction objects. + interactions (List[QAInteraction]): A list of QAInteraction objects. Returns: Tuple[str, List[str]]: A tuple containing a string of formatted interactions and a list of user IDs. @@ -182,25 +182,21 @@ def process_and_store_questions(assistant_response_json): Returns: list[QuizQuestionDTO]: A list of QuizQuestionDTO objects added to the database. """ - # Parse the JSON response try: questions_data = json.loads(assistant_response_json) except json.JSONDecodeError as e: logging.error(f"Error decoding JSON: {e}") return [] - # Initialize the QuizQuestionManager quiz_question_manager = QuizQuestionManager() quiz_question_dtos = [] for item in questions_data: question_text = item.get("Question") if question_text: - # Add the question to the database and directly collect the QuizQuestion object quiz_question_dto = quiz_question_manager.add_quiz_question(question_text=question_text) if quiz_question_dto: quiz_question_dtos.append(quiz_question_dto) - return quiz_question_dtos @@ -234,8 +230,7 @@ def strip_json(assistant_response): def identify_knowledge_gaps(context): query = f"no information in context: {context}" interaction_ids = retrieve_relevant_interaction_ids(query) - qa_interaction_manager = QAInteractionManager() - relevant_qa_interactions = qa_interaction_manager.get_interactions_by_interaction_ids(interaction_ids) + relevant_qa_interactions = QAInteractionManager().get_interactions_by_interaction_ids(interaction_ids) formatted_interactions, user_ids = format_interactions(relevant_qa_interactions) assistant_response, thread_ids = query_assistant_with_context(context, formatted_interactions) questions_json = strip_json(assistant_response) diff --git a/interactions/quiz_question_dto.py b/interactions/quiz_question_dto.py index d5a30523..f37bc528 100644 --- a/interactions/quiz_question_dto.py +++ b/interactions/quiz_question_dto.py @@ -1,11 +1,11 @@ # ./interactions/quiz_question_dto.py class QuizQuestionDTO: - def __init__(self, id, question_text, thread_id=None, + def __init__(self, question_id, question_text, thread_id=None, summary=None, posted_on_slack=None, posted_on_confluence=None ): - self.id = id + self.id = question_id self.question_text = question_text self.thread_id = thread_id self.summary = summary self.posted_on_slack = posted_on_slack - self.posted_on_confluence = posted_on_confluence \ No newline at end of file + self.posted_on_confluence = posted_on_confluence diff --git a/interactions/vectorize_and_store.py b/interactions/vectorize_and_store.py index 7f6f2e6d..2113e9ec 100644 --- a/interactions/vectorize_and_store.py +++ b/interactions/vectorize_and_store.py @@ -15,12 +15,7 @@ def get_qna_interactions_from_database(): Returns: list: A list of QAInteraction objects. """ - - # Initialize QAInteractionManager with the session - qa_manager = QAInteractionManager() - - # Fetch all Q&A interactions from the database - all_interactions = qa_manager.get_qa_interactions() + all_interactions = QAInteractionManager().get_qa_interactions() return all_interactions @@ -31,11 +26,7 @@ def get_qna_interactions_without_embeds(): Returns: list: A list of QAInteraction objects. """ - # Initialize QAInteractionManager with the session - qa_manager = QAInteractionManager() - - # Fetch all Q&A interactions from the database - all_interactions = qa_manager.get_interactions_without_embeds() + all_interactions = QAInteractionManager().get_interactions_without_embeds() return all_interactions @@ -101,14 +92,12 @@ def store_interaction_embed_in_db(interaction_id, embed_response_json): :param embed_response_json: The JSON string response containing the embedding. :return: None """ - interaction_manager = QAInteractionManager() # Directly use the JSON string of the embedding vector as received from embed_text - interaction_manager.add_embed_to_interaction(interaction_id, embed_response_json) + QAInteractionManager().add_embed_to_interaction(interaction_id, embed_response_json) def vectorize_interaction_and_store_in_db(interaction_id): - interaction_manager = QAInteractionManager() - interaction = interaction_manager.get_interaction_by_interaction_id(interaction_id) # Assuming this method exists and correctly fetches the interaction + interaction = QAInteractionManager().get_interaction_by_interaction_id(interaction_id) if interaction: formatted_interaction = format_interaction(interaction) embed = vectorize_interaction(formatted_interaction, embedding_model_id) @@ -162,11 +151,11 @@ def vectorize_interactions_and_store_in_db(retry_limit: int = 3, wait_time: int for interaction in interactions: try: # Attempt to vectorize and store each interaction. - submit_create_interaction_embeds_request(str(interaction.interaction_id)) + submit_create_interaction_embeds_request(str(interaction.id)) # A brief delay between processing to manage load. time.sleep(0.5) except Exception as e: - logging.error(f"An error occurred while vectorizing interaction with ID {interaction.interaction_id}: {e}") + logging.error(f"An error occurred while vectorizing interaction with ID {interaction.id}: {e}") print(f"Waiting for {wait_time} seconds for embeddings to be processed...") time.sleep(wait_time) diff --git a/main.py b/main.py index d97b2d89..9be9b794 100644 --- a/main.py +++ b/main.py @@ -57,8 +57,7 @@ def main_menu(): elif choice == "3": print("Creating vector db for interactions") vectorize_interactions_and_store_in_db() - vector_interaction_manager = VectorInteractionManager() - vector_interaction_manager.add_to_vector() + VectorInteractionManager().add_to_vector() elif choice == "4": load_manage_assistants() @@ -79,4 +78,4 @@ def main_menu(): if __name__ == "__main__": - main_menu() \ No newline at end of file + main_menu() diff --git a/gamification/__init__.py b/models/__init__.py similarity index 100% rename from gamification/__init__.py rename to models/__init__.py diff --git a/database/bookmarked_conversation.py b/models/bookmarked_conversation.py similarity index 85% rename from database/bookmarked_conversation.py rename to models/bookmarked_conversation.py index 8ba85bfd..1b43b486 100644 --- a/database/bookmarked_conversation.py +++ b/models/bookmarked_conversation.py @@ -1,11 +1,13 @@ -from sqlalchemy import Column, Integer, String, DateTime, Text from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy import Column, Integer, String, Text, DateTime from datetime import datetime, timezone Base = declarative_base() + class BookmarkedConversation(Base): __tablename__ = 'bookmarked_conversations' + id = Column(Integer, primary_key=True) title = Column(Text) body = Column(Text) @@ -14,4 +16,4 @@ class BookmarkedConversation(Base): posted_on_confluence = Column(DateTime, nullable=True) def __repr__(self): - return f"" \ No newline at end of file + return f"" diff --git a/models/page_data.py b/models/page_data.py new file mode 100644 index 00000000..2a3e36b9 --- /dev/null +++ b/models/page_data.py @@ -0,0 +1,24 @@ +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy import Column, Integer, String, Text, DateTime +import json + +Base = declarative_base() + + +class PageData(Base): + """ + SQLAlchemy model for storing Confluence page data. + """ + __tablename__ = 'page_data' + + id = Column(Integer, primary_key=True) + page_id = Column(String) + space_key = Column(String) + title = Column(String) + author = Column(String) + createdDate = Column(DateTime) + lastUpdated = Column(DateTime) + content = Column(Text) + comments = Column(Text, default=json.dumps([])) + last_embedded = Column(DateTime) + embed = Column(Text, default=json.dumps([])) diff --git a/models/qa_interaction.py b/models/qa_interaction.py new file mode 100644 index 00000000..62b9e55f --- /dev/null +++ b/models/qa_interaction.py @@ -0,0 +1,22 @@ +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy import Column, Integer, String, Text, DateTime +import json + +Base = declarative_base() + + +class QAInteraction(Base): + __tablename__ = 'qa_interactions' + + id = Column(Integer, primary_key=True) + question_text = Column(Text) + thread_id = Column(String) + assistant_thread_id = Column(String) + answer_text = Column(Text) + channel_id = Column(String) + slack_user_id = Column(String) + question_timestamp = Column(DateTime) + answer_timestamp = Column(DateTime) + comments = Column(Text, default=json.dumps([])) + last_embedded = Column(DateTime) + embed = Column(Text, default=json.dumps([])) diff --git a/models/quiz_question.py b/models/quiz_question.py new file mode 100644 index 00000000..65957b32 --- /dev/null +++ b/models/quiz_question.py @@ -0,0 +1,16 @@ +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy import Column, Integer, String, Text, DateTime + +Base = declarative_base() + + +class QuizQuestion(Base): + __tablename__ = 'quiz_questions' + + id = Column(Integer, primary_key=True) + question_text = Column(Text) + thread_id = Column(String) + summary = Column(Text) + posted_on_slack = Column(DateTime) + posted_on_confluence = Column(DateTime, nullable=True) + diff --git a/models/space_info.py b/models/space_info.py new file mode 100644 index 00000000..98470c01 --- /dev/null +++ b/models/space_info.py @@ -0,0 +1,16 @@ +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy import Column, Integer, String, DateTime + +Base = declarative_base() + + +class SpaceInfo(Base): + """ + SQLAlchemy model for storing Confluence space data. + """ + __tablename__ = 'space_info' + + id = Column(Integer, primary_key=True) + space_key = Column(String, nullable=False) + space_name = Column(String, nullable=False) + last_import_date = Column(DateTime, nullable=False) diff --git a/gamification/models/user_score.py b/models/user_score.py similarity index 73% rename from gamification/models/user_score.py rename to models/user_score.py index 505c7c16..e33c3d95 100644 --- a/gamification/models/user_score.py +++ b/models/user_score.py @@ -1,13 +1,15 @@ -from sqlalchemy import Column, Integer, String, create_engine from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import sessionmaker -from configuration import sql_file_path +from sqlalchemy import Column, Integer, String Base = declarative_base() class UserScore(Base): + """ + SQLAlchemy model for storing user scores. + """ __tablename__ = 'user_scores' + id = Column(Integer, primary_key=True) slack_user_id = Column(String, unique=True, nullable=False) seeker_score = Column(Integer, default=0) diff --git a/open_ai/assistants/query_assistant_from_documents.py b/open_ai/assistants/query_assistant_from_documents.py index 730579f6..9d8ff492 100644 --- a/open_ai/assistants/query_assistant_from_documents.py +++ b/open_ai/assistants/query_assistant_from_documents.py @@ -6,7 +6,7 @@ from configuration import qa_assistant_id import logging -from database.page_manager import PageData, find_pages, format_page_for_llm +from database.page_manager import PageManager, PageData logging.basicConfig(level=logging.INFO) @@ -28,7 +28,7 @@ def format_pages_as_context(pages: List[PageData], max_length=30000, truncation_ for page in pages: title = page.title space_key = page.space_key - file_content = format_page_for_llm(page) + file_content = PageManager().format_page_for_llm(page) page_data = f"\nDocument Title: {title}\nSpace Key: {space_key}\n\n{file_content}" # Truncate and stop if the total length exceeds the maximum allowed @@ -71,7 +71,7 @@ def query_assistant_with_context(question, page_ids, thread_id=None): print(f"IDs of pages to load in context : {page_ids}\n") # Format the context - pages = find_pages(page_ids) + pages = PageManager().find_pages(page_ids) context = format_pages_as_context(pages) print(f"\n\nContext formatted: {context}\n") diff --git a/slack/channel_message_handler.py b/slack/channel_message_handler.py index 9163a4f9..4a3f4f54 100644 --- a/slack/channel_message_handler.py +++ b/slack/channel_message_handler.py @@ -24,7 +24,7 @@ def __init__(self): def load_processed_data(self): """ Load processed messages and questions from the database """ try: - interactions = self.interaction_manager.get_all_interactions() + interactions = self.interaction_manager.get_qa_interactions() except Exception as e: logging.error(f"Error loading processed messages and questions: {e}") interactions = [] diff --git a/slack/event_consumer.py b/slack/event_consumer.py index 85ddf2d0..9bb52e69 100644 --- a/slack/event_consumer.py +++ b/slack/event_consumer.py @@ -11,7 +11,7 @@ from vector.chroma import retrieve_relevant_documents from open_ai.assistants.query_assistant_from_documents import query_assistant_with_context from database.interaction_manager import QAInteractionManager -from gamification.score_manager import ScoreManager +from database.score_manager import ScoreManager from slack_sdk.errors import SlackApiError @@ -63,13 +63,13 @@ def add_question_and_response_to_database(self, question_event, response_text, a def process_question(self, question_event: QuestionEvent): channel_id = question_event.channel message_ts = question_event.ts - user_id = question_event.user try: context_page_ids = retrieve_relevant_documents(question_event.text) response_text, assistant_thread_id = query_assistant_with_context(question_event.text, context_page_ids, None) except Exception as e: print(f"Error processing question: {e}") response_text = None + if response_text: print(f"Response from assistant: {response_text}\n") try: diff --git a/slack/message_manager.py b/slack/message_manager.py index 07b729f2..a26bc8ea 100644 --- a/slack/message_manager.py +++ b/slack/message_manager.py @@ -1,11 +1,9 @@ # ./slack/message_manager.py from slack_sdk import WebClient from credentials import slack_bot_user_oauth_token +from database.score_manager import ScoreManager from database.quiz_question_manager import QuizQuestionManager from interactions.quiz_question_dto import QuizQuestionDTO - -# Assuming the ScoreManager class and add_or_update_score method exist -from gamification.score_manager import ScoreManager from slack_sdk.errors import SlackApiError @@ -26,7 +24,7 @@ def post_questions_to_slack(channel_id, quiz_question_dtos, user_ids): client = WebClient(token=slack_bot_user_oauth_token) quiz_question_manager = QuizQuestionManager() - score_manager = ScoreManager() # Initialize the ScoreManager + score_manager = ScoreManager() for quiz_question_dto in quiz_question_dtos: try: @@ -86,4 +84,4 @@ def get_message_replies(client, channel, ts): except SlackApiError as e: print(f"Slack API Error: {e.response['error']}") - return [] \ No newline at end of file + return [] diff --git a/slack/reaction_manager.py b/slack/reaction_manager.py index 8d69fe5a..7da6af49 100644 --- a/slack/reaction_manager.py +++ b/slack/reaction_manager.py @@ -3,12 +3,13 @@ import json import re from confluence.system_knowledge_manager import create_page_on_confluence -from gamification.score_manager import ScoreManager +from database.score_manager import ScoreManager from slack.event_consumer import get_user_name_from_id from slack_sdk.errors import SlackApiError from database.bookmarked_conversation_manager import BookmarkedConversationManager from slack.message_manager import get_message_replies + def get_top_users_by_category(slack_web_client): score_manager = ScoreManager() categories = ['seeker', 'revealer', 'luminary'] @@ -68,7 +69,6 @@ def process_checkmark_added_event(slack_web_client, event): channel = event.get("item", {}).get("channel") knowledge_gathering_messages = get_message_replies(slack_web_client, channel, item_ts) - # Assuming ScoreManager or similar exists for managing scores score_manager = ScoreManager() # Extract unique user IDs from the replies @@ -140,7 +140,6 @@ def process_bookmark_added_event(slack_web_client, event): bookmarked_conversation_manager.add_bookmarked_conversation(title=title, body=body, thread_id=item_ts) print(f"Bookmarked conversation added to the database: {title}") - # Add conversation on confluence create_page_on_confluence(title, body) bookmarked_conversation_manager.update_posted_on_confluence(item_ts) @@ -148,4 +147,4 @@ def process_bookmark_added_event(slack_web_client, event): else: print("No messages found for the bookmarked conversation.") except SlackApiError as e: - print(f"Failed to fetch conversation replies: {e.response['error']}") \ No newline at end of file + print(f"Failed to fetch conversation replies: {e.response['error']}") diff --git a/vector/chroma.py b/vector/chroma.py index eaa7b4ee..d4c9b181 100644 --- a/vector/chroma.py +++ b/vector/chroma.py @@ -4,7 +4,7 @@ import logging from typing import List from open_ai.embedding.embed_manager import embed_text -from database.page_manager import add_or_update_embed_vector, find_page, format_page_for_llm +from database.page_manager import PageManager def generate_document_embedding(page_id, page_content, model=embedding_model_id): @@ -73,17 +73,17 @@ def vectorize_document_and_store_in_db(page_id): :param page_id: The ID of the page to vectorize. :return: None """ - page = find_page(page_id) + page = PageManager().find_page(page_id) if not page: logging.error(f"Page content for page ID {page_id} could not be retrieved.") return - page_content = format_page_for_llm(page) + page_content = PageManager().format_page_for_llm(page) embedding, error_message = generate_document_embedding(page_id, page_content) if embedding: if len(embedding) > 0: # Store the embedding in the database - add_or_update_embed_vector(page_id, embedding) + PageManager().add_or_update_embed_vector(page_id, embedding) logging.info(f"Embedding for page ID {page_id} stored in the database.") else: logging.error(f"Embedding for page ID {page_id} is empty.") diff --git a/vector/create_interaction_db.py b/vector/create_interaction_db.py index d15d73b4..a6472189 100644 --- a/vector/create_interaction_db.py +++ b/vector/create_interaction_db.py @@ -13,15 +13,12 @@ def __init__(self): def add_to_vector(self): collection_name = interactions_collection_name - - # Get interactions with embedings - qa_interaction_manager = QAInteractionManager() - interactions = qa_interaction_manager.get_interactions_with_embeds() + interactions = QAInteractionManager().get_interactions_with_embeds() interaction_ids = [] interaction_embeddings = [] for interaction in interactions: - interaction_ids.append(str(interaction.interaction_id)) + interaction_ids.append(str(interaction.id)) interaction_embeddings.append(json.loads(interaction.embed)) # Ensure the collection exists diff --git a/vector/create_vector_db.py b/vector/create_vector_db.py index 0fd84ce0..2ad298c4 100644 --- a/vector/create_vector_db.py +++ b/vector/create_vector_db.py @@ -3,7 +3,7 @@ import json from configuration import pages_collection_name, vector_folder_path -from database.page_manager import get_all_page_data_from_db +from database.page_manager import PageManager # Initialize the Chroma PersistentClient for disk persistence client = chromadb.PersistentClient(path=vector_folder_path) @@ -18,7 +18,7 @@ def add_to_vector(collection_name, space_key=None): space_key (str): The key of the space to retrieve page data from. If None, retrieves data from all spaces. """ # Retrieve all documents, their corresponding IDs, and embeddings - page_ids, _, embeddings = get_all_page_data_from_db(space_key=space_key) + page_ids, _, embeddings = PageManager().get_all_page_data_from_db(space_key=space_key) # Deserialize the embeddings and filter out None values valid_embeddings = [] diff --git a/visualize/pages.py b/visualize/pages.py index 31d3c5c2..94808c02 100644 --- a/visualize/pages.py +++ b/visualize/pages.py @@ -1,5 +1,5 @@ # Import necessary libraries -from database.page_manager import get_all_page_data_from_db +from database.page_manager import PageManager import numpy as np import json import plotly.graph_objects as go # Import Plotly Graph Objects for 3D plotting @@ -9,7 +9,7 @@ def import_data(): # Step 1: Retrieve all page data, including embeddings, titles, and space keys - page_ids, all_documents, embeddings_json = get_all_page_data_from_db() + page_ids, all_documents, embeddings_json = PageManager().get_all_page_data_from_db() print(f"Retrieved {len(embeddings_json)} embeddings from the database.") if not embeddings_json: