Skip to content

Commit

Permalink
feat: add file:// url
Browse files Browse the repository at this point in the history
  • Loading branch information
tavallaie committed Aug 4, 2024
1 parent 265e933 commit 201204a
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 164 deletions.
2 changes: 2 additions & 0 deletions connectiva/protocol_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class ProtocolDetector:
"ws://": "WebSocket",
"wss://": "WebSocket",
"graphql://": "GraphQL",
"file://": "File",
}

@staticmethod
Expand All @@ -45,3 +46,4 @@ def detect_protocol(endpoint: str) -> str:

# Default fallback
return "REST"

241 changes: 112 additions & 129 deletions connectiva/protocols/file_protocol.py
Original file line number Diff line number Diff line change
@@ -1,144 +1,127 @@
import unittest
# connectiva/protocols/file_protocol.py

import os
import shutil
import json
import fcntl
import logging
import threading
import time
from connectiva.connectiva import Connectiva
from connectiva.message import Message

# Configure logging
logging.basicConfig(level=logging.DEBUG)


class TestFileProtocolWithConnectiva(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.test_dir = "test_messages"
cls.connectiva = Connectiva(
log=True,
directory=cls.test_dir,
protocol="File",
prefix="msg_",
processed_prefix="processed_"
)

# Ensure the directory is clean before starting
if os.path.exists(cls.test_dir):
shutil.rmtree(cls.test_dir)
os.makedirs(cls.test_dir)

# Log the setup process
cls.logger = logging.getLogger('TestFileProtocolWithConnectiva')
cls.logger.info("Test setup complete. Test directory created.")

@classmethod
def tearDownClass(cls):
# Clean up the test directory after all tests
if os.path.exists(cls.test_dir):
shutil.rmtree(cls.test_dir)
cls.logger.info("Test teardown complete. Test directory removed.")

def test_send_message(self):
from uuid import uuid4
from typing import Dict, Any
from connectiva import CommunicationMethod, Message

class FileProtocol(CommunicationMethod):
"""
File sharing communication class with atomic file naming and processing.
"""

def __init__(self, **kwargs):
self.directory = kwargs.get("directory", ".")
self.prefix = kwargs.get("prefix", "msg_")
self.processed_prefix = kwargs.get("processed_", "processed_")
self.logger = logging.getLogger(self.__class__.__name__)

# Ensure the directory exists
if not os.path.exists(self.directory):
os.makedirs(self.directory)
self.logger.debug(f"Directory {self.directory} created.")

def connect(self):
self.logger.info(f"Accessing directory at {self.directory}...")

def _generate_filename(self) -> str:
"""
Test sending a message and ensure the file is created correctly.
Generate a unique filename for each message.
"""
message = Message(action="send", data={"key": "value"})
result = self.connectiva.send(message)
self.assertIn("status", result)
self.assertEqual(result["status"], "file_written")

# Check if a file was created
files = [f for f in os.listdir(self.test_dir) if f.startswith(self.connectiva.config.get('prefix'))]
self.assertTrue(len(files) > 0, "No files created by send method.")
return f"{self.prefix}{uuid4().hex}.json"

def test_receive_message(self):
def _lock_file(self, file):
"""
Test receiving a message and ensure the content matches the sent data.
Lock the file to ensure exclusive access.
"""
# Create a test message file
message_data = {"key": "value"}
message = Message(action="send", data=message_data)
self.connectiva.send(message)

received_message = self.connectiva.receive()
self.assertEqual(received_message.data, message_data, "Received data should match sent message.")

def test_receive_no_message(self):
"""
Test receiving when no message is available and ensure proper error handling.
self.logger.debug(f"Locking file {file.name}...")
try:
fcntl.flock(file, fcntl.LOCK_EX)
self.logger.debug(f"File {file.name} locked successfully!")
except Exception as e:
self.logger.error(f"Failed to lock file: {e}")
raise

def _unlock_file(self, file):
"""
# Ensure directory is empty before the test
files = os.listdir(self.test_dir)
for f in files:
os.remove(os.path.join(self.test_dir, f))

received_message = self.connectiva.receive()
self.assertEqual(received_message.action, "error", "Action should be 'error' when no message is found.")

def test_concurrent_access(self):
Unlock the file to release access.
"""
Test concurrent access to ensure that multiple receivers can process messages
without conflicts. This test will simulate concurrent access by using multiple threads.
self.logger.debug(f"Unlocking file {file.name}...")
try:
fcntl.flock(file, fcntl.LOCK_UN)
self.logger.debug(f"File {file.name} unlocked successfully!")
except Exception as e:
self.logger.error(f"Failed to unlock file: {e}")
raise

def send(self, message: Message) -> Dict[str, Any]:
"""
def receiver(connectiva, results, index):
message = connectiva.receive()
results[index] = message.data

# Create multiple message files
for _ in range(5):
self.connectiva.send(Message(action="send", data={"content": "Hello!"}))
Write a message to a uniquely named file.
results = [None] * 5
threads = []

# Start multiple receiver threads
for i in range(5):
thread = threading.Thread(target=receiver, args=(self.connectiva, results, i))
thread.start()
threads.append(thread)

# Wait for all threads to finish
for thread in threads:
thread.join()

# Verify that each message was processed exactly once
expected_results = [{"content": "Hello!"}] * 5
self.assertEqual(sorted(results), sorted(expected_results), "Each receiver should get the correct message content.")

def test_locking_mechanism(self):
:param message: Message object containing data to be written.
:return: Dictionary indicating the status of the file operation.
"""
Test the file locking mechanism by attempting concurrent writes and reads,
ensuring that the lock is acquired and released properly.
filename = self._generate_filename()
file_path = os.path.join(self.directory, filename)
self.logger.info(f"Writing message to file {file_path}...")

try:
with open(file_path, 'w') as file:
self._lock_file(file)
json.dump(message.__dict__, file)
self._unlock_file(file)
self.logger.info("Message written successfully!")
return {"status": "file_written", "file_path": file_path}
except Exception as e:
self.logger.error(f"Failed to write message: {e}")
return {"error": str(e)}

def receive(self) -> Message:
"""
lock_acquired = threading.Event()

def write_with_lock(connectiva, message_data):
nonlocal lock_acquired
message = Message(action="send", data=message_data)
connectiva.send(message)
lock_acquired.set() # Indicate that the lock was acquired

def read_with_lock(connectiva, result):
nonlocal lock_acquired
# Wait until the lock is acquired by the write operation
lock_acquired.wait()
time.sleep(0.5) # Wait for a bit to ensure the write lock is held
result.append(connectiva.receive().data)

write_thread = threading.Thread(target=write_with_lock, args=(self.connectiva, {"content": "Test Lock"}))
result = []
read_thread = threading.Thread(target=read_with_lock, args=(self.connectiva, result))

write_thread.start()
read_thread.start()

write_thread.join()
read_thread.join()

# Check if the message was read successfully
self.assertEqual(result[0], {"content": "Test Lock"}, "Locking mechanism failed; message not read correctly.")
Read and process the oldest unprocessed message file.
:return: Message object containing data read from the file.
"""
self.logger.info(f"Scanning directory {self.directory} for messages...")
files = sorted(
[f for f in os.listdir(self.directory) if f.startswith(self.prefix)],
key=lambda f: os.path.getctime(os.path.join(self.directory, f))
)

if __name__ == '__main__':
unittest.main()
if not files:
self.logger.info("No new messages found.")
return Message(action="error", data={}, metadata={"error": "No message found"})

for filename in files:
file_path = os.path.join(self.directory, filename)
try:
# Lock the file and rename it to indicate processing
with open(file_path, 'r+') as file:
self._lock_file(file)

# Check if the file has already been processed
if file_path.startswith(self.processed_prefix):
self._unlock_file(file)
continue

new_file_path = os.path.join(self.directory, self.processed_prefix + filename)
os.rename(file_path, new_file_path)
self.logger.info(f"Renamed file to {new_file_path} for processing.")

# Read the message
file.seek(0) # Reset file pointer to the beginning
data = json.load(file)
self._unlock_file(file)
self.logger.info("Message read successfully!")
return Message(**data)
except Exception as e:
self.logger.error(f"Failed to read message: {e}")
return Message(action="error", data={}, metadata={"error": str(e)})

return Message(action="error", data={}, metadata={"error": "No message found"})

def disconnect(self):
self.logger.info("Closing directory access...")
67 changes: 32 additions & 35 deletions tests/test_file_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,29 @@
import shutil
import logging
import threading
import time
from connectiva.connectiva import Connectiva
from connectiva.message import Message

# Configure logging
logging.basicConfig(level=logging.DEBUG)


class TestFileProtocolWithConnectiva(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.test_dir = "test_messages"
cls.connectiva = Connectiva(
log=True,
directory=cls.test_dir,
protocol="File",
prefix="msg_",
processed_prefix="processed_"
)
cls.endpoint = f"file://{os.path.abspath(cls.test_dir)}" # Use file:// scheme

# Ensure the directory is clean before starting
if os.path.exists(cls.test_dir):
shutil.rmtree(cls.test_dir)
os.makedirs(cls.test_dir)

cls.connectiva = Connectiva(
log=True,
endpoint=cls.endpoint,
protocol="File", # Specify protocol directly
prefix="msg_",
processed_prefix="processed_"
)

# Log the setup process
cls.logger = logging.getLogger('TestFileProtocolWithConnectiva')
cls.logger.info("Test setup complete. Test directory created.")
Expand All @@ -51,7 +49,7 @@ def test_send_message(self):
self.assertEqual(result["status"], "file_written")

# Check if a file was created
files = [f for f in os.listdir(self.test_dir) if f.startswith(self.connectiva.config.get('prefix'))]
files = [f for f in os.listdir(self.test_dir) if f.startswith("msg_")]
self.assertTrue(len(files) > 0, "No files created by send method.")

def test_receive_message(self):
Expand Down Expand Up @@ -110,37 +108,36 @@ def receiver(connectiva, results, index):

def test_locking_mechanism(self):
"""
Test the file locking mechanism by attempting concurrent writes and reads,
ensuring that the lock is acquired and released properly.
Test the file locking mechanism by attempting concurrent writes and reads
"""
lock_acquired = threading.Event()

def write_with_lock(connectiva, message_data):
nonlocal lock_acquired
def write_message(connectiva, message_data):
message = Message(action="send", data=message_data)
connectiva.send(message)
lock_acquired.set() # Indicate that the lock was acquired

def read_with_lock(connectiva, result):
nonlocal lock_acquired
# Wait until the lock is acquired by the write operation
lock_acquired.wait()
time.sleep(0.5) # Wait for a bit to ensure the write lock is held
result.append(connectiva.receive().data)
# Write a message to the file
message_data_1 = {"content": "Test Lock"}
message_data_2 = {"content": "Hello!"}

thread_1 = threading.Thread(target=write_message, args=(self.connectiva, message_data_1))
thread_2 = threading.Thread(target=write_message, args=(self.connectiva, message_data_2))

write_thread = threading.Thread(target=write_with_lock, args=(self.connectiva, {"content": "Test Lock"}))
result = []
read_thread = threading.Thread(target=read_with_lock, args=(self.connectiva, result))
thread_1.start()
thread_2.start()

write_thread.start()
read_thread.start()
thread_1.join()
thread_2.join()

write_thread.join()
read_thread.join()
# Ensure both messages are read correctly
received_messages = []
while True:
received_message = self.connectiva.receive()
if received_message.action == "error":
break
received_messages.append(received_message.data)

# Check if the message was read successfully
self.assertEqual(result[0], {"content": "Test Lock"}, "Locking mechanism failed; message not read correctly.")
expected_results = [message_data_1, message_data_2]
self.assertEqual(sorted(received_messages), sorted(expected_results), "Locking mechanism failed; message not read correctly.")


if __name__ == '__main__':
if __name__ == "__main__":
unittest.main()

0 comments on commit 201204a

Please sign in to comment.