import libvirt import uuid import random import ipaddress import shutil import os class KVMManager: def __init__(self): # 定义类的引用self接口: __init__ 内置初始化函数 被自定义名字 self 继承; 自定执行,不需要在其它函数调用 self.conn = self.connect_to_libvirt() # 定义: self 的属性成员 conn 的表达方式 由 connect_to_libvirt 函数里面定义得来 # 下次使用 self.conn 函数为初始化链接 qemu def connect_to_libvirt(self): try: # try 代码块 用于捕获可能发生的异常 conn = libvirt.open('qemu:///system') if conn is None: raise Exception('Failed to connect to hypervisor') # raise 关键字: 抛出异常, 异常的方法 Exception 是一个字符串 return conn # 如果连接成功,返回连接对象 conn,赋值给 self.conn except libvirt.libvirtError as e: # except 捕捉异常并处理它;libvirt 是kvm虚拟化api模块;libvirtError 异常类: 错误信息变量 print(f'Connection failed: {e}') exit(1) # 程序退出的条件是 出现异常1 判断的是 当前代码块 except libvirt.libvirtError as e def generate_mac(self): """Generate a random MAC address.""" mac = [0x52, 0x54, 0x00] + [random.randint(0x00, 0x7f) for _ in range(3)] return ':'.join(map(lambda x: f'{x:02x}', mac)) def generate_ip(self, subnet): """Generate a random IP address in the given subnet.""" network = ipaddress.ip_network(subnet, strict=False) return str(random.choice(list(network.hosts()))) def ensure_dir_exists(self, directory): """Ensure that the directory exists, creating it if necessary.""" if not os.path.exists(directory): os.makedirs(directory) def copy_image(self, source_path, dest_dir, new_name): """Copy the VM image to the destination directory with a new filename.""" self.ensure_dir_exists(dest_dir) destination_path = os.path.join(dest_dir, new_name) try: shutil.copy(source_path, destination_path) print(f'Image copied to {destination_path}') return destination_path except IOError as e: print(f'Failed to copy image: {e}') return None def create_vm(self, name, memory, vcpu, source_image_path, subnet): dest_dir = '/datadisk/vm/pyauto' new_filename = 'debian12-01.qcow2' destination_image_path = self.copy_image(source_image_path, dest_dir, new_filename) if destination_image_path is None: print(f'Failed to copy image. VM creation aborted.') return mac_address = self.generate_mac() ip_address = self.generate_ip(subnet) xml_config = f""" <domain type='kvm'> <name>{name}</name> <memory unit='KiB'>{memory}</memory> <vcpu placement='static'>{vcpu}</vcpu> <os> <type arch='x86_64' machine='pc-i440fx-2.9'>hvm</type> </os> <devices> <disk type='file' device='disk'> <driver name='qemu' type='qcow2'/> <source file='{destination_image_path}'/> <target dev='vda' bus='virtio'/> <address type='pci' domain='0x0000' bus='0x00' slot='0x04' function='0x0'/> </disk> <interface type='network'> <mac address='{mac_address}'/> <source network='default'/> <model type='virtio'/> <address type='pci' domain='0x0000' bus='0x00' slot='0x03' function='0x0'/> </interface> <graphics type='vnc' port='-1' autoport='yes' listen='0.0.0.0'/> </devices> <metadata> <network> <ip address='{ip_address}' netmask='255.255.255.0'/> </network> </metadata> </domain> """ try: self.conn.defineXML(xml_config) print(f'VM {name} created successfully with MAC {mac_address} and IP {ip_address}') except libvirt.libvirtError as e: print(f'Failed to create VM: {e}') def start_vm(self, name): try: vm = self.conn.lookupByName(name) vm.create() print(f'VM {name} started successfully') except libvirt.libvirtError as e: print(f'Failed to start VM: {e}') def stop_vm(self, name): try: vm = self.conn.lookupByName(name) vm.shutdown() print(f'VM {name} shutdown request sent') except libvirt.libvirtError as e: print(f'Failed to stop VM: {e}') def delete_vm(self, name): try: vm = self.conn.lookupByName(name) vm.undefine() print(f'VM {name} deleted successfully') except libvirt.libvirtError as e: print(f'Failed to delete VM: {e}') def list_vms(self): try: vms = self.conn.listAllDomains() for vm in vms: print(f'VM Name: {vm.name()}') except libvirt.libvirtError as e: print(f'Failed to list VMs: {e}') def close(self): self.conn.close() if __name__ == '__main__': manager = KVMManager() # 示例操作 manager.create_vm('test-vm', '2048000', '2', '/datadisk/myfolder/iso/qcow2/linux/debian12-username-eisc-000000.qcow2', '192.168.122.0/24') manager.start_vm('test-vm') manager.list_vms() manager.stop_vm('test-vm') manager.delete_vm('test-vm') manager.close() ''' python312 -m pip install --upgrade pip # 更新pip 安装工具 pip install libvirt-python # 安装kvm api 库 '''
Powered by ddoss.cn 12.0
©2015 - 2024 ddoss
渝公网安备50011302222260号 渝ICP备2024035333号 【实验平台安全承诺书】 小绿叶技术社区,优化网络中,点击查看配置信息
您的IP:192.168.122.82,2024-10-10 18:16:09,Processed in 0.02248 second(s).