Add support for control messages
This commit is contained in:
parent
7ba6e18ced
commit
639edda489
211
tests/ctrl_packets_test.py
Normal file
211
tests/ctrl_packets_test.py
Normal file
|
@ -0,0 +1,211 @@
|
|||
from twnet_parser.packet import parse7
|
||||
from twnet_parser.messages7.control.keep_alive import CtrlKeepAlive
|
||||
from twnet_parser.messages7.control.connect import CtrlConnect
|
||||
from twnet_parser.messages7.control.accept import CtrlAccept
|
||||
from twnet_parser.messages7.control.close import CtrlClose
|
||||
from twnet_parser.messages7.control.token import CtrlToken
|
||||
|
||||
def test_parse_7_close():
|
||||
packet = parse7(b'\x04\x0a\x00\xcf\x2e\xde\x1d\04') # 0.7 close
|
||||
|
||||
assert packet.version == '0.7'
|
||||
assert packet.header.flags.control is True
|
||||
|
||||
assert packet.messages[0].message_name == 'close'
|
||||
assert len(packet.messages) == 1
|
||||
|
||||
def test_pack_7_close():
|
||||
close = CtrlClose()
|
||||
data = close.pack()
|
||||
assert data == b''
|
||||
|
||||
def test_parse_7_close_with_reason():
|
||||
packet = parse7(b'\x04\x0a\x00\xcf\x2e\xde\x1d\04shutdown\x00') # 0.7 close
|
||||
|
||||
assert packet.version == '0.7'
|
||||
assert packet.header.flags.control is True
|
||||
|
||||
assert packet.messages[0].message_name == 'close'
|
||||
assert len(packet.messages) == 1
|
||||
|
||||
assert packet.messages[0].reason == 'shutdown'
|
||||
|
||||
def test_pack_7_close_with_reason():
|
||||
close = CtrlClose(reason='foo')
|
||||
data = close.pack()
|
||||
assert data == b'foo\x00'
|
||||
close.unpack(b'hello world\x00')
|
||||
assert close.reason == 'hello world'
|
||||
|
||||
def test_parse_keep_alive7():
|
||||
packet = parse7(b'\x04\x04\x00\x5d\x95\xd6\x80\x00')
|
||||
# < packet header ><ka>
|
||||
|
||||
assert packet.version == '0.7'
|
||||
assert packet.header.num_chunks == 0
|
||||
assert packet.header.flags.control is True
|
||||
assert packet.header.token == b'\x5d\x95\xd6\x80'
|
||||
|
||||
assert packet.messages[0].message_name == 'keep_alive'
|
||||
assert len(packet.messages) == 1
|
||||
|
||||
def test_pack_keep_alive7():
|
||||
keep_alive = CtrlKeepAlive()
|
||||
data = keep_alive.pack()
|
||||
assert data == b''
|
||||
|
||||
def test_parse_7_ctrl_connect():
|
||||
packet = parse7(b'\x04\x00\x00\xcf\x2e\xde\x1d' \
|
||||
b'\x01' \
|
||||
b'\x58\xeb\x9a\xf4' \
|
||||
b'\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00')
|
||||
|
||||
assert packet.version == '0.7'
|
||||
assert packet.header.num_chunks == 0
|
||||
assert packet.header.flags.control is True
|
||||
assert packet.header.token == b'\xcf\x2e\xde\x1d'
|
||||
|
||||
assert packet.messages[0].response_token == b'\x58\xeb\x9a\xf4'
|
||||
|
||||
def test_pack_ctrl_connect7():
|
||||
connect = CtrlConnect()
|
||||
data = connect.pack()
|
||||
|
||||
assert 512 == len(data)
|
||||
|
||||
assert connect.unpack(data) is True
|
||||
|
||||
assert connect.response_token == b'\xff\xff\xff\xff'
|
||||
|
||||
# drop too short anti reflection attack payloads
|
||||
assert connect.unpack(b'\xff\xff\xff\xff\x00\x00\x00') is False
|
||||
|
||||
# allow too long anti reflection attack payloads
|
||||
assert connect.unpack(b'\xaa\xbb\xcc\xff' + bytes(900)) is True
|
||||
|
||||
assert connect.response_token == b'\xaa\xbb\xcc\xff'
|
||||
|
||||
def test_parse_7_accept():
|
||||
packet = parse7(b'\x04\x00\x00\x58\xeb\x9a\xf4\x02')
|
||||
|
||||
assert packet.version == '0.7'
|
||||
assert packet.header.num_chunks == 0
|
||||
assert packet.header.flags.control is True
|
||||
assert packet.header.token == b'\x58\xeb\x9a\xf4'
|
||||
|
||||
assert packet.messages[0].message_name == 'accept'
|
||||
|
||||
def test_pack_ctrl_accept7():
|
||||
accept = CtrlAccept()
|
||||
assert accept.pack() == b''
|
||||
|
||||
def test_parse_7_ctrl_token_sent_by_client():
|
||||
data = b'\x04\x00\x00\xff\xff\xff\xff' \
|
||||
b'\x05' \
|
||||
b'\x58\xeb\x9a\xf4' \
|
||||
b'\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00'
|
||||
packet = parse7(data=data, we_are_a_client = False)
|
||||
|
||||
assert packet.version == '0.7'
|
||||
assert packet.header.num_chunks == 0
|
||||
assert packet.header.flags.control is True
|
||||
assert packet.header.token == b'\xff\xff\xff\xff'
|
||||
|
||||
assert packet.messages[0].message_name == 'token'
|
||||
assert packet.messages[0].response_token == b'\x58\xeb\x9a\xf4'
|
||||
|
||||
def test_pack_token_sent_by_client7():
|
||||
# TODO: implement
|
||||
pass
|
||||
|
||||
def test_parse_7_ctrl_token_sent_by_server():
|
||||
data = b'\x04\x00\x00\x58\xeb\x9a\xf4' \
|
||||
b'\x05' \
|
||||
b'\xcf\x2e\xde\x1d'
|
||||
|
||||
# TODO: uncomment this when this issue is solved
|
||||
# https://gitlab.com/teeworlds-network/twnet_parser/-/issues/2
|
||||
# should we insert some ErrMsg object into messages?
|
||||
# should we raise an error?
|
||||
# should the messages array be empty?
|
||||
# packet = parse7(data=data, we_are_a_client = False)
|
||||
# assert len(packet.messages) == 0
|
||||
|
||||
packet = parse7(data=data, we_are_a_client = True)
|
||||
|
||||
assert packet.version == '0.7'
|
||||
assert packet.header.num_chunks == 0
|
||||
assert packet.header.flags.control is True
|
||||
assert packet.header.token == b'\x58\xeb\x9a\xf4'
|
||||
|
||||
assert packet.messages[0].response_token == b'\xcf\x2e\xde\x1d'
|
||||
|
||||
def test_pack_token_sent_by_server7():
|
||||
token = CtrlToken()
|
||||
data = token.pack(we_are_a_client = False)
|
||||
|
||||
assert data == b'\xff\xff\xff\xff'
|
|
@ -1,22 +0,0 @@
|
|||
from twnet_parser.packet import *
|
||||
|
||||
def test_parse_7_close():
|
||||
packet = parse7(b'\x04\x0a\x00\xcf\x2e\xde\x1d\04') # 0.7 close
|
||||
|
||||
assert packet.version == '0.7'
|
||||
assert packet.header.flags.control is True
|
||||
|
||||
assert packet.messages[0].message_name == 'close'
|
||||
assert len(packet.messages) == 1
|
||||
|
||||
def test_parse_7_close_with_reason():
|
||||
packet = parse7(b'\x04\x0a\x00\xcf\x2e\xde\x1d\04shutdown\x00') # 0.7 close
|
||||
|
||||
assert packet.version == '0.7'
|
||||
assert packet.header.flags.control is True
|
||||
|
||||
assert packet.messages[0].message_name == 'close'
|
||||
assert len(packet.messages) == 1
|
||||
|
||||
# TODO: uncomment when implemented
|
||||
# assert packet.messages[0].reason == 'shutdown'
|
|
@ -2,7 +2,7 @@ from typing import Protocol
|
|||
|
||||
class CtrlMessage(Protocol):
|
||||
message_name: str
|
||||
def unpack(self, data: bytes) -> bool:
|
||||
def unpack(self, data: bytes, we_are_a_client: bool = False) -> bool:
|
||||
...
|
||||
def pack(self) -> bytes:
|
||||
...
|
||||
|
|
11
twnet_parser/messages7/control/accept.py
Normal file
11
twnet_parser/messages7/control/accept.py
Normal file
|
@ -0,0 +1,11 @@
|
|||
from twnet_parser.pretty_print import PrettyPrint
|
||||
|
||||
class CtrlAccept(PrettyPrint):
|
||||
def __init__(self) -> None:
|
||||
self.message_name = 'accept'
|
||||
|
||||
def unpack(self, data: bytes, we_are_a_client: bool = False) -> bool:
|
||||
return False
|
||||
|
||||
def pack(self, client: bool = True) -> bytes:
|
||||
return b''
|
|
@ -15,7 +15,7 @@ class CtrlClose(PrettyPrint):
|
|||
# first byte of data
|
||||
# has to be the first byte of the message payload
|
||||
# NOT the chunk header and NOT the message id
|
||||
def unpack(self, data: bytes) -> bool:
|
||||
def unpack(self, data: bytes, we_are_a_client: bool = False) -> bool:
|
||||
unpacker = Unpacker(data)
|
||||
self.reason = unpacker.get_str() # TODO: this is an optional field
|
||||
return True
|
||||
|
|
19
twnet_parser/messages7/control/connect.py
Normal file
19
twnet_parser/messages7/control/connect.py
Normal file
|
@ -0,0 +1,19 @@
|
|||
from twnet_parser.pretty_print import PrettyPrint
|
||||
|
||||
class CtrlConnect(PrettyPrint):
|
||||
def __init__(
|
||||
self,
|
||||
response_token: bytes = b'\xff\xff\xff\xff'
|
||||
) -> None:
|
||||
self.message_name = 'connect'
|
||||
self.response_token: bytes = response_token
|
||||
|
||||
def unpack(self, data: bytes, we_are_a_client: bool = False) -> bool:
|
||||
# anti reflection attack
|
||||
if len(data) < 512:
|
||||
return False
|
||||
self.response_token = data[0:4]
|
||||
return True
|
||||
|
||||
def pack(self) -> bytes:
|
||||
return self.response_token + bytes(508)
|
11
twnet_parser/messages7/control/keep_alive.py
Normal file
11
twnet_parser/messages7/control/keep_alive.py
Normal file
|
@ -0,0 +1,11 @@
|
|||
from twnet_parser.pretty_print import PrettyPrint
|
||||
|
||||
class CtrlKeepAlive(PrettyPrint):
|
||||
def __init__(self) -> None:
|
||||
self.message_name = 'keep_alive'
|
||||
|
||||
def unpack(self, data: bytes, we_are_a_client: bool = False) -> bool:
|
||||
return False
|
||||
|
||||
def pack(self) -> bytes:
|
||||
return b''
|
22
twnet_parser/messages7/control/token.py
Normal file
22
twnet_parser/messages7/control/token.py
Normal file
|
@ -0,0 +1,22 @@
|
|||
from twnet_parser.pretty_print import PrettyPrint
|
||||
|
||||
class CtrlToken(PrettyPrint):
|
||||
def __init__(
|
||||
self,
|
||||
response_token: bytes = b'\xff\xff\xff\xff'
|
||||
) -> None:
|
||||
self.message_name = 'token'
|
||||
self.response_token: bytes = response_token
|
||||
|
||||
def unpack(self, data: bytes, we_are_a_client: bool = False) -> bool:
|
||||
if not we_are_a_client:
|
||||
# anti reflection attack
|
||||
if len(data) < 512:
|
||||
return False
|
||||
self.response_token = data[0:4]
|
||||
return True
|
||||
|
||||
def pack(self, we_are_a_client: bool = True) -> bytes:
|
||||
if we_are_a_client:
|
||||
return self.response_token + bytes(508)
|
||||
return self.response_token
|
|
@ -3,16 +3,28 @@ from typing import Optional
|
|||
import twnet_parser.msg7
|
||||
from twnet_parser.ctrl_message import CtrlMessage
|
||||
|
||||
import twnet_parser.messages7.control.keep_alive as keep_alive7
|
||||
import twnet_parser.messages7.control.connect as connect7
|
||||
import twnet_parser.messages7.control.accept as accept7
|
||||
import twnet_parser.messages7.control.close as close7
|
||||
import twnet_parser.messages7.control.token as token7
|
||||
|
||||
def match_control7(msg_id: int, data: bytes) -> CtrlMessage:
|
||||
def match_control7(msg_id: int, data: bytes, client: bool) -> CtrlMessage:
|
||||
msg: Optional[CtrlMessage] = None
|
||||
|
||||
if msg_id == twnet_parser.msg7.CTRL_CLOSE:
|
||||
if msg_id == twnet_parser.msg7.CTRL_KEEPALIVE:
|
||||
msg = keep_alive7.CtrlKeepAlive()
|
||||
elif msg_id == twnet_parser.msg7.CTRL_CONNECT:
|
||||
msg = connect7.CtrlConnect()
|
||||
elif msg_id == twnet_parser.msg7.CTRL_ACCEPT:
|
||||
msg = accept7.CtrlAccept()
|
||||
elif msg_id == twnet_parser.msg7.CTRL_CLOSE:
|
||||
msg = close7.CtrlClose()
|
||||
elif msg_id == twnet_parser.msg7.CTRL_TOKEN:
|
||||
msg = token7.CtrlToken()
|
||||
|
||||
if msg is None:
|
||||
raise ValueError(f"Error: unknown control message id={msg_id} data={data[0]}")
|
||||
|
||||
msg.unpack(data)
|
||||
msg.unpack(data, client)
|
||||
return msg
|
||||
|
|
|
@ -146,7 +146,7 @@ class PacketParser():
|
|||
msg.header = chunk_header
|
||||
return msg
|
||||
|
||||
def parse7(self, data: bytes) -> TwPacket:
|
||||
def parse7(self, data: bytes, client: bool) -> TwPacket:
|
||||
pck = TwPacket()
|
||||
pck.version = '0.7'
|
||||
# TODO: what is the most performant way in python to do this?
|
||||
|
@ -157,7 +157,7 @@ class PacketParser():
|
|||
pck.payload_raw = data[PACKET_HEADER7_SIZE:]
|
||||
pck.payload_decompressed = pck.payload_raw
|
||||
if pck.header.flags.control:
|
||||
ctrl_msg: CtrlMessage = match_control7(data[7], data[8:])
|
||||
ctrl_msg: CtrlMessage = match_control7(data[7], data[8:], client)
|
||||
pck.messages.append(ctrl_msg)
|
||||
return pck
|
||||
if pck.header.flags.compression:
|
||||
|
@ -171,5 +171,5 @@ class PacketParser():
|
|||
def parse6(data: bytes) -> TwPacket:
|
||||
raise NotImplementedError()
|
||||
|
||||
def parse7(data: bytes) -> TwPacket:
|
||||
return PacketParser().parse7(data)
|
||||
def parse7(data: bytes, we_are_a_client: bool = False) -> TwPacket:
|
||||
return PacketParser().parse7(data, we_are_a_client)
|
||||
|
|
Loading…
Reference in a new issue