diff --git a/__pycache__/main.cpython-311.pyc b/__pycache__/main.cpython-311.pyc new file mode 100644 index 0000000..460408d Binary files /dev/null and b/__pycache__/main.cpython-311.pyc differ diff --git a/main.py b/main.py index b2d537f..61e7ac1 100755 --- a/main.py +++ b/main.py @@ -7,33 +7,75 @@ import pdb import os import random import re +import json from enum import Enum -IP = "192.168.42.100" + +# ============================================================================== +# ENUMs +# ============================================================================== + class API_CMD_IDS(Enum): + READ_SYS_IP = 516 READ_PORTS = 768 + WRITE_PORT = 770 READ_VLANS = 1283 + CREATE_VLAN = 1285 + UPDATE_VLAN = 1287 + DELETE_VLAN = 1289 READ_VLAN_PORTS = 1290 - READ_VLAN_PORT_CFG = 1293 # this needs vid as a parameter and is parsed by parse_vlan_ports + UPDATE_VLAN_PORT = 1292 + # READ_VLAN_PORT_CFG needs vid as a parameter and is parsed by parse_vlan_ports + READ_VLAN_PORT_CFG = 1293 READ_STP_PORTS = 4098 -class VLAN_PORT_MODE(Enum): +class PORT_VLAN_MODE(Enum): GENERAL = 0 ACCESS = 1 TRUNK = 2 DOT1QTUNNEL = 3 -class VLAN_PORT_ACL(Enum): +class PORT_VLAN_ACL(Enum): EXCLUDED = 0 FORBIDDEN = 1 TAGGED = 2 UNTAGGED = 3 +class PORT_STATE(Enum): + ENABLED = 1 + DISABLED = 0 + + +class PORT_SPEED(Enum): + AUTO = 0 + E10M = 1 + E100M = 2 + E1000M = 3 + + +class PORT_DUPLEX(Enum): + AUTO = 0 + FULL = 1 + HALF = 2 + + +class PORT_FLOW_CONTROL(Enum): + ENABLED = 1 + DISABLED = 0 + + +# ============================================================================== +# Util functions +# ============================================================================== + + def zyxel_password(pw: str) -> str: + """Generates an obfuscated password from a cleartext pw""" + alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" result = "" l = len(pw) @@ -56,7 +98,98 @@ def zyxel_password(pw: str) -> str: return result +def dictify(data: list, key: str) -> dict: + """ + converts a list of dictionaries to a single dictionary by using + the `key`'s value of each element as the new key + """ + result = {} + for d in data: + k = d[key] + del d[key] + result.update({k: d}) + return result + + +# ============================================================================== +# Parsing functions +# ============================================================================== + + +def parse_xssid(response: str) -> str: + """parses a xssid value from html source containing it""" + soup = BeautifulSoup(response, "html.parser") + xssid_input = soup.find_all("input", attrs={"type": "hidden", "name": "XSSID"}) + try: + inp = xssid_input[0] + except IndexError: + return None + return inp.attrs.get("value") + + +def parse_vlan_ports(response: str) -> dict: + """parses the response of a READ_VLAN_PORTS get-command""" + soup = BeautifulSoup(response, "html.parser") + + data = {} + port_settings = soup.find_all( + "input", attrs={"type": "hidden", "id": re.compile(r"vlanMode_\d+")} + ) + for port_setting in port_settings: + port = port_setting.find_previous().text.strip() + port_type = int(port_setting.attrs.get("value")) + + acl = list( + filter( + lambda p: "checked" in p.attrs, + port_setting.find_next().find_all("input"), + ) + )[0] + + data.update( + { + port: { + "mode": PORT_VLAN_MODE(port_type), + "acl": PORT_VLAN_ACL(int(acl.attrs.get("value"))), + } + } + ) + + return data + + +def extract_data_from_table(response: str) -> list: + """ + parses the _last_ table from a HTML text and returns a list of dicts, each + dict contains one line from the table + """ + soup = BeautifulSoup(response, "html.parser") + table_body = soup.find_all("table")[-1] + + rows = table_body.find_all("tr") + keys = [ele.text.strip() for ele in rows[0].find_all("td")] + entries = [] + for row in rows[1:]: + cols = row.find_all("td") + cols = [ele.text.strip() for ele in cols] + entries.append({k: v for (k, v) in zip(keys, cols) if k.strip() != ""}) + + return list(filter(lambda e: e, entries)) + + +# ============================================================================== +# "API" request functions +# ============================================================================== + + def get_login_cookie(host: str, username: str, password: str) -> str: + """ + logs into the webservice using username and password for authentication. + + - `host` - hostname or IP address of the device + - `username` - username for login + - `password` - cleartext password, it will be encoded using `zyxel_password` + """ auth_id = requests.get( f"http://{host}/cgi-bin/dispatcher.cgi", params={"login": 1, "username": username, "password": zyxel_password(password)}, @@ -79,6 +212,7 @@ def get_login_cookie(host: str, username: str, password: str) -> str: def get_cmd(host: str, xssid: str, cmd: int, **params) -> str: + """sends a get command, returns the plaintext answer""" params.update({"cmd": cmd}) return requests.get( f"http://{host}/cgi-bin/dispatcher.cgi", @@ -87,68 +221,137 @@ def get_cmd(host: str, xssid: str, cmd: int, **params) -> str: ).text -def parse_vlan_ports(response: str) -> dict: - soup = BeautifulSoup(response, "html.parser") +def set_cmd(host: str, xssid: str, cmd: int, **params) -> str: + """ + sends a set command, returns the plaintext answer. + You can supply params for the command as keyword arguments. + """ + # inject command into data + params.update({"cmd": cmd}) - data = {} - port_settings = soup.find_all( - "input", attrs={"type": "hidden", "id": re.compile(r"vlanMode_\d+")} + # get "CSRF" token for submitting a form + token = parse_xssid(get_cmd(host, xssid, API_CMD_IDS.READ_SYS_IP.value)) + params.update({"XSSID": token}) + if token is None: + raise Exception("unable to get XSSID token") + + # call api + return requests.post( + f"http://{host}/cgi-bin/dispatcher.cgi", + data=params, + cookies={"HTTP_XSSID": xssid}, + ).text + + +# ============================================================================== +# Set command wrapper functions +# ============================================================================== + + +def update_port( + host: str, + xssid: str, + port: str, + description: str, + state: PORT_STATE, + speed: PORT_SPEED, + duplex: PORT_DUPLEX, + fc: PORT_FLOW_CONTROL, +): + """configures a port, returns true on success""" + return "window.location.replace" in set_cmd( + host, + xssid, + API_CMD_IDS.WRITE_PORT.value, + portlist=port, + descp=description, + state=str(state.value), + speed=str(speed.value), + duplex=str(duplex.value), + fc=str(fc.value), + sysSubmit="Apply", ) - for port_setting in port_settings: - port = port_setting.find_previous().text.strip() - port_type = int(port_setting.attrs.get("value")) - - acl = list( - filter( - lambda p: "checked" in p.attrs, - port_setting.find_next().find_all("input"), - ) - )[0] - - data.update( - { - port: { - "mode": VLAN_PORT_MODE(port_type), - "acl": VLAN_PORT_ACL(int(acl.attrs.get("value"))), - } - } - ) - - return data -def extract_data_from_table(response: str) -> dict: - soup = BeautifulSoup(response, "html.parser") - - data = {} - table_body = soup.find_all("table")[-1] - - rows = table_body.find_all("tr") - keys = [ele.text.strip() for ele in rows[0].find_all("td")] - entries = [] - for row in rows[1:]: - cols = row.find_all("td") - cols = [ele.text.strip() for ele in cols] - entries.append({k: v for (k, v) in zip(keys, cols) if k.strip() != ""}) - - return list(filter(lambda e: e, entries)) +def create_vlan(host: str, cookie: str, vlan_id: int, vlan_name_prefix="VLAN"): + return "window.location.replace" in set_cmd( + host, + cookie, + API_CMD_IDS.CREATE_VLAN.value, + vlanlist=str(vlan_id), + vlanAction="0", + name=vlan_name_prefix, + sysSubmit="Apply", + ) -def dictify(data: list, key: str) -> dict: - result = {} - for d in data: - k = d[key] - del d[key] - result.update({k: d}) - return result +def update_vlan_name(host: str, cookie: str, vlan_id: int, vlan_name: str): + return "window.location.replace" in set_cmd( + host, + cookie, + API_CMD_IDS.UPDATE_VLAN.value, + vidValue=str(vlan_id), + editName=vlan_name, + sysSubmit="Apply", + ) + + +def delete_vlan(host: str, cookie: str, vlan_id: int): + return "window.location.replace" in get_cmd( + host, cookie, API_CMD_IDS.DELETE_VLAN.value, _del=vlan_id + ) + + +def update_port_vlan(host: str, cookie: str, port: str): + pass + + +# ============================================================================== +# Get command wrapper functions +# ============================================================================== + + +def read_ports(host: str, xssid: str) -> dict: + return dictify( + extract_data_from_table(get_cmd(host, xssid, API_CMD_IDS.READ_PORTS.value)), + "Port", + ) + + +def read_vlans(host: str, xssid: str) -> dict: + return dictify( + extract_data_from_table(get_cmd(host, xssid, API_CMD_IDS.READ_VLANS.value)), + "VLAN ID", + ) + + +def read_vlan_ports(host: str, xssid: str) -> dict: + return dictify( + extract_data_from_table( + get_cmd(host, xssid, API_CMD_IDS.READ_VLAN_PORTS.value) + ), + "Port", + ) + + +def read_stp_ports(host: str, xssid: str) -> dict: + return dictify( + extract_data_from_table(get_cmd(host, xssid, API_CMD_IDS.READ_STP_PORTS.value)), + "Port", + ) + + +def read_vlan_port_cfg(host: str, xssid: str, vid: int) -> dict: + return dictify( + parse_vlan_ports( + get_cmd(host, xssid, API_CMD_IDS.READ_VLAN_PORT_CFG.value, vid=vid) + ), + "VLAN", + ) if __name__ == "__main__": - + IP = "192.168.42.100" cookie = get_login_cookie(IP, "admin", os.environ.get("ADMIN_PW")) - print(dictify(extract_data_from_table(get_cmd(IP, cookie, API_CMD_IDS.READ_PORTS.value)), "Port")) - print(dictify(extract_data_from_table(get_cmd(IP, cookie, API_CMD_IDS.READ_VLANS.value)), "VLAN ID")) - print(dictify(extract_data_from_table(get_cmd(IP, cookie, API_CMD_IDS.READ_VLAN_PORTS.value)), "Port")) - print(dictify(extract_data_from_table(get_cmd(IP, cookie, API_CMD_IDS.READ_STP_PORTS.value)), "Port")) - print(parse_vlan_ports(get_cmd(IP, cookie, API_CMD_IDS.READ_VLAN_PORT_CFG.value, vid=2))) + print(read_stp_ports(IP, cookie))