First draft of chunk header parsing
This commit is contained in:
parent
32ba361c04
commit
cec1edcaa5
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -3,4 +3,5 @@ dist/
|
|||
__pycache__/
|
||||
*.pyc
|
||||
*.egg-info
|
||||
.mypy_cache/
|
||||
.env
|
||||
|
|
|
@ -67,7 +67,7 @@ ignored-modules=
|
|||
|
||||
# Python code to execute, usually for sys.path manipulation such as
|
||||
# pygtk.require().
|
||||
#init-hook=
|
||||
init-hook='import sys; sys.path.append("./twnet_parser")'
|
||||
|
||||
# Use multiple processes to speed up Pylint. Specifying 0 will auto-detect the
|
||||
# number of processors available to use, and will cap the count on Windows to
|
||||
|
|
3
mypy.ini
Normal file
3
mypy.ini
Normal file
|
@ -0,0 +1,3 @@
|
|||
[mypy]
|
||||
# ignore_missing_imports = True
|
||||
# mypy_path = $MYPY_CONFIG_FILE_DIR/twnet_parser
|
|
@ -1,2 +1,2 @@
|
|||
[pytest]
|
||||
pythonpath = .
|
||||
pythonpath = . twnet_parser
|
||||
|
|
|
@ -20,8 +20,8 @@ def test_parse_7_real_map_change():
|
|||
assert packet.header.flags.compression == False
|
||||
|
||||
# TODO: uncomment
|
||||
# assert len(packet.messages) == 1
|
||||
# assert packet.messages[0].name = 'map_change'
|
||||
assert len(packet.messages) == 1
|
||||
assert packet.messages[0].name == 'sys.todo.id=2data=64' # TODO: 'map_change'
|
||||
|
||||
# Teeworlds 0.7 Protocol packet
|
||||
# Flags: none (..00 00..)
|
||||
|
|
|
@ -1,6 +1,10 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
from typing import Union
|
||||
from typing import cast
|
||||
|
||||
# TODO: fix mypy
|
||||
import packer # type: ignore
|
||||
|
||||
# TODO: what is a nice pythonic way of storing those?
|
||||
# also does some version:: namespace thing make sense?
|
||||
|
@ -9,24 +13,30 @@ PACKETFLAG7_RESEND = 2
|
|||
PACKETFLAG7_COMPRESSION = 4
|
||||
PACKETFLAG7_CONNLESS = 8
|
||||
|
||||
CHUNKFLAG7_VITAL = 1
|
||||
CHUNKFLAG7_RESEND = 2
|
||||
|
||||
PACKET_HEADER7_SIZE = 7
|
||||
|
||||
class PrettyPrint():
|
||||
def __repr__(self):
|
||||
return "<class: '" + str(self.__class__.__name__) + "'>"
|
||||
def __str__(self):
|
||||
return "<class: '" + str(self.__class__.__name__) + "'>: " + str(self.__dict__)
|
||||
|
||||
class BaseMessage(PrettyPrint):
|
||||
def __init__(self, name):
|
||||
self.name = name
|
||||
class CtrlMessage(PrettyPrint):
|
||||
def __init__(self, name: str) -> None:
|
||||
self.name: str = name
|
||||
|
||||
class CtrlMessage(BaseMessage):
|
||||
pass
|
||||
class GameMessage(PrettyPrint):
|
||||
def __init__(self, name: str) -> None:
|
||||
self.name: str = name
|
||||
self.header: ChunkHeader
|
||||
|
||||
class GameMessage(BaseMessage):
|
||||
pass
|
||||
|
||||
class SysMessage(BaseMessage):
|
||||
pass
|
||||
class SysMessage(PrettyPrint):
|
||||
def __init__(self, name: str) -> None:
|
||||
self.name: str = name
|
||||
self.header: ChunkHeader
|
||||
|
||||
class PacketFlags7(PrettyPrint):
|
||||
def __init__(self):
|
||||
|
@ -91,7 +101,96 @@ class PacketHeaderParser7():
|
|||
header.token = self.parse_token(data)
|
||||
return header
|
||||
|
||||
class ChunkFlags(PrettyPrint):
|
||||
def __init__(self):
|
||||
self.resend = False
|
||||
self.vital = False
|
||||
|
||||
# same fields for 0.6 and 0.7
|
||||
# different bit layout tho
|
||||
class ChunkHeader(PrettyPrint):
|
||||
def __init__(self) -> None:
|
||||
self.flags: ChunkFlags = ChunkFlags()
|
||||
self.size: int = 0
|
||||
# TODO: should seq be a optional?
|
||||
# so it can be None for non vital packages
|
||||
# this could turn downstream users logic errors into
|
||||
# crashes which would be easier to detect
|
||||
#
|
||||
# Or is None annoying because it crashes
|
||||
# and pollutes the code with error checking?
|
||||
# Also the teeworlds code uses -1
|
||||
# doing the same for someone who knows the codebase
|
||||
# could also be nice
|
||||
self.seq: int = -1
|
||||
|
||||
class ChunkHeaderParser:
|
||||
def parse_flags7(self, data: bytes) -> ChunkFlags:
|
||||
# FFss ssss xxss ssss
|
||||
flag_bits = (data[0] >> 6) & 0x03
|
||||
flags = ChunkFlags()
|
||||
flags.resend = (flag_bits & CHUNKFLAG7_RESEND) != 0
|
||||
flags.vital = (flag_bits & CHUNKFLAG7_VITAL) != 0
|
||||
return flags
|
||||
|
||||
# the first byte of data has to be the
|
||||
# first byte of the chunk header
|
||||
def parse_header7(self, data: bytes) -> ChunkHeader:
|
||||
header = ChunkHeader()
|
||||
header.flags = self.parse_flags7(data)
|
||||
# ffSS SSSS xxSS SSSS
|
||||
header.size = ((data[0] & 0x3F) << 6) | (data[1] & 0x3F)
|
||||
if header.flags.vital:
|
||||
# ffss ssss XXss ssss
|
||||
header.seq = ((data[1] & 0xC0) << 2) | data[2]
|
||||
return header
|
||||
|
||||
# could also be named ChunkParser
|
||||
class MessageParser():
|
||||
# the first byte of data has to be the
|
||||
# first byte of a message PAYLOAD
|
||||
# NOT the whole packet with packet header
|
||||
# and NOT the whole message with chunk header
|
||||
def parse_game_message(self, msg_id: int, data: bytes) -> str:
|
||||
return f"game.todo.id={msg_id}data={data[0]}" # TODO: return GameMessage
|
||||
def parse_sys_message(self, msg_id: int, data: bytes) -> str:
|
||||
return f"sys.todo.id={msg_id}data={data[0]}" # TODO: return SysMessage
|
||||
|
||||
class PacketParser():
|
||||
# the first byte of data has to be the
|
||||
# first byte of a message chunk
|
||||
# NOT the whole packet with packet header
|
||||
def get_messages(self, data: bytes) -> list[Union[GameMessage, SysMessage]]:
|
||||
messages: list[Union[GameMessage, SysMessage]] = []
|
||||
i = 0
|
||||
while i < len(data):
|
||||
msg = self.get_message(data[i:])
|
||||
i = msg.header.size + 3 # header + msg id = 3
|
||||
if msg.header.flags.vital:
|
||||
i += 1
|
||||
messages.append(msg)
|
||||
return messages
|
||||
|
||||
# the first byte of data has to be the
|
||||
# first byte of a message chunk
|
||||
# NOT the whole packet with packet header
|
||||
def get_message(self, data: bytes) -> Union[GameMessage, SysMessage]:
|
||||
chunk_header = ChunkHeaderParser().parse_header7(data)
|
||||
i = 2
|
||||
if chunk_header.flags.vital:
|
||||
i += 1
|
||||
msg_id: int = packer.unpack_int(data[i:])
|
||||
i += 1
|
||||
sys: bool = (msg_id & 1) == 1
|
||||
msg_id >>= 1
|
||||
msg: Union[GameMessage, SysMessage]
|
||||
if sys:
|
||||
msg = SysMessage(MessageParser().parse_sys_message(msg_id, data))
|
||||
else:
|
||||
msg = GameMessage(MessageParser().parse_game_message(msg_id, data[i:]))
|
||||
msg.header = chunk_header
|
||||
return msg
|
||||
|
||||
def parse7(self, data: bytes) -> TwPacket:
|
||||
pck = TwPacket()
|
||||
pck.version = '0.7'
|
||||
|
@ -105,6 +204,10 @@ class PacketParser():
|
|||
msg_dc = CtrlMessage('close')
|
||||
pck.messages.append(msg_dc)
|
||||
return pck
|
||||
else:
|
||||
pck.messages = cast(
|
||||
list[Union[CtrlMessage, GameMessage, SysMessage]],
|
||||
self.get_messages(data[PACKET_HEADER7_SIZE:]))
|
||||
return pck
|
||||
|
||||
def parse6(data: bytes) -> TwPacket:
|
||||
|
|
Loading…
Reference in a new issue