From 822074ea47e11be9744b0631febeef2e094221ce Mon Sep 17 00:00:00 2001 From: ChillerDragon Date: Sat, 8 Apr 2023 14:24:19 +0200 Subject: [PATCH] Add support for generating array chunk fields --- scripts/generate_messages.py | 205 +++++++++++++++++++++++++---------- 1 file changed, 149 insertions(+), 56 deletions(-) diff --git a/scripts/generate_messages.py b/scripts/generate_messages.py index 5594dec..daafc12 100755 --- a/scripts/generate_messages.py +++ b/scripts/generate_messages.py @@ -38,11 +38,21 @@ class InnerNetMessageMemberTypeJson(TypedDict): kind: KIND disallow_cc: bool +class ArrayMemberTypeJson(TypedDict): + kind: KIND + # strings + disallow_cc: bool + class NetMessageMemberTypeJson(TypedDict): kind: KIND inner: InnerNetMessageMemberTypeJson + # strings disallow_cc: bool + # arrays + count: int + member_type: ArrayMemberTypeJson + class NetMessageMemberJson(TypedDict): name: list[str] type: NetMessageMemberTypeJson @@ -183,9 +193,29 @@ def generate_msg(msg: NetMessageJson, game: Literal['game', 'system']) -> None: # TODO: think about snapshot_object ftype = 'int' default = '0' - elif member['type']['kind'] == 'array': # TODO: think about array - ftype = 'int' - default = '0' + elif member['type']['kind'] == 'array': + size: int = member['type']['count'] + if size is None: + print("Error: size is none for the following message") + print(msg) + exit(1) + arr_member: ArrayMemberTypeJson = member['type']['member_type'] + if arr_member['kind'] == 'string': + ftype = f'Annotated[list[str], {size}]' + default = '[' + ', '.join(["''"] * size) + ']' + elif arr_member['kind'] == 'boolean': + ftype = f'Annotated[list[bool], {size}]' + default = '[' + ', '.join(["False"] * size) + ']' + elif arr_member['kind'] in ('int32', 'tick', 'enum'): + ftype = f'Annotated[list[int], {size}]' + default = '[' + ', '.join(["0"] * size) + ']' + else: + raise ValueError( \ + f"Error: unknown array member type {member['type']}") + # Initializing lists with defaults + # And type annotation can get quite long + # So split it in two lines + default = f'\\\n {default}' elif member['type']['kind'] == 'flags': # TODO: think about flags ftype = 'int' default = '0' @@ -232,8 +262,12 @@ def generate_msg(msg: NetMessageJson, game: Literal['game', 'system']) -> None: elif member['type']['kind'] == 'snapshot_object': # TODO: think about snapshot_object ftype = 'int' - elif member['type']['kind'] == 'array': # TODO: think about array - ftype = 'int' + elif member['type']['kind'] == 'array': + # Array type annotations are so annoyingly long + # also there is a planned refactor + # https://gitlab.com/teeworlds-network/twnet_parser/-/issues/4 + # so inherit type from constructor arguments + ftype = '' elif member['type']['kind'] == 'flags': # TODO: think about flags ftype = 'int' elif member['type']['kind'] == 'optional': @@ -246,7 +280,9 @@ def generate_msg(msg: NetMessageJson, game: Literal['game', 'system']) -> None: else: raise ValueError(f"Error: unknown type {member['type']}") name = name_to_snake(member["name"]) - out_file.write(f" self.{name}: {ftype} = {name}\n") + if ftype != '': + ftype = f': {ftype}' + out_file.write(f" self.{name}{ftype} = {name}\n") out_file.write('\n') out_file.write(' # first byte of data\n') out_file.write(' # has to be the first byte of the message payload\n') @@ -254,55 +290,83 @@ def generate_msg(msg: NetMessageJson, game: Literal['game', 'system']) -> None: out_file.write(' def unpack(self, data: bytes) -> bool:\n') if len(msg['members']) > 0: out_file.write(' unpacker = Unpacker(data)\n') - for member in msg['members']: - # {'name': ['message'], 'type': {'kind': 'string', 'disallow_cc': False}} - unpacker = 'int()' - if member['type']['kind'] == 'string': - if member['type']['disallow_cc']: - unpacker = 'str(SANITIZE_CC)' - else: - unpacker = 'str()' - elif member['type']['kind'] in \ - ('raw', 'sha256', 'rest', 'data'): # TODO: data has a size field - unpacker = 'raw()' - # {"name": ["mode"], "type": {"kind": "enum", "enum": ["chat"]}}, - elif member['type']['kind'] == 'enum': - unpacker = 'int() # TODO: this is a enum' - elif member['type']['kind'] in ('int32', 'tick'): - unpacker = 'int()' - elif member['type']['kind'] == 'boolean': - unpacker = 'int() == 1' - elif member['type']['kind'] == 'tune_param': - unpacker = 'int() / 100.0' - elif member['type']['kind'] == 'snapshot_object': - # TODO: think about snapshot_object - unpacker = 'int() # TODO: this is a snapshot object' - elif member['type']['kind'] == 'array': # TODO: think about array - unpacker = 'int() # TODO: this is an array' - elif member['type']['kind'] == 'flags': # TODO: think about flags - unpacker = 'int() # TODO: this is a flag' - elif member['type']['kind'] == 'optional': - # TODO: unpacker should not crash on missing optional fields - # check how tw code does it and be smart here - if member['type']['inner']['kind'] == 'string': - if member['type']['inner']['disallow_cc']: - unpacker = 'str(SANITIZE_CC)' - unpacker += ' # TODO: optionals' - else: - unpacker = 'str() # TODO: optionals' - elif member['type']['inner']['kind'] in ('int32', 'tick'): - unpacker = 'int() # TODO: optionals' - else: - raise ValueError(f"Error: unknown type {member['type']}") - name = name_to_snake(member["name"]) - out_file.write(f' self.{name} = unpacker.get_{unpacker}\n') + out_file.write(gen_unpack_members(msg)) out_file.write(' return True\n') out_file.write('\n') out_file.write(' def pack(self) -> bytes:\n') out_file.write(gen_pack_return(msg)) +def gen_unpack_members(msg: NetMessageJson) -> str: + res: str = '' + for member in msg['members']: + # {'name': ['message'], 'type': {'kind': 'string', 'disallow_cc': False}} + unpacker = 'int()' + if member['type']['kind'] == 'string': + if member['type']['disallow_cc']: + unpacker = 'str(SANITIZE_CC)' + else: + unpacker = 'str()' + elif member['type']['kind'] in \ + ('raw', 'sha256', 'rest', 'data'): # TODO: data has a size field + unpacker = 'raw()' + # {"name": ["mode"], "type": {"kind": "enum", "enum": ["chat"]}}, + elif member['type']['kind'] == 'enum': + unpacker = 'int() # TODO: this is a enum' + elif member['type']['kind'] in ('int32', 'tick'): + unpacker = 'int()' + elif member['type']['kind'] == 'boolean': + unpacker = 'int() == 1' + elif member['type']['kind'] == 'tune_param': + unpacker = 'int() / 100.0' + elif member['type']['kind'] == 'snapshot_object': + # TODO: think about snapshot_object + unpacker = 'int() # TODO: this is a snapshot object' + elif member['type']['kind'] == 'array': + size: int = member['type']['count'] + if size is None: + print("Error: size is none for the following message") + print(msg) + exit(1) + arr_member: ArrayMemberTypeJson = member['type']['member_type'] + if arr_member['kind'] == 'string': + if arr_member['disallow_cc']: + unpacker = 'str(SANITIZE_CC)' + else: + unpacker = 'str()' + elif arr_member['kind'] == 'enum': + unpacker = 'int() # TODO: this is a enum' + elif arr_member['kind'] == 'boolean': + unpacker = 'int() == 1' + elif arr_member['kind'] in ('int32', 'tick'): + unpacker = 'int()' + else: + raise ValueError(f"Error: unknown array member type {member['type']}") + name = name_to_snake(member["name"]) + res += f' for i in range(0, {size}):\n' + res += f' self.{name}[i] = unpacker.get_{unpacker}\n' + continue + elif member['type']['kind'] == 'flags': # TODO: think about flags + unpacker = 'int() # TODO: this is a flag' + elif member['type']['kind'] == 'optional': + # TODO: unpacker should not crash on missing optional fields + # check how tw code does it and be smart here + if member['type']['inner']['kind'] == 'string': + if member['type']['inner']['disallow_cc']: + unpacker = 'str(SANITIZE_CC)' + unpacker += ' # TODO: optionals' + else: + unpacker = 'str() # TODO: optionals' + elif member['type']['inner']['kind'] in ('int32', 'tick'): + unpacker = 'int() # TODO: optionals' + else: + raise ValueError(f"Error: unknown type {member['type']}") + name = name_to_snake(member["name"]) + res += f' self.{name} = unpacker.get_{unpacker}\n' + return res + def get_dependencies(msg: NetMessageJson) -> str: packer_deps: list[str] = [] + typing_deps: list[str] = [] for member in msg['members']: if member['type']['kind'] == 'string': packer_deps.append('pack_str') @@ -323,8 +387,22 @@ def get_dependencies(msg: NetMessageJson) -> str: elif member['type']['kind'] == 'snapshot_object': # TODO: think about snapshot_object packer_deps.append('pack_int') - elif member['type']['kind'] == 'array': # TODO: think about array + elif member['type']['kind'] == 'array': packer_deps.append('pack_int') + typing_deps.append('Annotated') + arr_member: ArrayMemberTypeJson = member['type']['member_type'] + if arr_member['kind'] == 'string': + packer_deps.append('pack_str') + if arr_member['disallow_cc']: + packer_deps.append('SANITIZE_CC') + elif arr_member['kind'] == 'enum': + packer_deps.append('pack_int') + elif arr_member['kind'] == 'boolean': + packer_deps.append('pack_int') + elif arr_member['kind'] in ('int32', 'tick'): + packer_deps.append('pack_int') + else: + raise ValueError(f"Error: unknown array member type {member['type']}") elif member['type']['kind'] == 'flags': # TODO: think about flags packer_deps.append('pack_int') elif member['type']['kind'] == 'optional': @@ -336,10 +414,14 @@ def get_dependencies(msg: NetMessageJson) -> str: packer_deps.append('pack_int') else: raise ValueError(f"Error: unknown type {member['type']}") - if len(packer_deps) == 0: - return '' - return 'from twnet_parser.packer import ' + \ - ', '.join(sorted(set(packer_deps))) + '\n' + res: str = '' + if len(packer_deps) > 0: + res += 'from twnet_parser.packer import ' + \ + ', '.join(sorted(set(packer_deps))) + '\n' + if len(typing_deps) > 0: + res += 'from typing import ' + \ + ', '.join(sorted(set(typing_deps))) + '\n' + return res def pack_field(member: NetMessageMemberJson) -> str: name: str = name_to_snake(member["name"]) @@ -355,7 +437,7 @@ def pack_field(member: NetMessageMemberJson) -> str: packer = 'int' elif member['type']['kind'] in ('int32', 'tick'): packer = 'int' - elif member['type']['kind'] == 'boolean': + elif member['type']['kind'] == 'boolean': # TODO: can we back True and False as int? packer = 'int' elif member['type']['kind'] == 'tune_param': packer = 'int' @@ -363,8 +445,19 @@ def pack_field(member: NetMessageMemberJson) -> str: elif member['type']['kind'] == 'snapshot_object': # TODO: think about snapshot_object packer = 'int' - elif member['type']['kind'] == 'array': # TODO: think about array - packer = 'int' + elif member['type']['kind'] == 'array': + arr_member: ArrayMemberTypeJson = member['type']['member_type'] + if arr_member['kind'] == 'string': + packer = 'str' + elif arr_member['kind'] == 'enum': + packer = 'int' + elif arr_member['kind'] == 'boolean': + packer = 'int' + elif arr_member['kind'] in ('int32', 'tick'): + packer = 'int' + else: + raise ValueError(f"Error: unknown array member type {member['type']}") + return f"b''.join([pack_{packer}(x) for x in {field}])" elif member['type']['kind'] == 'flags': # TODO: think about flags packer = 'int' elif member['type']['kind'] == 'optional':