Some thoughts on edge cases and performance
This commit is contained in:
parent
8c78cdc759
commit
6f2a9eda51
22
tests/packet_ctrl_close7_test.py
Normal file
22
tests/packet_ctrl_close7_test.py
Normal file
|
@ -0,0 +1,22 @@
|
|||
from twnet_parser.packet import *
|
||||
|
||||
def test_parse_7_close():
|
||||
packet = parse(b'\x04\x0a\x00\xcf\x2e\xde\x1d\04') # 0.7 close
|
||||
|
||||
assert packet.version == '0.7'
|
||||
assert packet.header.flags.control == True
|
||||
|
||||
assert packet.messages[0].name == 'close'
|
||||
assert len(packet.messages) == 1
|
||||
|
||||
def test_parse_7_close_with_reason():
|
||||
packet = parse(b'\x04\x0a\x00\xcf\x2e\xde\x1d\04shutdown\x00') # 0.7 close
|
||||
|
||||
assert packet.version == '0.7'
|
||||
assert packet.header.flags.control == True
|
||||
|
||||
assert packet.messages[0].name == 'close'
|
||||
assert len(packet.messages) == 1
|
||||
|
||||
# TODO: uncomment when implemented
|
||||
# assert packet.messages[0].reason == 'shutdown'
|
|
@ -1,6 +1,5 @@
|
|||
from twnet_parser.packet import *
|
||||
|
||||
# # packet = parse(b'\x10\x10\x00\x04\x9a\xcb9\xc9') # 0.6 close
|
||||
def test_parse_7_close():
|
||||
packet = parse(b'\x04\x0a\x00\xcf\x2e\xde\x1d\04') # 0.7 close
|
||||
|
||||
|
|
27
tests/packet_invalid_test.py
Normal file
27
tests/packet_invalid_test.py
Normal file
|
@ -0,0 +1,27 @@
|
|||
from twnet_parser.packet import *
|
||||
|
||||
def test_parse_7_close():
|
||||
# ctrl close 0.7 with last two bytes cut off
|
||||
# the last byte of the token and the message id
|
||||
# is missing
|
||||
try:
|
||||
packet = parse(b'\x04\x0a\x00\xcf\x2e\xde')
|
||||
except IndexError:
|
||||
# def parse7(self, data: bytes) -> TwPacket:
|
||||
# pck = TwPacket()
|
||||
# pck.version = '0.7'
|
||||
# pck.header = PacketHeaderParser().parse_header(data)
|
||||
# if pck.header.flags.control:
|
||||
# > if data[7] == 0x04: # close
|
||||
# E IndexError: index out of range
|
||||
pass
|
||||
|
||||
# TODO: think of what we want to do here
|
||||
# crash?
|
||||
# silent skip?
|
||||
# detailed error message?
|
||||
# set error field on the packet?
|
||||
# would the error checking be on by default or opt in?
|
||||
# it can affect performance
|
||||
# and a python crash is also pretty informative already
|
||||
# assert packet.version == 'unknown'
|
|
@ -57,8 +57,7 @@ class TwPacket(PrettyPrint):
|
|||
self.header: PacketHeader = PacketHeader()
|
||||
self.messages: list[Union[CtrlMessage, GameMessage, SysMessage]] = []
|
||||
|
||||
class PacketParser():
|
||||
# TODO: move this to another class?
|
||||
class PacketHeaderParser():
|
||||
def parse_flags7(self, data: bytes) -> PacketFlags7:
|
||||
# FFFF FFaa
|
||||
flag_bits = (data[0] & 0xfc) >> 2
|
||||
|
@ -69,12 +68,10 @@ class PacketParser():
|
|||
flags.connless = (flag_bits & PACKETFLAG7_CONNLESS) != 0
|
||||
return flags
|
||||
|
||||
# TODO: move this to another class?
|
||||
def parse_ack(self, header_bytes: bytes) -> int:
|
||||
# ffAA AAAA AAAA
|
||||
return ((header_bytes[0] & 0x3) << 8) | header_bytes[1]
|
||||
|
||||
# TODO: move this to another class?
|
||||
def parse_num_chunks(self, header_bytes: bytes) -> int:
|
||||
# TODO: not sure if this is correct
|
||||
return header_bytes[2]
|
||||
|
@ -82,7 +79,6 @@ class PacketParser():
|
|||
def parse_token(self, header_bytes: bytes) -> bytes:
|
||||
return header_bytes[3:7]
|
||||
|
||||
# TODO: move this to another class?
|
||||
def parse_header(self, data: bytes) -> PacketHeader:
|
||||
header = PacketHeader()
|
||||
# bits 2..5
|
||||
|
@ -95,10 +91,15 @@ class PacketParser():
|
|||
header.token = self.parse_token(data)
|
||||
return header
|
||||
|
||||
class PacketParser():
|
||||
def parse7(self, data: bytes) -> TwPacket:
|
||||
pck = TwPacket()
|
||||
pck.version = '0.7'
|
||||
pck.header = self.parse_header(data)
|
||||
# TODO: what is the most performant way in python to do this?
|
||||
# heap allocating a PacketHeaderParser just to bundle a bunch of
|
||||
# methods that do not share state seems like a waste of performance
|
||||
# would this be nicer with class methods?
|
||||
pck.header = PacketHeaderParser().parse_header(data)
|
||||
if pck.header.flags.control:
|
||||
if data[7] == 0x04: # close
|
||||
msg_dc = CtrlMessage('close')
|
||||
|
|
Loading…
Reference in a new issue