From 201204a517a636fac81a4c988c66716a67ed27e8 Mon Sep 17 00:00:00 2001 From: Ali Tavallaie Date: Sun, 4 Aug 2024 10:38:40 +0330 Subject: [PATCH] feat: add file:// url --- connectiva/protocol_detector.py | 2 + connectiva/protocols/file_protocol.py | 241 ++++++++++++-------------- tests/test_file_protocol.py | 67 ++++--- 3 files changed, 146 insertions(+), 164 deletions(-) diff --git a/connectiva/protocol_detector.py b/connectiva/protocol_detector.py index 747b180..5b5b5f5 100644 --- a/connectiva/protocol_detector.py +++ b/connectiva/protocol_detector.py @@ -19,6 +19,7 @@ class ProtocolDetector: "ws://": "WebSocket", "wss://": "WebSocket", "graphql://": "GraphQL", + "file://": "File", } @staticmethod @@ -45,3 +46,4 @@ def detect_protocol(endpoint: str) -> str: # Default fallback return "REST" + diff --git a/connectiva/protocols/file_protocol.py b/connectiva/protocols/file_protocol.py index 5cd114f..76bcc04 100644 --- a/connectiva/protocols/file_protocol.py +++ b/connectiva/protocols/file_protocol.py @@ -1,144 +1,127 @@ -import unittest +# connectiva/protocols/file_protocol.py + import os -import shutil +import json +import fcntl import logging -import threading -import time -from connectiva.connectiva import Connectiva -from connectiva.message import Message - -# Configure logging -logging.basicConfig(level=logging.DEBUG) - - -class TestFileProtocolWithConnectiva(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.test_dir = "test_messages" - cls.connectiva = Connectiva( - log=True, - directory=cls.test_dir, - protocol="File", - prefix="msg_", - processed_prefix="processed_" - ) - - # Ensure the directory is clean before starting - if os.path.exists(cls.test_dir): - shutil.rmtree(cls.test_dir) - os.makedirs(cls.test_dir) - - # Log the setup process - cls.logger = logging.getLogger('TestFileProtocolWithConnectiva') - cls.logger.info("Test setup complete. Test directory created.") - - @classmethod - def tearDownClass(cls): - # Clean up the test directory after all tests - if os.path.exists(cls.test_dir): - shutil.rmtree(cls.test_dir) - cls.logger.info("Test teardown complete. Test directory removed.") - - def test_send_message(self): +from uuid import uuid4 +from typing import Dict, Any +from connectiva import CommunicationMethod, Message + +class FileProtocol(CommunicationMethod): + """ + File sharing communication class with atomic file naming and processing. + """ + + def __init__(self, **kwargs): + self.directory = kwargs.get("directory", ".") + self.prefix = kwargs.get("prefix", "msg_") + self.processed_prefix = kwargs.get("processed_", "processed_") + self.logger = logging.getLogger(self.__class__.__name__) + + # Ensure the directory exists + if not os.path.exists(self.directory): + os.makedirs(self.directory) + self.logger.debug(f"Directory {self.directory} created.") + + def connect(self): + self.logger.info(f"Accessing directory at {self.directory}...") + + def _generate_filename(self) -> str: """ - Test sending a message and ensure the file is created correctly. + Generate a unique filename for each message. """ - message = Message(action="send", data={"key": "value"}) - result = self.connectiva.send(message) - self.assertIn("status", result) - self.assertEqual(result["status"], "file_written") - - # Check if a file was created - files = [f for f in os.listdir(self.test_dir) if f.startswith(self.connectiva.config.get('prefix'))] - self.assertTrue(len(files) > 0, "No files created by send method.") + return f"{self.prefix}{uuid4().hex}.json" - def test_receive_message(self): + def _lock_file(self, file): """ - Test receiving a message and ensure the content matches the sent data. + Lock the file to ensure exclusive access. """ - # Create a test message file - message_data = {"key": "value"} - message = Message(action="send", data=message_data) - self.connectiva.send(message) - - received_message = self.connectiva.receive() - self.assertEqual(received_message.data, message_data, "Received data should match sent message.") - - def test_receive_no_message(self): - """ - Test receiving when no message is available and ensure proper error handling. + self.logger.debug(f"Locking file {file.name}...") + try: + fcntl.flock(file, fcntl.LOCK_EX) + self.logger.debug(f"File {file.name} locked successfully!") + except Exception as e: + self.logger.error(f"Failed to lock file: {e}") + raise + + def _unlock_file(self, file): """ - # Ensure directory is empty before the test - files = os.listdir(self.test_dir) - for f in files: - os.remove(os.path.join(self.test_dir, f)) - - received_message = self.connectiva.receive() - self.assertEqual(received_message.action, "error", "Action should be 'error' when no message is found.") - - def test_concurrent_access(self): + Unlock the file to release access. """ - Test concurrent access to ensure that multiple receivers can process messages - without conflicts. This test will simulate concurrent access by using multiple threads. + self.logger.debug(f"Unlocking file {file.name}...") + try: + fcntl.flock(file, fcntl.LOCK_UN) + self.logger.debug(f"File {file.name} unlocked successfully!") + except Exception as e: + self.logger.error(f"Failed to unlock file: {e}") + raise + + def send(self, message: Message) -> Dict[str, Any]: """ - def receiver(connectiva, results, index): - message = connectiva.receive() - results[index] = message.data - - # Create multiple message files - for _ in range(5): - self.connectiva.send(Message(action="send", data={"content": "Hello!"})) + Write a message to a uniquely named file. - results = [None] * 5 - threads = [] - - # Start multiple receiver threads - for i in range(5): - thread = threading.Thread(target=receiver, args=(self.connectiva, results, i)) - thread.start() - threads.append(thread) - - # Wait for all threads to finish - for thread in threads: - thread.join() - - # Verify that each message was processed exactly once - expected_results = [{"content": "Hello!"}] * 5 - self.assertEqual(sorted(results), sorted(expected_results), "Each receiver should get the correct message content.") - - def test_locking_mechanism(self): + :param message: Message object containing data to be written. + :return: Dictionary indicating the status of the file operation. """ - Test the file locking mechanism by attempting concurrent writes and reads, - ensuring that the lock is acquired and released properly. + filename = self._generate_filename() + file_path = os.path.join(self.directory, filename) + self.logger.info(f"Writing message to file {file_path}...") + + try: + with open(file_path, 'w') as file: + self._lock_file(file) + json.dump(message.__dict__, file) + self._unlock_file(file) + self.logger.info("Message written successfully!") + return {"status": "file_written", "file_path": file_path} + except Exception as e: + self.logger.error(f"Failed to write message: {e}") + return {"error": str(e)} + + def receive(self) -> Message: """ - lock_acquired = threading.Event() - - def write_with_lock(connectiva, message_data): - nonlocal lock_acquired - message = Message(action="send", data=message_data) - connectiva.send(message) - lock_acquired.set() # Indicate that the lock was acquired - - def read_with_lock(connectiva, result): - nonlocal lock_acquired - # Wait until the lock is acquired by the write operation - lock_acquired.wait() - time.sleep(0.5) # Wait for a bit to ensure the write lock is held - result.append(connectiva.receive().data) - - write_thread = threading.Thread(target=write_with_lock, args=(self.connectiva, {"content": "Test Lock"})) - result = [] - read_thread = threading.Thread(target=read_with_lock, args=(self.connectiva, result)) - - write_thread.start() - read_thread.start() - - write_thread.join() - read_thread.join() - - # Check if the message was read successfully - self.assertEqual(result[0], {"content": "Test Lock"}, "Locking mechanism failed; message not read correctly.") + Read and process the oldest unprocessed message file. + :return: Message object containing data read from the file. + """ + self.logger.info(f"Scanning directory {self.directory} for messages...") + files = sorted( + [f for f in os.listdir(self.directory) if f.startswith(self.prefix)], + key=lambda f: os.path.getctime(os.path.join(self.directory, f)) + ) -if __name__ == '__main__': - unittest.main() + if not files: + self.logger.info("No new messages found.") + return Message(action="error", data={}, metadata={"error": "No message found"}) + + for filename in files: + file_path = os.path.join(self.directory, filename) + try: + # Lock the file and rename it to indicate processing + with open(file_path, 'r+') as file: + self._lock_file(file) + + # Check if the file has already been processed + if file_path.startswith(self.processed_prefix): + self._unlock_file(file) + continue + + new_file_path = os.path.join(self.directory, self.processed_prefix + filename) + os.rename(file_path, new_file_path) + self.logger.info(f"Renamed file to {new_file_path} for processing.") + + # Read the message + file.seek(0) # Reset file pointer to the beginning + data = json.load(file) + self._unlock_file(file) + self.logger.info("Message read successfully!") + return Message(**data) + except Exception as e: + self.logger.error(f"Failed to read message: {e}") + return Message(action="error", data={}, metadata={"error": str(e)}) + + return Message(action="error", data={}, metadata={"error": "No message found"}) + + def disconnect(self): + self.logger.info("Closing directory access...") diff --git a/tests/test_file_protocol.py b/tests/test_file_protocol.py index 1badd8a..1088f3f 100644 --- a/tests/test_file_protocol.py +++ b/tests/test_file_protocol.py @@ -5,31 +5,29 @@ import shutil import logging import threading -import time from connectiva.connectiva import Connectiva from connectiva.message import Message -# Configure logging -logging.basicConfig(level=logging.DEBUG) - class TestFileProtocolWithConnectiva(unittest.TestCase): @classmethod def setUpClass(cls): cls.test_dir = "test_messages" - cls.connectiva = Connectiva( - log=True, - directory=cls.test_dir, - protocol="File", - prefix="msg_", - processed_prefix="processed_" - ) + cls.endpoint = f"file://{os.path.abspath(cls.test_dir)}" # Use file:// scheme # Ensure the directory is clean before starting if os.path.exists(cls.test_dir): shutil.rmtree(cls.test_dir) os.makedirs(cls.test_dir) + cls.connectiva = Connectiva( + log=True, + endpoint=cls.endpoint, + protocol="File", # Specify protocol directly + prefix="msg_", + processed_prefix="processed_" + ) + # Log the setup process cls.logger = logging.getLogger('TestFileProtocolWithConnectiva') cls.logger.info("Test setup complete. Test directory created.") @@ -51,7 +49,7 @@ def test_send_message(self): self.assertEqual(result["status"], "file_written") # Check if a file was created - files = [f for f in os.listdir(self.test_dir) if f.startswith(self.connectiva.config.get('prefix'))] + files = [f for f in os.listdir(self.test_dir) if f.startswith("msg_")] self.assertTrue(len(files) > 0, "No files created by send method.") def test_receive_message(self): @@ -110,37 +108,36 @@ def receiver(connectiva, results, index): def test_locking_mechanism(self): """ - Test the file locking mechanism by attempting concurrent writes and reads, - ensuring that the lock is acquired and released properly. + Test the file locking mechanism by attempting concurrent writes and reads """ - lock_acquired = threading.Event() - - def write_with_lock(connectiva, message_data): - nonlocal lock_acquired + def write_message(connectiva, message_data): message = Message(action="send", data=message_data) connectiva.send(message) - lock_acquired.set() # Indicate that the lock was acquired - def read_with_lock(connectiva, result): - nonlocal lock_acquired - # Wait until the lock is acquired by the write operation - lock_acquired.wait() - time.sleep(0.5) # Wait for a bit to ensure the write lock is held - result.append(connectiva.receive().data) + # Write a message to the file + message_data_1 = {"content": "Test Lock"} + message_data_2 = {"content": "Hello!"} + + thread_1 = threading.Thread(target=write_message, args=(self.connectiva, message_data_1)) + thread_2 = threading.Thread(target=write_message, args=(self.connectiva, message_data_2)) - write_thread = threading.Thread(target=write_with_lock, args=(self.connectiva, {"content": "Test Lock"})) - result = [] - read_thread = threading.Thread(target=read_with_lock, args=(self.connectiva, result)) + thread_1.start() + thread_2.start() - write_thread.start() - read_thread.start() + thread_1.join() + thread_2.join() - write_thread.join() - read_thread.join() + # Ensure both messages are read correctly + received_messages = [] + while True: + received_message = self.connectiva.receive() + if received_message.action == "error": + break + received_messages.append(received_message.data) - # Check if the message was read successfully - self.assertEqual(result[0], {"content": "Test Lock"}, "Locking mechanism failed; message not read correctly.") + expected_results = [message_data_1, message_data_2] + self.assertEqual(sorted(received_messages), sorted(expected_results), "Locking mechanism failed; message not read correctly.") -if __name__ == '__main__': +if __name__ == "__main__": unittest.main()