Compare commits

...

2 commits

Author SHA1 Message Date
510d8c7321 dicitfy read_ commands 2025-07-03 18:10:17 +02:00
194540ccd4 did things 2025-07-03 17:51:42 +02:00
2 changed files with 261 additions and 58 deletions

Binary file not shown.

313
main.py
View file

@ -7,33 +7,75 @@ import pdb
import os
import random
import re
import json
from enum import Enum
IP = "192.168.42.100"
# ==============================================================================
# ENUMs
# ==============================================================================
class API_CMD_IDS(Enum):
READ_SYS_IP = 516
READ_PORTS = 768
WRITE_PORT = 770
READ_VLANS = 1283
CREATE_VLAN = 1285
UPDATE_VLAN = 1287
DELETE_VLAN = 1289
READ_VLAN_PORTS = 1290
READ_VLAN_PORT_CFG = 1293 # this needs vid as a parameter and is parsed by parse_vlan_ports
UPDATE_VLAN_PORT = 1292
# READ_VLAN_PORT_CFG needs vid as a parameter and is parsed by parse_vlan_ports
READ_VLAN_PORT_CFG = 1293
READ_STP_PORTS = 4098
class VLAN_PORT_MODE(Enum):
class PORT_VLAN_MODE(Enum):
GENERAL = 0
ACCESS = 1
TRUNK = 2
DOT1QTUNNEL = 3
class VLAN_PORT_ACL(Enum):
class PORT_VLAN_ACL(Enum):
EXCLUDED = 0
FORBIDDEN = 1
TAGGED = 2
UNTAGGED = 3
class PORT_STATE(Enum):
ENABLED = 1
DISABLED = 0
class PORT_SPEED(Enum):
AUTO = 0
E10M = 1
E100M = 2
E1000M = 3
class PORT_DUPLEX(Enum):
AUTO = 0
FULL = 1
HALF = 2
class PORT_FLOW_CONTROL(Enum):
ENABLED = 1
DISABLED = 0
# ==============================================================================
# Util functions
# ==============================================================================
def zyxel_password(pw: str) -> str:
"""Generates an obfuscated password from a cleartext pw"""
alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
result = ""
l = len(pw)
@ -56,7 +98,98 @@ def zyxel_password(pw: str) -> str:
return result
def dictify(data: list, key: str) -> dict:
"""
converts a list of dictionaries to a single dictionary by using
the `key`'s value of each element as the new key
"""
result = {}
for d in data:
k = d[key]
del d[key]
result.update({k: d})
return result
# ==============================================================================
# Parsing functions
# ==============================================================================
def parse_xssid(response: str) -> str:
"""parses a xssid value from html source containing it"""
soup = BeautifulSoup(response, "html.parser")
xssid_input = soup.find_all("input", attrs={"type": "hidden", "name": "XSSID"})
try:
inp = xssid_input[0]
except IndexError:
return None
return inp.attrs.get("value")
def parse_vlan_ports(response: str) -> dict:
"""parses the response of a READ_VLAN_PORTS get-command"""
soup = BeautifulSoup(response, "html.parser")
data = {}
port_settings = soup.find_all(
"input", attrs={"type": "hidden", "id": re.compile(r"vlanMode_\d+")}
)
for port_setting in port_settings:
port = port_setting.find_previous().text.strip()
port_type = int(port_setting.attrs.get("value"))
acl = list(
filter(
lambda p: "checked" in p.attrs,
port_setting.find_next().find_all("input"),
)
)[0]
data.update(
{
port: {
"mode": PORT_VLAN_MODE(port_type),
"acl": PORT_VLAN_ACL(int(acl.attrs.get("value"))),
}
}
)
return data
def extract_data_from_table(response: str) -> list:
"""
parses the _last_ table from a HTML text and returns a list of dicts, each
dict contains one line from the table
"""
soup = BeautifulSoup(response, "html.parser")
table_body = soup.find_all("table")[-1]
rows = table_body.find_all("tr")
keys = [ele.text.strip() for ele in rows[0].find_all("td")]
entries = []
for row in rows[1:]:
cols = row.find_all("td")
cols = [ele.text.strip() for ele in cols]
entries.append({k: v for (k, v) in zip(keys, cols) if k.strip() != ""})
return list(filter(lambda e: e, entries))
# ==============================================================================
# "API" request functions
# ==============================================================================
def get_login_cookie(host: str, username: str, password: str) -> str:
"""
logs into the webservice using username and password for authentication.
- `host` - hostname or IP address of the device
- `username` - username for login
- `password` - cleartext password, it will be encoded using `zyxel_password`
"""
auth_id = requests.get(
f"http://{host}/cgi-bin/dispatcher.cgi",
params={"login": 1, "username": username, "password": zyxel_password(password)},
@ -79,6 +212,7 @@ def get_login_cookie(host: str, username: str, password: str) -> str:
def get_cmd(host: str, xssid: str, cmd: int, **params) -> str:
"""sends a get command, returns the plaintext answer"""
params.update({"cmd": cmd})
return requests.get(
f"http://{host}/cgi-bin/dispatcher.cgi",
@ -87,68 +221,137 @@ def get_cmd(host: str, xssid: str, cmd: int, **params) -> str:
).text
def parse_vlan_ports(response: str) -> dict:
soup = BeautifulSoup(response, "html.parser")
def set_cmd(host: str, xssid: str, cmd: int, **params) -> str:
"""
sends a set command, returns the plaintext answer.
You can supply params for the command as keyword arguments.
"""
# inject command into data
params.update({"cmd": cmd})
data = {}
port_settings = soup.find_all(
"input", attrs={"type": "hidden", "id": re.compile(r"vlanMode_\d+")}
)
for port_setting in port_settings:
port = port_setting.find_previous().text.strip()
port_type = int(port_setting.attrs.get("value"))
# get "CSRF" token for submitting a form
token = parse_xssid(get_cmd(host, xssid, API_CMD_IDS.READ_SYS_IP.value))
params.update({"XSSID": token})
if token is None:
raise Exception("unable to get XSSID token")
acl = list(
filter(
lambda p: "checked" in p.attrs,
port_setting.find_next().find_all("input"),
)
)[0]
# call api
return requests.post(
f"http://{host}/cgi-bin/dispatcher.cgi",
data=params,
cookies={"HTTP_XSSID": xssid},
).text
data.update(
{
port: {
"mode": VLAN_PORT_MODE(port_type),
"acl": VLAN_PORT_ACL(int(acl.attrs.get("value"))),
}
}
# ==============================================================================
# Set command wrapper functions
# ==============================================================================
def update_port(
host: str,
xssid: str,
port: str,
description: str,
state: PORT_STATE,
speed: PORT_SPEED,
duplex: PORT_DUPLEX,
fc: PORT_FLOW_CONTROL,
):
"""configures a port, returns true on success"""
return "window.location.replace" in set_cmd(
host,
xssid,
API_CMD_IDS.WRITE_PORT.value,
portlist=port,
descp=description,
state=str(state.value),
speed=str(speed.value),
duplex=str(duplex.value),
fc=str(fc.value),
sysSubmit="Apply",
)
return data
def create_vlan(host: str, cookie: str, vlan_id: int, vlan_name_prefix="VLAN"):
return "window.location.replace" in set_cmd(
host,
cookie,
API_CMD_IDS.CREATE_VLAN.value,
vlanlist=str(vlan_id),
vlanAction="0",
name=vlan_name_prefix,
sysSubmit="Apply",
)
def extract_data_from_table(response: str) -> dict:
soup = BeautifulSoup(response, "html.parser")
data = {}
table_body = soup.find_all("table")[-1]
rows = table_body.find_all("tr")
keys = [ele.text.strip() for ele in rows[0].find_all("td")]
entries = []
for row in rows[1:]:
cols = row.find_all("td")
cols = [ele.text.strip() for ele in cols]
entries.append({k: v for (k, v) in zip(keys, cols) if k.strip() != ""})
return list(filter(lambda e: e, entries))
def update_vlan_name(host: str, cookie: str, vlan_id: int, vlan_name: str):
return "window.location.replace" in set_cmd(
host,
cookie,
API_CMD_IDS.UPDATE_VLAN.value,
vidValue=str(vlan_id),
editName=vlan_name,
sysSubmit="Apply",
)
def dictify(data: list, key: str) -> dict:
result = {}
for d in data:
k = d[key]
del d[key]
result.update({k: d})
return result
def delete_vlan(host: str, cookie: str, vlan_id: int):
return "window.location.replace" in get_cmd(
host, cookie, API_CMD_IDS.DELETE_VLAN.value, _del=vlan_id
)
def update_port_vlan(host: str, cookie: str, port: str):
pass
# ==============================================================================
# Get command wrapper functions
# ==============================================================================
def read_ports(host: str, xssid: str) -> dict:
return dictify(
extract_data_from_table(get_cmd(host, xssid, API_CMD_IDS.READ_PORTS.value)),
"Port",
)
def read_vlans(host: str, xssid: str) -> dict:
return dictify(
extract_data_from_table(get_cmd(host, xssid, API_CMD_IDS.READ_VLANS.value)),
"VLAN ID",
)
def read_vlan_ports(host: str, xssid: str) -> dict:
return dictify(
extract_data_from_table(
get_cmd(host, xssid, API_CMD_IDS.READ_VLAN_PORTS.value)
),
"Port",
)
def read_stp_ports(host: str, xssid: str) -> dict:
return dictify(
extract_data_from_table(get_cmd(host, xssid, API_CMD_IDS.READ_STP_PORTS.value)),
"Port",
)
def read_vlan_port_cfg(host: str, xssid: str, vid: int) -> dict:
return dictify(
parse_vlan_ports(
get_cmd(host, xssid, API_CMD_IDS.READ_VLAN_PORT_CFG.value, vid=vid)
),
"VLAN",
)
if __name__ == "__main__":
IP = "192.168.42.100"
cookie = get_login_cookie(IP, "admin", os.environ.get("ADMIN_PW"))
print(dictify(extract_data_from_table(get_cmd(IP, cookie, API_CMD_IDS.READ_PORTS.value)), "Port"))
print(dictify(extract_data_from_table(get_cmd(IP, cookie, API_CMD_IDS.READ_VLANS.value)), "VLAN ID"))
print(dictify(extract_data_from_table(get_cmd(IP, cookie, API_CMD_IDS.READ_VLAN_PORTS.value)), "Port"))
print(dictify(extract_data_from_table(get_cmd(IP, cookie, API_CMD_IDS.READ_STP_PORTS.value)), "Port"))
print(parse_vlan_ports(get_cmd(IP, cookie, API_CMD_IDS.READ_VLAN_PORT_CFG.value, vid=2)))
print(read_stp_ports(IP, cookie))