Implement packing full tw packets (no ctrl messages yet)

This commit is contained in:
ChillerDragon 2023-04-29 09:07:06 +02:00
parent 06b751940f
commit 425506d2a3
3 changed files with 217 additions and 73 deletions

View file

@ -32,69 +32,194 @@ def test_parse_7_real_call_vote() -> None:
assert msg.reason == ''
assert msg.force is False
# TODO: uncomment when done
# def test_serialize_7_real_call_vote_all_fields_set() -> None:
# packet: TwPacket = TwPacket()
# packet.version == '0.7'
#
# packet.header.token = b'\x48\x1f\x93\xd7'
# packet.header.num_chunks = 1
# packet.header.ack = 638
#
# packet.header.flags.control = False
# packet.header.flags.compression = False
#
# msg: MsgClCallVote = MsgClCallVote()
#
# msg.type = 'option'
# msg.value = 'test'
# msg.reason = ''
# msg.force = False
#
# packet.messages.append(msg)
#
# data: bytes = b'\x02\x7e\x01\x48\x1f\x93\xd7' \
# b'\x40\x10\x0a' \
# b'\x80\x01' \
# b'\x6f\x70\x74\x69\x6f\x6e\x00' \
# b'\x74\x65\x73\x74\x00' \
# b'\x00' \
# b'\x00'
#
# print(packet.header)
# assert packet.pack() == data
assert msg.header.flags.resend is False
assert msg.header.flags.vital is True
assert msg.header.size == 16
assert msg.header.seq == 10
# def test_serialize_7_real_call_vote_should_allow_custom_num_chunks() -> None:
# packet: TwPacket = TwPacket()
# packet.version == '0.7'
#
# packet.header.token == b'\x48\x1f\x93\xd7'
#
# # this is technically wrong
# # but the library should allow sending
# # wrong values
# packet.header.num_chunks == 6
# packet.header.ack == 638
#
# packet.header.flags.control is False
# packet.header.flags.compression is False
#
# msg: MsgClCallVote = MsgClCallVote()
#
# msg.message_name = 'cl_call_vote'
#
# msg.type = 'option'
# msg.value = 'test'
# msg.reason = ''
# msg.force = False
#
# data: bytes = b'\x02\x7e\x01\x48\x1f\x93\xd7' \
# b'\x40\x10\x0a' \
# b'\x80\x06' \
# b'\x6f\x70\x74\x69\x6f\x6e\x00' \
# b'\x74\x65\x73\x74\x00' \
# b'\x00' \
# b'\x00'
#
# assert packet.pack() == data
def test_serialize_7_real_call_vote_all_fields_set() -> None:
packet: TwPacket = TwPacket()
packet.version == '0.7'
packet.header.token = b'\x48\x1f\x93\xd7'
packet.header.num_chunks = 1
packet.header.ack = 638
packet.header.flags.control = False
packet.header.flags.compression = False
msg: MsgClCallVote = MsgClCallVote()
msg.header.flags.resend = False
msg.header.flags.vital = True
msg.header.size = 16
msg.header.seq = 10
msg.type = 'option'
msg.value = 'test'
msg.reason = ''
msg.force = False
packet.messages.append(msg)
data: bytes = b'\x02\x7e\x01\x48\x1f\x93\xd7' \
b'\x40\x10\x0a' \
b'\x80\x01' \
b'\x6f\x70\x74\x69\x6f\x6e\x00' \
b'\x74\x65\x73\x74\x00' \
b'\x00' \
b'\x00'
assert packet.pack() == data
def test_serialize_7_real_call_vote_allow_custom_size() -> None:
packet: TwPacket = TwPacket()
packet.version == '0.7'
packet.header.token = b'\x48\x1f\x93\xd7'
packet.header.num_chunks = 1
packet.header.ack = 638
packet.header.flags.control = False
packet.header.flags.compression = False
msg: MsgClCallVote = MsgClCallVote()
msg.header.flags.resend = False
msg.header.flags.vital = True
# this is technically wrong
# but the library should allow sending
# wrong values
msg.header.size = 6
msg.header.seq = 10
msg.type = 'option'
msg.value = 'test'
msg.reason = ''
msg.force = False
packet.messages.append(msg)
data: bytes = b'\x02\x7e\x01\x48\x1f\x93\xd7' \
b'\x40\x06\x0a' \
b'\x80\x01' \
b'\x6f\x70\x74\x69\x6f\x6e\x00' \
b'\x74\x65\x73\x74\x00' \
b'\x00' \
b'\x00'
assert packet.pack() == data
def test_serialize_7_real_call_vote_should_allow_custom_num_chunks() -> None:
packet: TwPacket = TwPacket()
packet.version == '0.7'
packet.header.token = b'\x48\x1f\x93\xd7'
# this is technically wrong
# but the library should allow sending
# wrong values
packet.header.num_chunks = 6
packet.header.ack = 638
packet.header.flags.control = False
packet.header.flags.compression = False
msg: MsgClCallVote = MsgClCallVote()
msg.header.flags.resend = False
msg.header.flags.vital = True
msg.header.size = 16
msg.header.seq = 10
msg.type = 'option'
msg.value = 'test'
msg.reason = ''
msg.force = False
packet.messages.append(msg)
data: bytes = b'\x02\x7e\x06\x48\x1f\x93\xd7' \
b'\x40\x10\x0a' \
b'\x80\x01' \
b'\x6f\x70\x74\x69\x6f\x6e\x00' \
b'\x74\x65\x73\x74\x00' \
b'\x00' \
b'\x00'
assert packet.pack() == data
def test_serialize_7_real_call_vote_compute_size() -> None:
packet: TwPacket = TwPacket()
packet.version == '0.7'
packet.header.token = b'\x48\x1f\x93\xd7'
packet.header.num_chunks = 1
packet.header.ack = 638
packet.header.flags.control = False
packet.header.flags.compression = False
msg: MsgClCallVote = MsgClCallVote()
msg.header.flags.resend = False
msg.header.flags.vital = True
# This should be computed and overwritten by
# TwPacket.pack()
# msg.header.size = 16
msg.header.seq = 10
msg.type = 'option'
msg.value = 'test'
msg.reason = ''
msg.force = False
packet.messages.append(msg)
data: bytes = b'\x02\x7e\x01\x48\x1f\x93\xd7' \
b'\x40\x10\x0a' \
b'\x80\x01' \
b'\x6f\x70\x74\x69\x6f\x6e\x00' \
b'\x74\x65\x73\x74\x00' \
b'\x00' \
b'\x00'
assert packet.pack() == data
def test_serialize_7_real_call_vote_compute_defaults() -> None:
packet: TwPacket = TwPacket()
packet.version == '0.7'
packet.header.token = b'\x48\x1f\x93\xd7'
packet.header.num_chunks = None # <= compute this
packet.header.ack = 638
packet.header.flags.control = False
packet.header.flags.compression = False
msg: MsgClCallVote = MsgClCallVote()
msg.header.flags.resend = False
msg.header.flags.vital = True
msg.header.size = None # <= compute this
msg.header.seq = 10
msg.type = 'option'
msg.value = 'test'
msg.reason = ''
msg.force = False
packet.messages.append(msg)
data: bytes = b'\x02\x7e\x01\x48\x1f\x93\xd7' \
b'\x40\x10\x0a' \
b'\x80\x01' \
b'\x6f\x70\x74\x69\x6f\x6e\x00' \
b'\x74\x65\x73\x74\x00' \
b'\x00' \
b'\x00'
assert packet.pack() == data

View file

@ -1,3 +1,5 @@
from typing import Optional
from twnet_parser.pretty_print import PrettyPrint
class ChunkFlags(PrettyPrint):
@ -12,7 +14,7 @@ class ChunkFlags(PrettyPrint):
class ChunkHeader(PrettyPrint):
def __init__(self) -> None:
self.flags: ChunkFlags = ChunkFlags()
self.size: int = 0
self.size: Optional[int] = 0
# TODO: should seq be a optional?
# so it can be None for non vital packages
# this could turn downstream users logic errors into
@ -23,6 +25,10 @@ class ChunkHeader(PrettyPrint):
# Also the teeworlds code uses -1
# doing the same for someone who knows the codebase
# could also be nice
#
# update: self.size uses optional now
# so it can be computed automatically
# if unset
self.seq: int = -1
def pack(self) -> bytes:
@ -31,6 +37,8 @@ class ChunkHeader(PrettyPrint):
flags |= 2
if self.flags.vital:
flags |= 1
if self.size is None:
self.size = 0
data: bytearray = bytearray([ \
((flags & 0x03) << 6) | \
((self.size >> 6) & 0x3f),

View file

@ -1,7 +1,6 @@
#!/usr/bin/env python
from typing import Union
from typing import cast
from typing import Union, cast, Optional
from twnet_parser.packer import Unpacker, pack_int
from twnet_parser.pretty_print import PrettyPrint
@ -47,12 +46,17 @@ class PacketHeader(PrettyPrint):
flags: PacketFlags7 = PacketFlags7(),
ack: int = 0,
token: bytes = b'\xff\xff\xff\xff',
num_chunks: int = 0
num_chunks: Optional[int] = None
) -> None:
"""
If num_chunks is not set it will count
the messages it was given when
the pack() method is called
"""
self.flags: PacketFlags7 = flags
self.ack: int = ack % NET_MAX_SEQUENCE
self.token: bytes = token
self.num_chunks: int = num_chunks
self.num_chunks: Optional[int] = num_chunks
def pack(self) -> bytes:
"""
@ -83,6 +87,8 @@ class PacketHeader(PrettyPrint):
flags |= PACKETFLAG7_COMPRESSION
if self.flags.connless:
flags |= PACKETFLAG7_CONNLESS
if self.num_chunks is None:
self.num_chunks = 0
return bytes([ \
((flags << 2)&0xfc) | ((self.ack>>8)&0x03), \
self.ack&0xff, \
@ -98,7 +104,6 @@ class TwPacket(PrettyPrint):
self.messages: list[Union[CtrlMessage, NetMessage]] = []
def pack(self) -> bytes:
print(self.header)
messages: bytes = b''
msg: Union[CtrlMessage, NetMessage]
for msg in self.messages:
@ -110,11 +115,15 @@ class TwPacket(PrettyPrint):
if not hasattr(msg, 'system_message'):
raise ValueError('Packing control messages is not supported yet')
msg = cast(NetMessage, msg)
msg_payload: bytes = msg.pack()
msg.header.size = len(msg_payload)
msg_payload: bytes = pack_int((msg.message_id<<1)|(int)(msg.system_message))
msg_payload += msg.pack()
if msg.header.size is None:
msg.header.size = len(msg_payload)
messages += msg.header.pack()
messages += pack_int(msg.message_id)
messages += msg_payload
if self.header.num_chunks is None:
self.header.num_chunks = len(self.messages)
print(self.header.num_chunks)
return self.header.pack() + messages
class PacketHeaderParser7():
@ -181,6 +190,8 @@ class PacketParser():
i = 0
while i < len(data):
msg = self.get_message(data[i:])
if msg.header.size is None:
raise ValueError('header size is not set')
i += msg.header.size + 2 # header + msg id = 3
if msg.header.flags.vital:
i += 1