twnet_parser/tests/packet_header7_test.py

320 lines
9.6 KiB
Python

from twnet_parser.packet import \
parse7, PacketHeaderParser7, PacketHeader7, TwPacket
def test_packet_header_unpack() -> None:
# TODO: change api to
# PacketHeader.pack()
# PacketHeader.unpack(bytes)
parser = PacketHeaderParser7()
header: PacketHeader7 = parser.parse_header(b'\x04\x0a\x00\xcf\x2e\xde\x1d')
assert header.ack == 10
assert header.token == b'\xcf.\xde\x1d'
assert header.num_chunks == 0
assert header.flags.control is True
assert header.flags.resend is False
assert header.flags.compression is False
assert header.flags.connless is False
def test_packet_header_pack_flags() -> None:
header: PacketHeader7 = PacketHeader7()
header.ack = 0
header.flags.control = True
header.flags.resend = False
header.flags.compression = False
header.flags.connless = False
assert header.pack()[0:1] == b'\x04'
header.flags.control = False
header.flags.resend = False
header.flags.compression = True
header.flags.connless = False
assert header.pack()[0:1] == b'\x10'
def test_packet_header_pack_ack() -> None:
header: PacketHeader7 = PacketHeader7()
header.flags.control = False
header.flags.resend = False
header.flags.compression = False
header.flags.connless = False
header.ack = 8
assert header.pack()[1:2] == b'\x08'
header.ack = 9
assert header.pack()[1:2] == b'\x09'
header.ack = 10
assert header.pack()[1:2] == b'\x0a'
header.ack = 11
assert header.pack()[1:2] == b'\x0b'
def test_packet_header_repack_overflowing_ack() -> None:
header: PacketHeader7 = PacketHeader7()
header.flags.control = False
header.flags.resend = False
header.flags.compression = False
header.flags.connless = False
header.ack = 1024
parser = PacketHeaderParser7()
header = parser.parse_header(header.pack())
assert header.ack == 0
header.ack = 1025
parser = PacketHeaderParser7()
header = parser.parse_header(header.pack())
assert header.ack == 1
header.ack = 2000
parser = PacketHeaderParser7()
header = parser.parse_header(header.pack())
assert header.ack == 976
def test_packet_header_repack_ack_overlapping_into_flags_byte() -> None:
header: PacketHeader7 = PacketHeader7()
header.flags.control = False
header.flags.resend = False
header.flags.compression = False
header.flags.connless = False
parser = PacketHeaderParser7()
header.ack = 8
assert header.pack()[0:2] == b'\x00\x08'
data = header.pack()
header = parser.parse_header(data)
assert header.ack == 8
# https://github.com/teeworlds/teeworlds/blob/26d24ec061d44e6084b2d77a9b8a0a48e354eba6/src/engine/shared/network.h#L112
# NET_MAX_SEQUENCE = 1<<10
# which is 1024
# so ack is being clamped with ack%NET_MAX_SEQUENCE
# in teeworlds code base
# meaning 1023 is the highest ack ever sent
# by official client and server
header.ack = 1023
assert header.pack()[0:2] == b'\x03\xff'
data = header.pack()
header = parser.parse_header(data)
assert header.ack == 1023
# note the first byte being 0x07
# which is 00000111 in binary
# |____/|/
# | |
# flags higher bits of ack
#
# usually one sees something like 0x04
# which is 00000100 where the last two bits are zero
#
# so here we do the 2 set bits from 0x07
# plus all set bits of 0xff
header = parser.parse_header(b'\x07\xff\x00\xcf\x2e\xde\x1d')
assert header.ack == 1023
header = parser.parse_header(b'\x07\xfe\x00\xcf\x2e\xde\x1d')
assert header.ack == 1022
header = parser.parse_header(b'\x07\xfd\x00\xcf\x2e\xde\x1d')
assert header.ack == 1021
header = parser.parse_header(b'\x07\xfc\x00\xcf\x2e\xde\x1d')
assert header.ack == 1020
header = parser.parse_header(b'\x07\xfb\x00\xcf\x2e\xde\x1d')
assert header.ack == 1019
# note the first byte being 0x05
# which is 00000101 in binary
# |____/|/
# | |
# flags higher bits of ack
header = parser.parse_header(b'\x05\xff\x00\xcf\x2e\xde\x1d')
assert header.ack == 511
def test_packet_header_pack_num_chunks() -> None:
header = PacketHeader7()
header.num_chunks = 0
assert header.pack()[2:3] == b'\x00'
header.num_chunks = 1
assert header.pack()[2:3] == b'\x01'
header.num_chunks = 6
assert header.pack()[2:3] == b'\x06'
def test_packet_header_pack_token() -> None:
header = PacketHeader7()
header.token = b'\x11\x22\x33\xff'
assert header.pack()[3:] == b'\x11\x22\x33\xff'
header.token = b'\x22\xff\xaa\xff'
assert header.pack()[3:] == b'\x22\xff\xaa\xff'
header.token = b'\x00\x00\x00\x00'
assert header.pack()[3:] == b'\x00\x00\x00\x00'
header.token = b'helo'
assert header.pack()[3:] == b'helo'
header.token = b'tekn'
assert header.pack()[3:] == b'tekn'
def test_packet_header_pack_full() -> None:
header: PacketHeader7 = PacketHeader7()
header.ack = 10
header.token = b'\xcf.\xde\x1d'
header.num_chunks = 0
header.flags.control = True
header.flags.resend = False
header.flags.compression = False
header.flags.connless = False
assert header.pack() == b'\x04\x0a\x00\xcf\x2e\xde\x1d'
def test_packet_header_repack_all_set() -> None:
header: PacketHeader7 = PacketHeader7()
header.ack = 1023
header.token = b'\xff\xff\xff\xff'
header.num_chunks = 255
header.flags.control = True
header.flags.resend = True
header.flags.compression = True
header.flags.connless = False
# Note that even if we set everything
# we still end up with two leading zeros
# because those bits are unused
# and we init the flags with 0
assert header.pack() == bytes([ \
0b00011111, \
0b11111111, \
0b11111111, \
0b11111111, \
0b11111111, \
0b11111111, \
0b11111111
])
parser = PacketHeaderParser7()
header = parser.parse_header(b'\x1f\xff\xff\xff\xff\xff\xff')
assert header.ack == 1023
assert header.token == b'\xff\xff\xff\xff'
assert header.num_chunks == 255
assert header.flags.control is True
assert header.flags.resend is True
assert header.flags.compression is True
assert header.flags.connless is False
# Note that is doesn matter wether we parse
#
# b'\x1f\xff\xff\xff\xff\xff\xff'
#
# or
#
# b'\xdf\xff\xff\xff\xff\xff\xff'
#
# because the first two bytes are ignored anyways
parser = PacketHeaderParser7()
header = parser.parse_header(b'\xdf\xff\xff\xff\xff\xff\xff')
assert header.ack == 1023
assert header.token == b'\xff\xff\xff\xff'
assert header.num_chunks == 255
assert header.flags.control is True
assert header.flags.resend is True
assert header.flags.compression is True
assert header.flags.connless is False
def test_packet_header_repack_none_set() -> None:
header: PacketHeader7 = PacketHeader7()
header.ack = 0
header.token = b'\x00\x00\x00\x00'
header.num_chunks = 0
header.flags.control = False
header.flags.resend = False
header.flags.compression = False
header.flags.connless = False
assert header.pack() == b'\x00\x00\x00\x00\x00\x00\x00'
parser = PacketHeaderParser7()
header = parser.parse_header(b'\x00\x00\x00\x00\x00\x00\x00')
assert header.ack == 0
assert header.token == b'\x00\x00\x00\x00'
assert header.num_chunks == 0
assert header.flags.control is False
assert header.flags.resend is False
assert header.flags.compression is False
assert header.flags.connless is False
def test_parse_7_close() -> None:
packet: TwPacket = parse7(b'\x04\x0a\x00\xcf\x2e\xde\x1d\04') # 0.7 close
assert packet.version == '0.7'
assert packet.header.ack == 10
assert packet.header.token == b'\xcf.\xde\x1d'
assert packet.header.num_chunks == 0
assert packet.header.flags.control is True
assert packet.header.flags.resend is False
assert packet.header.flags.compression is False
assert packet.header.flags.connless is False
assert packet.messages[0].message_name == 'close'
assert len(packet.messages) == 1
def test_parse_7_close_fake_resend() -> None:
packet: TwPacket = parse7(b'\x0c\x0a\x00\xaa\xbb\xcc\xdd\04') # 0.7 close
# ^
# resending ctrl close
# probably never happens
assert packet.version == '0.7'
assert packet.header.ack == 10
assert packet.header.token == b'\xaa\xbb\xcc\xdd'
assert packet.header.num_chunks == 0
assert packet.header.flags.control is True
assert packet.header.flags.resend is True
assert packet.header.flags.compression is False
assert packet.header.flags.connless is False
assert packet.messages[0].message_name == 'close'
assert len(packet.messages) == 1
def test_parse_7_close_fake_num_chunks() -> None:
packet: TwPacket = parse7(b'\x04\x0a\x01\xcf\xee\xde\x2d\04') # 0.7 close
# ^
# 1 chunk makes no sense
# because control messages should
# always have 0 chunks
assert packet.version == '0.7'
assert packet.header.ack == 10
assert packet.header.token == b'\xcf\xee\xde\x2d'
assert packet.header.num_chunks == 1
assert packet.header.flags.control is True
assert packet.header.flags.resend is False
assert packet.header.flags.compression is False
assert packet.header.flags.connless is False
assert packet.messages[0].message_name == 'close'
assert len(packet.messages) == 1