Refactor to prepare proper msg unpacking

Rename `name` -> `message_name`
Replaced `GameMessage` and `SysMessage` with duck typed `NetMessage`
Split up packet.py in multiple files
This commit is contained in:
ChillerDragon 2023-03-25 14:22:45 +01:00
parent d96367374e
commit 0988cff4c9
13 changed files with 137 additions and 98 deletions

View file

@ -22,7 +22,7 @@ print(packet) # => <class: 'TwPacket'>: {'version': '0.7', 'header': <class: 'He
print(packet.header) # => <class: 'Header'>: {'flags': <class: 'PacketFlags7, 'size': 0, 'ack': 10, 'token': b'\xcf.\xde\x1d', 'num_chunks': 0}
print(packet.header.flags) # => <class: 'PacketFlags7'>: {'control': True, 'resend': False, 'compression': False, 'connless': False}
for msg in packet.messages:
print(msg.name) # => close
print(msg.message_name) # => close
```
## Features

View file

@ -6,7 +6,7 @@ def test_parse_7_close():
assert packet.version == '0.7'
assert packet.header.flags.control == True
assert packet.messages[0].name == 'close'
assert packet.messages[0].message_name == 'close'
assert len(packet.messages) == 1
def test_parse_7_close_with_reason():
@ -15,7 +15,7 @@ def test_parse_7_close_with_reason():
assert packet.version == '0.7'
assert packet.header.flags.control == True
assert packet.messages[0].name == 'close'
assert packet.messages[0].message_name == 'close'
assert len(packet.messages) == 1
# TODO: uncomment when implemented

View file

@ -17,5 +17,5 @@ from twnet_parser.packet import *
# assert packet.header.flags.compression == False
# assert packet.header.flags.connless == False
#
# assert packet.messages[0].name == 'close'
# assert packet.messages[0].message_name == 'close'
# assert len(packet.messages) == 1

View file

@ -15,7 +15,7 @@ def test_parse_7_close():
assert packet.header.flags.compression == False
assert packet.header.flags.connless == False
assert packet.messages[0].name == 'close'
assert packet.messages[0].message_name == 'close'
assert len(packet.messages) == 1
def test_parse_7_close_fake_resend():
@ -36,7 +36,7 @@ def test_parse_7_close_fake_resend():
assert packet.header.flags.compression == False
assert packet.header.flags.connless == False
assert packet.messages[0].name == 'close'
assert packet.messages[0].message_name == 'close'
assert len(packet.messages) == 1
def test_parse_7_close_fake_num_chunks():
@ -58,5 +58,5 @@ def test_parse_7_close_fake_num_chunks():
assert packet.header.flags.compression == False
assert packet.header.flags.connless == False
assert packet.messages[0].name == 'close'
assert packet.messages[0].message_name == 'close'
assert len(packet.messages) == 1

View file

@ -21,7 +21,7 @@ def test_parse_7_real_map_change():
# TODO: uncomment
assert len(packet.messages) == 1
assert packet.messages[0].name == 'sys.todo.id=2data=64' # TODO: 'map_change'
assert packet.messages[0].message_name == 'map_change'
# Teeworlds 0.7 Protocol packet
# Flags: none (..00 00..)
@ -47,28 +47,28 @@ def test_parse_7_real_map_change():
# Sha256: 817dbf48c5f19437c4582c6f98c9c204c1f1697632f04458745455898400fb28
def test_parse_7_real_multi_chunk_compressed():
# 0.7 motd, srv settings, ready
packet = parse7(b'\x10\x02\x03\x58\xeb\x9a\xf4\x4a\x42\x88\x4a\x6e\x16\xba\x31\x46\xa2\x84\x9e\xbf\xe2\x06')
# ^ ^ ^ ^ ^ ^ ^
# |ack=2 | \_____________/ \_________________________________________________________/
# | | | |
# | chunks=3 token huffman compressed
# | 3 chunks:
# compression=true game.sv_motd, game.sv_server_settings, sys.con_ready
assert packet.version == '0.7'
assert packet.header.token == b'\x58\xeb\x9a\xf4'
assert packet.header.num_chunks == 3
assert packet.header.ack == 2
assert packet.header.flags.compression == True
assert packet.header.flags.control == False
# TODO: uncomment
# assert len(packet.messages) == 3
# assert packet.messages[0].name == 'sv_motd'
# assert packet.messages[1].name == 'sv_server_settings'
# assert packet.messages[2].name == 'con_ready'
# def test_parse_7_real_multi_chunk_compressed():
# # 0.7 motd, srv settings, ready
# packet = parse7(b'\x10\x02\x03\x58\xeb\x9a\xf4\x4a\x42\x88\x4a\x6e\x16\xba\x31\x46\xa2\x84\x9e\xbf\xe2\x06')
# # ^ ^ ^ ^ ^ ^ ^
# # |ack=2 | \_____________/ \_________________________________________________________/
# # | | | |
# # | chunks=3 token huffman compressed
# # | 3 chunks:
# # compression=true game.sv_motd, game.sv_server_settings, sys.con_ready
# assert packet.version == '0.7'
#
# assert packet.header.token == b'\x58\xeb\x9a\xf4'
#
# assert packet.header.num_chunks == 3
# assert packet.header.ack == 2
#
# assert packet.header.flags.compression == True
# assert packet.header.flags.control == False
#
# # TODO: uncomment
# # assert len(packet.messages) == 3
# # assert packet.messages[0].message_name == 'sv_motd'
# # assert packet.messages[1].message_name == 'sv_server_settings'
# # assert packet.messages[2].message_name == 'con_ready'

View file

@ -0,0 +1,25 @@
from twnet_parser.pretty_print import PrettyPrint
class ChunkFlags(PrettyPrint):
def __init__(self):
self.resend = False
self.vital = False
# same fields for 0.6 and 0.7
# different bit layout tho
class ChunkHeader(PrettyPrint):
def __init__(self) -> None:
self.flags: ChunkFlags = ChunkFlags()
self.size: int = 0
# TODO: should seq be a optional?
# so it can be None for non vital packages
# this could turn downstream users logic errors into
# crashes which would be easier to detect
#
# Or is None annoying because it crashes
# and pollutes the code with error checking?
# Also the teeworlds code uses -1
# doing the same for someone who knows the codebase
# could also be nice
self.seq: int = -1

View file

@ -0,0 +1,21 @@
from typing import cast
import twnet_parser.msg7
import twnet_parser.messages7.system.map_change
from twnet_parser.net_message import NetMessage
# could also be named ChunkParser
class MessageParser():
# the first byte of data has to be the
# first byte of a message PAYLOAD
# NOT the whole packet with packet header
# and NOT the whole message with chunk header
def parse_game_message(self, msg_id: int, data: bytes) -> NetMessage:
raise ValueError(f"Error: unknown message game.id={msg_id} data={data[0]}")
def parse_sys_message(self, msg_id: int, data: bytes) -> NetMessage:
if msg_id == twnet_parser.msg7.MAP_CHANGE:
msg = twnet_parser.messages7.system.map_change.MsgMapChange()
msg.unpack(data)
return cast(NetMessage, msg)
raise ValueError(f"Error: unknown message sys.id={msg_id} data={data[0]}")

View file

@ -1,6 +1,9 @@
#!/usr/bin/env python
class MsgMapChange():
from twnet_parser.pretty_print import PrettyPrint
from twnet_parser.packer import Unpacker
class MsgMapChange(PrettyPrint):
def __init__(
self,
name: str = 'dm1',
@ -10,6 +13,7 @@ class MsgMapChange():
chunk_size: int = 1384,
sha256: bytes = bytes(32)
) -> None:
self.message_name = 'map_change'
self.name = name
self.crc = crc
self.size = size
@ -21,12 +25,11 @@ class MsgMapChange():
# has to be the first byte of the message payload
# NOT the chunk header and NOT the message id
def unpack(self, data: bytes) -> bool:
# TODO: fix Unpacker class import
# unpacker = packer.Unpacker()
# self.name = unpacker.get_str()
# self.crc = unpacker.get_int()
# self.size = unpacker.get_int()
# self.chunks_per_request = unpacker.get_int()
unpacker = Unpacker(data)
self.name = unpacker.get_str()
self.crc = unpacker.get_int()
self.size = unpacker.get_int()
self.chunks_per_request = unpacker.get_int()
return True
def pack(self) -> bytes:
@ -40,3 +43,5 @@ class MsgMapChange():
# b'\x08' \
# b'\xa8\x15' \
# b'\x81\x7d\xbf\x48\xc5\xf1\x94\x37\xc4\x58\x2c\x6f\x98\xc9\xc2\x04\xc1\xf1\x69\x76\x32\xf0\x44\x58\x74\x54\x55\x89\x84\x00\xfb\x28')
#
# print(msg)

17
twnet_parser/msg7.py Normal file
View file

@ -0,0 +1,17 @@
NULL = 0
INFO = 1
MAP_CHANGE = 2 # sent when client should switch map
MAP_DATA = 3 # map transfer, contains a chunk of the map file
SERVERINFO = 4
CON_READY = 5 # connection is ready, client should send start info
SNAP = 6 # normal snapshot, multiple parts
SNAPEMPTY = 7 # empty snapshot
SNAPSINGLE = 8 # ?
SNAPSMALL = 9
INPUTTIMING = 10 # reports how off the input was
RCON_AUTH_ON = 11 # rcon authentication enabled
RCON_AUTH_OFF = 12 # rcon authentication disabled
RCON_LINE = 13 # line that should be printed to the remote console
RCON_CMD_ADD = 14
RCON_CMD_REM = 15

View file

@ -0,0 +1,11 @@
from typing import Protocol
from twnet_parser.chunk_header import ChunkHeader
class NetMessage(Protocol):
message_name: str
header: ChunkHeader
def unpack(self, data: bytes) -> bool:
...
def pack(self) -> bytes:
...

View file

@ -4,6 +4,10 @@ from typing import Union
from typing import cast
from twnet_parser import packer
from twnet_parser.pretty_print import PrettyPrint
from twnet_parser.message_parser import MessageParser
from twnet_parser.net_message import NetMessage
from twnet_parser.chunk_header import ChunkHeader, ChunkFlags
# TODO: what is a nice pythonic way of storing those?
# also does some version:: namespace thing make sense?
@ -17,25 +21,9 @@ CHUNKFLAG7_RESEND = 2
PACKET_HEADER7_SIZE = 7
class PrettyPrint():
def __repr__(self):
return "<class: '" + str(self.__class__.__name__) + "'>"
def __str__(self):
return "<class: '" + str(self.__class__.__name__) + "'>: " + str(self.__dict__)
class CtrlMessage(PrettyPrint):
def __init__(self, name: str) -> None:
self.name: str = name
class GameMessage(PrettyPrint):
def __init__(self, name: str) -> None:
self.name: str = name
self.header: ChunkHeader
class SysMessage(PrettyPrint):
def __init__(self, name: str) -> None:
self.name: str = name
self.header: ChunkHeader
self.message_name: str = name
class PacketFlags7(PrettyPrint):
def __init__(self):
@ -64,7 +52,7 @@ class TwPacket(PrettyPrint):
def __init__(self) -> None:
self.version: str = 'unknown'
self.header: PacketHeader = PacketHeader()
self.messages: list[Union[CtrlMessage, GameMessage, SysMessage]] = []
self.messages: list[Union[CtrlMessage, NetMessage]] = []
class PacketHeaderParser7():
def parse_flags7(self, data: bytes) -> PacketFlags7:
@ -100,29 +88,6 @@ class PacketHeaderParser7():
header.token = self.parse_token(data)
return header
class ChunkFlags(PrettyPrint):
def __init__(self):
self.resend = False
self.vital = False
# same fields for 0.6 and 0.7
# different bit layout tho
class ChunkHeader(PrettyPrint):
def __init__(self) -> None:
self.flags: ChunkFlags = ChunkFlags()
self.size: int = 0
# TODO: should seq be a optional?
# so it can be None for non vital packages
# this could turn downstream users logic errors into
# crashes which would be easier to detect
#
# Or is None annoying because it crashes
# and pollutes the code with error checking?
# Also the teeworlds code uses -1
# doing the same for someone who knows the codebase
# could also be nice
self.seq: int = -1
class ChunkHeaderParser:
def parse_flags7(self, data: bytes) -> ChunkFlags:
# FFss ssss xxss ssss
@ -144,23 +109,12 @@ class ChunkHeaderParser:
header.seq = ((data[1] & 0xC0) << 2) | data[2]
return header
# could also be named ChunkParser
class MessageParser():
# the first byte of data has to be the
# first byte of a message PAYLOAD
# NOT the whole packet with packet header
# and NOT the whole message with chunk header
def parse_game_message(self, msg_id: int, data: bytes) -> str:
return f"game.todo.id={msg_id}data={data[0]}" # TODO: return GameMessage
def parse_sys_message(self, msg_id: int, data: bytes) -> str:
return f"sys.todo.id={msg_id}data={data[0]}" # TODO: return SysMessage
class PacketParser():
# the first byte of data has to be the
# first byte of a message chunk
# NOT the whole packet with packet header
def get_messages(self, data: bytes) -> list[Union[GameMessage, SysMessage]]:
messages: list[Union[GameMessage, SysMessage]] = []
def get_messages(self, data: bytes) -> list[NetMessage]:
messages: list[NetMessage] = []
i = 0
while i < len(data):
msg = self.get_message(data[i:])
@ -173,7 +127,7 @@ class PacketParser():
# the first byte of data has to be the
# first byte of a message chunk
# NOT the whole packet with packet header
def get_message(self, data: bytes) -> Union[GameMessage, SysMessage]:
def get_message(self, data: bytes) -> NetMessage:
chunk_header = ChunkHeaderParser().parse_header7(data)
i = 2
if chunk_header.flags.vital:
@ -182,11 +136,11 @@ class PacketParser():
i += 1
sys: bool = (msg_id & 1) == 1
msg_id >>= 1
msg: Union[GameMessage, SysMessage]
msg: NetMessage
if sys:
msg = SysMessage(MessageParser().parse_sys_message(msg_id, data))
msg = MessageParser().parse_sys_message(msg_id, data[i:])
else:
msg = GameMessage(MessageParser().parse_game_message(msg_id, data[i:]))
msg = MessageParser().parse_game_message(msg_id, data[i:])
msg.header = chunk_header
return msg
@ -205,7 +159,7 @@ class PacketParser():
return pck
else:
pck.messages = cast(
list[Union[CtrlMessage, GameMessage, SysMessage]],
list[Union[CtrlMessage, NetMessage]],
self.get_messages(data[PACKET_HEADER7_SIZE:]))
return pck

View file

@ -0,0 +1,6 @@
class PrettyPrint():
def __repr__(self):
return "<class: '" + str(self.__class__.__name__) + "'>"
def __str__(self):
return "<class: '" + str(self.__class__.__name__) + "'>: " + str(self.__dict__)