#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Vue前端应用自动化部署脚本 功能:本地压缩Vue构建文件 -> SSH上传 -> 远程执行部署脚本 """ import os import sys import time import zipfile import paramiko import argparse import tempfile import hashlib import subprocess import logging from pathlib import Path from typing import Optional, Tuple, List import getpass import shutil logger = logging.getLogger(__name__) class VueAutoDeployer: def __init__(self, hostname: str, username: str, local_source_dir: str, remote_deploy_dir: str, remote_script_path: str, frontend_project_dir: Optional[str] = None, build_command: str = "npm run build:dev", password: Optional[str] = None, port: int = 22, key_filename: Optional[str] = None): """ 初始化自动部署器 Args: hostname: 服务器地址 username: 用户名 local_source_dir: 本地Vue构建文件目录(包含assets和index.html) remote_deploy_dir: 远程服务器部署目录(上传zip文件的目录) remote_script_path: 远程部署脚本路径 frontend_project_dir: 前端项目根目录(可选,如果提供则会先执行打包) build_command: 前端打包命令,默认为 npm run build:dev password: SSH密码(可选,密钥认证不需要) port: SSH端口,默认22 key_filename: SSH私钥文件路径(可选) """ self.hostname = hostname self.username = username self.local_source_dir = os.path.expanduser(local_source_dir) self.remote_deploy_dir = remote_deploy_dir self.remote_script_path = remote_script_path self.frontend_project_dir = os.path.expanduser(frontend_project_dir) if frontend_project_dir else None self.build_command = build_command self.password = password self.port = port self.key_filename = key_filename self.ssh_client = None self.sftp_client = None # 压缩文件名 self.zip_filename = "dist-dev.zip" # 验证目录 self._validate_local_directories() def build_frontend(self) -> bool: """ 执行前端打包 Returns: 是否打包成功 """ if not self.frontend_project_dir: print("⚠ 未指定前端项目目录,跳过打包步骤") return True print(f"\n正在执行前端打包...") print(f"项目目录: {self.frontend_project_dir}") print(f"打包命令: {self.build_command}") print("-" * 50) try: # 检查前端项目目录是否存在 if not os.path.exists(self.frontend_project_dir): raise FileNotFoundError(f"前端项目目录不存在: {self.frontend_project_dir}") # 检查package.json是否存在 package_json_path = os.path.join(self.frontend_project_dir, 'package.json') if not os.path.exists(package_json_path): raise FileNotFoundError(f"package.json不存在: {package_json_path}") # 检查npm是否可用 print("检查npm环境...") try: if os.name == 'nt': # Windows系统 npm_check = subprocess.run( "npm --version", capture_output=True, text=True, shell=True, timeout=10 ) else: # Unix/Linux系统 npm_check = subprocess.run( ["npm", "--version"], capture_output=True, text=True, timeout=10 ) if npm_check.returncode == 0: print(f"✓ npm版本: {npm_check.stdout.strip()}") else: print(f"⚠ npm检查失败: {npm_check.stderr}") print("请确保npm已正确安装并添加到PATH环境变量中") return False except Exception as e: print(f"✗ npm环境检查失败: {e}") print("请确保npm已正确安装并添加到PATH环境变量中") return False # 检查node_modules是否存在,如果不存在则先安装依赖 node_modules_path = os.path.join(self.frontend_project_dir, 'node_modules') if not os.path.exists(node_modules_path): print("⚠ node_modules不存在,正在安装依赖...") # 在Windows系统上,需要通过shell执行npm命令 if os.name == 'nt': # Windows系统 install_result = subprocess.run( "npm install", cwd=self.frontend_project_dir, capture_output=True, text=True, shell=True, # 在Windows上使用shell timeout=300 # 5分钟超时 ) else: # Unix/Linux系统 install_result = subprocess.run( ["npm", "install"], cwd=self.frontend_project_dir, capture_output=True, text=True, timeout=300 # 5分钟超时 ) if install_result.returncode != 0: print(f"✗ 依赖安装失败:") print(f"错误输出: {install_result.stderr}") return False print("✓ 依赖安装完成") # 清理之前的构建文件 dist_path = os.path.join(self.frontend_project_dir, 'dist-dev') if os.path.exists(dist_path): print(f"清理之前的构建文件: {dist_path}") shutil.rmtree(dist_path) # 执行打包命令 print(f"执行打包命令: {self.build_command}") start_time = time.time() # 在Windows系统上,需要通过shell执行npm命令 if os.name == 'nt': # Windows系统 build_result = subprocess.run( self.build_command, cwd=self.frontend_project_dir, capture_output=True, text=True, shell=True, # 在Windows上使用shell timeout=600 # 10分钟超时 ) else: # Unix/Linux系统 cmd_parts = self.build_command.split() build_result = subprocess.run( cmd_parts, cwd=self.frontend_project_dir, capture_output=True, text=True, timeout=600 # 10分钟超时 ) elapsed_time = time.time() - start_time if build_result.returncode != 0: print(f"✗ 前端打包失败 (耗时: {elapsed_time:.1f}秒)") print(f"返回码: {build_result.returncode}") print(f"错误输出:") print(build_result.stderr) if build_result.stdout: print(f"标准输出:") print(build_result.stdout) return False print(f"✓ 前端打包成功 (耗时: {elapsed_time:.1f}秒)") # 显示打包输出(如果有的话) if build_result.stdout: print("打包输出:") print(build_result.stdout) # 验证构建结果 if os.path.exists(dist_path): # 检查构建文件 files_count = len([f for f in os.listdir(dist_path) if os.path.isfile(os.path.join(dist_path, f))]) dirs_count = len([d for d in os.listdir(dist_path) if os.path.isdir(os.path.join(dist_path, d))]) print(f"✓ 构建文件验证通过: {files_count} 个文件, {dirs_count} 个目录") # 显示构建结果 print("构建文件列表:") for item in os.listdir(dist_path): item_path = os.path.join(dist_path, item) if os.path.isdir(item_path): print(f" 📁 {item}/") else: size = os.path.getsize(item_path) print(f" 📄 {item} ({size/1024:.1f} KB)") else: print(f"⚠ 构建目录不存在: {dist_path}") return False return True except subprocess.TimeoutExpired: print("✗ 前端打包超时") return False except FileNotFoundError as e: print(f"✗ 文件不存在: {e}") return False except Exception as e: print(f"✗ 前端打包失败: {e}") return False def _validate_local_directories(self): """验证本地目录是否存在且包含必要文件""" print(f"检查前端应用目录: {self.frontend_project_dir}") if not os.path.exists(self.frontend_project_dir): raise FileNotFoundError(f"前端应用目录不存在: {self.frontend_project_dir}") print("✓ 前端应用目录验证通过") def _validate_all_local_directories(self): """验证本地目录是否存在且包含必要文件""" logger.info("检查本地目录: %s", self.local_source_dir) if not os.path.exists(self.local_source_dir): raise FileNotFoundError(f"本地目录不存在: {self.local_source_dir}") # 检查必要文件 required_files = ['index.html'] required_dirs = ['assets'] missing_items = [] for file in required_files: file_path = os.path.join(self.local_source_dir, file) if not os.path.exists(file_path): missing_items.append(file) for dir_name in required_dirs: dir_path = os.path.join(self.local_source_dir, dir_name) if not os.path.exists(dir_path): missing_items.append(dir_name) if missing_items: raise FileNotFoundError( f"本地目录缺少必要的文件/目录: {', '.join(missing_items)}\n" f"请确保 {self.local_source_dir} 包含完整的Vue构建文件" ) # 显示目录内容 logger.info("本地目录内容:") for item in os.listdir(self.local_source_dir): item_path = os.path.join(self.local_source_dir, item) if os.path.isdir(item_path): logger.info(" 📁 %s/", item) else: logger.info(" 📄 %s", item) print("✓ 本地目录验证通过") def _create_zip_from_source(self) -> str: """ 从源目录创建zip压缩包 Returns: zip文件的临时路径 """ logger.info("正在创建压缩包: %s", self.zip_filename) # 创建临时文件 temp_dir = tempfile.mkdtemp(prefix="vue_deploy_") zip_path = os.path.join(temp_dir, self.zip_filename) try: with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: # 添加index.html index_path = os.path.join(self.local_source_dir, 'index.html') zipf.write(index_path, 'index.html') logger.info(" ✓ 添加: index.html") # 添加assets目录 assets_dir = os.path.join(self.local_source_dir, 'assets') if os.path.exists(assets_dir): # 遍历assets目录中的所有文件 for root, dirs, files in os.walk(assets_dir): # 计算相对路径 rel_path = os.path.relpath(root, self.local_source_dir) for file in files: file_path = os.path.join(root, file) arcname = os.path.join(rel_path, file) zipf.write(file_path, arcname) logger.info(" ✓ 添加: %s", arcname) # 添加其他可能的文件(css, js文件) for item in os.listdir(self.local_source_dir): if item not in ['index.html', 'assets']: item_path = os.path.join(self.local_source_dir, item) if os.path.isfile(item_path) and item.endswith(('.css', '.js')): zipf.write(item_path, item) logger.info(" ✓ 添加: %s", item) # 获取压缩包信息 zip_size = os.path.getsize(zip_path) file_count = len(zipfile.ZipFile(zip_path, 'r').namelist()) logger.info("压缩包创建完成") logger.info("文件路径: %s", zip_path) logger.info("文件大小: %.2f MB", zip_size / 1024 / 1024) logger.info("包含文件: %s 个", file_count) return zip_path except Exception as e: # 清理临时目录 shutil.rmtree(temp_dir, ignore_errors=True) raise Exception(f"创建压缩包失败: {e}") def connect(self) -> bool: """连接到SSH服务器""" logger.info("正在连接到服务器 %s:%s...", self.hostname, self.port) try: self.ssh_client = paramiko.SSHClient() self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 连接参数 connect_params = { 'hostname': self.hostname, 'port': self.port, 'username': self.username, } # 认证方式 if self.key_filename: connect_params['key_filename'] = self.key_filename auth_method = "密钥认证" elif self.password: connect_params['password'] = self.password auth_method = "密码认证" else: # 交互式输入密码 self.password = getpass.getpass(f"请输入用户 {self.username} 的密码: ") connect_params['password'] = self.password auth_method = "密码认证" logger.info("使用认证方式: %s", auth_method) self.ssh_client.connect(**connect_params, timeout=30) # 测试连接 stdin, stdout, stderr = self.ssh_client.exec_command('echo "连接成功" && whoami && hostname') output = stdout.read().decode().strip() user = output.split('\n')[1] if len(output.split('\n')) > 1 else '未知' hostname = output.split('\n')[2] if len(output.split('\n')) > 2 else '未知' logger.info("SSH连接成功") logger.info("服务器: %s", hostname) logger.info("用户: %s", user) # 创建SFTP客户端 self.sftp_client = self.ssh_client.open_sftp() return True except paramiko.AuthenticationException: logger.error("SSH认证失败,请检查用户名/密码/密钥") return False except paramiko.SSHException as e: logger.exception("SSH连接异常: %s", e) return False except Exception as e: logger.exception("连接失败: %s", e) return False def disconnect(self): """断开连接""" if self.sftp_client: self.sftp_client.close() if self.ssh_client: self.ssh_client.close() logger.info("已断开SSH连接") def execute_command(self, command: str, verbose: bool = True) -> Tuple[int, str, str]: """ 执行远程命令 Args: command: 要执行的命令 verbose: 是否显示输出 Returns: (返回码, 标准输出, 标准错误) """ try: if verbose: logger.info("执行命令: %s", command) stdin, stdout, stderr = self.ssh_client.exec_command(command, timeout=60) # 读取输出 stdout_str = stdout.read().decode('utf-8', errors='ignore').strip() stderr_str = stderr.read().decode('utf-8', errors='ignore').strip() # 等待命令完成并获取返回码 exit_status = stdout.channel.recv_exit_status() if verbose: if stdout_str: logger.info("输出:\n%s", stdout_str) if stderr_str and exit_status != 0: logger.warning("错误:\n%s", stderr_str) logger.info("返回码: %s", exit_status) return exit_status, stdout_str, stderr_str except Exception as e: logger.exception("执行命令失败: %s", e) return -1, "", str(e) def upload_file(self, local_path: str, remote_path: str) -> bool: """ 上传文件到服务器 Args: local_path: 本地文件路径 remote_path: 远程文件路径 Returns: 是否成功 """ try: if not os.path.exists(local_path): logger.error("本地文件不存在: %s", local_path) return False logger.info("本地文件: %s", local_path) file_size = os.path.getsize(local_path) logger.info("正在上传文件: %s (%.2f MB)", os.path.basename(local_path), file_size / 1024 / 1024) # 确保远程目录存在并检查权限 remote_dir = os.path.dirname(remote_path) logger.info("远程文件目录: %s", remote_dir) # 创建目录 exit_code, stdout, stderr = self.execute_command(f"mkdir -p {remote_dir}", verbose=False) if exit_code != 0: logger.error("创建远程目录失败: %s", stderr) return False # 检查目录权限 exit_code, stdout, stderr = self.execute_command(f"ls -ld {remote_dir}", verbose=False) if exit_code == 0: logger.info("目录权限: %s", stdout) # 检查写入权限 test_file = os.path.join(remote_dir, "test_write_permission.tmp") exit_code, stdout, stderr = self.execute_command(f"touch {test_file} && rm -f {test_file}", verbose=False) if exit_code != 0: logger.error("远程目录没有写入权限: %s", remote_dir) logger.error("错误: %s", stderr) return False logger.info("远程目录写入权限检查通过") # 使用SFTP上传文件(显示进度) start_time = time.time() def progress_callback(transferred, total): elapsed = time.time() - start_time if elapsed > 0: speed = transferred / elapsed / 1024 # KB/s percent = (transferred / total) * 100 sys.stdout.write(f"\r 进度: {percent:.1f}% ({transferred/1024/1024:.2f}/{total/1024/1024:.2f} MB) 速度: {speed:.1f} KB/s") sys.stdout.flush() self.sftp_client.put(local_path, remote_path, callback=progress_callback) # 验证上传 exit_code, stdout, stderr = self.execute_command(f"ls -lh {remote_path}", verbose=False) if exit_code == 0 and stdout: logger.info("文件验证成功: %s", stdout) else: logger.warning("文件验证失败: 文件可能未正确上传") if stderr: logger.warning("错误: %s", stderr) # 检查目录内容 logger.info("检查目录内容: %s", remote_dir) self.execute_command(f"ls -la {remote_dir}", verbose=True) return False elapsed = time.time() - start_time logger.info("文件上传成功,耗时: %.1f秒", elapsed) return True except paramiko.SFTPError as e: logger.exception("SFTP上传失败: %s", e) logger.info("可能的原因:") logger.info(" 1. 远程目录权限不足") logger.info(" 2. 磁盘空间不足") logger.info(" 3. 网络连接中断") return False except Exception as e: logger.exception("文件上传失败: %s", e) return False def check_remote_prerequisites(self) -> bool: """ 检查远程服务器是否满足部署条件 Returns: 是否满足条件 """ logger.info("检查远程服务器部署条件...") checks = [] # 检查远程目录是否存在 exit_code, stdout, stderr = self.execute_command( f"ls -ld {self.remote_deploy_dir} 2>/dev/null || echo '目录不存在'", verbose=False ) if "目录不存在" in stdout: checks.append(("部署目录", "✗", f"{self.remote_deploy_dir} 不存在")) else: checks.append(("部署目录", "✓", f"{self.remote_deploy_dir}")) # 检查部署脚本是否存在且有执行权限 exit_code, stdout, stderr = self.execute_command( f"ls -la {self.remote_script_path} 2>/dev/null || echo '脚本不存在'", verbose=False ) if "脚本不存在" in stdout: checks.append(("部署脚本", "✗", f"{self.remote_script_path} 不存在")) else: # 检查执行权限 exit_code, stdout, stderr = self.execute_command( f"test -x {self.remote_script_path} && echo '可执行' || echo '不可执行'", verbose=False ) if "可执行" in stdout: checks.append(("部署脚本", "✓", "存在且可执行")) else: checks.append(("部署脚本", "⚠", "存在但不可执行")) # 检查unzip命令 exit_code, stdout, stderr = self.execute_command( "which unzip 2>/dev/null && unzip -v 2>/dev/null | head -1", verbose=False ) if exit_code == 0: checks.append(("unzip工具", "✓", stdout.strip())) else: checks.append(("unzip工具", "✗", "未安装")) # 检查zip命令 exit_code, stdout, stderr = self.execute_command( "which zip 2>/dev/null && zip -v 2>/dev/null | head -1", verbose=False ) if exit_code == 0: checks.append(("zip工具", "✓", stdout.strip())) else: checks.append(("zip工具", "⚠", "未安装(备份功能可能受影响)")) # 检查nginx目录(通常的部署目录) exit_code, stdout, stderr = self.execute_command( "ls -ld /usr/share/nginx/html 2>/dev/null || ls -ld /var/www/html 2>/dev/null || echo '未找到nginx目录'", verbose=False ) if "未找到nginx目录" in stdout: checks.append(("nginx目录", "⚠", "未找到标准nginx目录")) else: checks.append(("nginx目录", "✓", stdout.strip().split()[-1])) # 显示检查结果 logger.info("=" * 60) logger.info("服务器环境检查结果:") logger.info("=" * 60) all_passed = True for check_name, status, message in checks: if status == "✓": logger.info(" %s %s: %s", status, check_name, message) elif status == "⚠": logger.info(" %s %s: %s", status, check_name, message) else: logger.info(" %s %s: %s", status, check_name, message) all_passed = False logger.info("=" * 60) if not all_passed: logger.warning("警告: 部分检查未通过,部署可能会失败") response = input("是否继续部署?(y/N): ").strip().lower() return response == 'y' return True def deploy(self, cleanup_temp: bool = True) -> bool: """ 执行完整的部署流程 Args: cleanup_temp: 是否清理临时文件 Returns: 是否部署成功 """ logger.info("=" * 70) logger.info("Vue前端应用自动化部署流程") logger.info("=" * 70) temp_zip_path = None temp_dir = None try: # 步骤0: 前端打包(如果指定了前端项目目录) if self.frontend_project_dir: print("\n[步骤 0/5] 前端应用打包") print("-"*40) if not self.build_frontend(): return False # 步骤1: 本地压缩文件 step_num = "1/5" if self.frontend_project_dir else "1/4" print(f"\n[步骤 {step_num}] 本地压缩Vue构建文件") print("-"*40) temp_zip_path = self._create_zip_from_source() temp_dir = os.path.dirname(temp_zip_path) # 验证所有目录 self._validate_all_local_directories() # 步骤2: 连接到服务器 step_num = "2/5" if self.frontend_project_dir else "2/4" print(f"\n[步骤 {step_num}] 连接到远程服务器") print("-"*40) if not self.connect(): return False # 检查服务器环境 if not self.check_remote_prerequisites(): return False # 步骤3: 上传文件 step_num = "3/5" if self.frontend_project_dir else "3/4" print(f"\n[步骤 {step_num}] 上传文件到服务器") print("-"*40) remote_zip_path = os.path.join(self.remote_deploy_dir, self.zip_filename) if not self.upload_file(temp_zip_path, remote_zip_path): return False # 步骤4: 执行部署脚本 step_num = "4/5" if self.frontend_project_dir else "4/4" print(f"\n[步骤 {step_num}] 执行远程部署脚本") # 构建部署命令,传递上传的zip文件路径作为参数 {remote_zip_path} deploy_command = f"{self.remote_script_path}" logger.info("执行部署命令: %s", deploy_command) logger.info("-" * 40) start_time = time.time() exit_code, stdout, stderr = self.execute_command(deploy_command, verbose=True) elapsed_time = time.time() - start_time logger.info("-" * 40) logger.info("部署执行完成,耗时: %.1f秒", elapsed_time) if exit_code != 0: logger.error("部署失败,返回码: %s", exit_code) if stderr: logger.error("错误信息:\n%s", stderr) return False logger.info("部署成功完成") # 可选: 验证部署结果 print("\n验证部署结果...") self.execute_command("ls -la /home/lq/nginx/html/ 2>/dev/null | head -10", verbose=True) return True except Exception as e: logger.exception("部署过程中发生错误: %s", e) return False finally: # 清理临时文件 if cleanup_temp and temp_dir and os.path.exists(temp_dir): try: shutil.rmtree(temp_dir) logger.info("已清理临时文件: %s", temp_dir) except: logger.warning("清理临时文件失败: %s", temp_dir) # 断开连接 self.disconnect() def parse_arguments(): """解析命令行参数""" parser = argparse.ArgumentParser( description='Vue前端应用自动化部署工具', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 使用示例: # 基本用法(仅上传已构建的文件) %(prog)s --host 192.168.1.100 --user deploy --source ./dist-dev # 完整流程(先打包再部署) %(prog)s --host 192.168.1.100 --user deploy --source ./dist-dev --frontend-dir ./LQAdminFront # 自定义打包命令 %(prog)s --host 192.168.1.100 --user deploy --source ./dist-dev --frontend-dir ./LQAdminFront --build-cmd "npm run build:prod" # 使用SSH密钥认证 %(prog)s --host example.com --user ubuntu --key ~/.ssh/id_rsa --source ./build --frontend-dir ./frontend """ ) # 必需参数 parser.add_argument('--host', required=True, help='服务器地址 (例如: 192.168.1.100 或 example.com)') parser.add_argument('--user', required=True, help='SSH用户名') parser.add_argument('--source', required=True, help='本地Vue构建文件目录(包含assets和index.html)') # 可选参数 parser.add_argument('--frontend-dir', help='前端项目根目录(如果提供则会先执行打包)') parser.add_argument('--build-cmd', default='npm run build:dev', help='前端打包命令 (默认: npm run build:dev)') parser.add_argument('--remote-dir', default='/home/lq/lq_workspace/deploy_workspace/put/', help='远程服务器上传目录 (默认: /home/lq/lq_workspace/deploy_workspace/put/)') parser.add_argument('--script', default='/home/lq/lq_workspace/deploy_workspace/admin_front_deploy.sh', help='远程部署脚本路径 (默认: /home/lq/lq_workspace/deploy_workspace/admin_front_deploy.sh)') parser.add_argument('--port', type=int, default=22, help='SSH端口 (默认: 22)') parser.add_argument('--key', help='SSH私钥文件路径(如果使用密钥认证)') parser.add_argument('--password', help='SSH密码(如果使用密码认证)') parser.add_argument('--no-cleanup', action='store_true', help='不清理临时文件(用于调试)') parser.add_argument('--verbose', '-v', action='store_true', help='显示详细输出') return parser.parse_args() def main(): """主函数""" logging.basicConfig(level=logging.INFO, format="%(message)s") args = parse_arguments() # 显示欢迎信息 logger.info("=" * 70) logger.info("🚀 Vue前端应用自动化部署工具") logger.info("=" * 70) print(f"服务器: {args.host}:{args.port}") print(f"用户: {args.user}") print(f"本地源目录: {args.source}") if args.frontend_dir: print(f"前端项目目录: {args.frontend_dir}") print(f"打包命令: {args.build_cmd}") print(f"远程目录: {args.remote_dir}") print(f"部署脚本: {args.script}") print("="*70) try: # 创建部署器实例 deployer = VueAutoDeployer( hostname=args.host, username=args.user, local_source_dir=args.source, remote_deploy_dir=args.remote_dir, remote_script_path=args.script, frontend_project_dir=args.frontend_dir, build_command=args.build_cmd, password=args.password, port=args.port, key_filename=args.key ) # 执行部署 success = deployer.deploy(cleanup_temp=not args.no_cleanup) if success: logger.info("=" * 70) logger.info("🎉 部署流程全部完成!") logger.info("=" * 70) logger.info("建议操作:") logger.info(" 1. 访问网站检查是否正常显示") logger.info(" 2. 检查nginx错误日志: sudo tail -f /var/log/nginx/error.log") logger.info(" 3. 如果需要重启nginx: sudo systemctl restart nginx") logger.info("=" * 70) return 0 else: logger.error("=" * 70) logger.error("❌ 部署失败!") logger.error("=" * 70) return 1 except FileNotFoundError as e: logger.error("文件错误: %s", e) return 1 except KeyboardInterrupt: logger.warning("用户中断操作") return 130 except Exception as e: logger.exception("发生未预期的错误: %s", e) return 1 if __name__ == "__main__": # 检查必要依赖 try: import paramiko except ImportError: logging.basicConfig(level=logging.INFO, format="%(message)s") logger.error("错误: 未安装paramiko库") logger.error("请安装依赖: pip install paramiko") sys.exit(1) sys.exit(main())