From cec1edcaa52848dcbbc2db007f053cb7147c6615 Mon Sep 17 00:00:00 2001 From: ChillerDragon Date: Sun, 19 Mar 2023 18:03:13 +0100 Subject: [PATCH] First draft of chunk header parsing --- .gitignore | 1 + .pylintrc | 2 +- mypy.ini | 3 + pytest.ini | 2 +- tests/packet_with_chunks7_test.py | 4 +- twnet_parser/packet.py | 123 +++++++++++++++++++++++++++--- 6 files changed, 121 insertions(+), 14 deletions(-) create mode 100644 mypy.ini diff --git a/.gitignore b/.gitignore index 2cf4737..869bbb2 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ dist/ __pycache__/ *.pyc *.egg-info +.mypy_cache/ .env diff --git a/.pylintrc b/.pylintrc index 116a128..8448def 100644 --- a/.pylintrc +++ b/.pylintrc @@ -67,7 +67,7 @@ ignored-modules= # Python code to execute, usually for sys.path manipulation such as # pygtk.require(). -#init-hook= +init-hook='import sys; sys.path.append("./twnet_parser")' # Use multiple processes to speed up Pylint. Specifying 0 will auto-detect the # number of processors available to use, and will cap the count on Windows to diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..1f7b633 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,3 @@ +[mypy] +# ignore_missing_imports = True +# mypy_path = $MYPY_CONFIG_FILE_DIR/twnet_parser diff --git a/pytest.ini b/pytest.ini index a635c5c..b8035f0 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,2 +1,2 @@ [pytest] -pythonpath = . +pythonpath = . twnet_parser diff --git a/tests/packet_with_chunks7_test.py b/tests/packet_with_chunks7_test.py index e7d2d32..72375a9 100644 --- a/tests/packet_with_chunks7_test.py +++ b/tests/packet_with_chunks7_test.py @@ -20,8 +20,8 @@ def test_parse_7_real_map_change(): assert packet.header.flags.compression == False # TODO: uncomment - # assert len(packet.messages) == 1 - # assert packet.messages[0].name = 'map_change' + assert len(packet.messages) == 1 + assert packet.messages[0].name == 'sys.todo.id=2data=64' # TODO: 'map_change' # Teeworlds 0.7 Protocol packet # Flags: none (..00 00..) diff --git a/twnet_parser/packet.py b/twnet_parser/packet.py index 3c687ee..6e58241 100644 --- a/twnet_parser/packet.py +++ b/twnet_parser/packet.py @@ -1,6 +1,10 @@ #!/usr/bin/env python from typing import Union +from typing import cast + +# TODO: fix mypy +import packer # type: ignore # TODO: what is a nice pythonic way of storing those? # also does some version:: namespace thing make sense? @@ -9,24 +13,30 @@ PACKETFLAG7_RESEND = 2 PACKETFLAG7_COMPRESSION = 4 PACKETFLAG7_CONNLESS = 8 +CHUNKFLAG7_VITAL = 1 +CHUNKFLAG7_RESEND = 2 + +PACKET_HEADER7_SIZE = 7 + class PrettyPrint(): def __repr__(self): return "" def __str__(self): return ": " + str(self.__dict__) -class BaseMessage(PrettyPrint): - def __init__(self, name): - self.name = name +class CtrlMessage(PrettyPrint): + def __init__(self, name: str) -> None: + self.name: str = name -class CtrlMessage(BaseMessage): - pass +class GameMessage(PrettyPrint): + def __init__(self, name: str) -> None: + self.name: str = name + self.header: ChunkHeader -class GameMessage(BaseMessage): - pass - -class SysMessage(BaseMessage): - pass +class SysMessage(PrettyPrint): + def __init__(self, name: str) -> None: + self.name: str = name + self.header: ChunkHeader class PacketFlags7(PrettyPrint): def __init__(self): @@ -91,7 +101,96 @@ class PacketHeaderParser7(): header.token = self.parse_token(data) return header +class ChunkFlags(PrettyPrint): + def __init__(self): + self.resend = False + self.vital = False + +# same fields for 0.6 and 0.7 +# different bit layout tho +class ChunkHeader(PrettyPrint): + def __init__(self) -> None: + self.flags: ChunkFlags = ChunkFlags() + self.size: int = 0 + # TODO: should seq be a optional? + # so it can be None for non vital packages + # this could turn downstream users logic errors into + # crashes which would be easier to detect + # + # Or is None annoying because it crashes + # and pollutes the code with error checking? + # Also the teeworlds code uses -1 + # doing the same for someone who knows the codebase + # could also be nice + self.seq: int = -1 + +class ChunkHeaderParser: + def parse_flags7(self, data: bytes) -> ChunkFlags: + # FFss ssss xxss ssss + flag_bits = (data[0] >> 6) & 0x03 + flags = ChunkFlags() + flags.resend = (flag_bits & CHUNKFLAG7_RESEND) != 0 + flags.vital = (flag_bits & CHUNKFLAG7_VITAL) != 0 + return flags + + # the first byte of data has to be the + # first byte of the chunk header + def parse_header7(self, data: bytes) -> ChunkHeader: + header = ChunkHeader() + header.flags = self.parse_flags7(data) + # ffSS SSSS xxSS SSSS + header.size = ((data[0] & 0x3F) << 6) | (data[1] & 0x3F) + if header.flags.vital: + # ffss ssss XXss ssss + header.seq = ((data[1] & 0xC0) << 2) | data[2] + return header + +# could also be named ChunkParser +class MessageParser(): + # the first byte of data has to be the + # first byte of a message PAYLOAD + # NOT the whole packet with packet header + # and NOT the whole message with chunk header + def parse_game_message(self, msg_id: int, data: bytes) -> str: + return f"game.todo.id={msg_id}data={data[0]}" # TODO: return GameMessage + def parse_sys_message(self, msg_id: int, data: bytes) -> str: + return f"sys.todo.id={msg_id}data={data[0]}" # TODO: return SysMessage + class PacketParser(): + # the first byte of data has to be the + # first byte of a message chunk + # NOT the whole packet with packet header + def get_messages(self, data: bytes) -> list[Union[GameMessage, SysMessage]]: + messages: list[Union[GameMessage, SysMessage]] = [] + i = 0 + while i < len(data): + msg = self.get_message(data[i:]) + i = msg.header.size + 3 # header + msg id = 3 + if msg.header.flags.vital: + i += 1 + messages.append(msg) + return messages + + # the first byte of data has to be the + # first byte of a message chunk + # NOT the whole packet with packet header + def get_message(self, data: bytes) -> Union[GameMessage, SysMessage]: + chunk_header = ChunkHeaderParser().parse_header7(data) + i = 2 + if chunk_header.flags.vital: + i += 1 + msg_id: int = packer.unpack_int(data[i:]) + i += 1 + sys: bool = (msg_id & 1) == 1 + msg_id >>= 1 + msg: Union[GameMessage, SysMessage] + if sys: + msg = SysMessage(MessageParser().parse_sys_message(msg_id, data)) + else: + msg = GameMessage(MessageParser().parse_game_message(msg_id, data[i:])) + msg.header = chunk_header + return msg + def parse7(self, data: bytes) -> TwPacket: pck = TwPacket() pck.version = '0.7' @@ -105,6 +204,10 @@ class PacketParser(): msg_dc = CtrlMessage('close') pck.messages.append(msg_dc) return pck + else: + pck.messages = cast( + list[Union[CtrlMessage, GameMessage, SysMessage]], + self.get_messages(data[PACKET_HEADER7_SIZE:])) return pck def parse6(data: bytes) -> TwPacket: