Python 三方模块 paramiko
paramiko 是一个用于 SSH(Secure Shell)连接的 Python 第三方库,它提供了 SSH2 协议的纯 Python 实现。通过 paramiko,我们可以实现远程服务器连接、命令执行、文件传输等功能,广泛应用于自动化运维、远程管理等场景。
pip install paramiko
基础连接
SSH 客户端连接
最基本的 SSH 连接方式:
import paramiko
# 创建 SSH 客户端
ssh = paramiko.SSHClient()
# 自动添加主机密钥(仅用于测试环境)
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
# 连接服务器
ssh.connect(
hostname='192.168.1.100',
port=22,
username='root',
password='your_password'
)
print("SSH 连接成功")
except Exception as e:
print(f"连接失败: {e}")
finally:
ssh.close()
使用密钥文件连接
更安全的连接方式是使用 SSH 密钥:
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
# 使用私钥文件连接
ssh.connect(
hostname='192.168.1.100',
port=22,
username='root',
key_filename='/path/to/private_key'
)
print("使用密钥连接成功")
except Exception as e:
print(f"连接失败: {e}")
finally:
ssh.close()
执行远程命令
单条命令执行
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(hostname='192.168.1.100', username='root', password='your_password')
# 执行命令
stdin, stdout, stderr = ssh.exec_command('ls -la /home')
# 获取命令输出
output = stdout.read().decode('utf-8')
error = stderr.read().decode('utf-8')
print("命令输出:")
print(output)
if error:
print("错误信息:")
print(error)
except Exception as e:
print(f"执行失败: {e}")
finally:
ssh.close()
交互式命令执行
对于需要交互的命令,可以通过 stdin 传入参数:
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(hostname='192.168.1.100', username='root', password='your_password')
# 执行需要交互的命令
stdin, stdout, stderr = ssh.exec_command('sudo apt update')
# 如果需要输入密码
stdin.write('your_sudo_password\n')
stdin.flush()
# 等待命令执行完成
exit_status = stdout.channel.recv_exit_status()
output = stdout.read().decode('utf-8')
print(f"命令退出状态: {exit_status}")
print("命令输出:")
print(output)
except Exception as e:
print(f"执行失败: {e}")
finally:
ssh.close()
文件传输(SFTP)
上传文件
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(hostname='192.168.1.100', username='root', password='your_password')
# 创建 SFTP 客户端
sftp = ssh.open_sftp()
# 上传文件
local_file = '/local/path/file.txt'
remote_file = '/remote/path/file.txt'
sftp.put(local_file, remote_file)
print(f"文件上传成功: {local_file} -> {remote_file}")
sftp.close()
except Exception as e:
print(f"上传失败: {e}")
finally:
ssh.close()
下载文件
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(hostname='192.168.1.100', username='root', password='your_password')
sftp = ssh.open_sftp()
# 下载文件
remote_file = '/remote/path/file.txt'
local_file = '/local/path/downloaded_file.txt'
sftp.get(remote_file, local_file)
print(f"文件下载成功: {remote_file} -> {local_file}")
sftp.close()
except Exception as e:
print(f"下载失败: {e}")
finally:
ssh.close()
文件操作
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(hostname='192.168.1.100', username='root', password='your_password')
sftp = ssh.open_sftp()
# 列出目录内容
files = sftp.listdir('/home')
print("目录内容:", files)
# 获取文件属性
file_attr = sftp.stat('/home/user/file.txt')
print(f"文件大小: {file_attr.st_size} 字节")
# 创建目录
sftp.mkdir('/home/user/new_directory')
# 删除文件
sftp.remove('/home/user/old_file.txt')
# 重命名文件
sftp.rename('/home/user/old_name.txt', '/home/user/new_name.txt')
sftp.close()
except Exception as e:
print(f"操作失败: {e}")
finally:
ssh.close()
最佳实践
连接池管理
import paramiko
from contextlib import contextmanager
class SSHConnectionPool:
def __init__(self):
self.connections = {}
@contextmanager
def get_connection(self, host, username, password):
key = f"{host}:{username}"
if key not in self.connections:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname=host, username=username, password=password)
self.connections[key] = ssh
try:
yield self.connections[key]
except Exception as e:
# 连接出错时重新创建
if key in self.connections:
self.connections[key].close()
del self.connections[key]
raise e
def close_all(self):
for ssh in self.connections.values():
ssh.close()
self.connections.clear()
# 使用连接池
pool = SSHConnectionPool()
try:
with pool.get_connection('192.168.1.100', 'root', 'password') as ssh:
stdin, stdout, stderr = ssh.exec_command('uptime')
print(stdout.read().decode('utf-8'))
finally:
pool.close_all()