#!/usr/bin/env python3 import os import json from typing import \ TypedDict, Literal, Optional, Dict, Union, TextIO, Annotated KINDCONNLESS = Literal[ \ 'packed_addresses', \ 'be_uint16', \ 'uint8'] KIND = Literal[ \ 'int32', \ 'tick', \ 'string', \ 'raw', \ 'sha256', \ 'data', \ 'rest', \ 'enum', \ 'boolean', \ 'tune_param', \ 'snapshot_object', \ 'array', \ 'flags', \ 'optional'] class GameEnumValueJson(TypedDict): value: int name: list[str] class GameEnumJson(TypedDict): name: list[str] values: list[GameEnumValueJson] class ConstantJson(TypedDict): name: list[str] type: str value: int class InnerNetMessageMemberTypeJson(TypedDict): kind: KIND disallow_cc: bool class ArrayMemberTypeJson(TypedDict): kind: KIND # strings disallow_cc: bool # array in array only for de client info # type forward declaration or self reference is not supported # so hack it with a string member_type: 'ArrayMemberTypeJson' count: int class NetConnlessMemberTypeJson(TypedDict): kind: KINDCONNLESS class NetMessageMemberTypeJson(TypedDict): kind: KIND inner: InnerNetMessageMemberTypeJson # snapshot items name: list[str] # enums enum: list[str] # strings disallow_cc: bool # arrays count: int member_type: ArrayMemberTypeJson # data size: Literal['specified_before'] class NetMessageMemberJson(TypedDict): name: list[str] type: NetMessageMemberTypeJson class NetMessageJson(TypedDict): id: int name: list[str] members: list[NetMessageMemberJson] attributes: list[Literal['msg_encoding']] class NetConnlessJson(TypedDict): id: Annotated[list[int], 8] name: list[str] members: list[NetMessageMemberJson] class SpecJson(TypedDict): constants: list[ConstantJson] game_enumerations: list[GameEnumJson] game_messages: list[NetMessageJson] system_messages: list[NetMessageJson] connless_messages: list[NetConnlessJson] snapshot_objects: list[NetMessageJson] def fix_name_conflict(name: str) -> str: # https://peps.python.org/pep-0008/#descriptive-naming-styles if name in ('pass', 'self'): return f'{name}_' return name def name_to_camel(name_list: list[str]) -> str: name = ''.join([part.capitalize() for part in name_list]) return fix_name_conflict(name) def name_to_snake(name_list: list[str]) -> str: name = '_'.join(name_list) return fix_name_conflict(name) def gen_yield(messages: list[NetMessageMemberJson]) -> list[str]: lines = [] if len(messages) > 0: lines.append("") for msg in messages: name = name_to_snake(msg['name']) lines.append(f" yield '{name}', self.{name}") return lines def gen_iter(messages: list[NetMessageMemberJson]) -> str: lines: list[str] = [] lines.append(' def __iter__(self):') lines.append(" yield 'message_type', self.message_type") lines.append(" yield 'message_name', self.message_name") lines.append(" yield 'system_message', self.system_message") lines.append(" yield 'message_id', self.message_id") lines.append(" yield 'header', dict(self.header)") lines += gen_yield(messages) return '\n'.join(lines) def gen_iter_connless(messages: list[NetMessageMemberJson]) -> str: lines: list[str] = [] lines.append(' def __iter__(self):') lines.append(" yield 'message_type', self.message_type") lines.append(" yield 'message_name', self.message_name") lines.append(" yield 'message_id', self.message_id") lines += gen_yield(messages) return '\n'.join(lines) def gen_unpack_members_connless7(msg: NetConnlessJson) -> str: res: str = '' for member in msg['members']: unpacker = 'int()' name = name_to_snake(member["name"]) if member['type']['kind'] == 'be_uint16': unpacker = 'be_uint16()' elif member['type']['kind'] == 'uint8': unpacker = 'uint8()' elif member['type']['kind'] == 'int32': unpacker = 'int()' elif member['type']['kind'] == 'int32_string': res += f' self.{name} = int(unpacker.get_str())\n' continue elif member['type']['kind'] == 'string': if member['type']['disallow_cc']: unpacker = 'str(SANITIZE_CC)' else: unpacker = 'str()' elif member['type']['kind'] == 'serverinfo_client': # TODO: serverinfo_client unpacker = 'raw()' elif member['type']['kind'] == 'packed_addresses': unpacker = 'packed_addresses()' else: raise ValueError(f"Error: unknown type {member['type']}") res += f' self.{name} = unpacker.get_{unpacker}\n' return res def gen_unpack_members(msg: NetMessageJson) -> str: res: str = '' for member in msg['members']: # {'name': ['message'], 'type': {'kind': 'string', 'disallow_cc': False}} unpacker = 'int()' if member['type']['kind'] == 'string': if member['type']['disallow_cc']: unpacker = 'str(SANITIZE_CC)' else: unpacker = 'str()' elif member['type']['kind'] == 'rest': unpacker = 'raw()' elif member['type']['kind'] == 'sha256': unpacker = 'raw(32)' elif member['type']['kind'] == 'data': if member['type']['size'] == 'specified_before': res += ' self.data_size = unpacker.get_int()\n' unpacker = 'raw(self.data_size)' else: raise ValueError(f"Error: unknown data size {member['type']}") # {"name": ["mode"], "type": {"kind": "enum", "enum": ["chat"]}}, elif member['type']['kind'] == 'enum': enum_name: str = name_to_camel(member['type']['enum']).upper() unpacker = f"int() # enum {enum_name}" elif member['type']['kind'] in ('int32', 'tick'): unpacker = 'int()' elif member['type']['kind'] == 'boolean': unpacker = 'int() == 1' elif member['type']['kind'] == 'tune_param': unpacker = 'int() / 100.0' elif member['type']['kind'] == 'snapshot_object': name = name_to_snake(member["name"]) res += f' self.{name}.unpack(unpacker.get_raw())\n' continue elif member['type']['kind'] == 'array': size: int = member['type']['count'] if size is None: print("Error: size is none for the following message") print(msg) exit(1) arr_member: ArrayMemberTypeJson = member['type']['member_type'] if arr_member['kind'] == 'string': if arr_member['disallow_cc']: unpacker = 'str(SANITIZE_CC)' else: unpacker = 'str()' elif arr_member['kind'] == 'enum': # We intentionally do not do anything fancy here # no enums for example because it comes with too many # disadvantages see the related issue here # https://gitlab.com/teeworlds-network/twnet_parser/-/issues/7 unpacker = 'int()' elif arr_member['kind'] == 'boolean': unpacker = 'int() == 1' elif arr_member['kind'] in ('int32', 'tick'): unpacker = 'int()' elif arr_member['kind'] == 'array': sub_size: int = arr_member['count'] sub_arr_member = arr_member['member_type'] unpacker = 'int()' if sub_arr_member['kind'] == 'int32': name = name_to_snake(member["name"]) res += f' for i in range(0, {size}):\n' res += ' sub: list[int] = []\n' res += f' for k in range(0, {sub_size}):\n' res += f' sub[k] = unpacker.get_{unpacker}\n' res += f' self.{name}[i] = sub\n' continue else: raise ValueError( f"Error: unknown sub array member type {member['type']}" ) else: raise ValueError(f"Error: unknown array member type {member['type']}") name = name_to_snake(member["name"]) res += f' for i in range(0, {size}):\n' res += f' self.{name}[i] = unpacker.get_{unpacker}\n' continue elif member['type']['kind'] == 'flags': # TODO: think about flags unpacker = 'int() # TODO: this is a flag' elif member['type']['kind'] == 'uuid': unpacker = 'raw(16)' elif member['type']['kind'] == 'optional': if member['type']['inner']['kind'] == 'string': if member['type']['inner']['disallow_cc']: unpacker = 'optional_str(SANITIZE_CC)' else: unpacker = 'optional_str()' elif member['type']['inner']['kind'] in ('int32', 'tick'): unpacker = 'optional_int()' else: raise ValueError(f"Error: unknown type {member['type']}") name = name_to_snake(member["name"]) res += f' self.{name} = unpacker.get_{unpacker}\n' return res def pack_field(member: NetMessageMemberJson) -> str: name: str = name_to_snake(member["name"]) field: str = f'self.{name}' packer = 'int' if member['type']['kind'] == 'string': packer = 'str' elif member['type']['kind'] in ('sha256', 'rest'): return f'self.{name}' elif member['type']['kind'] == 'data': if member['type']['size'] == 'specified_before': return f'pack_int(self.data_size) + \\\n' \ f' self.{name}' else: raise ValueError(f"Error: unknown data size {member['type']}") # {"name": ["mode"], "type": {"kind": "enum", "enum": ["chat"]}}, elif member['type']['kind'] == 'enum': packer = 'int' elif member['type']['kind'] in ('int32', 'tick'): packer = 'int' elif member['type']['kind'] == 'boolean': packer = 'int' elif member['type']['kind'] == 'tune_param': packer = 'int' field = f'int({field} * 100.0)' elif member['type']['kind'] == 'snapshot_object': name = name_to_snake(member['name']) return f'self.{name}.pack()' elif member['type']['kind'] == 'array': arr_member: ArrayMemberTypeJson = member['type']['member_type'] if arr_member['kind'] == 'string': packer = 'str' elif arr_member['kind'] == 'enum': packer = 'int' elif arr_member['kind'] == 'boolean': packer = 'int' elif arr_member['kind'] in ('int32', 'tick'): packer = 'int' elif arr_member['kind'] == 'array': sub_arr_member = arr_member['member_type'] if sub_arr_member['kind'] == 'int32': return f"b''.join([b''.join([pack_{packer}(x) for x in sub]) for sub in {field}])" else: raise ValueError(f"Error: unknown sub array member type {member['type']}") else: raise ValueError(f"Error: unknown array member type {member['type']}") return f"b''.join([pack_{packer}(x) for x in {field}])" elif member['type']['kind'] == 'flags': # TODO: think about flags packer = 'int' elif member['type']['kind'] == 'uuid': return f'self.{name}' elif member['type']['kind'] == 'optional': packer = 'int' if member['type']['inner']['kind'] == 'string': packer = 'str' elif member['type']['inner']['kind'] in ('int32', 'tick'): packer = 'int' return f"(pack_{packer}({field}) if {field} is not None else b'')" else: raise ValueError(f"Error: unknown type {member['type']}") return f'pack_{packer}({field})' def gen_pack_return(msg: NetMessageJson) -> str: members: list[NetMessageMemberJson] = msg['members'] if len(members) == 0: return " return b''" if len(members) == 1: return f' return {pack_field(members[0])}' mem_strs: list[str] = [ f' {pack_field(member)}' for member in members[1:]] return f" return {pack_field(members[0])} + \\\n" + \ ' + \\\n'.join(mem_strs) def pack_field_connless7(member: NetMessageMemberJson) -> str: name: str = name_to_snake(member["name"]) field: str = f'self.{name}' packer = 'int' if member['type']['kind'] == 'packed_addresses': packer = 'packed_addresses' elif member['type']['kind'] == 'be_uint16': packer = 'be_uint16' elif member['type']['kind'] == 'uint8': packer = 'uint8' elif member['type']['kind'] == 'string': packer = 'str' elif member['type']['kind'] == 'int32': packer = 'int' elif member['type']['kind'] == 'int32_string': return f'pack_str(str({field}))' elif member['type']['kind'] == 'serverinfo_client': # TODO: serverinfo_client return f'self.{name}' else: raise ValueError(f"Error: unknown type {member['type']}") return f'pack_{packer}({field})' def gen_pack_return_connless7(msg: NetConnlessJson) -> str: members: list[NetMessageMemberJson] = msg['members'] if len(members) == 0: return " return b''" if len(members) == 1: return f' return {pack_field_connless7(members[0])}' mem_strs: list[str] = [ f' {pack_field_connless7(member)}' for member in members[1:]] return f" return {pack_field_connless7(members[0])} + \\\n" + \ ' + \\\n'.join(mem_strs) def get_default(field_path: str) -> Optional[str]: """ field_path has the following format: game.msg_name.field_name example: game.sv_tune_params.ground_control_speed """ # COULDDO: make this faster # but then who cares about # code gen speed def_file: str = './data/messages7_defaults.json' if not os.path.exists(def_file): print(f"Failed to open defaults file '{def_file}'") exit(1) with open(def_file) as def_io: def_json: Dict[str, Union[int, float, bool, str]] = json.load(def_io) if field_path not in def_json: return None default = def_json[field_path] # also covers bool cuz python drunk # but this is actually exactly what we want if isinstance(default, int): return str(default) elif isinstance(default, float): return str(default) elif isinstance(default, str): return f"'{default}'" else: print(f"Error: invalid default type for field {field_path}") print(f" please check {def_file} for errors") exit(1) class CodeGenerator(): def __init__(self, protocol_version: str) -> None: self.protocol_version = protocol_version self.game_enums: list[GameEnumJson] = [] def get_dependencies_connless(self, msg: NetConnlessJson) -> str: packer_deps: list[str] = [] typing_deps: list[str] = ['Literal'] res: str = '' for member in msg['members']: if member['type']['kind'] == 'packed_addresses': packer_deps.append('pack_packed_addresses') res += 'from twnet_parser.master_server import MastersrvAddr\n' elif member['type']['kind'] == 'uint8': packer_deps.append('pack_uint8') elif member['type']['kind'] == 'be_uint16': packer_deps.append('pack_be_uint16') elif member['type']['kind'] == 'int32': packer_deps.append('pack_int') elif member['type']['kind'] == 'int32_string': packer_deps.append('pack_str') elif member['type']['kind'] == 'serverinfo_client': # TODO: serverinfo_client pass # use pack raw elif member['type']['kind'] == 'string': packer_deps.append('pack_str') if member['type']['disallow_cc']: packer_deps.append('SANITIZE_CC') else: raise ValueError(f"Error: unknown type {member['type']}") if len(packer_deps) > 0: res += 'from twnet_parser.packer import ' + \ ', '.join(sorted(set(packer_deps))) + '\n' if len(typing_deps) > 0: res += 'from typing import ' + \ ', '.join(sorted(set(typing_deps))) + '\n' return res def get_dependencies( self, msg: NetMessageJson, game_or_sys: bool, typing_dep: Optional[str] = 'Literal' ) -> str: packer_deps: list[str] = [] typing_deps: list[str] = [] custom_deps: list[str] = [] # the ChunkHeader is Optional if game_or_sys: typing_deps.append('Optional') if typing_dep: typing_deps.append(typing_dep) need_enums: bool = False for member in msg['members']: if member['type']['kind'] == 'string': packer_deps.append('pack_str') if member['type']['disallow_cc']: packer_deps.append('SANITIZE_CC') elif member['type']['kind'] == 'rest': pass elif member['type']['kind'] == 'sha256': typing_deps.append('Annotated') elif member['type']['kind'] == 'data': if member['type']['size'] == 'specified_before': typing_deps.append('Optional') else: raise ValueError(f"Error: unknown data size {member['type']}") # {"name": ["mode"], "type": {"kind": "enum", "enum": ["chat"]}}, elif member['type']['kind'] == 'enum': need_enums = True packer_deps.append('pack_int') elif member['type']['kind'] in ('int32', 'tick'): packer_deps.append('pack_int') elif member['type']['kind'] == 'boolean': packer_deps.append('pack_int') elif member['type']['kind'] == 'tune_param': packer_deps.append('pack_int') elif member['type']['kind'] == 'snapshot_object': obj_name: str = 'Obj' + name_to_camel(member['type']['name']) file_name: str = name_to_snake(member['type']['name']) typing_deps.append('Optional') custom_deps.append( f'from twnet_parser.snap{self.protocol_version}.{file_name} import {obj_name}' ) elif member['type']['kind'] == 'array': packer_deps.append('pack_int') typing_deps.append('Annotated') arr_member: ArrayMemberTypeJson = member['type']['member_type'] if arr_member['kind'] == 'string': packer_deps.append('pack_str') if arr_member['disallow_cc']: packer_deps.append('SANITIZE_CC') elif arr_member['kind'] == 'enum': need_enums = True packer_deps.append('pack_int') elif arr_member['kind'] == 'boolean': packer_deps.append('pack_int') elif arr_member['kind'] in ('int32', 'tick'): packer_deps.append('pack_int') elif arr_member['kind'] == 'array': if arr_member['member_type']['kind'] == 'int32': packer_deps.append('pack_int') else: raise ValueError( f"Error: unknown sub array member type {member['type']}" ) else: raise ValueError(f"Error: unknown array member type {member['type']}") elif member['type']['kind'] == 'flags': # TODO: think about flags packer_deps.append('pack_int') elif member['type']['kind'] == 'optional': typing_deps.append('Optional') if member['type']['inner']['kind'] == 'string': packer_deps.append('pack_str') if member['type']['inner']['disallow_cc']: packer_deps.append('SANITIZE_CC') elif member['type']['inner']['kind'] in ('int32', 'tick'): packer_deps.append('pack_int') elif member['type']['kind'] == 'uuid': pass else: raise ValueError(f"Error: unknown type {member['type']}") res: str = '' if len(packer_deps) > 0: res += 'from twnet_parser.packer import ' + \ ', '.join(sorted(set(packer_deps))) + '\n' if len(typing_deps) > 0: res += 'from typing import ' + \ ', '.join(sorted(set(typing_deps))) + '\n' if len(custom_deps) > 0: res += '\n'.join(sorted(set(custom_deps))) + '\n' if need_enums: res += f'import twnet_parser.enum{self.protocol_version} as enum{self.protocol_version}\n' return res def gen_match_file( self, msg_type: Literal['system', 'game'], messages: list[NetMessageJson] ): match_code: str = f"""# generated by scripts/generate_messages.py from typing import Optional import twnet_parser.msg{self.protocol_version} from twnet_parser.net_message import NetMessage from twnet_parser.ddnet_uuid import MsgDDNetUuid """ msg: NetMessageJson for msg in messages: name_snake = name_to_snake(msg['name']) match_code += f"import twnet_parser.messages{self.protocol_version}.{msg_type}" \ f".{name_snake}" \ " as \\\n" \ f" {msg_type}{self.protocol_version}_{name_snake}\n" match_code += f""" def match_{msg_type}{self.protocol_version}(msg_id: int, data: bytes) -> NetMessage: msg: Optional[NetMessage] = None """ if_ = 'if' for msg in messages: name_snake = name_to_snake(msg['name']) name_camel = name_to_camel(msg['name']) match_code += f""" {if_} msg_id == twnet_parser.msg{self.protocol_version}.{name_snake.upper()}: msg = {msg_type}{self.protocol_version}_{name_snake}.Msg{name_camel}()""" if_ = 'elif' match_code += '\n' match_code += '\n # msg_id 0 is used by ddnet uuid messages' match_code += '\n # do not crash on those' match_code += '\n ' match_code += 'if msg is None:\n' match_code += ' if msg_id != 0:\n' match_code += ' ' match_code += 'raise ValueError(' match_code += 'f"Error: unknown ' \ + msg_type + \ ' message id={msg_id} data={data[0]}")\n' match_code += ' msg = MsgDDNetUuid()\n' match_code += '\n' match_code += ' msg.unpack(data)\n' match_code += ' return msg\n' dirname = os.path.dirname(__file__) file_path= os.path.join( dirname, f'../twnet_parser/msg_matcher/{msg_type}{self.protocol_version}.py') # if os.path.exists(file_path): # print(f"Warning: file already exists! {file_path}") # return with open(file_path, 'w') as out_file: print(f"Generating {file_path} ...") out_file.write(match_code) def gen_match_file_connless( self, messages: list[NetConnlessJson] ): match_code: str = f"""# generated by scripts/generate_messages.py from typing import Optional import twnet_parser.msg{self.protocol_version} from twnet_parser.connless_message import ConnlessMessage """ msg: NetConnlessJson for msg in messages: name_snake = name_to_snake(msg['name']) match_code += f"import twnet_parser.messages{self.protocol_version}.connless" \ f".{name_snake}" \ " as \\\n" \ f" connless_{name_snake}\n" match_code += f""" def match_connless{self.protocol_version}(msg_id: bytes, data: bytes) -> ConnlessMessage: msg: Optional[ConnlessMessage] = None """ if_ = 'if' for msg in messages: name_snake = name_to_snake(msg['name']) name_camel = name_to_camel(msg['name']) match_code += \ f""" {if_} msg_id == twnet_parser.msg{self.protocol_version}.CONNLESS_{name_snake.upper()}: msg = connless_{name_snake}.Msg{name_camel}()""" if_ = 'elif' match_code += '\n\n if msg is None:\n' match_code += ' ' match_code += 'raise ValueError(\n' match_code += ' ' match_code += 'f"Error: unknown connless ' \ 'message id={msg_id!r} data={data[0]}"\n' match_code += ' )\n' match_code += '\n' match_code += ' msg.unpack(data)\n' match_code += ' return msg\n' dirname = os.path.dirname(__file__) file_path= os.path.join( dirname, f'../twnet_parser/msg_matcher/connless{self.protocol_version}.py') with open(file_path, 'w') as out_file: print(f"Generating {file_path} ...") out_file.write(match_code) def gen_init_member_header_def( self, member: NetMessageMemberJson, name_snake: str, message_type: Literal['game', 'system', 'connless', 'snap'] ) -> list[str]: """ get_init_member_header_def given a member field it returns an array of strings that represent the python code for the content in the init method parameter list def __init__(self, [THIS IS GENERATED]) -> None: it might return two fields for members like data that also introduce a size field the returned assignements do not include indentation """ args: list[str] = [] # { # 'name': ['message'], # 'type': { # 'kind': 'string', # 'disallow_cc': False # } # } ftype = 'int' default = '-1' if member['type']['kind'] == 'string': ftype = 'str' default = "'default'" elif member['type']['kind'] == 'packed_addresses': # TODO: packed_addreses default value ftype = 'list[MastersrvAddr]' default = '[]' elif member['type']['kind'] == 'serverinfo_client': # TODO: serverinfo_client ftype = 'bytes' default = "b''" elif member['type']['kind'] == 'be_uint16': ftype = 'int' default = '0' elif member['type']['kind'] == 'uint8': ftype = 'int' default = '0' elif member['type']['kind'] == 'rest': ftype = 'bytes' default = "b'\\x00'" elif member['type']['kind'] == 'sha256': ftype = 'Annotated[bytes, 32]' default = "bytes(32)" elif member['type']['kind'] == 'data': ftype = 'bytes' default = "b'\\x00'" if member['type']['size'] == 'specified_before': args.append('data_size: Optional[int] = None') else: raise ValueError(f"Error: unknown data size {member['type']}") # {"name": ["mode"], "type": {"kind": "enum", "enum": ["chat"]}}, elif member['type']['kind'] == 'enum': enum_name: str = name_to_camel(member['type']['enum']) ftype = 'int' default = self.get_default_enum(enum_name) default = f"enum{self.protocol_version}.{default}.value" elif member['type']['kind'] in ('int32', 'tick', 'int32_string'): ftype = 'int' default = '0' elif member['type']['kind'] == 'boolean': ftype = 'bool' default = 'False' elif member['type']['kind'] == 'tune_param': ftype = 'float' default = '0.0' elif member['type']['kind'] == 'snapshot_object': obj_name: str = 'Obj' + name_to_camel(member['type']['name']) ftype = f'Optional[{obj_name}]' default = 'None' elif member['type']['kind'] == 'array': size: int = member['type']['count'] if size is None: print("Error: size is none for the following member") print(member) exit(1) arr_member: ArrayMemberTypeJson = member['type']['member_type'] if arr_member['kind'] == 'string': ftype = f'Annotated[list[str], {size}]' default = '[' + ', '.join(["''"] * size) + ']' elif arr_member['kind'] == 'boolean': ftype = f'Annotated[list[bool], {size}]' default = '[' + ', '.join(["False"] * size) + ']' elif arr_member['kind'] in ('int32', 'tick', 'enum'): ftype = f'Annotated[list[int], {size}]' default = '[' + ', '.join(["0"] * size) + ']' elif arr_member['kind'] == 'array': # snap de client info has an array of int32 arrays as field # should probably do some kind of recursion here # but it breaks my brain sub_size: int = arr_member['count'] if sub_size is None: print("Error: size is none for the following sub member") print(arr_member) exit(1) sub_arr_member: ArrayMemberTypeJson = arr_member['member_type'] if sub_arr_member['kind'] == 'int32': ftype = f'Annotated[list[list[int]], ({size},{sub_size})]' inner_default = '[' + ', '.join(["0"] * sub_size) + ']' default = '[' + ',\n '.join([inner_default] * size) + ']' else: raise ValueError( \ f"Error: msg {name_to_snake(member['name'])} " \ f"has unknown array sub member type {member['type']}") else: raise ValueError( \ f"Error: msg {name_to_snake(member['name'])} " \ f"has unknown array member type {member['type']}") # Initializing lists with defaults # And type annotation can get quite long # So split it in two lines default = f'\\\n {default}' elif member['type']['kind'] == 'flags': # TODO: think about flags ftype = 'int' default = '0' elif member['type']['kind'] == 'uuid': ftype = 'bytes' default = "b'\xbc\xb4\x3b\xf5\x42\x7c\x36\xd8\xb5\xb8\x79\x75\xc8\xc0\x6a\xa1'" elif member['type']['kind'] == 'optional': if member['type']['inner']['kind'] == 'string': ftype = 'str' default = "''" elif member['type']['inner']['kind'] in ('int32', 'tick'): ftype = 'int' default = '0' else: raise \ ValueError( \ f"Error: unknown optional type {member['type']}") else: raise ValueError(f"Error: unknown type {member['type']}") name = name_to_snake(member["name"]) manual_default = get_default(f"{message_type}.{name_snake}.{name}") if manual_default: default = manual_default args.append(f'{name}: {ftype} = {default}') return args def write_init_method_header_connless( self, out_file: TextIO, msg: NetConnlessJson, name_snake: str ) -> None: comma: str = '' if len(msg['members']) > 0: comma = ',\n' out_file.write( \ ' def __init__(\n' \ f' self{comma}') args: list[str] = [] for member in msg['members']: mem_defs: list[str] = self.gen_init_member_header_def(member, name_snake, 'connless') for mem_def in mem_defs: args.append(f' {mem_def}') out_file.write(',\n'.join(args) + '\n') out_file.write(' ) -> None:\n') def write_init_method_header( self, out_file: TextIO, msg: NetMessageJson, game: Literal['system', 'game'], name_snake: str ) -> None: comma: str = '' if len(msg['members']) > 0: comma = ',\n' out_file.write( \ ' def __init__(\n' \ ' self,\n' \ f' chunk_header: Optional[ChunkHeader] = None{comma}') args: list[str] = [] for member in msg['members']: mem_defs: list[str] = self.gen_init_member_header_def(member, name_snake, game) for mem_def in mem_defs: args.append(f' {mem_def}') out_file.write(',\n'.join(args) + '\n') out_file.write(' ) -> None:\n') def generate_snap_obj7(self, obj: NetMessageJson) -> None: name_snake = name_to_snake(obj['name']) name_camel = name_to_camel(obj['name']) dirname = os.path.dirname(__file__) file_path= os.path.join( dirname, f'../twnet_parser/snap{self.protocol_version}/', f'{name_snake}.py') with open(file_path, 'w') as out_file: print(f"Generating {file_path} ...") out_file.write('# generated by scripts/generate_messages.py\n') out_file.write('\n') out_file.write('from twnet_parser.pretty_print import PrettyPrint\n') if len(obj['members']) > 0: out_file.write('from twnet_parser.packer import Unpacker\n') out_file.write(self.get_dependencies(obj, False, None)) out_file.write('\n') out_file.write(f'class Obj{name_camel}(PrettyPrint):\n') comma: str = '' if len(obj['members']) > 0: comma = ',\n' out_file.write( \ ' def __init__(\n' \ f' self{comma}') args: list[str] = [] for member in obj['members']: mem_defs: list[str] = self.gen_init_member_header_def(member, name_snake, 'snap') for mem_def in mem_defs: args.append(f' {mem_def}') out_file.write(',\n'.join(args) + '\n') out_file.write(' ) -> None:\n') out_file.write(f" self.item_name: str = 'connless.{name_snake}'\n") out_file.write(f" self.type_id: int = {obj['id']}\n") out_file.write( " self.id: int = 0\n") out_file.write('\n') self.generate_field_assignments_in_initialize(obj, out_file) out_file.write('\n') out_file.write(' # first byte of data\n') out_file.write(' # has to be the first byte of the message payload\n') out_file.write(' # NOT the chunk header and NOT the message id\n') out_file.write(' def unpack(self, data: bytes) -> bool:\n') if len(obj['members']) > 0: out_file.write(' unpacker = Unpacker(data)\n') out_file.write(gen_unpack_members(obj)) out_file.write(' return True\n') out_file.write('\n') out_file.write(' def pack(self) -> bytes:\n') out_file.write(gen_pack_return(obj)) def generate_msg_connless( self, msg: NetConnlessJson ) -> None: name_snake = name_to_snake(msg['name']) name_camel = name_to_camel(msg['name']) dirname = os.path.dirname(__file__) file_path= os.path.join( dirname, f'../twnet_parser/messages{self.protocol_version}/connless/', f'{name_snake}.py') with open(file_path, 'w') as out_file: print(f"Generating {file_path} ...") out_file.write('# generated by scripts/generate_messages.py\n') out_file.write('\n') out_file.write('from twnet_parser.pretty_print import PrettyPrint\n') if len(msg['members']) > 0: out_file.write('from twnet_parser.packer import Unpacker\n') out_file.write(self.get_dependencies_connless(msg)) out_file.write('\n') out_file.write(f'class Msg{name_camel}(PrettyPrint):\n') self.write_init_method_header_connless(out_file, msg, name_snake) out_file.write(" self.message_type: Literal['connless'] = 'connless'\n") out_file.write(f" self.message_name: str = 'connless.{name_snake}'\n") out_file.write(f" self.message_id: list[int] = {msg['id']}\n") out_file.write('\n') for member in msg['members']: ftype = 'int' if member['type']['kind'] == 'packed_addresses': # TODO: packed_addreses default value ftype = 'list[MastersrvAddr]' elif member['type']['kind'] == 'be_uint16': # TODO: be_uint16 ftype = 'int' elif member['type']['kind'] == 'uint8': # TODO: uint8 ftype = 'int' elif member['type']['kind'] in ('int32', 'int32_string'): ftype = 'int' elif member['type']['kind'] == 'string': ftype = 'str' elif member['type']['kind'] == 'serverinfo_client': # TODO: serverinfo_client ftype = 'bytes' else: raise ValueError(f"Error: unknown connless type {member['type']}") name = name_to_snake(member["name"]) if ftype != '': ftype = f': {ftype}' if member['type']['kind'] == 'enum': out_file.write(f" self.{name}{ftype} = {name}\n") else: out_file.write(f" self.{name}{ftype} = {name}\n") out_file.write('\n') out_file.write(gen_iter_connless(msg['members'])) out_file.write('\n') out_file.write('\n') out_file.write(' # first byte of data\n') out_file.write(' # has to be the first byte of the message payload\n') out_file.write(' # NOT the chunk header and NOT the message id\n') out_file.write(' def unpack(self, data: bytes) -> bool:\n') if len(msg['members']) > 0: out_file.write(' unpacker = Unpacker(data)\n') out_file.write(gen_unpack_members_connless7(msg)) out_file.write(' return True\n') out_file.write('\n') out_file.write(' def pack(self) -> bytes:\n') out_file.write(gen_pack_return_connless7(msg)) def generate_field_assignments_in_initialize( self, msg: NetMessageJson, out_file ) -> None: for member in msg['members']: # { # 'name': ['message'], # 'type': { # 'kind': 'string', # 'disallow_cc': False # } # } ftype = 'int' if member['type']['kind'] == 'string': ftype = 'str' elif member['type']['kind'] == 'rest': ftype = 'bytes' elif member['type']['kind'] == 'sha256': ftype = 'Annotated[bytes, 32]' elif member['type']['kind'] == 'data': ftype = 'bytes' if member['type']['size'] == 'specified_before': out_file.write(" " \ "self.data_size: int =" \ " data_size if data_size else len(data)\n") else: raise ValueError(f"Error: unknown data size {member['type']}") # {"name": ["mode"], "type": {"kind": "enum", "enum": ["chat"]}}, elif member['type']['kind'] == 'enum': ftype = 'int' elif member['type']['kind'] in ('int32', 'tick'): ftype = 'int' elif member['type']['kind'] == 'boolean': ftype = 'bool' elif member['type']['kind'] == 'tune_param': ftype = 'float' elif member['type']['kind'] == 'snapshot_object': obj_name: str = 'Obj' + name_to_camel(member['type']['name']) ftype = obj_name name = name_to_snake(member["name"]) out_file.write(f" if not {name}:\n") out_file.write(f" {name} = {obj_name}()\n") out_file.write(f" self.{name}: {ftype} = {name}\n") continue elif member['type']['kind'] == 'array': # Array type annotations are so annoyingly long # also there is a planned refactor # https://gitlab.com/teeworlds-network/twnet_parser/-/issues/4 # so inherit type from constructor arguments ftype = '' elif member['type']['kind'] == 'flags': # TODO: think about flags ftype = 'int' elif member['type']['kind'] == 'uuid': ftype = 'bytes' elif member['type']['kind'] == 'optional': if member['type']['inner']['kind'] == 'string': ftype = 'Optional[str]' elif member['type']['inner']['kind'] in ('int32', 'tick'): ftype = 'Optional[int]' else: raise \ ValueError( \ f"Error: unknown optional type {member['type']}") else: raise ValueError(f"Error: unknown type {member['type']}") name = name_to_snake(member["name"]) if ftype != '': ftype = f': {ftype}' # TODO: what is this if statement doing? if member['type']['kind'] == 'enum': out_file.write(f" self.{name}{ftype} = {name}\n") else: out_file.write(f" self.{name}{ftype} = {name}\n") def generate_msg( self, msg: NetMessageJson, game: Literal['system', 'game'] ) -> None: name_snake = name_to_snake(msg['name']) name_camel = name_to_camel(msg['name']) dirname = os.path.dirname(__file__) file_path= os.path.join( dirname, f'../twnet_parser/messages{self.protocol_version}/{game}/', f'{name_snake}.py') # if os.path.exists(file_path): # print(f"Warning: file already exists! {file_path}") # return with open(file_path, 'w') as out_file: print(f"Generating {file_path} ...") out_file.write('# generated by scripts/generate_messages.py\n') out_file.write('\n') out_file.write('from twnet_parser.pretty_print import PrettyPrint\n') if len(msg['members']) > 0: out_file.write('from twnet_parser.packer import Unpacker\n') out_file.write('from twnet_parser.chunk_header import ChunkHeader\n') out_file.write(self.get_dependencies(msg, True)) out_file.write('\n') out_file.write(f'class Msg{name_camel}(PrettyPrint):\n') self.write_init_method_header(out_file, msg, game, name_snake) out_file.write(f" self.message_type: Literal['system', 'game'] = '{game}'\n") out_file.write(f" self.message_name: str = '{name_snake}'\n") sys: str = 'True' if game == 'system' else 'False' out_file.write(f" self.system_message: bool = {sys}\n") out_file.write(f" self.message_id: int = {msg['id']}\n") out_file.write(" if not chunk_header:\n") out_file.write(f" chunk_header = ChunkHeader(version = '0.{self.protocol_version}')\n") out_file.write(" self.header: ChunkHeader = chunk_header\n") out_file.write('\n') self.generate_field_assignments_in_initialize(msg, out_file) out_file.write('\n') out_file.write(gen_iter(msg['members'])) out_file.write('\n') out_file.write('\n') out_file.write(' # first byte of data\n') out_file.write(' # has to be the first byte of the message payload\n') out_file.write(' # NOT the chunk header and NOT the message id\n') out_file.write(' def unpack(self, data: bytes) -> bool:\n') if len(msg['members']) > 0: out_file.write(' unpacker = Unpacker(data)\n') out_file.write(gen_unpack_members(msg)) out_file.write(' return True\n') out_file.write('\n') out_file.write(' def pack(self) -> bytes:\n') out_file.write(gen_pack_return(msg)) def get_default_enum(self, enum_name: str) -> str: """ enum_name has to be camel case If for example enum_name 'chat' is given it returns 'CHAT_NONE' """ enum: GameEnumJson for enum in self.game_enums: base: str = name_to_camel(enum['name']) if base != enum_name: continue val: GameEnumValueJson for val in enum['values']: sub: str = name_to_snake(val['name']).upper() return f"{base}.{sub}" raise ValueError(f"Enum not found '{enum_name}'") def gen_enum_file(self) -> None: enum_code = '# pylint: disable=duplicate-code\n' # TODO: remove enum_code += 'from enum import Enum\n\n' enum: GameEnumJson for enum in self.game_enums: base: str = name_to_camel(enum['name']) enum_code += f'class {base}(Enum):\n' val: GameEnumValueJson for val in enum['values']: sub: str = name_to_snake(val['name']).upper() enum_code += \ ' ' \ f"{sub}: int = {val['value']}\n" enum_code += "\n" # cut off last doubled newline # because we do not split a section anymore enum_code = enum_code[:-1] dirname = os.path.dirname(__file__) file_path= os.path.join( dirname, f'../twnet_parser/enum{self.protocol_version}.py') # if os.path.exists(file_path): # print(f"Warning: file already exists! {file_path}") # return with open(file_path, 'w') as out_file: print(f"Generating {file_path} ...") out_file.write(enum_code) def generate(self, spec: str) -> None: print(f"generating classes from {spec} ...") with open(spec) as spec_io: spec_data: SpecJson = json.load(spec_io) # for msg in [spec_data['game_messages'][1]]: self.game_enums = spec_data['game_enumerations'] game_messages: list[NetMessageJson] = spec_data['game_messages'] system_messages: list[NetMessageJson] = spec_data['system_messages'] connless_messages: list[NetConnlessJson] = spec_data['connless_messages'] snapshot_objects: list[NetMessageJson] = spec_data['snapshot_objects'] self.gen_enum_file() self.gen_match_file('game', game_messages) self.gen_match_file('system', system_messages) self.gen_match_file_connless(connless_messages) for msg in game_messages: self.generate_msg(msg, 'game') for msg in system_messages: if msg['name'] == ['snap']: continue self.generate_msg(msg, 'system') for connless_msg in connless_messages: self.generate_msg_connless(connless_msg) for obj in snapshot_objects: self.generate_snap_obj7(obj) class SpecInfo: def __init__( self, json_path: str, version_name: str ) -> None: self.json_path = json_path self.version_name = version_name def main() -> None: dirname = os.path.dirname(__file__) spec_infos: list[SpecInfo] = [ SpecInfo( '../../libtw2/gamenet/generate/spec/teeworlds-0.7.5.json', '7' ), SpecInfo( '../../libtw2/gamenet/generate/spec/teeworlds-0.6.json', '6' ) # SpecInfo( # '../../libtw2/gamenet/generate/spec/ddnet-17.2.1.json', # '_ddnet_17_2_1' # ) ] for spec_info in spec_infos: spec_file = os.path.join( dirname, spec_info.json_path) if os.path.exists(spec_file): generator = CodeGenerator(spec_info.version_name) generator.generate(spec_file) else: print(f"Error: file not found {spec_file}") print(" try running these commands") print("") print(" git clone git@github.com:heinrich5991/libtw2 ..") if __name__ == '__main__': main()