愿你星河常明
前言说明#
- 插件划分
Z0Scan 的 Scanners 分类为:
1. PerPage: 基于页面层面检测的插件集
2. PerDir: 基于子路径层面检测的插件集
3. PerDomain: 基于域名层面检测的插件集
4. PerHost: 基于主机层面检测的插件集
- 指纹划分
Z0Scan 指纹集包括 waf 以及 finger, 它们被存储在 data/fingers.db 中
其中 finger 是为 Scanner 的精准切入而诞生的
fingers.db 指纹开发#
语法规则:
匹配:
1. body: 在响应体文本中匹配
2. title: 在标题中匹配
3. header: 在响应头部中匹配(包括键与值)
4. path: 在路径中匹配,如:/admin/main.php
5. body_regex: 在响应体文本中使用正则表达式匹配
6. title_regex: 在标题中使用正则表达式匹配
7. header_regex: 在响应头部中使用正则表达式匹配(包括键与值)
8. path_regex: 在路径中使用正则表达式匹配,如:/admin/main.php
逻辑符:
1. &&: 并且
2. ||: 或
3. (): 优先
示例:
headers="Server: thinkphp" || body="Thinkphp"
关于WAF指纹
在lib/core/finger.py中会将特殊响应状态码及响应超时判断为存在WAF
Scanner 命名#
按照以下命名规则命名:
漏洞类型(简写) + (指纹对象)+ 简述
如 PerPage/PerDir/PerDomain 下存在同名插件,
在单个插件名后按照PerPage:1,PerDir:2,PerDomain:3在名末下划线连接,如:
PerPage 中的 sensi-backup:sensi-backup_1
漏洞类型分类与简写形式可参考api/VulType部分
内置模块#
api#
| 名称 | 描述 | 使用 | 必选/可选 |
|---|---|---|---|
| generateResponse | 用于报告中响应体的生成 | generateResponse(r) # r=request.get(…) | 必选 |
| random_num | 生成随机数 | / | 可选 |
| random_str | 生成随机字符串 | / | 可选 |
| VulType | 对漏洞类型的选定 | 见api/VulType | 必选 |
| Type | 对扫描类型的选定 | 见api/Type | 必选 |
| PluginBase | 被继承以获取数据/解析数据/返回结果 | class Z0SCAN(PluginBase) | PerPage等必选 |
| _PluginBase | 被继承以获取数据/解析数据/返回结果 | class Z0SCAN(_PluginBase) | PerHost必选 |
| conf | 储存一些命令行参数值 | 主要使用level,见api/conf.level | 必选 |
| PLACE | 对漏洞注入点(请求中的可控点)的选定 | 见api/PLACE | 必选 |
| Threads | 插件内置线程(针对参数的多线程) | 见api/Threads | 可选 |
- api/VulType:
| 名称 | 描述 | (插件命名)简写 |
|---|---|---|
| CMD_INNJECTION | 命令注入漏洞 | cmdi |
| CODE_INJECTION | 代码注入漏洞 | codei |
| XSS | XSS跨站脚本攻击 | xss |
| SQLI | SQL注入漏洞 | sqli |
| TRAVERSAL | 遍历漏洞 | trave |
| XXE | XML外部实体注入 | xxe |
| SSRF | 服务器端请求伪造 | ssrf |
| CSRF | CSRF | csrf |
| REDIRECT | 重定向漏洞 | redirect |
| WEAK_PASSWORD | 弱口令 | weakpwd |
| CRLF | 换行注入 | crlf |
| SENSITIVE | 敏感信息泄露漏洞 | sensi |
| SSTI | 服务器端模板注入 | ssti |
| UNAUTH | 未授权访问 | unauth |
| FILEUPLOAD | 文件上传 | upload |
| CORS | CORS漏洞 | cors |
| OTHER | 其它漏洞 | rce,jndi… |
- api/Type :
| 名称 | 描述 |
|---|---|
| ANALYZE | 被动分析发现 |
| REQUEST | 主动请求发现 |
- api/conf.level :
扫描深度(反映请求量)
| 值 | 描述 |
|---|---|
| 0 | 纯被动分析模式,不做任何请求 |
| 1 | 最低请求量的扫描,最低的业务影响 |
| 2 | 中等请求量的扫描,Payload多为通用Top5 |
| 3 | 大量请求扫描,Payload覆盖面更广 |
- api/conf.risk :
需要扫描的漏洞危害程度
| 值 | 描述 |
|---|---|
| -1 | 难以利用的极低危常见漏洞 |
| 0 | 可能产生1~3级危害的辅助性信息 |
| 1 | 低危漏洞 |
| 2 | 中危漏洞 |
| 3 | 高危漏洞 |
- api/conf.dicts :
被动态导入的字典(dicts)
# 在 dicts 目录下放置 example.txt
> print(conf.dicts["example"])
["1", "2"]
根据你的参考代码,我修改文档如下:
- api/PLACE
参数位置枚举类,定义参数在HTTP请求中的不同位置。
基础位置类型
| 名称 | 描述 |
|---|---|
| PARAM | URL后参数部分 (如: ?key=value) |
| DATA | 在BODY中传递的参数 |
| COOKIE | COOKIE中传递的参数 |
| URL | 伪静态参数 (如: /user/123) |
DATA子类型
DATA类型下包含不同数据格式的参数位置:
| 名称 | 描述 |
|---|---|
| NORMAL_DATA | 常规POST传参格式中的参数 (application/x-www-form-urlencoded) |
| JSON_DATA | JSON格式中的参数 (application/json) |
| XML_DATA | XML格式中的参数 (text/xml, application/xml) |
| MULTIPART_DATA | MULTIPART格式中的参数 (multipart/form-data) |
| ARRAY_LIKE_DATA | ARRAY_LIKE格式中的参数 |
| SOAP_DATA | SOAP_DATA格式中的参数 |
嵌套参数位置
值中的二级参数:
| 名称 | 描述 |
|---|---|
| FORM_VALUE_JSON | 表单值中的JSON参数 |
| PARAM_VALUE_JSON | URL参数值中的JSON参数 |
| COOKIE_JSON_VALUE | Cookie值中的JSON参数 |
- api/Threads :
z0thread = Threads(name="sqli-error") # name : 插件名
z0thread.submit(task_func, task_data, args, kwargs)
# task_func: 要执行的任务函数
# task_data: 任务数据迭代器,每个元素会作为task_func的第一个参数
# args: 传递给task_func的额外位置参数
# kwargs: 传递给task_func的额外关键字参数 (可选)
- api/chat :
AI交互模块
def _ai_validate_with_context(self, context, name, pattern):
# 输入尽量使用英文并规范其输出
prompt = f"""
Please analyze whether the marked sensitive information in the following JavaScript code is genuine:
{context}
Rule Name: {name}
Matching mode: {pattern}
Key points to consider:
1. The context in which the information appears
2. Whether it's in comments or test data
3. Whether it matches common patterns for this type of sensitive information
Respond strictly in the following JSON format:
{{
"valid": boolean, // Whether it's valid sensitive info
"confidence": float, // Confidence level (0.0~1.0)
"type": string, // Type of sensitive info (e.g., API key, password)
"reason": string // Detailed analysis rationale
}}
"""
try:
# 发送对话
response = chat(prompt)
if not response:
return False
try:
analysis = json.loads(response.content) # 解析回答
# 置信度阈值到0.8
if analysis.get('valid', False) and analysis.get('confidence', 0) > 0.8:
return True, analysis.get('type', ''), analysis.get('reason', '')
except json.JSONDecodeError:
logger.error(f"AI response format error, raw response: {response.content}")
return False, None, None
except Exception as e:
logger.error(f"Error during AI validation: {e}")
return False, None, None
return False, None, None
继承PluginBase#
用于编写 PerPage, PerDir, PerDomain
数据#
- self.fingerprints :
| 名称 | 类型 | 描述 |
|---|---|---|
| waf | str/bool | WAF名称(未检测到WAF时为False) |
| fingers | dict | 指纹集(包括os、language、cms…) |
>> print(self.fingerprints.waf)
"WTS" # 没有WAF时为False
>> print(self.fingerprints.fingers)
["thinkphp", "nginx"]
- self.requests :
| 名称 | 类型 | 描述 | 示例 |
|---|---|---|---|
| url | str | 完整的URL(包含GET参数) | https://www.example.com:443/a/file.php?id=1 |
| suffix | str | 文件后缀 | .php |
| protocol | str | 请求协议 | http |
| port | int | 服务端口 | 8888 |
| hostname | str | 域名(不包括端口) | www.myscantest.com |
| netloc | str | 包含协议与端口信息的域名 | https://www.example.com:443 |
| raw | str | 原始的请求包 | / |
| method | str | 请求方法 | GET |
| headers | dict | 请求头字典 | / |
| cookies | dict | COOKIE | / |
| params | dict | 在URL中包含的参数(不含伪静态) | {‘id’: ‘1’} |
| post_hint | str | POST请求包类型 | / |
| data | dict | POST数据 | / |
| path | str | 路径 | /a/file.php |
| body | str | 原始请求头 | / |
| ip | str | 由域名解析为IP | 59.20.13.14 |
注: data、params、cookies、headers与body允许直接requests构造请求
self.requests 仅负责常规解析,如需解析伪静态及xml等格式请求中的参数,见self.generateItemdatas()
- self.response :
| 名称 | 类型 | 描述 | 示例 |
|---|---|---|---|
| status_code | int | 返回状态码 | 200 |
| content | byte | 返回字节类型文本 | / |
| headers | dict | 请求头 | / |
| raw | str | 原始的返回包 | / |
| text | str | 解码后的返回文本 | / |
解析#
- self.generateItemdatas() :
generateItemdatas()会将参数名、参数值及其所处的可控点整理后返回
注意: 它会额外地解析出伪静态及xml等包含的参数,
并按照用户要求选择是否将cookie参数作为解析对象。
>> iterdatas = self.generateItemdatas()
>> print(iterdatas)
[
["id", "1", "URL"],
["username", "admin", "DATA"],
]
- self.insertPayload({“key”: k, “value”: v, “position”: position, “payload”: _payload}) :
令参数名为key,参数值为value(value为可选值),并向参数值后添加payload
最终返回其对应可控点修改后的数据
建议配合self.generateItemdatas()使用
>> datas = self.insertPayload({"key": "username", "value": "admin", "position": "DATA", "payload": "'--+"})
>> print(datas)
{"username": "admin'--+", "passwd": "admin"}
>> r = request.get(url, data=datas)
- self.req(position, payload) :
payload为对应可控点修改后的整体数据,可以为self.insertPayload的返回
需配合self.generateItemdatas()使用
>> datas = {"username": "admin'--+", "passwd": "admin"}
>> r = self.req("DATA", datas)
>> print(r)
… # r为request的返回
返回#
- self.generate_result() :
报告生成
result = self.generate_result() # 初始化
result.main({
"type": Type.REQUEST, # 扫描类型
"url": self.requests.url, # 漏洞URL
"vultype": VulType.SQLI, # 漏洞类型
"show": { # 你希望向命令行展示的信息
"Position": f"{position} > {k}", # 建议键名首字母大写
"Payload": payload,
"Msg": "DBMS_TYPE Maybe {}; Match {}".format(dbms_type, match.group())
}
})
# 验证步骤(可以添加多个过程,如二次验证)
result.step("Request1", { # 这是验证步骤的标题
"request": r.reqinfo, # 验证步骤的请求体
"response": generateResponse(r), # 验证步骤的响应体
"desc": "Dbms Maybe {}; Match {}".format(dbms_type, match.group()) # 验证步骤描述/结论/推断
})
self.success(result) # 生成
requests#
在 z0scan 的 lib/patch/request_patch.py 中向 requests 库增添了新的可选项
quote:是否进行URL编码(默认为True)
record:是否记录本次请求状态到数据库(默认为True)
编写示范#
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from api import generateResponse, random_num, random_str, VulType, Type, PluginBase, conf, logger, Thread
from helper.helper_sensitive import sensitive_page_error_message_check
import re
sqli_errors = {
"Microsoft SQL": [
.... # 省略
]
}
class Z0SCAN(PluginBase):
name = "sqli-error" # 插件名
desc = 'SQL Error Finder' # 描述
version = "2025.3.13" # 版本(最后更新日期)
risk = 2 # 危害等级
# require_reverse = False # 是否需要启用反连平台验证(省略默认False)
def audit(self):
if not self.fingerprints.waf and 2 in conf.risk and conf.level != 0:
_payloads = [
## 宽字节
r'鎈\'"\(',
## 通用报错
r';)\\\'\\"',
r'\' oRdeR bY 500 ',
r';`)',
r'\\',
r"%%2727",
r"%25%27",
r"%60",
r"%5C",
]
if conf.level == 3:
_payloads += [
## 强制报错
# MySQL
r'\' AND 0xG1#',
# PostgreSQL
r"' AND 'a' ~ 'b\[' -- ",
# MSSQL
r"; RAISERROR('Error generated', 16, 1) -- ",
# Oracle
r"' UNION SELECT XMLType('<invalid><xml>') FROM dual -- ",
# SQLite
r"' UNION SELECT SUBSTR('o', -1, 1) -- ",
]
iterdatas = self.generateItemdatas()
# 内置的线程并发
z0thread = Threads(name="sqli-error")
z0thread.submit(self.process, iterdatas, _payloads)
def Get_sql_errors(self):
sql_errors = []
for database, re_strings in rules.items():
for re_string in re_strings:
sql_errors.append((re.compile(re_string, re.IGNORECASE), database))
return sql_errors
def process(self, _, _payloads):
k, v, position = _
for _payload in _payloads:
payload = self.insertPayload({
"key": k,
"value": v,
"position": position,
"payload": _payload
})
r = self.req(position, payload)
if not r:
continue
html = r.text
for sql_regex, dbms_type in self.Get_sql_errors():
match = sql_regex.search(html)
if match:
# 生成报告
result = self.generate_result()
result.main({
"type": Type.REQUEST, # 扫描类型
"url": self.requests.url, # 漏洞URL
"vultype": VulType.SQLI, # 漏洞类型
"show": { # 你希望向命令行展示的信息
"Position": f"{position} > {k}", # 建议键名首字母大写
"Payload": payload,
"Msg": "DBMS_TYPE Maybe {}; Match {}".format(dbms_type, match.group())
}
})
# 验证步骤(可以添加多个过程,如二次验证)
result.step("Request1", {
"request": r.reqinfo,
"response": generateResponse(r),
"desc": "Dbms Maybe {}; Match {}".format(dbms_type, match.group())
})
self.success(result)
return True
message_lists = sensitive_page_error_message_check(html)
if message_lists:
result = self.generate_result()
result.main({
"type": Type.REQUEST,
"url": self.requests.url,
"vultype": VulType.SQLI,
"show": {
"Position": f"{position} > {k}",
"Payload": payload,
"Msg": "Receive Error Msg {}".format(repr(message_lists))
}
})
result.step("Request1", { # 步骤标题
"request": r.reqinfo, # 请求体
"response": generateResponse(r), # 响应体(由generateResponse生成)
"desc": "Receive Error Msg {}".format(repr(message_lists)) # 步骤的关键信息
})
self.success(result)
break
继承_PluginBase#
用于编写 PerHost
数据#
- self.host :
它的值类似于:5.9.2.0:1314
你可以通过splite它以取得ip与端口:self.ip, self.port = self.host.split(":")
- self.sockrecv :
由socket对端口请求后接收的返回
关键代码:
response = sock.recv(256).decode('utf-8', 'ignore')
返回#
- self.generate_result() :
报告生成
result = self.generate_result() # 初始化
result.main({
"type": Type.REQUEST, # 扫描类型
"url": self.requests.url, # 漏洞URL
"vultype": VulType.SQLI, # 漏洞类型
"show": { # 你希望向命令行展示的信息
"Position": f"{position} > {k}", # 建议键名首字母大写
"Payload": payload,
"Msg": "DBMS_TYPE Maybe {}; Match {}".format(dbms_type, match.group())
}
})
# 验证步骤(可以添加多个过程,如二次验证)
result.step("Request1", { # 这是验证步骤的标题
"request": r.reqinfo, # 验证步骤的请求体
"response": generateResponse(r), # 验证步骤的响应体
"desc": "Dbms Maybe {}; Match {}".format(dbms_type, match.group()) # 验证步骤描述/结论/推断
})
self.success(result) # 生成
编写示范#
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# JiuZero 2025/9/12
import socket, binascii
from api import generateResponse, random_num, conf, PLACE, VulType, POST_HINT, Type, _PluginBase, KB, Threads
class Z0SCAN(_PluginBase):
name = "leakpwd-mssql" # 插件名
version = "2025.9.12" # 版本
desc = "Weak Password on MSSQL Server" # 描述
ports = [1433] # 目标端口(可以多个)
fingers = [b'MSSQLSERVER'] # 目标服务的指纹(可以多个)
def __init__(self):
self.right_pwd = None
def audit(self):
self.ip, self.port = self.host.split(":")
userpass = []
for user in conf.lists["mssql-username"]:
for pwd in conf.lists["mssql-password"]:
userpass.append((user, pwd))
z0thread = Threads(name="leakpwd-redis")
z0thread.submit(self.process, userpass)
if self.right_pwd is not None:
result = self.generate_result()
result.main({
"type": Type.REQUEST,
"url": "tcp://" + self.host,
"vultype": VulType.WEAK_PASSWORD,
"show": {
"User/Password": "/".join(self.right_pwd)
}
})
self.success(result)
def process(self, userpwd):
user, pass_ = userpwd
if self.right_pwd is None:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.settimeout(8)
sock.connect((self.ip, self.port))
hh = binascii.b2a_hex(self.ip.encode()).decode()
husername = binascii.b2a_hex(user.encode()).decode()
lusername = len(user)
lpassword = len(pass_)
ladd = len(self.ip) + len(str(self.port)) + 1
hpwd = binascii.b2a_hex(pass_.encode()).decode()
pp = binascii.b2a_hex(str(self.port).encode()).decode()
address = hh + '3a' + pp
# hhost = binascii.b2a_hex(ip.encode()).decode()
data = "0200020000000000123456789000000000000000000000000000000000000000000000000000ZZ5440000000000000000000000000000000000000000000000000000000000X3360000000000000000000000000000000000000000000000000000000000Y373933340000000000000000000000000000000000000000000000000000040301060a09010000000002000000000070796d7373716c000000000000000000000000000000000000000000000007123456789000000000000000000000000000000000000000000000000000ZZ3360000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000Y0402000044422d4c6962726172790a00000000000d1175735f656e676c69736800000000000000000000000000000201004c000000000000000000000a000000000000000000000000000069736f5f31000000000000000000000000000000000000000000000000000501353132000000030000000000000000"
data1 = data.replace(data[16:16 + len(address)], address)
data2 = data1.replace(data1[78:78 + len(husername)], husername)
data3 = data2.replace(data2[140:140 + len(hpwd)], hpwd)
if lusername >= 16:
data4 = data3.replace('0X', str(hex(lusername)).replace('0x', ''))
else:
data4 = data3.replace('X', str(hex(lusername)).replace('0x', ''))
if lpassword >= 16:
data5 = data4.replace('0Y', str(hex(lpassword)).replace('0x', ''))
else:
data5 = data4.replace('Y', str(hex(lpassword)).replace('0x', ''))
hladd = hex(ladd).replace('0x', '')
data6 = data5.replace('ZZ', str(hladd))
data7 = binascii.unhexlify(data6)
sock.send(data7)
packet = sock.recv(1024)
if b'master' in packet:
self.right_pwd = userpwd
except Exception as e:
pass
finally:
sock.close()
使用AI编写#
Z0Scan 虽不能像 afrog与nuclei 那般能够快速借助静态平台快速生成Scanner,
但得益于AI 对Python的掌握,
您可以通过引用本文并提供需求描述以生成高可用性插件。
插件开发需求描述示例:
阅读以下关于 Z0Scan 插件的编写说明,
# 这里复制文本/引用链接(仅对部分AI有效)
请你为我编写满足以下要求的针对XXX漏洞检测的Scanner插件:
1. 需求1
2. 需求2
扫描器集成#
Z0Scan 是开源的,我也希望各位大师傅们能够将 Z0Scan 集成到自己的扫描器中
在api/__init__.py中给出了外部调用方案:
def scan(url, module_name, conf={}, headers={}):
root = modulePath()
cmdline = {
"level": 3
}
cmdline.update(conf)
init(root, cmdline)
r = requests.get(url, headers=headers)
req = FakeReq(url, headers, HTTPMETHOD.GET)
resp = FakeResp(r.status_code, r.content, r.headers)
poc_module = copy.deepcopy(KB["registered"][module_name])
poc_module.execute(req, resp)
