Start working on packet and chunk header packing
This commit is contained in:
parent
18dd35f4fa
commit
f0fd825879
43
tests/chunk_header7_test.py
Normal file
43
tests/chunk_header7_test.py
Normal file
|
@ -0,0 +1,43 @@
|
|||
from twnet_parser.packet import \
|
||||
ChunkHeaderParser
|
||||
from twnet_parser.chunk_header import \
|
||||
ChunkHeader
|
||||
|
||||
def test_chunk_header_unpack_vital1() -> None:
|
||||
parser = ChunkHeaderParser()
|
||||
header: ChunkHeader = parser.parse_header7(b'\x40\x10\x0a')
|
||||
|
||||
assert header.flags.resend is False
|
||||
assert header.flags.vital is True
|
||||
|
||||
assert header.size == 16
|
||||
assert header.seq == 10
|
||||
|
||||
def cmp_bytes_as_bin(got: bytes, expected: bytes) -> None:
|
||||
g_bin = [format(int(x), '#010b') for x in got]
|
||||
e_bin = [format(int(x), '#010b') for x in expected]
|
||||
assert g_bin == e_bin
|
||||
|
||||
# TODO: uncomment when working
|
||||
# def test_chunk_header_pack_vital1() -> None:
|
||||
# header: ChunkHeader = ChunkHeader()
|
||||
#
|
||||
# header.flags.resend = False
|
||||
# header.flags.vital = True
|
||||
# header.size = 16
|
||||
# header.seq = 10
|
||||
#
|
||||
# assert bytes([0b01000000, 0b00010000, 0b00001010]) == b'\x40\x10\x0a'
|
||||
#
|
||||
# cmp_bytes_as_bin(header.pack(), b'\x40\x10\x0a')
|
||||
# assert header.pack() == b'\x40\x10\x0a'
|
||||
|
||||
def test_chunk_header_unpack_vital2() -> None:
|
||||
parser = ChunkHeaderParser()
|
||||
header: ChunkHeader = parser.parse_header7(b'\x40\x10\x09')
|
||||
|
||||
assert header.flags.resend is False
|
||||
assert header.flags.vital is True
|
||||
|
||||
assert header.size == 16
|
||||
assert header.seq == 9
|
|
@ -31,3 +31,70 @@ def test_parse_7_real_call_vote() -> None:
|
|||
assert msg.value == 'test'
|
||||
assert msg.reason == ''
|
||||
assert msg.force is False
|
||||
|
||||
# TODO: uncomment when done
|
||||
# def test_serialize_7_real_call_vote_all_fields_set() -> None:
|
||||
# packet: TwPacket = TwPacket()
|
||||
# packet.version == '0.7'
|
||||
#
|
||||
# packet.header.token = b'\x48\x1f\x93\xd7'
|
||||
# packet.header.num_chunks = 1
|
||||
# packet.header.ack = 638
|
||||
#
|
||||
# packet.header.flags.control = False
|
||||
# packet.header.flags.compression = False
|
||||
#
|
||||
# msg: MsgClCallVote = MsgClCallVote()
|
||||
#
|
||||
# msg.type = 'option'
|
||||
# msg.value = 'test'
|
||||
# msg.reason = ''
|
||||
# msg.force = False
|
||||
#
|
||||
# packet.messages.append(msg)
|
||||
#
|
||||
# data: bytes = b'\x02\x7e\x01\x48\x1f\x93\xd7' \
|
||||
# b'\x40\x10\x0a' \
|
||||
# b'\x80\x01' \
|
||||
# b'\x6f\x70\x74\x69\x6f\x6e\x00' \
|
||||
# b'\x74\x65\x73\x74\x00' \
|
||||
# b'\x00' \
|
||||
# b'\x00'
|
||||
#
|
||||
# print(packet.header)
|
||||
# assert packet.pack() == data
|
||||
|
||||
# def test_serialize_7_real_call_vote_should_allow_custom_num_chunks() -> None:
|
||||
# packet: TwPacket = TwPacket()
|
||||
# packet.version == '0.7'
|
||||
#
|
||||
# packet.header.token == b'\x48\x1f\x93\xd7'
|
||||
#
|
||||
# # this is technically wrong
|
||||
# # but the library should allow sending
|
||||
# # wrong values
|
||||
# packet.header.num_chunks == 6
|
||||
# packet.header.ack == 638
|
||||
#
|
||||
# packet.header.flags.control is False
|
||||
# packet.header.flags.compression is False
|
||||
#
|
||||
# msg: MsgClCallVote = MsgClCallVote()
|
||||
#
|
||||
# msg.message_name = 'cl_call_vote'
|
||||
#
|
||||
# msg.type = 'option'
|
||||
# msg.value = 'test'
|
||||
# msg.reason = ''
|
||||
# msg.force = False
|
||||
#
|
||||
# data: bytes = b'\x02\x7e\x01\x48\x1f\x93\xd7' \
|
||||
# b'\x40\x10\x0a' \
|
||||
# b'\x80\x06' \
|
||||
# b'\x6f\x70\x74\x69\x6f\x6e\x00' \
|
||||
# b'\x74\x65\x73\x74\x00' \
|
||||
# b'\x00' \
|
||||
# b'\x00'
|
||||
#
|
||||
# assert packet.pack() == data
|
||||
|
||||
|
|
|
@ -25,3 +25,18 @@ class ChunkHeader(PrettyPrint):
|
|||
# could also be nice
|
||||
self.seq: int = -1
|
||||
|
||||
def pack(self) -> bytes:
|
||||
flags: int = 0
|
||||
if self.flags.resend:
|
||||
flags |= 1
|
||||
if self.flags.vital:
|
||||
flags |= 2
|
||||
data: bytearray = bytearray([ \
|
||||
((flags & 0x03) << 6) | \
|
||||
((self.size >> 6) & 0x3f),
|
||||
self.size & 0x3f
|
||||
])
|
||||
if self.flags.vital:
|
||||
data[1] |= (self.seq >> 2) & 0xc0
|
||||
data += bytes([self.seq & 0xff])
|
||||
return bytes(data)
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
from typing import Union
|
||||
from typing import cast
|
||||
|
||||
from twnet_parser.packer import Unpacker
|
||||
from twnet_parser.packer import Unpacker, pack_int
|
||||
from twnet_parser.pretty_print import PrettyPrint
|
||||
from twnet_parser.message_parser import MessageParser
|
||||
from twnet_parser.net_message import NetMessage
|
||||
|
@ -97,6 +97,26 @@ class TwPacket(PrettyPrint):
|
|||
self.header: PacketHeader = PacketHeader()
|
||||
self.messages: list[Union[CtrlMessage, NetMessage]] = []
|
||||
|
||||
def pack(self) -> bytes:
|
||||
print(self.header)
|
||||
messages: bytes = b''
|
||||
msg: Union[CtrlMessage, NetMessage]
|
||||
for msg in self.messages:
|
||||
# TODO: this is super ugly
|
||||
# revist https://gitlab.com/teeworlds-network/twnet_parser/-/issues/1
|
||||
# we can not check isinstance() not sure why
|
||||
# maybe because CtrlMessage and NetMessage are no actual classes
|
||||
# but just ducktyping helpers
|
||||
if not hasattr(msg, 'system_message'):
|
||||
raise ValueError('Packing control messages is not supported yet')
|
||||
msg = cast(NetMessage, msg)
|
||||
msg_payload: bytes = msg.pack()
|
||||
msg.header.size = len(msg_payload)
|
||||
messages += msg.header.pack()
|
||||
messages += pack_int(msg.message_id)
|
||||
messages += msg_payload
|
||||
return self.header.pack() + messages
|
||||
|
||||
class PacketHeaderParser7():
|
||||
def parse_flags7(self, data: bytes) -> PacketFlags7:
|
||||
# FFFF FFaa
|
||||
|
|
Loading…
Reference in a new issue