#!/usr/bin/env python3 from bs4 import BeautifulSoup import time import requests import pdb import os import random import re import json from enum import Enum # ============================================================================== # ENUMs # ============================================================================== class API_CMD_IDS(Enum): READ_SYS_IP = 516 READ_PORTS = 768 WRITE_PORT = 770 READ_VLANS = 1283 CREATE_VLAN = 1285 UPDATE_VLAN = 1287 DELETE_VLAN = 1289 READ_VLAN_PORTS = 1290 UPDATE_VLAN_PORT = 1292 # READ_VLAN_PORT_CFG needs vid as a parameter and is parsed by parse_vlan_ports READ_VLAN_PORT_CFG = 1293 READ_STP_PORTS = 4098 class PORT_VLAN_MODE(Enum): GENERAL = 0 ACCESS = 1 TRUNK = 2 DOT1QTUNNEL = 3 class PORT_VLAN_ACL(Enum): EXCLUDED = 0 FORBIDDEN = 1 TAGGED = 2 UNTAGGED = 3 class PORT_STATE(Enum): ENABLED = 1 DISABLED = 0 class PORT_SPEED(Enum): AUTO = 0 E10M = 1 E100M = 2 E1000M = 3 class PORT_DUPLEX(Enum): AUTO = 0 FULL = 1 HALF = 2 class PORT_FLOW_CONTROL(Enum): ENABLED = 1 DISABLED = 0 # ============================================================================== # Util functions # ============================================================================== def zyxel_password(pw: str) -> str: """Generates an obfuscated password from a cleartext pw""" alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" result = "" l = len(pw) for i in range(1, 322 - len(pw)): if i % 5 == 0 and l > 0: l -= 1 result += pw[l] elif i == 123: if len(pw) < 10: result += "0" else: c = str(len(pw) // 10) result += c elif i == 289: result += str(len(pw) % 10) else: rnd = random.choice(list(alphabet)) result += rnd return result def dictify(data: list, key: str) -> dict: """ converts a list of dictionaries to a single dictionary by using the `key`'s value of each element as the new key """ result = {} for d in data: k = d[key] del d[key] result.update({k: d}) return result # ============================================================================== # Parsing functions # ============================================================================== def parse_xssid(response: str) -> str: """parses a xssid value from html source containing it""" soup = BeautifulSoup(response, "html.parser") xssid_input = soup.find_all("input", attrs={"type": "hidden", "name": "XSSID"}) try: inp = xssid_input[0] except IndexError: return None return inp.attrs.get("value") def parse_vlan_ports(response: str) -> dict: """parses the response of a READ_VLAN_PORTS get-command""" soup = BeautifulSoup(response, "html.parser") data = {} port_settings = soup.find_all( "input", attrs={"type": "hidden", "id": re.compile(r"vlanMode_\d+")} ) for port_setting in port_settings: port = port_setting.find_previous().text.strip() port_type = int(port_setting.attrs.get("value")) acl = list( filter( lambda p: "checked" in p.attrs, port_setting.find_next().find_all("input"), ) )[0] data.update( { port: { "mode": PORT_VLAN_MODE(port_type), "acl": PORT_VLAN_ACL(int(acl.attrs.get("value"))), } } ) return data def extract_data_from_table(response: str) -> list: """ parses the _last_ table from a HTML text and returns a list of dicts, each dict contains one line from the table """ soup = BeautifulSoup(response, "html.parser") table_body = soup.find_all("table")[-1] rows = table_body.find_all("tr") keys = [ele.text.strip() for ele in rows[0].find_all("td")] entries = [] for row in rows[1:]: cols = row.find_all("td") cols = [ele.text.strip() for ele in cols] entries.append({k: v for (k, v) in zip(keys, cols) if k.strip() != ""}) return list(filter(lambda e: e, entries)) # ============================================================================== # "API" request functions # ============================================================================== def get_login_cookie(host: str, username: str, password: str) -> str: """ logs into the webservice using username and password for authentication. - `host` - hostname or IP address of the device - `username` - username for login - `password` - cleartext password, it will be encoded using `zyxel_password` """ auth_id = requests.get( f"http://{host}/cgi-bin/dispatcher.cgi", params={"login": 1, "username": username, "password": zyxel_password(password)}, ) if auth_id.status_code != 200: raise Exception("error while getting auth_id {auth_id.text}") time.sleep(0.5) cookie = requests.post( f"http://{host}/cgi-bin/dispatcher.cgi", data={"authId": auth_id.text.strip(), "login_chk": "true"}, ) if cookie.text.strip() != "OK,": raise Exception("error while getting cookie {cookie.text}") return cookie.cookies.get("HTTP_XSSID") def get_cmd(host: str, xssid: str, cmd: int, **params) -> str: """sends a get command, returns the plaintext answer""" params.update({"cmd": cmd}) return requests.get( f"http://{host}/cgi-bin/dispatcher.cgi", params=params, cookies={"HTTP_XSSID": xssid}, ).text def set_cmd(host: str, xssid: str, cmd: int, **params) -> str: """ sends a set command, returns the plaintext answer. You can supply params for the command as keyword arguments. """ # inject command into data params.update({"cmd": cmd}) # get "CSRF" token for submitting a form token = parse_xssid(get_cmd(host, xssid, API_CMD_IDS.READ_SYS_IP.value)) params.update({"XSSID": token}) if token is None: raise Exception("unable to get XSSID token") # call api return requests.post( f"http://{host}/cgi-bin/dispatcher.cgi", data=params, cookies={"HTTP_XSSID": xssid}, ).text # ============================================================================== # Set command wrapper functions # ============================================================================== def update_port( host: str, xssid: str, port: str, description: str, state: PORT_STATE, speed: PORT_SPEED, duplex: PORT_DUPLEX, fc: PORT_FLOW_CONTROL, ): """configures a port, returns true on success""" return "window.location.replace" in set_cmd( host, xssid, API_CMD_IDS.WRITE_PORT.value, portlist=port, descp=description, state=str(state.value), speed=str(speed.value), duplex=str(duplex.value), fc=str(fc.value), sysSubmit="Apply", ) def create_vlan(host: str, cookie: str, vlan_id: int, vlan_name_prefix="VLAN"): return "window.location.replace" in set_cmd( host, cookie, API_CMD_IDS.CREATE_VLAN.value, vlanlist=str(vlan_id), vlanAction="0", name=vlan_name_prefix, sysSubmit="Apply", ) def update_vlan_name(host: str, cookie: str, vlan_id: int, vlan_name: str): return "window.location.replace" in set_cmd( host, cookie, API_CMD_IDS.UPDATE_VLAN.value, vidValue=str(vlan_id), editName=vlan_name, sysSubmit="Apply", ) def delete_vlan(host: str, cookie: str, vlan_id: int): return "window.location.replace" in get_cmd( host, cookie, API_CMD_IDS.DELETE_VLAN.value, _del=vlan_id ) def update_port_vlan(host: str, cookie: str, port: str): pass # ============================================================================== # Get command wrapper functions # ============================================================================== def read_ports(host: str, xssid: str) -> dict: return dictify( extract_data_from_table(get_cmd(host, xssid, API_CMD_IDS.READ_PORTS.value)), "Port", ) def read_vlans(host: str, xssid: str) -> dict: return dictify( extract_data_from_table(get_cmd(host, xssid, API_CMD_IDS.READ_VLANS.value)), "VLAN ID", ) def read_vlan_ports(host: str, xssid: str) -> dict: return dictify( extract_data_from_table( get_cmd(host, xssid, API_CMD_IDS.READ_VLAN_PORTS.value) ), "Port", ) def read_stp_ports(host: str, xssid: str) -> dict: return dictify( extract_data_from_table(get_cmd(host, xssid, API_CMD_IDS.READ_STP_PORTS.value)), "Port", ) def read_vlan_port_cfg(host: str, xssid: str, vid: int) -> dict: return dictify( parse_vlan_ports( get_cmd(host, xssid, API_CMD_IDS.READ_VLAN_PORT_CFG.value, vid=vid) ), "VLAN", ) if __name__ == "__main__": IP = "192.168.42.100" cookie = get_login_cookie(IP, "admin", os.environ.get("ADMIN_PW")) print(read_stp_ports(IP, cookie))