Skip to content

Commit

Permalink
Enhance error handling in DemandBasedReplication and update tests
Browse files Browse the repository at this point in the history
- Updated adapt_based_on_demand in DemandBasedReplication to wrap replication failures in a ValueError, ensuring consistent error signaling.
- Enhanced error logging to include specific segment details for easier debugging.
- Refactored test_replication_failure_handling to validate ValueError raising on replication failure.
- Confirmed successful test runs for all demand-based replication conditions, including demand scaling and error handling.
  • Loading branch information
sergism77 committed Nov 7, 2024
1 parent 8438654 commit 3a6231a
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 13 deletions.
35 changes: 22 additions & 13 deletions src/dot_seigr/replication_demand.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,35 @@ def adapt_based_on_demand(self, segment: SegmentMetadata, access_count: int, dem
bool: True if replication was triggered, False otherwise.
Raises:
ValueError: If replication request fails.
ValueError: If replication request fails due to an error in the replication process or if replication
could not be completed successfully.
"""
if access_count < demand_threshold:
logger.info(f"Segment {segment.segment_hash} access below threshold ({access_count}/{demand_threshold}). No replication needed.")
return False

# Calculate the required replication count based on demand
new_replication_count = self.calculate_demand_scale(access_count, min_replication)
logger.info(f"Demand-based replication adjustment for segment {segment.segment_hash}. "
f"New replication count: {new_replication_count}")

# Attempt replication
# Attempt replication, handling potential exceptions and errors in the process
try:
# Request replication through the replication manager
success = self.replication_manager.replicate_segment(segment.segment_hash, new_replication_count)

# If replication is successful, log and return True
if success:
logger.info(f"Demand-based replication completed for segment {segment.segment_hash} with replication count: {new_replication_count}")
return True
else:
# Raise a ValueError if replication did not succeed, with detailed information
raise ValueError(f"Replication failed for segment {segment.segment_hash}. Requested count: {new_replication_count}")

except Exception as e:
# Log the error and wrap it in a ValueError to signal replication failure to calling functions
logger.error(f"Error during demand-based replication for segment {segment.segment_hash}: {e}")
raise
raise ValueError(f"Replication failed for segment {segment.segment_hash}") from e # Raise with context

def calculate_demand_scale(self, access_count: int, min_replication: int) -> int:
"""
Expand Down Expand Up @@ -90,14 +97,16 @@ def monitor_and_replicate_by_demand(self, segments_status: dict, demand_threshol
min_replication (int): Minimum replication level required for all segments.
"""
for segment_hash, status in segments_status.items():
access_count = status.get("access_count", 0)
segment_metadata = status.get("segment_metadata")

if not segment_metadata:
logger.warning(f"Missing metadata for segment {segment_hash}. Skipping demand-based replication check.")
continue
access_count = status.get("access_count", 0)
segment_metadata = status.get("segment_metadata")
if not segment_metadata:
logger.warning(f"Missing metadata for segment {segment_hash}. Skipping demand-based replication check.")
continue

try:
self.adapt_based_on_demand(segment_metadata, access_count, demand_threshold, min_replication)
except Exception as e:
logger.error(f"Demand-based replication failed for segment {segment_hash}: {e}")
# Only call adapt_based_on_demand if access count exceeds demand_threshold
if access_count >= demand_threshold:
try:
self.adapt_based_on_demand(segment_metadata, access_count, demand_threshold, min_replication)
except Exception as e:
logger.error(f"Demand-based replication failed for segment {segment_hash}: {e}")
104 changes: 104 additions & 0 deletions src/tests/test_replication_demand.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import unittest
from unittest.mock import MagicMock, patch
from src.dot_seigr.replication_demand import DemandBasedReplication
from src.dot_seigr.replication_manager import ReplicationManager
from src.dot_seigr.seigr_protocol.seed_dot_seigr_pb2 import SegmentMetadata

class TestDemandBasedReplication(unittest.TestCase):

def setUp(self):
# Mock ReplicationManager
self.replication_manager = MagicMock(spec=ReplicationManager)
self.demand_replicator = DemandBasedReplication(self.replication_manager)

# Default test parameters
self.demand_threshold = 10
self.min_replication = 2
self.segment_metadata = SegmentMetadata(segment_hash="test_segment_hash")

def test_no_replication_below_threshold(self):
# Verify no replication occurs if access count is below threshold
access_count = 5 # Below demand threshold
result = self.demand_replicator.adapt_based_on_demand(
self.segment_metadata, access_count, self.demand_threshold, self.min_replication
)

# Assert that no replication was triggered
self.assertFalse(result)
self.replication_manager.replicate_segment.assert_not_called()

def test_replication_triggered_above_threshold(self):
# Test that replication is triggered if access count exceeds threshold
access_count = 15 # Above demand threshold
self.replication_manager.replicate_segment.return_value = True

result = self.demand_replicator.adapt_based_on_demand(
self.segment_metadata, access_count, self.demand_threshold, self.min_replication
)

# Assert that replication was triggered
self.assertTrue(result)
self.replication_manager.replicate_segment.assert_called_once()

def test_calculate_demand_scale_low_demand(self):
# Test scaling function for low demand
access_count = 15
expected_replication_count = 3 # Corresponds to low demand level
replication_count = self.demand_replicator.calculate_demand_scale(access_count, self.min_replication)

self.assertEqual(replication_count, expected_replication_count)

def test_calculate_demand_scale_high_demand(self):
# Test scaling function for high demand
access_count = 1500
expected_replication_count = 12 # High demand level
replication_count = self.demand_replicator.calculate_demand_scale(access_count, self.min_replication)

self.assertEqual(replication_count, expected_replication_count)

def test_replication_failure_handling(self):
# Test that a ValueError is raised if replication fails
access_count = 20 # Above threshold
self.replication_manager.replicate_segment.side_effect = Exception("Replication failed")

# Expect adapt_based_on_demand to raise a ValueError when replicate_segment fails
with self.assertRaises(ValueError) as context:
self.demand_replicator.adapt_based_on_demand(
self.segment_metadata, access_count, self.demand_threshold, self.min_replication
)

# Verify that the exception message includes the segment hash and failure message
self.assertIn("Replication failed for segment", str(context.exception))
self.replication_manager.replicate_segment.assert_called_once_with(
self.segment_metadata.segment_hash, 3 # Expected replication count for low demand
)

def test_monitor_and_replicate_by_demand(self):
# Test monitor_and_replicate_by_demand with multiple segments
segments_status = {
"segment_1": {
"access_count": 5, # Below threshold
"segment_metadata": SegmentMetadata(segment_hash="hash1")
},
"segment_2": {
"access_count": 20, # Above threshold
"segment_metadata": SegmentMetadata(segment_hash="hash2")
}
}

self.replication_manager.replicate_segment.return_value = True

# Run the monitoring method
with patch.object(self.demand_replicator, 'adapt_based_on_demand', wraps=self.demand_replicator.adapt_based_on_demand) as mock_adapt:
self.demand_replicator.monitor_and_replicate_by_demand(segments_status, self.demand_threshold, self.min_replication)

# Verify that adapt_based_on_demand was called only for the segment above threshold
mock_adapt.assert_called_once_with(
segments_status["segment_2"]["segment_metadata"],
segments_status["segment_2"]["access_count"],
self.demand_threshold,
self.min_replication
)

if __name__ == '__main__':
unittest.main()

0 comments on commit 3a6231a

Please sign in to comment.