Skip to content

Commit

Permalink
Merge pull request #4 from tavallaie/local file
Browse files Browse the repository at this point in the history
local file test added
  • Loading branch information
tavallaie authored Aug 4, 2024
2 parents 1e72f25 + 4dd5423 commit 2c0e638
Show file tree
Hide file tree
Showing 4 changed files with 307 additions and 23 deletions.
45 changes: 45 additions & 0 deletions .github/workflows/test-file.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: File Protocol Tests

on:
push:
branches:
- main
paths:
- 'connectiva/protocols/file_protocol.py'
- 'tests/test_file_protocol.py'
- 'pyproject.toml'
pull_request:
branches:
- main
paths:
- 'connectiva/protocols/file_protocol.py'
- 'tests/test_file_protocol.py'
- 'pyproject.toml'

jobs:
test:
runs-on: ubuntu-latest

strategy:
matrix:
python-version: ['3.8', '3.9', '3.10', '3.11','3.12']

steps:
- name: Check out the code
uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install poetry
poetry install
- name: Run tests
run: |
echo "Running tests on Python ${{ matrix.python-version }}..."
poetry run python -m unittest discover -s tests -p 'test_file_protocol.py'
2 changes: 2 additions & 0 deletions connectiva/protocol_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class ProtocolDetector:
"ws://": "WebSocket",
"wss://": "WebSocket",
"graphql://": "GraphQL",
"file://": "File",
}

@staticmethod
Expand All @@ -45,3 +46,4 @@ def detect_protocol(endpoint: str) -> str:

# Default fallback
return "REST"

131 changes: 108 additions & 23 deletions connectiva/protocols/file_protocol.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,129 @@
import os
import json
import fcntl
import logging
from uuid import uuid4
from typing import Dict, Any
from ..interfaces import CommunicationMethod
from ..message import Message
from connectiva import CommunicationMethod, Message


class FileProtocol(CommunicationMethod):
"""
File sharing communication class.
File sharing communication class with atomic file naming and processing.
"""

def __init__(self, **kwargs):
self.file_path = kwargs.get("file_path")
self.directory = kwargs.get("directory", ".")
self.prefix = kwargs.get("prefix", "msg_")
self.processed_prefix = kwargs.get("processed_prefix", "processed_") # Fixed parameter name
self.logger = logging.getLogger(self.__class__.__name__)

# Ensure the directory exists
if not os.path.exists(self.directory):
os.makedirs(self.directory)
self.logger.debug(f"Directory {self.directory} created.")

def connect(self):
print(f"Accessing file at {self.file_path}...")
self.logger.info(f"Accessing directory at {self.directory}...")

def _generate_filename(self) -> str:
"""
Generate a unique filename for each message.
"""
return f"{self.prefix}{uuid4().hex}.json"

def _lock_file(self, file):
"""
Lock the file to ensure exclusive access.
"""
self.logger.debug(f"Locking file {file.name}...")
try:
fcntl.flock(file, fcntl.LOCK_EX)
self.logger.debug(f"File {file.name} locked successfully!")
except Exception as e:
self.logger.error(f"Failed to lock file: {e}")
raise

def _unlock_file(self, file):
"""
Unlock the file to release access.
"""
self.logger.debug(f"Unlocking file {file.name}...")
try:
fcntl.flock(file, fcntl.LOCK_UN)
self.logger.debug(f"File {file.name} unlocked successfully!")
except Exception as e:
self.logger.error(f"Failed to unlock file: {e}")
raise

def send(self, message: Message) -> Dict[str, Any]:
print(f"Writing message to file {self.file_path}...")
"""
Write a message to a uniquely named file.
:param message: Message object containing data to be written.
:return: Dictionary indicating the status of the file operation.
"""
filename = self._generate_filename()
file_path = os.path.join(self.directory, filename)
self.logger.info(f"Writing message to file {file_path}...")

try:
with open(self.file_path, 'w') as file:
with open(file_path, 'w') as file:
self._lock_file(file)
json.dump(message.__dict__, file)
print("Message written successfully!")
return {"status": "file_written"}
self._unlock_file(file)
self.logger.info("Message written successfully!")
return {"status": "file_written", "file_path": file_path}
except Exception as e:
print(f"Failed to write message: {e}")
self.logger.error(f"Failed to write message: {e}")
return {"error": str(e)}

def receive(self) -> Message:
print(f"Reading message from file {self.file_path}...")
try:
if os.path.exists(self.file_path):
with open(self.file_path, 'r') as file:
print("Message read successfully!")
return Message(action="receive", data=json.load(file))
else:
print("File does not exist.")
return Message(action="error", data={}, metadata={"error": "File not found"})
except Exception as e:
print(f"Failed to read message: {e}")
return Message(action="error", data={}, metadata={"error": str(e)})
"""
Read and process the oldest unprocessed message file.
:return: Message object containing data read from the file.
"""
self.logger.info(f"Scanning directory {self.directory} for messages...")
files = sorted(
[f for f in os.listdir(self.directory) if f.startswith(self.prefix)],
key=lambda f: os.path.getctime(os.path.join(self.directory, f))
)

if not files:
self.logger.info("No new messages found.")
return Message(action="error", data={}, metadata={"error": "No message found"})

for filename in files:
file_path = os.path.join(self.directory, filename)
new_file_path = os.path.join(self.directory, self.processed_prefix + filename)

try:
# Lock the file and rename it to indicate processing
with open(file_path, 'r+') as file:
self._lock_file(file)

# Check if the file has already been processed
if filename.startswith(self.processed_prefix):
self._unlock_file(file)
continue

os.rename(file_path, new_file_path)
self.logger.info(f"Renamed file to {new_file_path} for processing.")

# Read the message
file.seek(0) # Reset file pointer to the beginning
data = json.load(file)
self._unlock_file(file)
self.logger.info("Message read successfully!")
return Message(**data)

except Exception as e:
self.logger.error(f"Failed to read message: {e}")
return Message(action="error", data={}, metadata={"error": str(e)})

return Message(action="error", data={}, metadata={"error": "No message found"})

def disconnect(self):
print("Closing file access...")
self.logger.info("Closing directory access...")

152 changes: 152 additions & 0 deletions tests/test_file_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# tests/test_file_protocol.py

import unittest
import os
import shutil
import logging
import threading
from connectiva.connectiva import Connectiva
from connectiva.message import Message


class TestFileProtocolWithConnectiva(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.test_dir = "test_messages"
cls.endpoint = f"file://{os.path.abspath(cls.test_dir)}" # Use file:// scheme

# Ensure the directory is clean before starting
if os.path.exists(cls.test_dir):
shutil.rmtree(cls.test_dir)
os.makedirs(cls.test_dir)

cls.connectiva = Connectiva(
log=True,
log_level="DEBUG",
endpoint=cls.endpoint,
protocol="File", # Specify protocol directly
directory=cls.test_dir, # Ensure directory is passed correctly
prefix="msg_",
processed_prefix="processed_"
)

# Log the setup process
cls.logger = logging.getLogger('TestFileProtocolWithConnectiva')
cls.logger.info("Test setup complete. Test directory created.")

@classmethod
def tearDownClass(cls):
# Clean up the test directory after all tests
if os.path.exists(cls.test_dir):
shutil.rmtree(cls.test_dir)
cls.logger.info("Test teardown complete. Test directory removed.")

def test_send_message(self):
"""
Test sending a message and ensure the file is created correctly.
"""
message = Message(action="send", data={"key": "value"})
result = self.connectiva.send(message)
self.assertIn("status", result)
self.assertEqual(result["status"], "file_written")

# Check if a file was created
files = [f for f in os.listdir(self.test_dir) if f.startswith("msg_")]
self.assertTrue(len(files) > 0, "No files created by send method.")

def test_receive_message(self):
"""
Test receiving a message and ensure the content matches the sent data.
"""
# Create a test message file
message_data = {"key": "value"}
message = Message(action="send", data=message_data)
self.connectiva.send(message)

received_message = self.connectiva.receive()
self.assertEqual(received_message.data, message_data, "Received data should match sent message.")

def test_receive_no_message(self):
"""
Test receiving when no message is available and ensure proper error handling.
"""
# Ensure directory is empty before the test
files = os.listdir(self.test_dir)
for f in files:
os.remove(os.path.join(self.test_dir, f))

received_message = self.connectiva.receive()
self.assertEqual(received_message.action, "error", "Action should be 'error' when no message is found.")

def test_concurrent_access(self):
"""
Test concurrent access to ensure that multiple receivers can process messages
without conflicts. This test will simulate concurrent access by using multiple threads.
"""
def receiver(connectiva, results, index):
message = connectiva.receive()
if message.action != "error": # Only store successful results
results[index] = message.data

# Create multiple message files
for _ in range(5):
self.connectiva.send(Message(action="send", data={"content": "Hello!"}))

results = [None] * 5
threads = []

# Start multiple receiver threads
for i in range(5):
thread = threading.Thread(target=receiver, args=(self.connectiva, results, i))
thread.start()
threads.append(thread)

# Wait for all threads to finish
for thread in threads:
thread.join()

# Filter out None results
filtered_results = [result for result in results if result is not None]

# Verify that each message was processed exactly once
expected_results = [{"content": "Hello!"}] * len(filtered_results)
self.assertEqual(filtered_results, expected_results, "Each receiver should get the correct message content.")

def test_locking_mechanism(self):
"""
Test the file locking mechanism by attempting concurrent writes and reads
"""
def write_message(connectiva, message_data):
message = Message(action="send", data=message_data)
connectiva.send(message)

# Write a message to the file
message_data_1 = {"content": "Test Lock"}
message_data_2 = {"content": "Hello!"}

thread_1 = threading.Thread(target=write_message, args=(self.connectiva, message_data_1))
thread_2 = threading.Thread(target=write_message, args=(self.connectiva, message_data_2))

thread_1.start()
thread_2.start()

thread_1.join()
thread_2.join()

# Ensure both messages are read correctly
received_messages = []
while True:
received_message = self.connectiva.receive()
if received_message.action == "error":
break
received_messages.append(received_message.data)

# Verify that both messages are in the received list
expected_results = [message_data_1, message_data_2]
for expected in expected_results:
self.assertIn(expected, received_messages, "Locking mechanism failed; message not read correctly.")


if __name__ == "__main__":
unittest.main()

0 comments on commit 2c0e638

Please sign in to comment.