Implemet connless packing
This commit is contained in:
parent
04945c770a
commit
ad8511d551
|
@ -113,14 +113,24 @@ def test_lis2_connless7():
|
|||
|
||||
server = msg.servers[0]
|
||||
assert server.port == 8298
|
||||
assert server.ipaddr == "5.78.73.17" # hetzner
|
||||
server = msg.servers[1]
|
||||
assert server.port == 8300
|
||||
assert server.ipaddr == "5.78.73.17"
|
||||
server = msg.servers[2]
|
||||
assert server.port == 8303
|
||||
assert server.ipaddr == "5.78.73.17"
|
||||
server = msg.servers[3]
|
||||
assert server.port == 8304
|
||||
assert server.ipaddr == "5.78.73.17"
|
||||
server = msg.servers[4]
|
||||
assert server.port == 8305
|
||||
assert server.ipaddr == "5.78.73.17"
|
||||
# ..
|
||||
server = msg.servers[74]
|
||||
assert server.port == 8346
|
||||
assert server.ipaddr == "37.230.210.231" # rus4.ddnet.org
|
||||
|
||||
repack: bytes = packet.pack()
|
||||
|
||||
assert repack == data
|
||||
|
|
|
@ -1,11 +1,13 @@
|
|||
from typing import Annotated
|
||||
from typing import Literal
|
||||
|
||||
class MastersrvAddr():
|
||||
def __init__(
|
||||
self,
|
||||
ipaddr: Annotated[bytes, 16] = bytes(16),
|
||||
port: int = 0
|
||||
family: Literal[4, 6] = 4,
|
||||
ipaddr: str = '127.0.0.1',
|
||||
port: int = 8303
|
||||
) -> None:
|
||||
self.ipaddr: Annotated[bytes, 16] = ipaddr
|
||||
self.family: Literal[4, 6] = family
|
||||
self.ipaddr: str = ipaddr
|
||||
self.port: int = port
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
from typing import Final
|
||||
from typing import Final, Literal
|
||||
|
||||
from twnet_parser.master_server import MastersrvAddr
|
||||
|
||||
|
@ -46,8 +46,17 @@ class Unpacker():
|
|||
servers: list[MastersrvAddr] = []
|
||||
while len(self.data()) >= 18:
|
||||
ipaddr = self.get_raw(16)
|
||||
ipv4_mapping = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF'
|
||||
family: Literal[4, 6] = 4
|
||||
addr_str: str = ''
|
||||
if ipaddr[0:12] == ipv4_mapping:
|
||||
addr_str = f"{ipaddr[12]}.{ipaddr[13]}.{ipaddr[14]}.{ipaddr[15]}"
|
||||
else:
|
||||
# TODO: make ipv6 nicer
|
||||
addr_str = str(ipaddr)
|
||||
family = 6
|
||||
port = self.get_be_uint16() # TODO: i randomly assumed this would work
|
||||
servers.append(MastersrvAddr(ipaddr, int(port)))
|
||||
servers.append(MastersrvAddr(family, addr_str, int(port)))
|
||||
return servers
|
||||
|
||||
def get_uint8(self) -> int:
|
||||
|
@ -154,7 +163,13 @@ def pack_uint8(num: int) -> bytes:
|
|||
def pack_packed_addresses(servers: list[MastersrvAddr]) -> bytes:
|
||||
res: bytes = b''
|
||||
for server in servers:
|
||||
res += server.ipaddr
|
||||
if server.family == 4:
|
||||
ipv4_mapping = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF'
|
||||
res += ipv4_mapping
|
||||
res += bytes([int(x) for x in server.ipaddr.split('.')])
|
||||
else:
|
||||
# TODO: ipv6 is wrong
|
||||
res += server.ipaddr.encode('utf-8')
|
||||
res += pack_be_uint16(server.port)
|
||||
return res
|
||||
|
||||
|
|
|
@ -98,6 +98,10 @@ class PacketHeader(PrettyPrint):
|
|||
flags |= PACKETFLAG7_CONNLESS
|
||||
if self.num_chunks is None:
|
||||
self.num_chunks = 0
|
||||
if self.flags.connless:
|
||||
return bytes([ \
|
||||
((PACKETFLAG7_CONNLESS<<2)&0xfc) | (self.connless_version&0x03)
|
||||
]) + self.token + self.response_token
|
||||
return bytes([ \
|
||||
((flags << 2)&0xfc) | ((self.ack>>8)&0x03), \
|
||||
self.ack&0xff, \
|
||||
|
@ -116,18 +120,19 @@ class TwPacket(PrettyPrint):
|
|||
payload: bytes = b''
|
||||
msg: Union[CtrlMessage, NetMessage, ConnlessMessage]
|
||||
is_control: bool = False
|
||||
is_connless: bool = False
|
||||
for msg in self.messages:
|
||||
# TODO: this is super ugly
|
||||
# revist https://gitlab.com/teeworlds-network/twnet_parser/-/issues/1
|
||||
# we can not check isinstance() not sure why
|
||||
# maybe because CtrlMessage and NetMessage are no actual classes
|
||||
# but just ducktyping helpers
|
||||
if not hasattr(msg, 'system_message'):
|
||||
if msg.message_type == 'connless':
|
||||
is_connless = True
|
||||
msg = cast(ConnlessMessage, msg)
|
||||
payload += bytes(msg.message_id)
|
||||
payload += msg.pack()
|
||||
elif msg.message_type == 'control':
|
||||
is_control = True
|
||||
msg = cast(CtrlMessage, msg)
|
||||
payload += pack_int(msg.message_id)
|
||||
payload += msg.pack(we_are_a_client)
|
||||
else:
|
||||
else: # game or system message
|
||||
msg = cast(NetMessage, msg)
|
||||
msg_payload: bytes = pack_int(
|
||||
(msg.message_id<<1) |
|
||||
|
@ -146,6 +151,9 @@ class TwPacket(PrettyPrint):
|
|||
if is_control:
|
||||
if self.header.flags.control is None:
|
||||
self.header.flags.control = True
|
||||
if is_connless:
|
||||
if self.header.flags.connless is None:
|
||||
self.header.flags.connless = True
|
||||
return self.header.pack() + payload
|
||||
|
||||
class PacketHeaderParser7():
|
||||
|
|
Loading…
Reference in a new issue