LOADING

加载过慢请开启缓存 浏览器默认开启

Z0Scan - 开发指南

2025/11/9 book z0scan

愿你星河常明

前言说明#

  1. 插件划分

Z0Scan 的 Scanners 分类为:

1. PerPage: 基于页面层面检测的插件集
2. PerDir: 基于子路径层面检测的插件集
3. PerDomain: 基于域名层面检测的插件集
4. PerHost: 基于主机层面检测的插件集
  1. 指纹划分

Z0Scan 指纹集包括 waf 以及 finger, 它们被存储在 data/fingers.db 中

其中 finger 是为 Scanner 的精准切入而诞生的

fingers.db 指纹开发#

语法规则:

匹配:

1. body: 在响应体文本中匹配
2. title: 在标题中匹配
3. header: 在响应头部中匹配(包括键与值)
4. path: 在路径中匹配,如:/admin/main.php
5. body_regex: 在响应体文本中使用正则表达式匹配
6. title_regex: 在标题中使用正则表达式匹配
7. header_regex: 在响应头部中使用正则表达式匹配(包括键与值)
8. path_regex: 在路径中使用正则表达式匹配,如:/admin/main.php

逻辑符:

1. &&: 并且
2. ||: 或
3. (): 优先

示例:

headers="Server: thinkphp" || body="Thinkphp"

关于WAF指纹
在lib/core/finger.py中会将特殊响应状态码及响应超时判断为存在WAF

Scanner 命名#

按照以下命名规则命名:

漏洞类型(简写) + (指纹对象)+ 简述

如 PerPage/PerDir/PerDomain 下存在同名插件,
在单个插件名后按照PerPage:1,PerDir:2,PerDomain:3在名末下划线连接,如:

PerPage 中的 sensi-backup:sensi-backup_1

漏洞类型分类与简写形式可参考api/VulType部分

内置模块#

api#

名称描述使用必选/可选
generateResponse用于报告中响应体的生成generateResponse(r) # r=request.get(…)必选
random_num生成随机数/可选
random_str生成随机字符串/可选
VulType对漏洞类型的选定见api/VulType必选
Type对扫描类型的选定见api/Type必选
PluginBase被继承以获取数据/解析数据/返回结果class Z0SCAN(PluginBase)PerPage等必选
_PluginBase被继承以获取数据/解析数据/返回结果class Z0SCAN(_PluginBase)PerHost必选
conf储存一些命令行参数值主要使用level,见api/conf.level必选
PLACE对漏洞注入点(请求中的可控点)的选定见api/PLACE必选
Threads插件内置线程(针对参数的多线程)见api/Threads可选
  • api/VulType:
名称描述(插件命名)简写
CMD_INNJECTION命令注入漏洞cmdi
CODE_INJECTION代码注入漏洞codei
XSSXSS跨站脚本攻击xss
SQLISQL注入漏洞sqli
TRAVERSAL遍历漏洞trave
XXEXML外部实体注入xxe
SSRF服务器端请求伪造ssrf
CSRFCSRFcsrf
REDIRECT重定向漏洞redirect
WEAK_PASSWORD弱口令weakpwd
CRLF换行注入crlf
SENSITIVE敏感信息泄露漏洞sensi
SSTI服务器端模板注入ssti
UNAUTH未授权访问unauth
FILEUPLOAD文件上传upload
CORSCORS漏洞cors
OTHER其它漏洞rce,jndi…
  • api/Type :
名称描述
ANALYZE被动分析发现
REQUEST主动请求发现
  • api/conf.level :

扫描深度(反映请求量)

描述
0纯被动分析模式,不做任何请求
1最低请求量的扫描,最低的业务影响
2中等请求量的扫描,Payload多为通用Top5
3大量请求扫描,Payload覆盖面更广
  • api/conf.risk :

需要扫描的漏洞危害程度

描述
-1难以利用的极低危常见漏洞
0可能产生1~3级危害的辅助性信息
1低危漏洞
2中危漏洞
3高危漏洞
  • api/conf.dicts :

被动态导入的字典(dicts)

# 在 dicts 目录下放置 example.txt
> print(conf.dicts["example"])
["1", "2"]

根据你的参考代码,我修改文档如下:

  • api/PLACE

参数位置枚举类,定义参数在HTTP请求中的不同位置。

基础位置类型

名称描述
PARAMURL后参数部分 (如: ?key=value)
DATA在BODY中传递的参数
COOKIECOOKIE中传递的参数
URL伪静态参数 (如: /user/123)

DATA子类型

DATA类型下包含不同数据格式的参数位置:

名称描述
NORMAL_DATA常规POST传参格式中的参数 (application/x-www-form-urlencoded)
JSON_DATAJSON格式中的参数 (application/json)
XML_DATAXML格式中的参数 (text/xml, application/xml)
MULTIPART_DATAMULTIPART格式中的参数 (multipart/form-data)
ARRAY_LIKE_DATAARRAY_LIKE格式中的参数
SOAP_DATASOAP_DATA格式中的参数

嵌套参数位置

值中的二级参数:

名称描述
FORM_VALUE_JSON表单值中的JSON参数
PARAM_VALUE_JSONURL参数值中的JSON参数
COOKIE_JSON_VALUECookie值中的JSON参数
  • api/Threads :
z0thread = Threads(name="sqli-error") # name : 插件名
z0thread.submit(task_func, task_data, args, kwargs)
# task_func: 要执行的任务函数
# task_data: 任务数据迭代器,每个元素会作为task_func的第一个参数
# args: 传递给task_func的额外位置参数
# kwargs: 传递给task_func的额外关键字参数 (可选)
  • api/chat :

AI交互模块

def _ai_validate_with_context(self, context, name, pattern):
    # 输入尽量使用英文并规范其输出
    prompt = f"""
        Please analyze whether the marked sensitive information in the following JavaScript code is genuine:

        {context}


        Rule Name: {name}
        Matching mode: {pattern}

        Key points to consider:
        1. The context in which the information appears
        2. Whether it's in comments or test data
        3. Whether it matches common patterns for this type of sensitive information

        Respond strictly in the following JSON format:
        {{
            "valid": boolean,    // Whether it's valid sensitive info
            "confidence": float, // Confidence level (0.0~1.0)
            "type": string,     // Type of sensitive info (e.g., API key, password)
            "reason": string    // Detailed analysis rationale
        }}
    """
    
    try:
        # 发送对话
        response = chat(prompt)
        if not response:
            return False
            
        try:
            analysis = json.loads(response.content) # 解析回答
            # 置信度阈值到0.8
            if analysis.get('valid', False) and analysis.get('confidence', 0) > 0.8:
                return True, analysis.get('type', ''), analysis.get('reason', '')
        except json.JSONDecodeError:
            logger.error(f"AI response format error, raw response: {response.content}")
            return False, None, None
            
    except Exception as e:
        logger.error(f"Error during AI validation: {e}")
        return False, None, None
        
    return False, None, None

继承PluginBase#

用于编写 PerPage, PerDir, PerDomain

数据#

  • self.fingerprints :
名称类型描述
wafstr/boolWAF名称(未检测到WAF时为False)
fingersdict指纹集(包括os、language、cms…)
>> print(self.fingerprints.waf)
"WTS" # 没有WAF时为False
>> print(self.fingerprints.fingers)
["thinkphp", "nginx"]
  • self.requests :
名称类型描述示例
urlstr完整的URL(包含GET参数)https://www.example.com:443/a/file.php?id=1
suffixstr文件后缀.php
protocolstr请求协议http
portint服务端口8888
hostnamestr域名(不包括端口)www.myscantest.com
netlocstr包含协议与端口信息的域名https://www.example.com:443
rawstr原始的请求包/
methodstr请求方法GET
headersdict请求头字典/
cookiesdictCOOKIE/
paramsdict在URL中包含的参数(不含伪静态){‘id’: ‘1’}
post_hintstrPOST请求包类型/
datadictPOST数据/
pathstr路径/a/file.php
bodystr原始请求头/
ipstr由域名解析为IP59.20.13.14

注: data、params、cookies、headers与body允许直接requests构造请求

self.requests 仅负责常规解析,如需解析伪静态及xml等格式请求中的参数,见self.generateItemdatas()

  • self.response :
名称类型描述示例
status_codeint返回状态码200
contentbyte返回字节类型文本/
headersdict请求头/
rawstr原始的返回包/
textstr解码后的返回文本/

解析#

  • self.generateItemdatas() :

generateItemdatas()会将参数名、参数值及其所处的可控点整理后返回

注意: 它会额外地解析出伪静态及xml等包含的参数,
并按照用户要求选择是否将cookie参数作为解析对象。

>> iterdatas = self.generateItemdatas()
>> print(iterdatas)
[
    ["id", "1", "URL"],
    ["username", "admin", "DATA"],
]
  • self.insertPayload({“key”: k, “value”: v, “position”: position, “payload”: _payload}) :

令参数名为key,参数值为value(value为可选值),并向参数值后添加payload

最终返回其对应可控点修改后的数据

建议配合self.generateItemdatas()使用

>> datas = self.insertPayload({"key": "username", "value": "admin", "position": "DATA", "payload": "'--+"})
>> print(datas)
{"username": "admin'--+", "passwd": "admin"}
>> r = request.get(url, data=datas)
  • self.req(position, payload) :

payload为对应可控点修改后的整体数据,可以为self.insertPayload的返回

需配合self.generateItemdatas()使用

>> datas = {"username": "admin'--+", "passwd": "admin"}
>> r = self.req("DATA", datas)
>> print(r)
… # r为request的返回

返回#

  • self.generate_result() :

报告生成

result = self.generate_result() # 初始化
result.main({
    "type": Type.REQUEST, # 扫描类型
    "url": self.requests.url, # 漏洞URL
    "vultype": VulType.SQLI, # 漏洞类型
    "show": { # 你希望向命令行展示的信息
        "Position": f"{position} > {k}", # 建议键名首字母大写
        "Payload": payload, 
        "Msg": "DBMS_TYPE Maybe {}; Match {}".format(dbms_type, match.group())
        }
    })
# 验证步骤(可以添加多个过程,如二次验证)
result.step("Request1", { # 这是验证步骤的标题
    "request": r.reqinfo, # 验证步骤的请求体
    "response": generateResponse(r), # 验证步骤的响应体
    "desc": "Dbms Maybe {}; Match {}".format(dbms_type, match.group()) # 验证步骤描述/结论/推断
    })
self.success(result) # 生成

requests#

在 z0scan 的 lib/patch/request_patch.py 中向 requests 库增添了新的可选项

quote:是否进行URL编码(默认为True)

record:是否记录本次请求状态到数据库(默认为True)

编写示范#

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from api import generateResponse, random_num, random_str, VulType, Type, PluginBase, conf, logger, Thread
from helper.helper_sensitive import sensitive_page_error_message_check
import re

sqli_errors = {
    "Microsoft SQL": [
    .... # 省略
    ]
}

class Z0SCAN(PluginBase):
    name = "sqli-error" # 插件名
    desc = 'SQL Error Finder' # 描述
    version = "2025.3.13" # 版本(最后更新日期)
    risk = 2 # 危害等级
    # require_reverse = False # 是否需要启用反连平台验证(省略默认False)
        
    def audit(self):
        if not self.fingerprints.waf and 2 in conf.risk and conf.level != 0:
            _payloads = [
                ## 宽字节
                r'鎈\'"\(',
                ## 通用报错
                r';)\\\'\\"',
                r'\' oRdeR bY 500 ',
                r';`)',
                r'\\', 
                r"%%2727", 
                r"%25%27", 
                r"%60", 
                r"%5C",
            ]
            if conf.level == 3: 
                _payloads += [
                ## 强制报错
                # MySQL
                r'\' AND 0xG1#',
                # PostgreSQL  
                r"' AND 'a' ~ 'b\[' -- ",
                # MSSQL
                r"; RAISERROR('Error generated', 16, 1) -- ", 
                # Oracle
                r"' UNION SELECT XMLType('<invalid><xml>') FROM dual -- ",  
                # SQLite
                r"' UNION SELECT SUBSTR('o', -1, 1) -- ",
                ]
    
            iterdatas = self.generateItemdatas()
            # 内置的线程并发
            z0thread = Threads(name="sqli-error")
            z0thread.submit(self.process, iterdatas, _payloads)
    
    def Get_sql_errors(self):
        sql_errors = []
        for database, re_strings in rules.items():
            for re_string in re_strings:
                sql_errors.append((re.compile(re_string, re.IGNORECASE), database))
        return sql_errors
    
    def process(self, _, _payloads):
        k, v, position = _
        for _payload in _payloads:
            payload = self.insertPayload({
                "key": k, 
                "value": v, 
                "position": position, 
                "payload": _payload
                })
            r = self.req(position, payload)
            if not r:
                continue
            html = r.text
            for sql_regex, dbms_type in self.Get_sql_errors():
                match = sql_regex.search(html)
                if match:
                    # 生成报告
                    result = self.generate_result()
                    result.main({
                        "type": Type.REQUEST, # 扫描类型
                        "url": self.requests.url, # 漏洞URL
                        "vultype": VulType.SQLI, # 漏洞类型
                        "show": { # 你希望向命令行展示的信息
                            "Position": f"{position} > {k}", # 建议键名首字母大写
                            "Payload": payload, 
                            "Msg": "DBMS_TYPE Maybe {}; Match {}".format(dbms_type, match.group())
                            }
                        })
                    # 验证步骤(可以添加多个过程,如二次验证)
                    result.step("Request1", {
                        "request": r.reqinfo, 
                        "response": generateResponse(r), 
                        "desc": "Dbms Maybe {}; Match {}".format(dbms_type, match.group())
                        })
                    self.success(result)
                    return True
            message_lists = sensitive_page_error_message_check(html)
            if message_lists:
                result = self.generate_result()
                result.main({
                    "type": Type.REQUEST, 
                    "url": self.requests.url, 
                    "vultype": VulType.SQLI, 
                    "show": {
                        "Position": f"{position} > {k}", 
                        "Payload": payload, 
                        "Msg": "Receive Error Msg {}".format(repr(message_lists))
                        }
                    })
                result.step("Request1", { # 步骤标题
                    "request": r.reqinfo, # 请求体
                    "response": generateResponse(r), # 响应体(由generateResponse生成)
                    "desc": "Receive Error Msg {}".format(repr(message_lists)) # 步骤的关键信息
                    })
                self.success(result)
                break
    

继承_PluginBase#

用于编写 PerHost

数据#

  • self.host :

它的值类似于:5.9.2.0:1314

你可以通过splite它以取得ip与端口:self.ip, self.port = self.host.split(":")

  • self.sockrecv :

由socket对端口请求后接收的返回

关键代码:

response = sock.recv(256).decode('utf-8', 'ignore')

返回#

  • self.generate_result() :

报告生成

result = self.generate_result() # 初始化
result.main({
    "type": Type.REQUEST, # 扫描类型
    "url": self.requests.url, # 漏洞URL
    "vultype": VulType.SQLI, # 漏洞类型
    "show": { # 你希望向命令行展示的信息
        "Position": f"{position} > {k}", # 建议键名首字母大写
        "Payload": payload, 
        "Msg": "DBMS_TYPE Maybe {}; Match {}".format(dbms_type, match.group())
        }
    })
# 验证步骤(可以添加多个过程,如二次验证)
result.step("Request1", { # 这是验证步骤的标题
    "request": r.reqinfo, # 验证步骤的请求体
    "response": generateResponse(r), # 验证步骤的响应体
    "desc": "Dbms Maybe {}; Match {}".format(dbms_type, match.group()) # 验证步骤描述/结论/推断
    })
self.success(result) # 生成

编写示范#

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# JiuZero 2025/9/12

import socket, binascii
from api import generateResponse, random_num, conf, PLACE, VulType, POST_HINT, Type, _PluginBase, KB, Threads

class Z0SCAN(_PluginBase):
    name = "leakpwd-mssql" # 插件名
    version = "2025.9.12" # 版本
    desc = "Weak Password on MSSQL Server" # 描述
    ports = [1433] # 目标端口(可以多个)
    fingers = [b'MSSQLSERVER'] # 目标服务的指纹(可以多个)
    
    def __init__(self):
        self.right_pwd = None
    
    def audit(self):
        self.ip, self.port = self.host.split(":")
        userpass = []
        for user in conf.lists["mssql-username"]:
            for pwd in conf.lists["mssql-password"]:
                userpass.append((user, pwd))
        z0thread = Threads(name="leakpwd-redis")
        z0thread.submit(self.process, userpass)
        if self.right_pwd is not None:
            result = self.generate_result()
            result.main({
                "type": Type.REQUEST, 
                "url": "tcp://" + self.host, 
                "vultype": VulType.WEAK_PASSWORD, 
                "show": {
                    "User/Password": "/".join(self.right_pwd)
                }
            })
            self.success(result)

    def process(self, userpwd):
        user, pass_ = userpwd
        if self.right_pwd is None:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            try:
                sock.settimeout(8)
                sock.connect((self.ip, self.port))
                hh = binascii.b2a_hex(self.ip.encode()).decode()
                husername = binascii.b2a_hex(user.encode()).decode()
                lusername = len(user)
                lpassword = len(pass_)
                ladd = len(self.ip) + len(str(self.port)) + 1
                hpwd = binascii.b2a_hex(pass_.encode()).decode()
                pp = binascii.b2a_hex(str(self.port).encode()).decode()
                address = hh + '3a' + pp
                # hhost = binascii.b2a_hex(ip.encode()).decode()
                data = "0200020000000000123456789000000000000000000000000000000000000000000000000000ZZ5440000000000000000000000000000000000000000000000000000000000X3360000000000000000000000000000000000000000000000000000000000Y373933340000000000000000000000000000000000000000000000000000040301060a09010000000002000000000070796d7373716c000000000000000000000000000000000000000000000007123456789000000000000000000000000000000000000000000000000000ZZ3360000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000Y0402000044422d4c6962726172790a00000000000d1175735f656e676c69736800000000000000000000000000000201004c000000000000000000000a000000000000000000000000000069736f5f31000000000000000000000000000000000000000000000000000501353132000000030000000000000000"
                data1 = data.replace(data[16:16 + len(address)], address)
                data2 = data1.replace(data1[78:78 + len(husername)], husername)
                data3 = data2.replace(data2[140:140 + len(hpwd)], hpwd)
                if lusername >= 16:
                    data4 = data3.replace('0X', str(hex(lusername)).replace('0x', ''))
                else:
                    data4 = data3.replace('X', str(hex(lusername)).replace('0x', ''))
                if lpassword >= 16:
                    data5 = data4.replace('0Y', str(hex(lpassword)).replace('0x', ''))
                else:
                    data5 = data4.replace('Y', str(hex(lpassword)).replace('0x', ''))
                hladd = hex(ladd).replace('0x', '')
                data6 = data5.replace('ZZ', str(hladd))
                data7 = binascii.unhexlify(data6)
                sock.send(data7)
                packet = sock.recv(1024)
                if b'master' in packet:
                    self.right_pwd = userpwd
            except Exception as e:
                pass
            finally:
                sock.close()

使用AI编写#

Z0Scan 虽不能像 afrog与nuclei 那般能够快速借助静态平台快速生成Scanner,
但得益于AI 对Python的掌握,
您可以通过引用本文并提供需求描述以生成高可用性插件。

插件开发需求描述示例:

阅读以下关于 Z0Scan 插件的编写说明,
# 这里复制文本/引用链接(仅对部分AI有效)
请你为我编写满足以下要求的针对XXX漏洞检测的Scanner插件:
1. 需求1
2. 需求2

扫描器集成#

Z0Scan 是开源的,我也希望各位大师傅们能够将 Z0Scan 集成到自己的扫描器中

api/__init__.py中给出了外部调用方案:

def scan(url, module_name, conf={}, headers={}):
    root = modulePath()
    cmdline = {
        "level": 3
    }
    cmdline.update(conf)
    init(root, cmdline)
    r = requests.get(url, headers=headers)
    req = FakeReq(url, headers, HTTPMETHOD.GET)
    resp = FakeResp(r.status_code, r.content, r.headers)
    poc_module = copy.deepcopy(KB["registered"][module_name])
    poc_module.execute(req, resp)