Compare commits
2 commits
cb712cdd04
...
510d8c7321
Author | SHA1 | Date | |
---|---|---|---|
510d8c7321 | |||
194540ccd4 |
2 changed files with 261 additions and 58 deletions
BIN
__pycache__/main.cpython-311.pyc
Normal file
BIN
__pycache__/main.cpython-311.pyc
Normal file
Binary file not shown.
313
main.py
313
main.py
|
@ -7,33 +7,75 @@ import pdb
|
||||||
import os
|
import os
|
||||||
import random
|
import random
|
||||||
import re
|
import re
|
||||||
|
import json
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
|
|
||||||
IP = "192.168.42.100"
|
|
||||||
|
# ==============================================================================
|
||||||
|
# ENUMs
|
||||||
|
# ==============================================================================
|
||||||
|
|
||||||
|
|
||||||
class API_CMD_IDS(Enum):
|
class API_CMD_IDS(Enum):
|
||||||
|
READ_SYS_IP = 516
|
||||||
READ_PORTS = 768
|
READ_PORTS = 768
|
||||||
|
WRITE_PORT = 770
|
||||||
READ_VLANS = 1283
|
READ_VLANS = 1283
|
||||||
|
CREATE_VLAN = 1285
|
||||||
|
UPDATE_VLAN = 1287
|
||||||
|
DELETE_VLAN = 1289
|
||||||
READ_VLAN_PORTS = 1290
|
READ_VLAN_PORTS = 1290
|
||||||
READ_VLAN_PORT_CFG = 1293 # this needs vid as a parameter and is parsed by parse_vlan_ports
|
UPDATE_VLAN_PORT = 1292
|
||||||
|
# READ_VLAN_PORT_CFG needs vid as a parameter and is parsed by parse_vlan_ports
|
||||||
|
READ_VLAN_PORT_CFG = 1293
|
||||||
READ_STP_PORTS = 4098
|
READ_STP_PORTS = 4098
|
||||||
|
|
||||||
|
|
||||||
class VLAN_PORT_MODE(Enum):
|
class PORT_VLAN_MODE(Enum):
|
||||||
GENERAL = 0
|
GENERAL = 0
|
||||||
ACCESS = 1
|
ACCESS = 1
|
||||||
TRUNK = 2
|
TRUNK = 2
|
||||||
DOT1QTUNNEL = 3
|
DOT1QTUNNEL = 3
|
||||||
|
|
||||||
|
|
||||||
class VLAN_PORT_ACL(Enum):
|
class PORT_VLAN_ACL(Enum):
|
||||||
EXCLUDED = 0
|
EXCLUDED = 0
|
||||||
FORBIDDEN = 1
|
FORBIDDEN = 1
|
||||||
TAGGED = 2
|
TAGGED = 2
|
||||||
UNTAGGED = 3
|
UNTAGGED = 3
|
||||||
|
|
||||||
|
|
||||||
|
class PORT_STATE(Enum):
|
||||||
|
ENABLED = 1
|
||||||
|
DISABLED = 0
|
||||||
|
|
||||||
|
|
||||||
|
class PORT_SPEED(Enum):
|
||||||
|
AUTO = 0
|
||||||
|
E10M = 1
|
||||||
|
E100M = 2
|
||||||
|
E1000M = 3
|
||||||
|
|
||||||
|
|
||||||
|
class PORT_DUPLEX(Enum):
|
||||||
|
AUTO = 0
|
||||||
|
FULL = 1
|
||||||
|
HALF = 2
|
||||||
|
|
||||||
|
|
||||||
|
class PORT_FLOW_CONTROL(Enum):
|
||||||
|
ENABLED = 1
|
||||||
|
DISABLED = 0
|
||||||
|
|
||||||
|
|
||||||
|
# ==============================================================================
|
||||||
|
# Util functions
|
||||||
|
# ==============================================================================
|
||||||
|
|
||||||
|
|
||||||
def zyxel_password(pw: str) -> str:
|
def zyxel_password(pw: str) -> str:
|
||||||
|
"""Generates an obfuscated password from a cleartext pw"""
|
||||||
|
|
||||||
alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
|
alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
|
||||||
result = ""
|
result = ""
|
||||||
l = len(pw)
|
l = len(pw)
|
||||||
|
@ -56,7 +98,98 @@ def zyxel_password(pw: str) -> str:
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def dictify(data: list, key: str) -> dict:
|
||||||
|
"""
|
||||||
|
converts a list of dictionaries to a single dictionary by using
|
||||||
|
the `key`'s value of each element as the new key
|
||||||
|
"""
|
||||||
|
result = {}
|
||||||
|
for d in data:
|
||||||
|
k = d[key]
|
||||||
|
del d[key]
|
||||||
|
result.update({k: d})
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
# ==============================================================================
|
||||||
|
# Parsing functions
|
||||||
|
# ==============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
def parse_xssid(response: str) -> str:
|
||||||
|
"""parses a xssid value from html source containing it"""
|
||||||
|
soup = BeautifulSoup(response, "html.parser")
|
||||||
|
xssid_input = soup.find_all("input", attrs={"type": "hidden", "name": "XSSID"})
|
||||||
|
try:
|
||||||
|
inp = xssid_input[0]
|
||||||
|
except IndexError:
|
||||||
|
return None
|
||||||
|
return inp.attrs.get("value")
|
||||||
|
|
||||||
|
|
||||||
|
def parse_vlan_ports(response: str) -> dict:
|
||||||
|
"""parses the response of a READ_VLAN_PORTS get-command"""
|
||||||
|
soup = BeautifulSoup(response, "html.parser")
|
||||||
|
|
||||||
|
data = {}
|
||||||
|
port_settings = soup.find_all(
|
||||||
|
"input", attrs={"type": "hidden", "id": re.compile(r"vlanMode_\d+")}
|
||||||
|
)
|
||||||
|
for port_setting in port_settings:
|
||||||
|
port = port_setting.find_previous().text.strip()
|
||||||
|
port_type = int(port_setting.attrs.get("value"))
|
||||||
|
|
||||||
|
acl = list(
|
||||||
|
filter(
|
||||||
|
lambda p: "checked" in p.attrs,
|
||||||
|
port_setting.find_next().find_all("input"),
|
||||||
|
)
|
||||||
|
)[0]
|
||||||
|
|
||||||
|
data.update(
|
||||||
|
{
|
||||||
|
port: {
|
||||||
|
"mode": PORT_VLAN_MODE(port_type),
|
||||||
|
"acl": PORT_VLAN_ACL(int(acl.attrs.get("value"))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
def extract_data_from_table(response: str) -> list:
|
||||||
|
"""
|
||||||
|
parses the _last_ table from a HTML text and returns a list of dicts, each
|
||||||
|
dict contains one line from the table
|
||||||
|
"""
|
||||||
|
soup = BeautifulSoup(response, "html.parser")
|
||||||
|
table_body = soup.find_all("table")[-1]
|
||||||
|
|
||||||
|
rows = table_body.find_all("tr")
|
||||||
|
keys = [ele.text.strip() for ele in rows[0].find_all("td")]
|
||||||
|
entries = []
|
||||||
|
for row in rows[1:]:
|
||||||
|
cols = row.find_all("td")
|
||||||
|
cols = [ele.text.strip() for ele in cols]
|
||||||
|
entries.append({k: v for (k, v) in zip(keys, cols) if k.strip() != ""})
|
||||||
|
|
||||||
|
return list(filter(lambda e: e, entries))
|
||||||
|
|
||||||
|
|
||||||
|
# ==============================================================================
|
||||||
|
# "API" request functions
|
||||||
|
# ==============================================================================
|
||||||
|
|
||||||
|
|
||||||
def get_login_cookie(host: str, username: str, password: str) -> str:
|
def get_login_cookie(host: str, username: str, password: str) -> str:
|
||||||
|
"""
|
||||||
|
logs into the webservice using username and password for authentication.
|
||||||
|
|
||||||
|
- `host` - hostname or IP address of the device
|
||||||
|
- `username` - username for login
|
||||||
|
- `password` - cleartext password, it will be encoded using `zyxel_password`
|
||||||
|
"""
|
||||||
auth_id = requests.get(
|
auth_id = requests.get(
|
||||||
f"http://{host}/cgi-bin/dispatcher.cgi",
|
f"http://{host}/cgi-bin/dispatcher.cgi",
|
||||||
params={"login": 1, "username": username, "password": zyxel_password(password)},
|
params={"login": 1, "username": username, "password": zyxel_password(password)},
|
||||||
|
@ -79,6 +212,7 @@ def get_login_cookie(host: str, username: str, password: str) -> str:
|
||||||
|
|
||||||
|
|
||||||
def get_cmd(host: str, xssid: str, cmd: int, **params) -> str:
|
def get_cmd(host: str, xssid: str, cmd: int, **params) -> str:
|
||||||
|
"""sends a get command, returns the plaintext answer"""
|
||||||
params.update({"cmd": cmd})
|
params.update({"cmd": cmd})
|
||||||
return requests.get(
|
return requests.get(
|
||||||
f"http://{host}/cgi-bin/dispatcher.cgi",
|
f"http://{host}/cgi-bin/dispatcher.cgi",
|
||||||
|
@ -87,68 +221,137 @@ def get_cmd(host: str, xssid: str, cmd: int, **params) -> str:
|
||||||
).text
|
).text
|
||||||
|
|
||||||
|
|
||||||
def parse_vlan_ports(response: str) -> dict:
|
def set_cmd(host: str, xssid: str, cmd: int, **params) -> str:
|
||||||
soup = BeautifulSoup(response, "html.parser")
|
"""
|
||||||
|
sends a set command, returns the plaintext answer.
|
||||||
|
You can supply params for the command as keyword arguments.
|
||||||
|
"""
|
||||||
|
# inject command into data
|
||||||
|
params.update({"cmd": cmd})
|
||||||
|
|
||||||
data = {}
|
# get "CSRF" token for submitting a form
|
||||||
port_settings = soup.find_all(
|
token = parse_xssid(get_cmd(host, xssid, API_CMD_IDS.READ_SYS_IP.value))
|
||||||
"input", attrs={"type": "hidden", "id": re.compile(r"vlanMode_\d+")}
|
params.update({"XSSID": token})
|
||||||
)
|
if token is None:
|
||||||
for port_setting in port_settings:
|
raise Exception("unable to get XSSID token")
|
||||||
port = port_setting.find_previous().text.strip()
|
|
||||||
port_type = int(port_setting.attrs.get("value"))
|
|
||||||
|
|
||||||
acl = list(
|
# call api
|
||||||
filter(
|
return requests.post(
|
||||||
lambda p: "checked" in p.attrs,
|
f"http://{host}/cgi-bin/dispatcher.cgi",
|
||||||
port_setting.find_next().find_all("input"),
|
data=params,
|
||||||
)
|
cookies={"HTTP_XSSID": xssid},
|
||||||
)[0]
|
).text
|
||||||
|
|
||||||
data.update(
|
|
||||||
{
|
# ==============================================================================
|
||||||
port: {
|
# Set command wrapper functions
|
||||||
"mode": VLAN_PORT_MODE(port_type),
|
# ==============================================================================
|
||||||
"acl": VLAN_PORT_ACL(int(acl.attrs.get("value"))),
|
|
||||||
}
|
|
||||||
}
|
def update_port(
|
||||||
|
host: str,
|
||||||
|
xssid: str,
|
||||||
|
port: str,
|
||||||
|
description: str,
|
||||||
|
state: PORT_STATE,
|
||||||
|
speed: PORT_SPEED,
|
||||||
|
duplex: PORT_DUPLEX,
|
||||||
|
fc: PORT_FLOW_CONTROL,
|
||||||
|
):
|
||||||
|
"""configures a port, returns true on success"""
|
||||||
|
return "window.location.replace" in set_cmd(
|
||||||
|
host,
|
||||||
|
xssid,
|
||||||
|
API_CMD_IDS.WRITE_PORT.value,
|
||||||
|
portlist=port,
|
||||||
|
descp=description,
|
||||||
|
state=str(state.value),
|
||||||
|
speed=str(speed.value),
|
||||||
|
duplex=str(duplex.value),
|
||||||
|
fc=str(fc.value),
|
||||||
|
sysSubmit="Apply",
|
||||||
)
|
)
|
||||||
|
|
||||||
return data
|
|
||||||
|
def create_vlan(host: str, cookie: str, vlan_id: int, vlan_name_prefix="VLAN"):
|
||||||
|
return "window.location.replace" in set_cmd(
|
||||||
|
host,
|
||||||
|
cookie,
|
||||||
|
API_CMD_IDS.CREATE_VLAN.value,
|
||||||
|
vlanlist=str(vlan_id),
|
||||||
|
vlanAction="0",
|
||||||
|
name=vlan_name_prefix,
|
||||||
|
sysSubmit="Apply",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def extract_data_from_table(response: str) -> dict:
|
def update_vlan_name(host: str, cookie: str, vlan_id: int, vlan_name: str):
|
||||||
soup = BeautifulSoup(response, "html.parser")
|
return "window.location.replace" in set_cmd(
|
||||||
|
host,
|
||||||
data = {}
|
cookie,
|
||||||
table_body = soup.find_all("table")[-1]
|
API_CMD_IDS.UPDATE_VLAN.value,
|
||||||
|
vidValue=str(vlan_id),
|
||||||
rows = table_body.find_all("tr")
|
editName=vlan_name,
|
||||||
keys = [ele.text.strip() for ele in rows[0].find_all("td")]
|
sysSubmit="Apply",
|
||||||
entries = []
|
)
|
||||||
for row in rows[1:]:
|
|
||||||
cols = row.find_all("td")
|
|
||||||
cols = [ele.text.strip() for ele in cols]
|
|
||||||
entries.append({k: v for (k, v) in zip(keys, cols) if k.strip() != ""})
|
|
||||||
|
|
||||||
return list(filter(lambda e: e, entries))
|
|
||||||
|
|
||||||
|
|
||||||
def dictify(data: list, key: str) -> dict:
|
def delete_vlan(host: str, cookie: str, vlan_id: int):
|
||||||
result = {}
|
return "window.location.replace" in get_cmd(
|
||||||
for d in data:
|
host, cookie, API_CMD_IDS.DELETE_VLAN.value, _del=vlan_id
|
||||||
k = d[key]
|
)
|
||||||
del d[key]
|
|
||||||
result.update({k: d})
|
|
||||||
return result
|
def update_port_vlan(host: str, cookie: str, port: str):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# ==============================================================================
|
||||||
|
# Get command wrapper functions
|
||||||
|
# ==============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
def read_ports(host: str, xssid: str) -> dict:
|
||||||
|
return dictify(
|
||||||
|
extract_data_from_table(get_cmd(host, xssid, API_CMD_IDS.READ_PORTS.value)),
|
||||||
|
"Port",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def read_vlans(host: str, xssid: str) -> dict:
|
||||||
|
return dictify(
|
||||||
|
extract_data_from_table(get_cmd(host, xssid, API_CMD_IDS.READ_VLANS.value)),
|
||||||
|
"VLAN ID",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def read_vlan_ports(host: str, xssid: str) -> dict:
|
||||||
|
return dictify(
|
||||||
|
extract_data_from_table(
|
||||||
|
get_cmd(host, xssid, API_CMD_IDS.READ_VLAN_PORTS.value)
|
||||||
|
),
|
||||||
|
"Port",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def read_stp_ports(host: str, xssid: str) -> dict:
|
||||||
|
return dictify(
|
||||||
|
extract_data_from_table(get_cmd(host, xssid, API_CMD_IDS.READ_STP_PORTS.value)),
|
||||||
|
"Port",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def read_vlan_port_cfg(host: str, xssid: str, vid: int) -> dict:
|
||||||
|
return dictify(
|
||||||
|
parse_vlan_ports(
|
||||||
|
get_cmd(host, xssid, API_CMD_IDS.READ_VLAN_PORT_CFG.value, vid=vid)
|
||||||
|
),
|
||||||
|
"VLAN",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
IP = "192.168.42.100"
|
||||||
cookie = get_login_cookie(IP, "admin", os.environ.get("ADMIN_PW"))
|
cookie = get_login_cookie(IP, "admin", os.environ.get("ADMIN_PW"))
|
||||||
|
|
||||||
print(dictify(extract_data_from_table(get_cmd(IP, cookie, API_CMD_IDS.READ_PORTS.value)), "Port"))
|
print(read_stp_ports(IP, cookie))
|
||||||
print(dictify(extract_data_from_table(get_cmd(IP, cookie, API_CMD_IDS.READ_VLANS.value)), "VLAN ID"))
|
|
||||||
print(dictify(extract_data_from_table(get_cmd(IP, cookie, API_CMD_IDS.READ_VLAN_PORTS.value)), "Port"))
|
|
||||||
print(dictify(extract_data_from_table(get_cmd(IP, cookie, API_CMD_IDS.READ_STP_PORTS.value)), "Port"))
|
|
||||||
print(parse_vlan_ports(get_cmd(IP, cookie, API_CMD_IDS.READ_VLAN_PORT_CFG.value, vid=2)))
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue