from dataclasses import dataclass, field from typing import List, Dict, Set import threading from enum import Enum, auto class ResourceType(Enum): """ 表示不同类型的系统资源 """ PRINTER = auto() DISK = auto() MEMORY = auto() NETWORK = auto() @dataclass class Resource: """ 表示系统资源 """ id: int type: ResourceType is_available: bool = True current_owner: int = None # 当前使用该资源的进程ID class DeadlockAvoidanceStrategy: """ 死锁预防和解决的策略 """ RESOURCE_ALLOCATION_GRAPH = 'resource_allocation_graph' BANKER_ALGORITHM = 'banker_algorithm' class ResourceManager: """ 管理系统资源分配和死锁预防 """ def __init__(self): # 资源池 self.resources: Dict[ResourceType, List[Resource]] = { ResourceType.PRINTER: [Resource(i, ResourceType.PRINTER) for i in range(3)], ResourceType.DISK: [Resource(i, ResourceType.DISK) for i in range(2)], ResourceType.NETWORK: [Resource(i, ResourceType.NETWORK) for i in range(1)], } # 跟踪资源请求和分配 self.resource_requests: Dict[int, Set[Resource]] = {} self.resource_allocations: Dict[int, Set[Resource]] = {} # 死锁预防 self.allocation_lock = threading.Lock() self.deadlock_strategy = DeadlockAvoidanceStrategy.BANKER_ALGORITHM def request_resource( self, process_id: int, resource_type: ResourceType ) -> bool: """ 为进程请求资源 :param process_id: 请求资源的进程ID :param resource_type: 要请求的资源类型 :return: 如果资源分配成功,返回True,否则返回False """ with self.allocation_lock: # 查找指定类型的可用资源 available_resources = [ res for res in self.resources[resource_type] if res.is_available ] if not available_resources: # 没有可用资源 return False # 分配第一个可用资源 resource = available_resources[0] resource.is_available = False resource.current_owner = process_id # 跟踪资源分配 if process_id not in self.resource_allocations: self.resource_allocations[process_id] = set() self.resource_allocations[process_id].add(resource) return True def release_resource( self, process_id: int, resource: Resource ): """ 释放先前分配给进程的资源 :param process_id: 释放资源的进程ID :param resource: 要释放的资源 """ with self.allocation_lock: # 验证进程是否拥有该资源 if (resource.current_owner != process_id or resource.type not in self.resources): return # 标记资源为可用 resource.is_available = True resource.current_owner = None # 从分配中移除 if process_id in self.resource_allocations: self.resource_allocations[process_id].discard(resource) def detect_deadlock(self) -> bool: """ 使用资源分配图检测潜在的死锁 :return: 如果检测到死锁,返回True,否则返回False """ # 使用资源分配图进行简单的死锁检测 for process, resources in self.resource_allocations.items(): for resource in resources: # 检查资源是否循环依赖 if resource.current_owner != process: return True return False def resolve_deadlock(self): """ 使用资源抢占解决死锁 """ if not self.detect_deadlock(): return # 从进程中抢占资源 for process, resources in list(self.resource_allocations.items()): for resource in list(resources): self.release_resource(process, resource) break # 一次释放一个资源