Skip to content

Commit

Permalink
feat: Enhance segment integrity, senary encoding, CBOR support, and a…
Browse files Browse the repository at this point in the history
…dd robust testing

encoding_utils.py Enhancements:

Refined encode_to_senary and decode_from_senary for improved modularity and debuggability.
Added cbor_encode_senary and cbor_decode_senary for .seigr-specific CBOR support.
Updated utility functions to handle both dictionaries and lists for seamless encoding.
hash_utils.py Improvements:

Extended hypha_hash with senary-based hashing, enhancing .seigr ecosystem compatibility.
Integrated CBOR encoding for hashed outputs, improving data integrity checks.
hypha_crypt.py Additions:

Expanded HyphaCrypt to support both senary and hexadecimal hashing.
Introduced options for exporting logs in CBOR and JSON formats for greater flexibility.
Enhanced integrity verification processes with support for partial-depth checks.
Added extensive logging for detailed traceability in encryption and hashing.
immune_system.py Development:

Built immune_ping to handle multi-layered segment integrity verification using verify_segment_integrity.
Implemented rollback_segment to enable secure rollbacks with rollback availability verification.
Integrated adaptive monitoring and threat response mechanisms for high-risk segments.
Comprehensive logging added for threat logging, integrity verification, and rollback attempts.
rollback.py Refinements:

Created verify_rollback_availability to check rollback feasibility.
Developed revert_segment_data and helpers to restore metadata, links, and coordinate index from prior states.
Detailed logging added to support debugging during rollback operations.
integrity.py Updates:

Enhanced verify_segment_integrity with senary hash validation.
Improved verify_full_lineage_integrity and verify_file_metadata_integrity for lineage and file metadata continuity.
Testing Improvements:

Built test_immune_system.py to validate integrity, rollback, threat responses, and adaptive replication.
Mocked SeigrFile, SegmentMetadata, and ReplicationController for thorough, isolated unit testing.
Verified CBOR compatibility, senary encoding, and adaptive replication triggers.
Tested encryption, decryption, and full/partial verification across modules.
Ensured module stability across all functionalities, achieving full test pass.
All functionality is now fully tested and stable, with clear logging for efficient debugging and improved traceability across the .seigr ecosystem.
  • Loading branch information
sergism77 committed Nov 6, 2024
1 parent 142d82d commit 8a99744
Show file tree
Hide file tree
Showing 28 changed files with 1,151 additions and 523 deletions.
7 changes: 5 additions & 2 deletions project_tree.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@
│   │   ├── immune_system.py
│   │   ├── integrity.py
│   │   ├── lineage.py
│   │   ├── replication.py
│   │   ├── replication_controller.py
│   │   ├── replication_demand.py
│   │   ├── replication_manager.py
│   │   ├── replication_self_heal.py
│   │   ├── replication_threat.py
│   │   ├── rollback.py
│   │   ├── seed_dot_seigr.py
│   │   ├── seigr_constants.py
Expand Down Expand Up @@ -83,4 +86,4 @@
│   └── home.html
└── uploads

21 directories, 63 files
21 directories, 66 files
117 changes: 71 additions & 46 deletions src/dot_seigr/dot_seigr.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,56 +46,81 @@ def create_segmented_seigr_files(self, directory: str, seed: SeedDotSeigrProto)
os.makedirs(directory, exist_ok=True)

for part_index in range(total_parts):
# Extract segment data and initialize encryption
start = part_index * segment_size
end = start + segment_size
segment_data = self.data[start:end]

# Initialize HyphaCrypt for segment cryptographic handling
hypha_crypt = HyphaCrypt(data=segment_data, segment_id=f"{self.creator_id}_{part_index}")
primary_hash = hypha_crypt.compute_primary_hash()

# Create SeigrFile instance
seigr_file = SeigrFile(
data=segment_data,
creator_id=self.creator_id,
index=part_index,
file_type=self.file_type
)

# Set up primary and secondary links
if last_primary_hash:
self.link_manager.set_primary_link(last_primary_hash)
seigr_file.set_links(
primary_link=self.link_manager.primary_link,
secondary_links=self.link_manager.secondary_links
)

# Add a temporal layer for the current state of the segment
seigr_file.add_temporal_layer()

# Save the .seigr segment as a Protobuf file
file_path = seigr_file.save_to_disk(directory)
logger.info(f"Saved .seigr file part {part_index + 1}/{total_parts} at {file_path}")

# Generate a secondary link for adaptive retrieval paths
secondary_link = hypha_crypt.compute_layered_hashes()
self.link_manager.add_secondary_link(secondary_link)

# Update last primary hash for linking the next segment
last_primary_hash = primary_hash

# Add the saved file path to the SeedDotSeigrProto instance
seed_file_metadata = seed.segments.add()
seed_file_metadata.segment_hash = primary_hash
seed_file_metadata.timestamp = datetime.now(timezone.utc).isoformat()

# Log hash tree and link for traceability
logger.debug(f"Hash tree for segment {part_index} and secondary links added.")
# Create and save each segment as a .seigr file
try:
primary_hash, file_path, secondary_link = self._create_and_save_segment(
directory, part_index, segment_size, last_primary_hash
)

# Update last primary hash for linking the next segment
last_primary_hash = primary_hash

# Add the saved file path to the SeedDotSeigrProto instance
seed_file_metadata = seed.segments.add()
seed_file_metadata.segment_hash = primary_hash
seed_file_metadata.timestamp = datetime.now(timezone.utc).isoformat()

# Log hash tree and link for traceability
logger.debug(f"Hash tree for segment {part_index} and secondary links added.")

except Exception as e:
logger.error(f"Failed to create and save segment {part_index}: {e}")
raise

logger.info("All segments created and saved successfully.")
return seed

def _create_and_save_segment(self, directory: str, part_index: int, segment_size: int, last_primary_hash: str):
"""
Creates and saves a single .seigr file segment.
Args:
directory (str): Directory to save the .seigr file.
part_index (int): The segment index.
segment_size (int): Size of each segment.
last_primary_hash (str): Hash of the previous segment for linking.
Returns:
tuple: Primary hash, file path, and secondary link for the segment.
"""
# Extract segment data and initialize encryption
start = part_index * segment_size
end = start + segment_size
segment_data = self.data[start:end]

# Initialize HyphaCrypt for segment cryptographic handling
hypha_crypt = HyphaCrypt(data=segment_data, segment_id=f"{self.creator_id}_{part_index}")
primary_hash = hypha_crypt.compute_primary_hash()

# Create SeigrFile instance
seigr_file = SeigrFile(
data=segment_data,
creator_id=self.creator_id,
index=part_index,
file_type=self.file_type
)

# Set up primary and secondary links
if last_primary_hash:
self.link_manager.set_primary_link(last_primary_hash)
seigr_file.set_links(
primary_link=self.link_manager.primary_link,
secondary_links=self.link_manager.secondary_links
)

# Add a temporal layer for the current state of the segment
seigr_file.add_temporal_layer()

# Save the .seigr segment as a Protobuf file
file_path = seigr_file.save_to_disk(directory)
logger.info(f"Saved .seigr file part {part_index + 1} at {file_path}")

# Generate a secondary link for adaptive retrieval paths
secondary_link = hypha_crypt.compute_layered_hashes()
self.link_manager.add_secondary_link(secondary_link)

return primary_hash, file_path, secondary_link

def save_seed_to_disk(self, seed: SeedDotSeigrProto, base_dir: str) -> str:
"""
Saves the seed cluster as a protobuf binary file.
Expand Down
120 changes: 56 additions & 64 deletions src/dot_seigr/immune_system.py
Original file line number Diff line number Diff line change
@@ -1,70 +1,61 @@
import logging
from datetime import datetime, timezone
from src.dot_seigr.integrity import verify_segment_integrity
from src.dot_seigr.replication import trigger_security_replication, adaptive_replication
from src.dot_seigr.rollback import rollback_to_previous_state
from src.dot_seigr.seigr_protocol.seed_dot_seigr_pb2 import SegmentMetadata, FileMetadata
from src.dot_seigr.replication_controller import ReplicationController
from src.dot_seigr.rollback import rollback_to_previous_state, verify_rollback_availability
from src.dot_seigr.seigr_protocol.seed_dot_seigr_pb2 import SegmentMetadata
from src.dot_seigr.seigr_file import SeigrFile

logger = logging.getLogger(__name__)

class ImmuneSystem:
def __init__(self, monitored_segments, replication_threshold=3, adaptive_threshold=5, max_threat_log_size=1000):
def __init__(self, monitored_segments, replication_controller, replication_threshold=3, adaptive_threshold=5, max_threat_log_size=1000):
"""
Initializes the immune system with monitored segments, thresholds, and logging limits.
Args:
monitored_segments (dict): Dictionary of SegmentMetadata protobufs.
replication_threshold (int): Threshold for initiating basic replication.
adaptive_threshold (int): Threshold for initiating adaptive replication for high-risk segments.
max_threat_log_size (int): Maximum number of threat entries to retain for efficiency.
Initializes the Immune System with monitored segments, thresholds, and logging limits.
"""
self.monitored_segments = monitored_segments # Example: {segment_hash: SegmentMetadata}
self.monitored_segments = monitored_segments
self.replication_controller = replication_controller
self.threat_log = []
self.replication_threshold = replication_threshold
self.adaptive_threshold = adaptive_threshold
self.max_threat_log_size = max_threat_log_size

def immune_ping(self, segment_metadata: SegmentMetadata) -> bool:
def immune_ping(self, segment_metadata: SegmentMetadata, data: bytes) -> bool:
"""
Sends an integrity ping, performing multi-layered hash verification on a segment.
Args:
segment_metadata (SegmentMetadata): Protobuf segment metadata to check.
Returns:
bool: True if integrity check passes, False if failed.
"""
segment_hash = segment_metadata.segment_hash
is_valid = verify_segment_integrity(segment_metadata)

logger.debug(f"Starting immune_ping on segment {segment_hash} with provided data.")

is_valid = verify_segment_integrity(segment_metadata, data)
logger.debug(f"Integrity check for segment {segment_hash} returned: {is_valid}")

if not is_valid:
logger.warning(f"Integrity check failed for segment {segment_hash}. Recording threat.")
self.record_threat(segment_hash)
self.handle_threat_response(segment_hash)

return is_valid

def monitor_integrity(self):
"""
Continuously monitors the integrity of all segments in `monitored_segments`.
"""
for segment_metadata in self.monitored_segments.values():
self.immune_ping(segment_metadata)
data = b"" # Placeholder; in practice, retrieve or mock the actual data.
self.immune_ping(segment_metadata, data)

def record_threat(self, segment_hash: str):
"""
Records a threat instance and manages threat log size.
Args:
segment_hash (str): Hash of the segment that failed integrity.
"""
threat_entry = {
"segment_hash": segment_hash,
"timestamp": datetime.now(timezone.utc).isoformat()
}
self.threat_log.append(threat_entry)

# Enforce log size limit for performance
# Enforce log size limit
if len(self.threat_log) > self.max_threat_log_size:
self.threat_log.pop(0)

Expand All @@ -73,14 +64,8 @@ def record_threat(self, segment_hash: str):
def detect_high_risk_segments(self):
"""
Analyzes the threat log to identify high-risk segments with multiple failures.
Returns:
list: Segments flagged for high-risk adaptive replication.
"""
threat_counts = {}
for entry in self.threat_log:
threat_counts[entry["segment_hash"]] = threat_counts.get(entry["segment_hash"], 0) + 1

threat_counts = self._get_threat_counts()
high_risk_segments = [
seg for seg, count in threat_counts.items() if count >= self.adaptive_threshold
]
Expand All @@ -90,68 +75,75 @@ def detect_high_risk_segments(self):
def handle_threat_response(self, segment_hash: str):
"""
Manages responses to a detected threat, initiating replication or rollback as appropriate.
Args:
segment_hash (str): Hash of the segment that failed integrity.
"""
high_risk_segments = self.detect_high_risk_segments()

if segment_hash in high_risk_segments:
logger.warning(f"High-risk segment {segment_hash} detected; initiating adaptive replication.")
adaptive_replication(segment_hash)
elif len([t for t in self.threat_log if t["segment_hash"] == segment_hash]) >= self.replication_threshold:
self.replication_controller.threat_replicator.adaptive_threat_replication(
segment=segment_hash, threat_level=5, min_replication=self.replication_controller.min_replication
)
elif self._get_segment_threat_count(segment_hash) >= self.replication_threshold:
logger.info(f"Threshold for basic replication met for segment {segment_hash}. Initiating security replication.")
self.trigger_security_replication(segment_hash)
self.replication_controller.trigger_security_replication(segment_hash)
else:
logger.info(f"Segment {segment_hash} remains under regular monitoring with no immediate replication action.")

def trigger_security_replication(self, segment_hash: str):
"""
Initiates standard security replication to reinforce segment availability.
Args:
segment_hash (str): Hash of the segment to replicate.
"""
logger.info(f"Initiating security replication for segment {segment_hash}")
trigger_security_replication(segment_hash)

def rollback_segment(self, seigr_file: SeigrFile):
"""
Rolls back a segment to its last verified secure state if threats are detected.
Args:
seigr_file (SeigrFile): Instance of SeigrFile representing the segment to roll back.
"""
# Check for available temporal layers
if not seigr_file.temporal_layers:
logger.warning(f"No previous layers available for rollback on segment {seigr_file.hash}")
logger.warning(f"No previous layers available for rollback on segment {seigr_file.hash}. Skipping rollback.")
return

rollback_to_previous_state(seigr_file)
logger.info(f"Successfully rolled back segment {seigr_file.hash} to a secure state.")
# Log the current temporal layers and hash for debugging
logger.debug(f"Debug: Temporal layers available for segment {seigr_file.hash}: {[layer.layer_hash for layer in seigr_file.temporal_layers]}")
logger.debug(f"Debug: Current segment hash = {seigr_file.hash}")

# Check if rollback is allowed
rollback_allowed = verify_rollback_availability(seigr_file)
logger.debug(f"Debug: rollback_allowed for segment {seigr_file.hash} = {rollback_allowed}")

# Perform rollback if allowed and log the call attempt
if rollback_allowed:
logger.info(f"Rollback allowed for segment {seigr_file.hash}, attempting rollback.")

# Double-check the actual call to ensure this is executed
try:
rollback_to_previous_state(seigr_file)
logger.info(f"Successfully rolled back segment {seigr_file.hash} to a secure state.")
except Exception as e:
logger.error(f"Error during rollback execution: {e}")
else:
logger.warning(f"Rollback not allowed for segment {seigr_file.hash}.")

def adaptive_monitoring(self, critical_threshold: int):
"""
Executes an adaptive monitoring routine, handling threats that exceed critical thresholds.
Args:
critical_threshold (int): Threshold for immediately flagging segments as critical.
"""
critical_segments = [
seg for seg, count in self._get_threat_counts().items() if count >= critical_threshold
]

for segment in critical_segments:
logger.critical(f"Critical threat level reached for segment {segment}. Triggering urgent replication.")
adaptive_replication(segment)
logger.critical(f"Critical threat level reached for segment {segment}. Triggering urgent adaptive replication.")
self.replication_controller.threat_replicator.adaptive_threat_replication(
segment=segment, threat_level=5, min_replication=self.replication_controller.min_replication
)

def _get_threat_counts(self):
"""
Internal method to count occurrences of threats per segment.
Returns:
dict: Mapping of segment hashes to the count of recorded threats.
"""
threat_counts = {}
for entry in self.threat_log:
threat_counts[entry["segment_hash"]] = threat_counts.get(entry["segment_hash"], 0) + 1
return threat_counts

def _get_segment_threat_count(self, segment_hash: str) -> int:
"""
Returns the threat count for a specific segment.
"""
return sum(1 for entry in self.threat_log if entry["segment_hash"] == segment_hash)
Loading

0 comments on commit 8a99744

Please sign in to comment.