s3dgraphy Caching and Performance
This document covers s3dgraphy’s performance optimization systems, including caching strategies, indexing mechanisms, and memory management techniques.
Indexing System
GraphIndices Architecture
s3dgraphy uses a sophisticated indexing system (GraphIndices) that provides O(1) lookup performance for common graph operations.
class GraphIndices:
"""
High-performance indexing system for graph queries.
Index Types:
- nodes_by_type: Fast node type lookups
- property_nodes_by_name: Property name indexing
- property_values_by_name: Property value sets
- strat_to_properties: Stratigraphic unit → properties mapping
- properties_to_strat: Property values → stratigraphic units mapping
- edges_by_type: Edge type indexing
- edges_by_source: Source node → edges mapping
- edges_by_target: Target node → edges mapping
"""
Lazy Loading Implementation
The Graph class implements lazy loading with automatic index rebuilding:
@property
def indices(self):
"""Lazy loading of indices with automatic rebuild if necessary"""
if self._indices is None:
self._indices = GraphIndices()
if self._indices_dirty:
self._rebuild_indices()
return self._indices
def _rebuild_indices(self):
"""Rebuilds graph indices for efficient querying"""
if self._indices is None:
self._indices = GraphIndices()
self._indices.clear()
# Index nodes by type - O(n) build, O(1) lookup
for node in self.nodes:
node_type = getattr(node, 'node_type', 'unknown')
self._indices.add_node_by_type(node_type, node)
# Special indexing for property nodes
if node_type == 'property' and hasattr(node, 'name'):
self._indices.add_property_node(node.name, node)
# Index edges for fast lookups
for edge in self.edges:
self._indices.add_edge(edge)
# Special indexing for property relationships
if edge.edge_type == 'has_property':
source_node = self.find_node_by_id(edge.edge_source)
target_node = self.find_node_by_id(edge.edge_target)
if source_node and target_node and hasattr(target_node, 'name'):
prop_value = getattr(target_node, 'description', 'empty')
self._indices.add_property_relation(
target_node.name,
edge.edge_source,
prop_value
)
self._indices_dirty = False
Index Invalidation Strategy
def add_node(self, node: Node, overwrite=False) -> Node:
"""Adds a node and marks indices as dirty"""
# ... node addition logic ...
self._indices_dirty = True # Invalidate indices
return node
def add_edge(self, edge_id: str, edge_source: str, edge_target: str, edge_type: str) -> Edge:
"""Adds an edge and marks indices as dirty"""
# ... edge addition logic ...
self._indices_dirty = True # Invalidate indices
return edge
Performance Characteristics:
Index Build Time: O(n + m) where n = nodes, m = edges
Lookup Time: O(1) for indexed operations
Memory Overhead: ~20-30% additional memory for indices
Rebuild Trigger: Only when graph structure changes
Performance Optimization Techniques
Batch Operations
Efficient Node Addition
def add_node(self, node: Node, overwrite=False) -> Node:
"""
Add a single node to the graph with automatic index invalidation.
Args:
node: Node to add
overwrite: Whether to overwrite if node already exists (default: False)
Note: For batch additions, use a loop with add_node(). Index
invalidation is handled automatically on each call.
"""
# ... node addition logic ...
self._indices_dirty = True
return node
# Usage example
nodes_to_add = []
for i in range(1000):
node = StratigraphicUnit(f"US{i:03d}")
nodes_to_add.append(node)
# Add all nodes to graph
for node in nodes_to_add:
graph.add_node(node)
print(f"Added {len(nodes_to_add)} nodes")
Efficient Edge Addition
def add_edges_batch(self, edge_definitions: List[tuple], validate_connections=True):
"""
Add multiple edges efficiently.
Args:
edge_definitions: List of (edge_id, source, target, edge_type) tuples
validate_connections: Whether to validate each connection
Returns:
Tuple of (successful_edges, failed_edges)
"""
successful = []
failed = []
for edge_id, source, target, edge_type in edge_definitions:
try:
if validate_connections:
source_node = self.find_node_by_id(source)
target_node = self.find_node_by_id(target)
if not source_node or not target_node:
failed.append((edge_id, "Node not found"))
continue
if not self.validate_connection(source_node.node_type,
target_node.node_type,
edge_type):
failed.append((edge_id, "Invalid connection"))
continue
edge = Edge(edge_id, source, target, edge_type)
self.edges.append(edge)
successful.append(edge)
except Exception as e:
failed.append((edge_id, str(e)))
# Single index invalidation
if successful:
self._indices_dirty = True
return successful, failed
# Usage example
edge_definitions = [
("rel001", "US001", "US002", "is_before"),
("rel002", "US002", "US003", "is_before"),
("rel003", "US003", "US004", "is_before"),
("doc001", "US001", "DOC001", "has_data_provenance")
]
successful, failed = graph.add_edges_batch(edge_definitions)
print(f"Added {len(successful)} edges, {len(failed)} failed")
Memory Management
Memory Usage Monitoring
import psutil
import sys
class MemoryProfiler:
"""Monitor memory usage during graph operations"""
def __init__(self):
self.process = psutil.Process()
self.baseline = 0
def start_monitoring(self):
"""Set baseline memory usage"""
self.baseline = self.process.memory_info().rss / 1024 / 1024 # MB
print(f"Baseline memory: {self.baseline:.1f} MB")
def check_memory(self, operation=""):
"""Check current memory usage"""
current = self.process.memory_info().rss / 1024 / 1024 # MB
delta = current - self.baseline
print(f"Memory after {operation}: {current:.1f} MB (+{delta:.1f} MB)")
return current
def get_graph_memory_estimate(self, graph):
"""Estimate memory usage of graph components"""
node_memory = sys.getsizeof(graph.nodes) + sum(sys.getsizeof(node) for node in graph.nodes)
edge_memory = sys.getsizeof(graph.edges) + sum(sys.getsizeof(edge) for edge in graph.edges)
# Estimate index memory (approximation)
index_memory = 0
if hasattr(graph, '_indices') and graph._indices:
for index_dict in [graph._indices.nodes_by_type,
graph._indices.edges_by_type,
graph._indices.edges_by_source,
graph._indices.edges_by_target]:
index_memory += sys.getsizeof(index_dict)
for key, value in index_dict.items():
index_memory += sys.getsizeof(key) + sys.getsizeof(value)
total_mb = (node_memory + edge_memory + index_memory) / 1024 / 1024
return {
"nodes_mb": node_memory / 1024 / 1024,
"edges_mb": edge_memory / 1024 / 1024,
"indices_mb": index_memory / 1024 / 1024,
"total_mb": total_mb
}
# Usage example
profiler = MemoryProfiler()
profiler.start_monitoring()
# Load large graph
manager = MultiGraphManager()
graph_id = manager.load_graph("large_excavation.graphml")
profiler.check_memory("graph loading")
graph = manager.get_graph(graph_id)
memory_breakdown = profiler.get_graph_memory_estimate(graph)
print("\nMemory breakdown:")
for component, size_mb in memory_breakdown.items():
print(f" {component}: {size_mb:.2f} MB")
Memory Optimization Strategies
def optimize_graph_memory(graph):
"""
Apply memory optimization strategies to a graph.
"""
optimizations_applied = []
# 1. Compact attribute dictionaries
for node in graph.nodes:
if hasattr(node, 'attributes') and node.attributes:
# Remove empty string values
empty_keys = [k for k, v in node.attributes.items() if v == ""]
for k in empty_keys:
del node.attributes[k]
if empty_keys:
optimizations_applied.append(f"Removed {len(empty_keys)} empty attributes from {node.node_id}")
# 2. Deduplicate identical attribute dictionaries
attribute_cache = {}
for node in graph.nodes:
if hasattr(node, 'attributes'):
attr_key = str(sorted(node.attributes.items()))
if attr_key in attribute_cache:
# Reuse existing dictionary
node.attributes = attribute_cache[attr_key]
optimizations_applied.append(f"Deduplicated attributes for {node.node_id}")
else:
attribute_cache[attr_key] = node.attributes
# 3. Compact warning messages
if len(graph.warnings) > 100:
# Keep only recent warnings
graph.warnings = graph.warnings[-50:]
optimizations_applied.append("Compacted warning messages")
# 4. Force garbage collection
import gc
collected = gc.collect()
optimizations_applied.append(f"Garbage collection freed {collected} objects")
# 5. Rebuild indices if dirty (consolidates memory)
if graph._indices_dirty:
graph._rebuild_indices()
optimizations_applied.append("Rebuilt indices")
return optimizations_applied
# Usage
optimizations = optimize_graph_memory(graph)
print("Applied optimizations:")
for opt in optimizations:
print(f" - {opt}")
Query Performance Optimization
Indexed Query Patterns
class PerformantQueries:
"""Collection of high-performance query methods"""
@staticmethod
def fast_node_by_type(graph, node_type):
"""O(1) node lookup by type using indices"""
return graph.indices.nodes_by_type.get(node_type, [])
@staticmethod
def fast_edges_from_node(graph, node_id):
"""O(1) edge lookup from source node"""
return graph.indices.edges_by_source.get(node_id, [])
@staticmethod
def fast_edges_to_node(graph, node_id):
"""O(1) edge lookup to target node"""
return graph.indices.edges_by_target.get(node_id, [])
@staticmethod
def fast_property_search(graph, property_name, property_value):
"""O(1) property value search using indices"""
return graph.indices.properties_to_strat.get(property_name, {}).get(property_value, [])
@staticmethod
def fast_connected_nodes(graph, node_id, edge_type=None):
"""Fast connected node lookup with optional edge type filtering"""
connected = []
# Outgoing edges
for edge in graph.indices.edges_by_source.get(node_id, []):
if edge_type is None or edge.edge_type == edge_type:
target = graph.find_node_by_id(edge.edge_target)
if target:
connected.append(target)
# Incoming edges
for edge in graph.indices.edges_by_target.get(node_id, []):
if edge_type is None or edge.edge_type == edge_type:
source = graph.find_node_by_id(edge.edge_source)
if source:
connected.append(source)
return connected
# Performance comparison example
def compare_query_performance(graph):
"""Compare indexed vs non-indexed query performance"""
import time
# Test data
node_type = "US"
test_node_id = graph.nodes[0].node_id if graph.nodes else "US001"
# Non-indexed approach (slow)
start = time.time()
slow_nodes = [node for node in graph.nodes if node.node_type == node_type]
slow_time = time.time() - start
# Indexed approach (fast)
start = time.time()
fast_nodes = PerformantQueries.fast_node_by_type(graph, node_type)
fast_time = time.time() - start
print(f"Query performance comparison:")
print(f" Non-indexed: {slow_time:.4f}s ({len(slow_nodes)} results)")
print(f" Indexed: {fast_time:.4f}s ({len(fast_nodes)} results)")
print(f" Speedup: {slow_time/fast_time:.2f}x faster")
# Usage
compare_query_performance(graph)
Query Result Caching
class QueryCache:
"""Simple LRU cache for expensive queries"""
def __init__(self, max_size=100):
self.cache = {}
self.access_order = []
self.max_size = max_size
def get(self, key):
"""Get cached result"""
if key in self.cache:
# Move to end (most recently used)
self.access_order.remove(key)
self.access_order.append(key)
return self.cache[key]
return None
def put(self, key, value):
"""Cache a result"""
if key in self.cache:
# Update existing
self.access_order.remove(key)
elif len(self.cache) >= self.max_size:
# Remove least recently used
oldest = self.access_order.pop(0)
del self.cache[oldest]
self.cache[key] = value
self.access_order.append(key)
def clear(self):
"""Clear all cached results"""
self.cache.clear()
self.access_order.clear()
# Usage with complex queries
class CachedGraphAnalyzer:
"""Graph analyzer with query result caching"""
def __init__(self, graph):
self.graph = graph
self.cache = QueryCache(max_size=50)
def get_stratigraphic_sequence(self, start_node_id):
"""Get stratigraphic sequence with caching"""
cache_key = f"sequence_{start_node_id}"
result = self.cache.get(cache_key)
if result is not None:
return result
# Expensive computation
sequence = self._compute_sequence(start_node_id)
self.cache.put(cache_key, sequence)
return sequence
def _compute_sequence(self, start_node_id):
"""Expensive sequence computation"""
# Complex algorithm here...
visited = set()
sequence = []
def dfs(node_id):
if node_id in visited:
return
visited.add(node_id)
sequence.append(node_id)
# Get temporal connections
edges = PerformantQueries.fast_edges_from_node(self.graph, node_id)
for edge in edges:
if edge.edge_type == "is_before":
dfs(edge.edge_target)
dfs(start_node_id)
return sequence
def invalidate_cache(self):
"""Clear cache when graph changes"""
self.cache.clear()
# Usage
analyzer = CachedGraphAnalyzer(graph)
# First call - computed and cached
sequence1 = analyzer.get_stratigraphic_sequence("US001")
# Second call - returned from cache (fast)
sequence2 = analyzer.get_stratigraphic_sequence("US001")
# After graph modifications
graph.add_edge("new_rel", "US001", "US999", "is_before")
analyzer.invalidate_cache() # Clear cache for consistency
Performance Monitoring and Benchmarking
Graph Performance Metrics
class GraphPerformanceMonitor:
"""Monitor and report graph performance metrics"""
def __init__(self, graph):
self.graph = graph
self.metrics = {}
def collect_metrics(self):
"""Collect comprehensive performance metrics"""
import time
# Basic metrics
self.metrics['node_count'] = len(self.graph.nodes)
self.metrics['edge_count'] = len(self.graph.edges)
# Index metrics
if hasattr(self.graph, '_indices') and self.graph._indices:
self.metrics['indices_built'] = not self.graph._indices_dirty
self.metrics['indexed_node_types'] = len(self.graph._indices.nodes_by_type)
self.metrics['indexed_edge_types'] = len(self.graph._indices.edges_by_type)
# Performance benchmarks
self.metrics.update(self._benchmark_operations())
return self.metrics
def _benchmark_operations(self):
"""Benchmark common operations"""
import time
benchmarks = {}
if not self.graph.nodes:
return benchmarks
# Node lookup benchmark
test_node_id = self.graph.nodes[0].node_id
start = time.time()
for _ in range(1000):
self.graph.find_node_by_id(test_node_id)
benchmarks['node_lookup_1000ops_ms'] = (time.time() - start) * 1000
# Node type query benchmark
start = time.time()
for _ in range(100):
self.graph.get_nodes_by_type("US")
benchmarks['type_query_100ops_ms'] = (time.time() - start) * 1000
# Index rebuild benchmark
start = time.time()
self.graph._rebuild_indices()
benchmarks['index_rebuild_ms'] = (time.time() - start) * 1000
return benchmarks
def generate_report(self):
"""Generate performance report"""
metrics = self.collect_metrics()
print("Graph Performance Report")
print("=" * 40)
print(f"Nodes: {metrics.get('node_count', 0):,}")
print(f"Edges: {metrics.get('edge_count', 0):,}")
if metrics.get('indices_built'):
print(f"Indexed node types: {metrics.get('indexed_node_types', 0)}")
print(f"Indexed edge types: {metrics.get('indexed_edge_types', 0)}")
else:
print("Indices: Not built (will be built on next access)")
print("\nBenchmarks:")
print(f" Node lookup (1000 ops): {metrics.get('node_lookup_1000ops_ms', 0):.2f} ms")
print(f" Type query (100 ops): {metrics.get('type_query_100ops_ms', 0):.2f} ms")
print(f" Index rebuild: {metrics.get('index_rebuild_ms', 0):.2f} ms")
# Performance recommendations
self._generate_recommendations(metrics)
def _generate_recommendations(self, metrics):
"""Generate performance recommendations"""
print("\nRecommendations:")
if metrics.get('node_count', 0) > 5000:
print(" - Large graph detected. Consider using database backend for better performance.")
if metrics.get('node_lookup_1000ops_ms', 0) > 100:
print(" - Slow node lookups. Ensure indices are built and consider graph optimization.")
if metrics.get('index_rebuild_ms', 0) > 1000:
print(" - Slow index rebuild. Consider reducing graph complexity or using batch operations.")
if not metrics.get('indices_built'):
print(" - Indices not built. Performance will improve after first indexed operation.")
# Usage
monitor = GraphPerformanceMonitor(graph)
monitor.generate_report()
Performance Best Practices
class PerformanceBestPractices:
"""Collection of performance best practices and utilities"""
@staticmethod
def efficient_graph_building():
"""Demonstrate efficient graph building patterns"""
from s3dgraphy import Graph
# Create graph
graph = Graph("OptimizedGraph")
# 1. Batch node creation (more efficient)
nodes = []
for i in range(1000):
node = StratigraphicUnit(f"US{i:03d}")
nodes.append(node)
# Add all nodes to graph
for node in nodes:
graph.add_node(node)
# 2. Batch edge creation
edge_definitions = []
for i in range(999):
edge_definitions.append((f"rel{i}", f"US{i:03d}", f"US{i+1:03d}", "is_before"))
successful, failed = graph.add_edges_batch(edge_definitions)
print(f"Efficiently created graph with {len(nodes)} nodes and {len(successful)} edges")
return graph
@staticmethod
def memory_conscious_iteration(graph):
"""Demonstrate memory-conscious iteration patterns"""
# Good: Use generators and indexed lookups
us_nodes = graph.get_nodes_by_type("US") # Uses indices
for node in us_nodes:
# Process one node at a time
if node.get_attribute("material") == "stone":
# Do something with stone nodes
pass
@staticmethod
def efficient_queries(graph):
"""Demonstrate efficient query patterns"""
# Use indexed lookups instead of linear search
# Good: O(1) indexed lookup (if property index exists)
stone_node_ids = graph.indices.properties_to_strat.get("material", {}).get("stone", [])
stone_nodes = [graph.find_node_by_id(node_id) for node_id in stone_node_ids]
# Use batch operations for multiple similar operations
node_ids_to_check = ["US001", "US002", "US003"]
nodes = [graph.find_node_by_id(node_id) for node_id in node_ids_to_check]
# Process all at once instead of individual lookups
# Performance testing utility
def performance_test_suite(graph):
"""Run comprehensive performance tests"""
print("Running performance test suite...")
# Test 1: Index performance
print("\n1. Testing index performance...")
monitor = GraphPerformanceMonitor(graph)
metrics = monitor.collect_metrics()
# Test 2: Memory usage
print("\n2. Testing memory usage...")
profiler = MemoryProfiler()
memory_stats = profiler.get_graph_memory_estimate(graph)
# Test 3: Query performance
print("\n3. Testing query performance...")
compare_query_performance(graph)
# Test 4: Best practices check
print("\n4. Checking best practices...")
if len(graph.nodes) > 1000 and not hasattr(graph, '_last_batch_operation'):
print(" Warning: Large graph without evident batch operations")
if graph._indices_dirty:
print(" Info: Indices need rebuilding")
else:
print(" Good: Indices are up to date")
print("\nPerformance test suite completed.")
# Usage
performance_test_suite(graph)
This comprehensive caching and performance guide provides all the tools needed to optimize s3dgraphy graphs for maximum efficiency and scalability.