From a27c6324ffe2da8ad66b65e48f79265b3dd23d60 Mon Sep 17 00:00:00 2001 From: Gregor Michels Date: Thu, 3 Jul 2025 18:28:49 +0200 Subject: [PATCH] disaggregate code into lib --- gs1900.py | 350 +++++++++++++++++++++++++++++++++++++++++++++++++++++ main.py | 355 +----------------------------------------------------- 2 files changed, 354 insertions(+), 351 deletions(-) create mode 100644 gs1900.py diff --git a/gs1900.py b/gs1900.py new file mode 100644 index 0000000..d7808af --- /dev/null +++ b/gs1900.py @@ -0,0 +1,350 @@ +#!/usr/bin/env python3 + +from bs4 import BeautifulSoup +import time +import requests +import pdb +import os +import random +import re +import json +from enum import Enum + + +# ============================================================================== +# ENUMs +# ============================================================================== + + +class API_CMD_IDS(Enum): + READ_SYS_IP = 516 + READ_PORTS = 768 + WRITE_PORT = 770 + READ_VLANS = 1283 + CREATE_VLAN = 1285 + UPDATE_VLAN = 1287 + DELETE_VLAN = 1289 + READ_VLAN_PORTS = 1290 + UPDATE_VLAN_PORT = 1292 + # READ_VLAN_PORT_CFG needs vid as a parameter and is parsed by parse_vlan_ports + READ_VLAN_PORT_CFG = 1293 + READ_STP_PORTS = 4098 + + +class PORT_VLAN_MODE(Enum): + GENERAL = 0 + ACCESS = 1 + TRUNK = 2 + DOT1QTUNNEL = 3 + + +class PORT_VLAN_ACL(Enum): + EXCLUDED = 0 + FORBIDDEN = 1 + TAGGED = 2 + UNTAGGED = 3 + + +class PORT_STATE(Enum): + ENABLED = 1 + DISABLED = 0 + + +class PORT_SPEED(Enum): + AUTO = 0 + E10M = 1 + E100M = 2 + E1000M = 3 + + +class PORT_DUPLEX(Enum): + AUTO = 0 + FULL = 1 + HALF = 2 + + +class PORT_FLOW_CONTROL(Enum): + ENABLED = 1 + DISABLED = 0 + + +# ============================================================================== +# Util functions +# ============================================================================== + + +def zyxel_password(pw: str) -> str: + """Generates an obfuscated password from a cleartext pw""" + + alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" + result = "" + l = len(pw) + + for i in range(1, 322 - len(pw)): + if i % 5 == 0 and l > 0: + l -= 1 + result += pw[l] + elif i == 123: + if len(pw) < 10: + result += "0" + else: + c = str(len(pw) // 10) + result += c + elif i == 289: + result += str(len(pw) % 10) + else: + rnd = random.choice(list(alphabet)) + result += rnd + return result + + +def dictify(data: list, key: str) -> dict: + """ + converts a list of dictionaries to a single dictionary by using + the `key`'s value of each element as the new key + """ + result = {} + for d in data: + k = d[key] + del d[key] + result.update({k: d}) + return result + + +# ============================================================================== +# Parsing functions +# ============================================================================== + + +def parse_xssid(response: str) -> str: + """parses a xssid value from html source containing it""" + soup = BeautifulSoup(response, "html.parser") + xssid_input = soup.find_all("input", attrs={"type": "hidden", "name": "XSSID"}) + try: + inp = xssid_input[0] + except IndexError: + return None + return inp.attrs.get("value") + + +def parse_vlan_ports(response: str) -> dict: + """parses the response of a READ_VLAN_PORTS get-command""" + soup = BeautifulSoup(response, "html.parser") + + data = {} + port_settings = soup.find_all( + "input", attrs={"type": "hidden", "id": re.compile(r"vlanMode_\d+")} + ) + for port_setting in port_settings: + port = port_setting.find_previous().text.strip() + port_type = int(port_setting.attrs.get("value")) + + acl = list( + filter( + lambda p: "checked" in p.attrs, + port_setting.find_next().find_all("input"), + ) + )[0] + + data.update( + { + port: { + "mode": PORT_VLAN_MODE(port_type), + "acl": PORT_VLAN_ACL(int(acl.attrs.get("value"))), + } + } + ) + + return data + + +def extract_data_from_table(response: str) -> list: + """ + parses the _last_ table from a HTML text and returns a list of dicts, each + dict contains one line from the table + """ + soup = BeautifulSoup(response, "html.parser") + table_body = soup.find_all("table")[-1] + + rows = table_body.find_all("tr") + keys = [ele.text.strip() for ele in rows[0].find_all("td")] + entries = [] + for row in rows[1:]: + cols = row.find_all("td") + cols = [ele.text.strip() for ele in cols] + entries.append({k: v for (k, v) in zip(keys, cols) if k.strip() != ""}) + + return list(filter(lambda e: e, entries)) + + +# ============================================================================== +# "API" request functions +# ============================================================================== + + +def get_login_cookie(host: str, username: str, password: str) -> str: + """ + logs into the webservice using username and password for authentication. + + - `host` - hostname or IP address of the device + - `username` - username for login + - `password` - cleartext password, it will be encoded using `zyxel_password` + """ + auth_id = requests.get( + f"http://{host}/cgi-bin/dispatcher.cgi", + params={"login": 1, "username": username, "password": zyxel_password(password)}, + ) + + if auth_id.status_code != 200: + raise Exception("error while getting auth_id {auth_id.text}") + + time.sleep(0.5) + + cookie = requests.post( + f"http://{host}/cgi-bin/dispatcher.cgi", + data={"authId": auth_id.text.strip(), "login_chk": "true"}, + ) + + if cookie.text.strip() != "OK,": + raise Exception("error while getting cookie {cookie.text}") + + return cookie.cookies.get("HTTP_XSSID") + + +def get_cmd(host: str, xssid: str, cmd: int, **params) -> str: + """sends a get command, returns the plaintext answer""" + params.update({"cmd": cmd}) + return requests.get( + f"http://{host}/cgi-bin/dispatcher.cgi", + params=params, + cookies={"HTTP_XSSID": xssid}, + ).text + + +def set_cmd(host: str, xssid: str, cmd: int, **params) -> str: + """ + sends a set command, returns the plaintext answer. + You can supply params for the command as keyword arguments. + """ + # inject command into data + params.update({"cmd": cmd}) + + # get "CSRF" token for submitting a form + token = parse_xssid(get_cmd(host, xssid, API_CMD_IDS.READ_SYS_IP.value)) + params.update({"XSSID": token}) + if token is None: + raise Exception("unable to get XSSID token") + + # call api + return requests.post( + f"http://{host}/cgi-bin/dispatcher.cgi", + data=params, + cookies={"HTTP_XSSID": xssid}, + ).text + + +# ============================================================================== +# Set command wrapper functions +# ============================================================================== + + +def update_port( + host: str, + xssid: str, + port: str, + description: str, + state: PORT_STATE, + speed: PORT_SPEED, + duplex: PORT_DUPLEX, + fc: PORT_FLOW_CONTROL, +): + """configures a port, returns true on success""" + return "window.location.replace" in set_cmd( + host, + xssid, + API_CMD_IDS.WRITE_PORT.value, + portlist=port, + descp=description, + state=str(state.value), + speed=str(speed.value), + duplex=str(duplex.value), + fc=str(fc.value), + sysSubmit="Apply", + ) + + +def create_vlan(host: str, cookie: str, vlan_id: int, vlan_name_prefix="VLAN"): + return "window.location.replace" in set_cmd( + host, + cookie, + API_CMD_IDS.CREATE_VLAN.value, + vlanlist=str(vlan_id), + vlanAction="0", + name=vlan_name_prefix, + sysSubmit="Apply", + ) + + +def update_vlan_name(host: str, cookie: str, vlan_id: int, vlan_name: str): + return "window.location.replace" in set_cmd( + host, + cookie, + API_CMD_IDS.UPDATE_VLAN.value, + vidValue=str(vlan_id), + editName=vlan_name, + sysSubmit="Apply", + ) + + +def delete_vlan(host: str, cookie: str, vlan_id: int): + return "window.location.replace" in get_cmd( + host, cookie, API_CMD_IDS.DELETE_VLAN.value, _del=vlan_id + ) + + +def update_port_vlan(host: str, cookie: str, port: str): + pass + + +# ============================================================================== +# Get command wrapper functions +# ============================================================================== + + +def read_ports(host: str, xssid: str) -> dict: + return dictify( + extract_data_from_table(get_cmd(host, xssid, API_CMD_IDS.READ_PORTS.value)), + "Port", + ) + + +def read_vlans(host: str, xssid: str) -> dict: + return dictify( + extract_data_from_table(get_cmd(host, xssid, API_CMD_IDS.READ_VLANS.value)), + "VLAN ID", + ) + + +def read_vlan_ports(host: str, xssid: str) -> dict: + return dictify( + extract_data_from_table( + get_cmd(host, xssid, API_CMD_IDS.READ_VLAN_PORTS.value) + ), + "Port", + ) + + +def read_stp_ports(host: str, xssid: str) -> dict: + return dictify( + extract_data_from_table(get_cmd(host, xssid, API_CMD_IDS.READ_STP_PORTS.value)), + "Port", + ) + + +def read_vlan_port_cfg(host: str, xssid: str, vid: int) -> dict: + return dictify( + parse_vlan_ports( + get_cmd(host, xssid, API_CMD_IDS.READ_VLAN_PORT_CFG.value, vid=vid) + ), + "VLAN", + ) diff --git a/main.py b/main.py index 61e7ac1..7fb6e55 100755 --- a/main.py +++ b/main.py @@ -1,357 +1,10 @@ #!/usr/bin/env python3 -from bs4 import BeautifulSoup -import time -import requests -import pdb -import os -import random -import re -import json -from enum import Enum - - -# ============================================================================== -# ENUMs -# ============================================================================== - - -class API_CMD_IDS(Enum): - READ_SYS_IP = 516 - READ_PORTS = 768 - WRITE_PORT = 770 - READ_VLANS = 1283 - CREATE_VLAN = 1285 - UPDATE_VLAN = 1287 - DELETE_VLAN = 1289 - READ_VLAN_PORTS = 1290 - UPDATE_VLAN_PORT = 1292 - # READ_VLAN_PORT_CFG needs vid as a parameter and is parsed by parse_vlan_ports - READ_VLAN_PORT_CFG = 1293 - READ_STP_PORTS = 4098 - - -class PORT_VLAN_MODE(Enum): - GENERAL = 0 - ACCESS = 1 - TRUNK = 2 - DOT1QTUNNEL = 3 - - -class PORT_VLAN_ACL(Enum): - EXCLUDED = 0 - FORBIDDEN = 1 - TAGGED = 2 - UNTAGGED = 3 - - -class PORT_STATE(Enum): - ENABLED = 1 - DISABLED = 0 - - -class PORT_SPEED(Enum): - AUTO = 0 - E10M = 1 - E100M = 2 - E1000M = 3 - - -class PORT_DUPLEX(Enum): - AUTO = 0 - FULL = 1 - HALF = 2 - - -class PORT_FLOW_CONTROL(Enum): - ENABLED = 1 - DISABLED = 0 - - -# ============================================================================== -# Util functions -# ============================================================================== - - -def zyxel_password(pw: str) -> str: - """Generates an obfuscated password from a cleartext pw""" - - alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" - result = "" - l = len(pw) - - for i in range(1, 322 - len(pw)): - if i % 5 == 0 and l > 0: - l -= 1 - result += pw[l] - elif i == 123: - if len(pw) < 10: - result += "0" - else: - c = str(len(pw) // 10) - result += c - elif i == 289: - result += str(len(pw) % 10) - else: - rnd = random.choice(list(alphabet)) - result += rnd - return result - - -def dictify(data: list, key: str) -> dict: - """ - converts a list of dictionaries to a single dictionary by using - the `key`'s value of each element as the new key - """ - result = {} - for d in data: - k = d[key] - del d[key] - result.update({k: d}) - return result - - -# ============================================================================== -# Parsing functions -# ============================================================================== - - -def parse_xssid(response: str) -> str: - """parses a xssid value from html source containing it""" - soup = BeautifulSoup(response, "html.parser") - xssid_input = soup.find_all("input", attrs={"type": "hidden", "name": "XSSID"}) - try: - inp = xssid_input[0] - except IndexError: - return None - return inp.attrs.get("value") - - -def parse_vlan_ports(response: str) -> dict: - """parses the response of a READ_VLAN_PORTS get-command""" - soup = BeautifulSoup(response, "html.parser") - - data = {} - port_settings = soup.find_all( - "input", attrs={"type": "hidden", "id": re.compile(r"vlanMode_\d+")} - ) - for port_setting in port_settings: - port = port_setting.find_previous().text.strip() - port_type = int(port_setting.attrs.get("value")) - - acl = list( - filter( - lambda p: "checked" in p.attrs, - port_setting.find_next().find_all("input"), - ) - )[0] - - data.update( - { - port: { - "mode": PORT_VLAN_MODE(port_type), - "acl": PORT_VLAN_ACL(int(acl.attrs.get("value"))), - } - } - ) - - return data - - -def extract_data_from_table(response: str) -> list: - """ - parses the _last_ table from a HTML text and returns a list of dicts, each - dict contains one line from the table - """ - soup = BeautifulSoup(response, "html.parser") - table_body = soup.find_all("table")[-1] - - rows = table_body.find_all("tr") - keys = [ele.text.strip() for ele in rows[0].find_all("td")] - entries = [] - for row in rows[1:]: - cols = row.find_all("td") - cols = [ele.text.strip() for ele in cols] - entries.append({k: v for (k, v) in zip(keys, cols) if k.strip() != ""}) - - return list(filter(lambda e: e, entries)) - - -# ============================================================================== -# "API" request functions -# ============================================================================== - - -def get_login_cookie(host: str, username: str, password: str) -> str: - """ - logs into the webservice using username and password for authentication. - - - `host` - hostname or IP address of the device - - `username` - username for login - - `password` - cleartext password, it will be encoded using `zyxel_password` - """ - auth_id = requests.get( - f"http://{host}/cgi-bin/dispatcher.cgi", - params={"login": 1, "username": username, "password": zyxel_password(password)}, - ) - - if auth_id.status_code != 200: - raise Exception("error while getting auth_id {auth_id.text}") - - time.sleep(0.5) - - cookie = requests.post( - f"http://{host}/cgi-bin/dispatcher.cgi", - data={"authId": auth_id.text.strip(), "login_chk": "true"}, - ) - - if cookie.text.strip() != "OK,": - raise Exception("error while getting cookie {cookie.text}") - - return cookie.cookies.get("HTTP_XSSID") - - -def get_cmd(host: str, xssid: str, cmd: int, **params) -> str: - """sends a get command, returns the plaintext answer""" - params.update({"cmd": cmd}) - return requests.get( - f"http://{host}/cgi-bin/dispatcher.cgi", - params=params, - cookies={"HTTP_XSSID": xssid}, - ).text - - -def set_cmd(host: str, xssid: str, cmd: int, **params) -> str: - """ - sends a set command, returns the plaintext answer. - You can supply params for the command as keyword arguments. - """ - # inject command into data - params.update({"cmd": cmd}) - - # get "CSRF" token for submitting a form - token = parse_xssid(get_cmd(host, xssid, API_CMD_IDS.READ_SYS_IP.value)) - params.update({"XSSID": token}) - if token is None: - raise Exception("unable to get XSSID token") - - # call api - return requests.post( - f"http://{host}/cgi-bin/dispatcher.cgi", - data=params, - cookies={"HTTP_XSSID": xssid}, - ).text - - -# ============================================================================== -# Set command wrapper functions -# ============================================================================== - - -def update_port( - host: str, - xssid: str, - port: str, - description: str, - state: PORT_STATE, - speed: PORT_SPEED, - duplex: PORT_DUPLEX, - fc: PORT_FLOW_CONTROL, -): - """configures a port, returns true on success""" - return "window.location.replace" in set_cmd( - host, - xssid, - API_CMD_IDS.WRITE_PORT.value, - portlist=port, - descp=description, - state=str(state.value), - speed=str(speed.value), - duplex=str(duplex.value), - fc=str(fc.value), - sysSubmit="Apply", - ) - - -def create_vlan(host: str, cookie: str, vlan_id: int, vlan_name_prefix="VLAN"): - return "window.location.replace" in set_cmd( - host, - cookie, - API_CMD_IDS.CREATE_VLAN.value, - vlanlist=str(vlan_id), - vlanAction="0", - name=vlan_name_prefix, - sysSubmit="Apply", - ) - - -def update_vlan_name(host: str, cookie: str, vlan_id: int, vlan_name: str): - return "window.location.replace" in set_cmd( - host, - cookie, - API_CMD_IDS.UPDATE_VLAN.value, - vidValue=str(vlan_id), - editName=vlan_name, - sysSubmit="Apply", - ) - - -def delete_vlan(host: str, cookie: str, vlan_id: int): - return "window.location.replace" in get_cmd( - host, cookie, API_CMD_IDS.DELETE_VLAN.value, _del=vlan_id - ) - - -def update_port_vlan(host: str, cookie: str, port: str): - pass - - -# ============================================================================== -# Get command wrapper functions -# ============================================================================== - - -def read_ports(host: str, xssid: str) -> dict: - return dictify( - extract_data_from_table(get_cmd(host, xssid, API_CMD_IDS.READ_PORTS.value)), - "Port", - ) - - -def read_vlans(host: str, xssid: str) -> dict: - return dictify( - extract_data_from_table(get_cmd(host, xssid, API_CMD_IDS.READ_VLANS.value)), - "VLAN ID", - ) - - -def read_vlan_ports(host: str, xssid: str) -> dict: - return dictify( - extract_data_from_table( - get_cmd(host, xssid, API_CMD_IDS.READ_VLAN_PORTS.value) - ), - "Port", - ) - - -def read_stp_ports(host: str, xssid: str) -> dict: - return dictify( - extract_data_from_table(get_cmd(host, xssid, API_CMD_IDS.READ_STP_PORTS.value)), - "Port", - ) - - -def read_vlan_port_cfg(host: str, xssid: str, vid: int) -> dict: - return dictify( - parse_vlan_ports( - get_cmd(host, xssid, API_CMD_IDS.READ_VLAN_PORT_CFG.value, vid=vid) - ), - "VLAN", - ) - +from gs1900 import * if __name__ == "__main__": - IP = "192.168.42.100" - cookie = get_login_cookie(IP, "admin", os.environ.get("ADMIN_PW")) + IP = os.environ.get("IP") + ADMIN_PW = os.environ.get("ADMIN_PW") + cookie = get_login_cookie(IP, "admin", ADMIN_PW) print(read_stp_ports(IP, cookie))