Start drafting control message layout

This commit is contained in:
ChillerDragon 2023-04-02 21:01:44 +02:00
parent a75c95d2ad
commit 37730deaa2
6 changed files with 111 additions and 11 deletions

View file

@ -0,0 +1,22 @@
from twnet_parser.packet import *
def test_parse_7_close_with_reason():
packet = parse7(b'\x04\x0a\x00\xcf\x2e\xde\x1d\04shutdown\x00') # 0.7 close
assert packet.version == '0.7'
assert packet.header.size == 0
assert packet.header.ack == 10
assert packet.header.token == b'\xcf.\xde\x1d'
assert packet.header.num_chunks == 0
assert packet.header.flags.control is True
assert packet.header.flags.resend is False
assert packet.header.flags.compression is False
assert packet.header.flags.connless is False
assert packet.messages[0].message_name == 'close'
assert len(packet.messages) == 1
assert packet.messages[0].reason == 'shutdown'

View file

@ -0,0 +1,11 @@
# from twnet_parser.packet import parse7
def test_parse_7_close_with_invalid_packet_header():
# Usually the first two bytes should be something like
# \x04\0a
# TODO: this is currently crashing
# packet = parse7(b'\x80\x0a\x00\xcf\x2e\xde\x1d\04') # 0.7 close
pass

View file

@ -0,0 +1,41 @@
from typing import Optional
from twnet_parser.pretty_print import PrettyPrint
from twnet_parser.packer import Unpacker
from twnet_parser.packer import pack_str
from twnet_parser.chunk_header import ChunkHeader
class CtrlClose(PrettyPrint):
def __init__(
self,
reason: Optional[str] = None
) -> None:
self.message_name = 'close'
self.system_message = False
self.control_message = True
# TODO: do something about ChunkHeader
# control messages do not really have a chunk header
# but we need it here so we can duck type it into a
# NetMessage
# maybe it should be an optional field?
# or there should be a union again instead of just
# NetMessage
#
# related issue
# https://gitlab.com/teeworlds-network/twnet_parser/-/issues/1
self.header: ChunkHeader
self.reason: Optional[str] = reason
# first byte of data
# has to be the first byte of the message payload
# NOT the chunk header and NOT the message id
def unpack(self, data: bytes) -> bool:
unpacker = Unpacker(data)
self.reason = unpacker.get_str() # TODO: this is an optional field
return True
def pack(self) -> bytes:
if self.reason:
return pack_str(self.reason)
return b''

View file

@ -1,3 +1,12 @@
# control
CTRL_KEEPALIVE = 0
CTRL_CONNECT = 1
CTRL_ACCEPT = 2
# yes control message 3 is missing in 0.7
CTRL_CLOSE = 4
CTRL_TOKEN = 5
# system
NULL = 0
INFO = 1

View file

@ -0,0 +1,18 @@
from typing import Optional
import twnet_parser.msg7
from twnet_parser.net_message import NetMessage
import twnet_parser.messages7.control.close as close7
def match_control7(msg_id: int, data: bytes) -> NetMessage:
msg: Optional[NetMessage] = None
if msg_id == twnet_parser.msg7.CTRL_CLOSE:
msg = close7.CtrlClose()
if msg is None:
raise ValueError(f"Error: unknown control message id={msg_id} data={data[0]}")
msg.unpack(data)
return msg

View file

@ -8,6 +8,7 @@ from twnet_parser.pretty_print import PrettyPrint
from twnet_parser.message_parser import MessageParser
from twnet_parser.net_message import NetMessage
from twnet_parser.chunk_header import ChunkHeader, ChunkFlags
from twnet_parser.msg_matcher.control7 import match_control7
from twnet_parser.external.huffman import huffman
@ -159,17 +160,15 @@ class PacketParser():
pck.payload_raw = data[PACKET_HEADER7_SIZE:]
pck.payload_decompressed = pck.payload_raw
if pck.header.flags.control:
if data[7] == 0x04: # close
msg_dc = CtrlMessage('close')
pck.messages.append(msg_dc)
return pck
else:
if pck.header.flags.compression:
payload = bytearray(pck.payload_raw)
pck.payload_decompressed = huffman.decompress(payload)
pck.messages = cast(
list[Union[CtrlMessage, NetMessage]],
self.get_messages(pck.payload_decompressed))
ctrl_msg: NetMessage = match_control7(data[7], data[8:])
pck.messages.append(ctrl_msg)
return pck
if pck.header.flags.compression:
payload = bytearray(pck.payload_raw)
pck.payload_decompressed = huffman.decompress(payload)
pck.messages = cast(
list[Union[CtrlMessage, NetMessage]],
self.get_messages(pck.payload_decompressed))
return pck
def parse6(data: bytes) -> TwPacket: