Pexpect 笔记和应用
参考资料
Pexpect 简介
Pexpect 是一个纯 Python 模块,用于自动化控制和交互命令行应用程序,如 ssh
, ftp
, telnet
等。它的工作原理类似于 Tcl 语言的 Expect
库。
Pexpect 的基本工作流程可以分为三个步骤:
启动 (Spawn): 使用
pexpect.spawn()
方法执行一个子程序。等待 (Expect): 使用
child.expect()
方法等待子程序输出特定的模式(如字符串、正则表达式)。发送 (Send): 当
expect()
匹配到期望的模式后,使用child.send()
或child.sendline()
方法向子程序发送指令。
这三步,尤其是后两步,会循环执行,从而完成复杂的自动化交互任务。
Pexpect 核心类与方法
pexpect.spawn()
类
spawn()
方法用于启动一个子程序,并返回一个子程序的句柄(child
对象),后续的所有交互都通过这个句柄进行。
xxxxxxxxxx
import pexpect
# 执行一个简单的命令
child = pexpect.spawn("ls -l")
# 等待子程序执行结束 (EOF: End Of File)
child.expect(pexpect.EOF)
# afrer 属性存储了匹配到的内容,before 属性存储了匹配之前的所有输出
print("--- child.before ---")
print(child.before.decode()) # 使用 .decode() 将字节串转为字符串,更易读
输出:
xxxxxxxxxx
--- child.before ---
total 8
-rw-r--r-- 1 user user 1234 Aug 05 10:00 some_file.txt
drwxr-xr-x 2 user user 4096 Aug 05 10:00 some_dir
注意: spawn
的 command
参数被视为一个整体。如果需要使用管道 |
或重定向 >
等 shell 特性,需要直接调用 shell 来执行命令:
xxxxxxxxxx
# 使用 /bin/bash -c 来执行包含管道的命令
child = pexpect.spawn('/bin/bash', args=['-c', 'ls -l | grep .py'])
child.expect(pexpect.EOF)
print(child.before.decode())
child.expect()
方法
expect()
是 Pexpect 的核心,它会读取子程序的输出,直到匹配到指定的模式。
参数
pattern
: 可以是字符串、正则表达式、pexpect.EOF
(文件结束)、pexpect.TIMEOUT
(超时),或者由这些元素组成的列表。返回值:
如果
pattern
是单个模式,成功匹配返回0
。如果
pattern
是一个列表,成功匹配则返回该模式在列表中的索引。
异常: 如果在指定时间内(默认为30秒)没有匹配到任何模式,会抛出
pexpect.TIMEOUT
异常。
为了避免程序因超时而卡死,通常会将 pexpect.EOF
或 pexpect.TIMEOUT
作为模式列表的最后一个元素,用于异常处理。
xxxxxxxxxx
child = pexpect.spawn('ls -l')
# expect() 会持续匹配,直到缓冲区内容耗尽
# 第一次匹配
index = child.expect(['main.py', 'test.py', pexpect.EOF])
if index < 2:
print(f"第一次匹配到: {child.match.decode()}")
# 第二次匹配
index = child.expect(['main.py', 'test.py', pexpect.EOF])
if index < 2:
print(f"第二次匹配到: {child.match.decode()}")
else:
print("没有更多匹配项了,到达文件末尾。")
此外,还有一些更具体的匹配方法:
child.expect_exact()
: 只匹配纯字符串,不解释为正则表达式。child.expect_list()
: 只匹配正则表达式列表。expect()
内部其实调用了此方法。
child.send()
与 child.sendline()
child.send(str)
: 向子程序发送一个字符串,不带换行符。child.sendline(str)
: 发送一个字符串,并在末尾自动添加换行符 (\r\n
),模拟用户按下回车。这在交互式应用中最为常用。
child.interact()
方法
interact()
方法会将终端的控制权完全交给用户,允许用户直接与子程序交互。这在需要手动操作的场景(如调试)中非常有用。
xxxxxxxxxx
# 启动一个 ssh 会话
child = pexpect.spawn('ssh user@some_host')
# ... 自动输入密码等 ...
# 将控制权交给用户
print("已登录,现在您可以手动操作。按 Ctrl+] 退出。")
child.interact()
# 当用户按下 Ctrl+] (默认的退出字符) 后,程序会继续执行
print("已退出交互模式。")
应用案例:自动化 CTF 答题
案例背景:
通过 SSH (ssh noname.plus -p2023
) 连接到一个 CTF 挑战。程序要求输入玩家ID,然后开始一个记忆数字的游戏。游戏规则是在数字出现后,立即原样输入,坚持11轮以上即可获胜。
挑战难点:
远程终端为了美观,使用了 ANSI 转义序列来显示彩色文本。这些颜色代码会混在正常的数字字符中,直接读取 child.before
会得到类似 \x1b[31m1\x1b[0m\x1b[32m2\x1b[0m
这样的乱码,需要过滤掉这些非数字字符。
解决方案: 编写一个函数,使用正则表达式过滤掉 ANSI 颜色代码,提取出纯净的数字字符串,然后发送给服务器。
xxxxxxxxxx
import pexpect
import re
import threading
import time
def filter_ansi_and_get_numbers(byte_string: bytes) -> str:
"""
使用正则表达式过滤掉 ANSI 颜色代码,并提取数字。
"""
# 将字节串解码为字符串
text = byte_string.decode('utf-8', errors='ignore')
# 正则表达式匹配 ANSI 转义序列
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
# 替换掉所有 ANSI 序列,得到纯净文本
clean_text = ansi_escape.sub('', text)
# 提取所有数字
numbers = ''.join(re.findall(r'\d+', clean_text))
return numbers
def run_game(player_id: str, rounds: int):
"""
执行一轮完整的游戏。
"""
try:
# 启动 SSH 连接
child = pexpect.spawn("ssh noname.plus -p2023", timeout=10)
# 1. 输入玩家ID
child.expect("Input Your player ID:")
child.sendline(player_id)
# 2. 选择开始游戏
child.expect("Choose an option:")
child.sendline("1")
# 3. 循环答题
for i in range(rounds):
child.expect("Enter numbers now:")
# 从 before 缓冲区获取带颜色的数字,并过滤
numbers_to_send = filter_ansi_and_get_numbers(child.before)
if numbers_to_send:
child.sendline(numbers_to_send)
print(f"[{player_id}] Round {i+1}/{rounds}: Sent '{numbers_to_send}'")
else:
print(f"[{player_id}] Round {i+1}/{rounds}: No numbers found, skipping.")
child.sendline("") # 发送空行以继续
# 4. 获取最终结果
child.expect(pexpect.EOF)
print(f"--- Game Over for {player_id} ---")
print(child.before.decode())
print("---------------------------------")
except pexpect.exceptions.TIMEOUT:
print(f"Error: Connection or game timed out for {player_id}")
except pexpect.exceptions.EOF:
print(f"Error: Connection closed unexpectedly for {player_id}")
finally:
if 'child' in locals() and child.isalive():
child.close()
if __name__ == "__main__":
# 使用多线程同时运行多个实例
player_name = input("Enter the base name for your player ID: ")
min_rounds = int(input("Enter the minimum rounds to play: "))
max_rounds = int(input("Enter the maximum rounds to play: "))
threads = []
for r in range(min_rounds, max_rounds + 1):
# 为每个线程创建唯一的玩家ID
unique_id = f"{player_name}|{r}"
thread = threading.Thread(target=run_game, args=(unique_id, r))
threads.append(thread)
thread.start()
time.sleep(0.1) # 稍微错开启动时间,避免瞬间大量连接
for i, t in enumerate(threads):
t.join()
print(f"Thread for {min_rounds + i} rounds has finished.")
print("\nAll threads have completed.")
0 评论:
发表评论