Add some types to packet test
This commit is contained in:
parent
1fd099c3de
commit
4f540fead1
|
@ -1,12 +1,13 @@
|
|||
from twnet_parser.packet import parse7, PacketHeaderParser7, PacketHeader
|
||||
from twnet_parser.packet import \
|
||||
parse7, PacketHeaderParser7, PacketHeader, TwPacket
|
||||
|
||||
def test_packet_header_unpack():
|
||||
def test_packet_header_unpack() -> None:
|
||||
# TODO: change api to
|
||||
# PacketHeader.pack()
|
||||
# PacketHeader.unpack(bytes)
|
||||
|
||||
parser = PacketHeaderParser7()
|
||||
header = parser.parse_header(b'\x04\x0a\x00\xcf\x2e\xde\x1d')
|
||||
header: PacketHeader = parser.parse_header(b'\x04\x0a\x00\xcf\x2e\xde\x1d')
|
||||
|
||||
assert header.ack == 10
|
||||
assert header.token == b'\xcf.\xde\x1d'
|
||||
|
@ -17,8 +18,8 @@ def test_packet_header_unpack():
|
|||
assert header.flags.compression is False
|
||||
assert header.flags.connless is False
|
||||
|
||||
def test_packet_header_pack_flags():
|
||||
header = PacketHeader()
|
||||
def test_packet_header_pack_flags() -> None:
|
||||
header: PacketHeader = PacketHeader()
|
||||
header.ack = 0
|
||||
|
||||
header.flags.control = True
|
||||
|
@ -33,8 +34,8 @@ def test_packet_header_pack_flags():
|
|||
header.flags.connless = False
|
||||
assert header.pack()[0:1] == b'\x10'
|
||||
|
||||
def test_packet_header_pack_ack():
|
||||
header = PacketHeader()
|
||||
def test_packet_header_pack_ack() -> None:
|
||||
header: PacketHeader = PacketHeader()
|
||||
header.flags.control = False
|
||||
header.flags.resend = False
|
||||
header.flags.compression = False
|
||||
|
@ -49,8 +50,8 @@ def test_packet_header_pack_ack():
|
|||
header.ack = 11
|
||||
assert header.pack()[1:2] == b'\x0b'
|
||||
|
||||
def test_packet_header_repack_overflowing_ack():
|
||||
header = PacketHeader()
|
||||
def test_packet_header_repack_overflowing_ack() -> None:
|
||||
header: PacketHeader = PacketHeader()
|
||||
header.flags.control = False
|
||||
header.flags.resend = False
|
||||
header.flags.compression = False
|
||||
|
@ -71,8 +72,8 @@ def test_packet_header_repack_overflowing_ack():
|
|||
header = parser.parse_header(header.pack())
|
||||
assert header.ack == 976
|
||||
|
||||
def test_packet_header_repack_ack_overlapping_into_flags_byte():
|
||||
header = PacketHeader()
|
||||
def test_packet_header_repack_ack_overlapping_into_flags_byte() -> None:
|
||||
header: PacketHeader = PacketHeader()
|
||||
header.flags.control = False
|
||||
header.flags.resend = False
|
||||
header.flags.compression = False
|
||||
|
@ -131,7 +132,7 @@ def test_packet_header_repack_ack_overlapping_into_flags_byte():
|
|||
header = parser.parse_header(b'\x05\xff\x00\xcf\x2e\xde\x1d')
|
||||
assert header.ack == 511
|
||||
|
||||
def test_packet_header_pack_num_chunks():
|
||||
def test_packet_header_pack_num_chunks() -> None:
|
||||
header = PacketHeader()
|
||||
|
||||
header.num_chunks = 0
|
||||
|
@ -143,7 +144,7 @@ def test_packet_header_pack_num_chunks():
|
|||
header.num_chunks = 6
|
||||
assert header.pack()[2:3] == b'\x06'
|
||||
|
||||
def test_packet_header_pack_token():
|
||||
def test_packet_header_pack_token() -> None:
|
||||
header = PacketHeader()
|
||||
header.token = b'\x11\x22\x33\xff'
|
||||
assert header.pack()[3:] == b'\x11\x22\x33\xff'
|
||||
|
@ -160,8 +161,8 @@ def test_packet_header_pack_token():
|
|||
header.token = b'tekn'
|
||||
assert header.pack()[3:] == b'tekn'
|
||||
|
||||
def test_packet_header_pack_full():
|
||||
header = PacketHeader()
|
||||
def test_packet_header_pack_full() -> None:
|
||||
header: PacketHeader = PacketHeader()
|
||||
|
||||
header.ack = 10
|
||||
header.token = b'\xcf.\xde\x1d'
|
||||
|
@ -174,8 +175,8 @@ def test_packet_header_pack_full():
|
|||
|
||||
assert header.pack() == b'\x04\x0a\x00\xcf\x2e\xde\x1d'
|
||||
|
||||
def test_packet_header_repack_all_set():
|
||||
header = PacketHeader()
|
||||
def test_packet_header_repack_all_set() -> None:
|
||||
header: PacketHeader = PacketHeader()
|
||||
|
||||
header.ack = 1023
|
||||
header.token = b'\xff\xff\xff\xff'
|
||||
|
@ -233,8 +234,8 @@ def test_packet_header_repack_all_set():
|
|||
assert header.flags.compression is True
|
||||
assert header.flags.connless is True
|
||||
|
||||
def test_packet_header_repack_none_set():
|
||||
header = PacketHeader()
|
||||
def test_packet_header_repack_none_set() -> None:
|
||||
header: PacketHeader = PacketHeader()
|
||||
|
||||
header.ack = 0
|
||||
header.token = b'\x00\x00\x00\x00'
|
||||
|
@ -259,8 +260,8 @@ def test_packet_header_repack_none_set():
|
|||
assert header.flags.compression is False
|
||||
assert header.flags.connless is False
|
||||
|
||||
def test_parse_7_close():
|
||||
packet = parse7(b'\x04\x0a\x00\xcf\x2e\xde\x1d\04') # 0.7 close
|
||||
def test_parse_7_close() -> None:
|
||||
packet: TwPacket = parse7(b'\x04\x0a\x00\xcf\x2e\xde\x1d\04') # 0.7 close
|
||||
|
||||
assert packet.version == '0.7'
|
||||
|
||||
|
@ -276,8 +277,8 @@ def test_parse_7_close():
|
|||
assert packet.messages[0].message_name == 'close'
|
||||
assert len(packet.messages) == 1
|
||||
|
||||
def test_parse_7_close_fake_resend():
|
||||
packet = parse7(b'\x0c\x0a\x00\xaa\xbb\xcc\xdd\04') # 0.7 close
|
||||
def test_parse_7_close_fake_resend() -> None:
|
||||
packet: TwPacket = parse7(b'\x0c\x0a\x00\xaa\xbb\xcc\xdd\04') # 0.7 close
|
||||
# ^
|
||||
# resending ctrl close
|
||||
# probably never happens
|
||||
|
@ -296,8 +297,8 @@ def test_parse_7_close_fake_resend():
|
|||
assert packet.messages[0].message_name == 'close'
|
||||
assert len(packet.messages) == 1
|
||||
|
||||
def test_parse_7_close_fake_num_chunks():
|
||||
packet = parse7(b'\x04\x0a\x01\xcf\xee\xde\x2d\04') # 0.7 close
|
||||
def test_parse_7_close_fake_num_chunks() -> None:
|
||||
packet: TwPacket = parse7(b'\x04\x0a\x01\xcf\xee\xde\x2d\04') # 0.7 close
|
||||
# ^
|
||||
# 1 chunk makes no sense
|
||||
# because control messages should
|
||||
|
|
Loading…
Reference in a new issue