diff --git a/__pycache__/main.cpython-311.pyc b/__pycache__/main.cpython-311.pyc deleted file mode 100644 index 460408d..0000000 Binary files a/__pycache__/main.cpython-311.pyc and /dev/null differ diff --git a/main.py b/main.py index 61e7ac1..b2d537f 100755 --- a/main.py +++ b/main.py @@ -7,75 +7,33 @@ import pdb import os import random import re -import json from enum import Enum - -# ============================================================================== -# ENUMs -# ============================================================================== - +IP = "192.168.42.100" class API_CMD_IDS(Enum): - READ_SYS_IP = 516 READ_PORTS = 768 - WRITE_PORT = 770 READ_VLANS = 1283 - CREATE_VLAN = 1285 - UPDATE_VLAN = 1287 - DELETE_VLAN = 1289 READ_VLAN_PORTS = 1290 - UPDATE_VLAN_PORT = 1292 - # READ_VLAN_PORT_CFG needs vid as a parameter and is parsed by parse_vlan_ports - READ_VLAN_PORT_CFG = 1293 + READ_VLAN_PORT_CFG = 1293 # this needs vid as a parameter and is parsed by parse_vlan_ports READ_STP_PORTS = 4098 -class PORT_VLAN_MODE(Enum): +class VLAN_PORT_MODE(Enum): GENERAL = 0 ACCESS = 1 TRUNK = 2 DOT1QTUNNEL = 3 -class PORT_VLAN_ACL(Enum): +class VLAN_PORT_ACL(Enum): EXCLUDED = 0 FORBIDDEN = 1 TAGGED = 2 UNTAGGED = 3 -class PORT_STATE(Enum): - ENABLED = 1 - DISABLED = 0 - - -class PORT_SPEED(Enum): - AUTO = 0 - E10M = 1 - E100M = 2 - E1000M = 3 - - -class PORT_DUPLEX(Enum): - AUTO = 0 - FULL = 1 - HALF = 2 - - -class PORT_FLOW_CONTROL(Enum): - ENABLED = 1 - DISABLED = 0 - - -# ============================================================================== -# Util functions -# ============================================================================== - - def zyxel_password(pw: str) -> str: - """Generates an obfuscated password from a cleartext pw""" - alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" result = "" l = len(pw) @@ -98,98 +56,7 @@ def zyxel_password(pw: str) -> str: return result -def dictify(data: list, key: str) -> dict: - """ - converts a list of dictionaries to a single dictionary by using - the `key`'s value of each element as the new key - """ - result = {} - for d in data: - k = d[key] - del d[key] - result.update({k: d}) - return result - - -# ============================================================================== -# Parsing functions -# ============================================================================== - - -def parse_xssid(response: str) -> str: - """parses a xssid value from html source containing it""" - soup = BeautifulSoup(response, "html.parser") - xssid_input = soup.find_all("input", attrs={"type": "hidden", "name": "XSSID"}) - try: - inp = xssid_input[0] - except IndexError: - return None - return inp.attrs.get("value") - - -def parse_vlan_ports(response: str) -> dict: - """parses the response of a READ_VLAN_PORTS get-command""" - soup = BeautifulSoup(response, "html.parser") - - data = {} - port_settings = soup.find_all( - "input", attrs={"type": "hidden", "id": re.compile(r"vlanMode_\d+")} - ) - for port_setting in port_settings: - port = port_setting.find_previous().text.strip() - port_type = int(port_setting.attrs.get("value")) - - acl = list( - filter( - lambda p: "checked" in p.attrs, - port_setting.find_next().find_all("input"), - ) - )[0] - - data.update( - { - port: { - "mode": PORT_VLAN_MODE(port_type), - "acl": PORT_VLAN_ACL(int(acl.attrs.get("value"))), - } - } - ) - - return data - - -def extract_data_from_table(response: str) -> list: - """ - parses the _last_ table from a HTML text and returns a list of dicts, each - dict contains one line from the table - """ - soup = BeautifulSoup(response, "html.parser") - table_body = soup.find_all("table")[-1] - - rows = table_body.find_all("tr") - keys = [ele.text.strip() for ele in rows[0].find_all("td")] - entries = [] - for row in rows[1:]: - cols = row.find_all("td") - cols = [ele.text.strip() for ele in cols] - entries.append({k: v for (k, v) in zip(keys, cols) if k.strip() != ""}) - - return list(filter(lambda e: e, entries)) - - -# ============================================================================== -# "API" request functions -# ============================================================================== - - def get_login_cookie(host: str, username: str, password: str) -> str: - """ - logs into the webservice using username and password for authentication. - - - `host` - hostname or IP address of the device - - `username` - username for login - - `password` - cleartext password, it will be encoded using `zyxel_password` - """ auth_id = requests.get( f"http://{host}/cgi-bin/dispatcher.cgi", params={"login": 1, "username": username, "password": zyxel_password(password)}, @@ -212,7 +79,6 @@ def get_login_cookie(host: str, username: str, password: str) -> str: def get_cmd(host: str, xssid: str, cmd: int, **params) -> str: - """sends a get command, returns the plaintext answer""" params.update({"cmd": cmd}) return requests.get( f"http://{host}/cgi-bin/dispatcher.cgi", @@ -221,137 +87,68 @@ def get_cmd(host: str, xssid: str, cmd: int, **params) -> str: ).text -def set_cmd(host: str, xssid: str, cmd: int, **params) -> str: - """ - sends a set command, returns the plaintext answer. - You can supply params for the command as keyword arguments. - """ - # inject command into data - params.update({"cmd": cmd}) +def parse_vlan_ports(response: str) -> dict: + soup = BeautifulSoup(response, "html.parser") - # get "CSRF" token for submitting a form - token = parse_xssid(get_cmd(host, xssid, API_CMD_IDS.READ_SYS_IP.value)) - params.update({"XSSID": token}) - if token is None: - raise Exception("unable to get XSSID token") - - # call api - return requests.post( - f"http://{host}/cgi-bin/dispatcher.cgi", - data=params, - cookies={"HTTP_XSSID": xssid}, - ).text - - -# ============================================================================== -# Set command wrapper functions -# ============================================================================== - - -def update_port( - host: str, - xssid: str, - port: str, - description: str, - state: PORT_STATE, - speed: PORT_SPEED, - duplex: PORT_DUPLEX, - fc: PORT_FLOW_CONTROL, -): - """configures a port, returns true on success""" - return "window.location.replace" in set_cmd( - host, - xssid, - API_CMD_IDS.WRITE_PORT.value, - portlist=port, - descp=description, - state=str(state.value), - speed=str(speed.value), - duplex=str(duplex.value), - fc=str(fc.value), - sysSubmit="Apply", + data = {} + port_settings = soup.find_all( + "input", attrs={"type": "hidden", "id": re.compile(r"vlanMode_\d+")} ) + for port_setting in port_settings: + port = port_setting.find_previous().text.strip() + port_type = int(port_setting.attrs.get("value")) + + acl = list( + filter( + lambda p: "checked" in p.attrs, + port_setting.find_next().find_all("input"), + ) + )[0] + + data.update( + { + port: { + "mode": VLAN_PORT_MODE(port_type), + "acl": VLAN_PORT_ACL(int(acl.attrs.get("value"))), + } + } + ) + + return data -def create_vlan(host: str, cookie: str, vlan_id: int, vlan_name_prefix="VLAN"): - return "window.location.replace" in set_cmd( - host, - cookie, - API_CMD_IDS.CREATE_VLAN.value, - vlanlist=str(vlan_id), - vlanAction="0", - name=vlan_name_prefix, - sysSubmit="Apply", - ) +def extract_data_from_table(response: str) -> dict: + soup = BeautifulSoup(response, "html.parser") + + data = {} + table_body = soup.find_all("table")[-1] + + rows = table_body.find_all("tr") + keys = [ele.text.strip() for ele in rows[0].find_all("td")] + entries = [] + for row in rows[1:]: + cols = row.find_all("td") + cols = [ele.text.strip() for ele in cols] + entries.append({k: v for (k, v) in zip(keys, cols) if k.strip() != ""}) + + return list(filter(lambda e: e, entries)) -def update_vlan_name(host: str, cookie: str, vlan_id: int, vlan_name: str): - return "window.location.replace" in set_cmd( - host, - cookie, - API_CMD_IDS.UPDATE_VLAN.value, - vidValue=str(vlan_id), - editName=vlan_name, - sysSubmit="Apply", - ) - - -def delete_vlan(host: str, cookie: str, vlan_id: int): - return "window.location.replace" in get_cmd( - host, cookie, API_CMD_IDS.DELETE_VLAN.value, _del=vlan_id - ) - - -def update_port_vlan(host: str, cookie: str, port: str): - pass - - -# ============================================================================== -# Get command wrapper functions -# ============================================================================== - - -def read_ports(host: str, xssid: str) -> dict: - return dictify( - extract_data_from_table(get_cmd(host, xssid, API_CMD_IDS.READ_PORTS.value)), - "Port", - ) - - -def read_vlans(host: str, xssid: str) -> dict: - return dictify( - extract_data_from_table(get_cmd(host, xssid, API_CMD_IDS.READ_VLANS.value)), - "VLAN ID", - ) - - -def read_vlan_ports(host: str, xssid: str) -> dict: - return dictify( - extract_data_from_table( - get_cmd(host, xssid, API_CMD_IDS.READ_VLAN_PORTS.value) - ), - "Port", - ) - - -def read_stp_ports(host: str, xssid: str) -> dict: - return dictify( - extract_data_from_table(get_cmd(host, xssid, API_CMD_IDS.READ_STP_PORTS.value)), - "Port", - ) - - -def read_vlan_port_cfg(host: str, xssid: str, vid: int) -> dict: - return dictify( - parse_vlan_ports( - get_cmd(host, xssid, API_CMD_IDS.READ_VLAN_PORT_CFG.value, vid=vid) - ), - "VLAN", - ) +def dictify(data: list, key: str) -> dict: + result = {} + for d in data: + k = d[key] + del d[key] + result.update({k: d}) + return result if __name__ == "__main__": - IP = "192.168.42.100" + cookie = get_login_cookie(IP, "admin", os.environ.get("ADMIN_PW")) - print(read_stp_ports(IP, cookie)) + print(dictify(extract_data_from_table(get_cmd(IP, cookie, API_CMD_IDS.READ_PORTS.value)), "Port")) + print(dictify(extract_data_from_table(get_cmd(IP, cookie, API_CMD_IDS.READ_VLANS.value)), "VLAN ID")) + print(dictify(extract_data_from_table(get_cmd(IP, cookie, API_CMD_IDS.READ_VLAN_PORTS.value)), "Port")) + print(dictify(extract_data_from_table(get_cmd(IP, cookie, API_CMD_IDS.READ_STP_PORTS.value)), "Port")) + print(parse_vlan_ports(get_cmd(IP, cookie, API_CMD_IDS.READ_VLAN_PORT_CFG.value, vid=2)))