#!/usr/bin/env python3 import os import json from typing import TypedDict, Literal, Optional, Dict, Union class ConstantJson(TypedDict): name: list[str] type: str value: int class NetEnumValuesJson(TypedDict): value: str name: list[str] class NetEnumJson(TypedDict): name: list[str] values: list[NetEnumValuesJson] class InnerNetMessageMemberTypeJson(TypedDict): # TODO: define this literal as type # so it can be reused kind: Literal[ \ 'int32', \ 'tick', \ 'string', \ 'raw', \ 'sha256', \ 'data', \ 'rest', \ 'enum', \ 'boolean', \ 'tune_param', \ 'snapshot_object', \ 'array', \ 'flags', \ 'optional'] disallow_cc: bool class NetMessageMemberTypeJson(TypedDict): kind: Literal[ \ 'int32', \ 'tick', \ 'string', \ 'raw', \ 'sha256', \ 'data', \ 'rest', \ 'enum', \ 'boolean', \ 'tune_param', \ 'snapshot_object', \ 'array', \ 'flags', \ 'optional'] inner: InnerNetMessageMemberTypeJson disallow_cc: bool class NetMessageMemberJson(TypedDict): name: list[str] type: NetMessageMemberTypeJson class NetMessageJson(TypedDict): id: int name: list[str] members: list[NetMessageMemberJson] attributes: list[Literal['msg_encoding']] class SpecJson(TypedDict): constants: list[ConstantJson] game_enumerations: list[NetEnumJson] game_messages: list[NetMessageJson] system_messages: list[NetMessageJson] def gen_match_file7( msg_type: Literal['system', 'game'], messages: list[NetMessageJson] ): match_code: str = """# generated by scripts/generate_messages.py from typing import Optional import twnet_parser.msg7 from twnet_parser.net_message import NetMessage """ msg: NetMessageJson for msg in messages: name_snake = name_to_snake(msg['name']) match_code += f"import twnet_parser.messages7.{msg_type}" \ f".{name_snake}" \ " as \\\n" \ f" {msg_type}7_{name_snake}\n" match_code += f""" def match_{msg_type}7(msg_id: int, data: bytes) -> NetMessage: msg: Optional[NetMessage] = None """ if_ = 'if' for msg in messages: name_snake = name_to_snake(msg['name']) name_camel = name_to_camel(msg['name']) match_code += \ f""" {if_} msg_id == twnet_parser.msg7.{name_snake.upper()}: msg = {msg_type}7_{name_snake}.Msg{name_camel}()""" if_ = 'elif' match_code += '\n\n if msg is None:\n' match_code += ' ' match_code += 'raise ValueError(' match_code += 'f"Error: unknown ' \ + msg_type + \ ' message id={msg_id} data={data[0]}")\n' match_code += '\n' match_code += ' msg.unpack(data)\n' match_code += ' return msg\n' dirname = os.path.dirname(__file__) file_path= os.path.join( dirname, f'../twnet_parser/msg_matcher/{msg_type}7.py') # if os.path.exists(file_path): # print(f"Warning: file already exists! {file_path}") # return with open(file_path, 'w') as out_file: print(f"Generating {file_path} ...") out_file.write(match_code) def fix_name_conflict(name: str) -> str: # https://peps.python.org/pep-0008/#descriptive-naming-styles if name == 'pass': return 'pass_' return name def name_to_camel(name_list: list[str]) -> str: name = ''.join([part.capitalize() for part in name_list]) return fix_name_conflict(name) def name_to_snake(name_list: list[str]) -> str: name = '_'.join(name_list) return fix_name_conflict(name) def generate_msg(msg: NetMessageJson, game: Literal['game', 'system']) -> None: name_snake = name_to_snake(msg['name']) name_camel = name_to_camel(msg['name']) dirname = os.path.dirname(__file__) file_path= os.path.join( dirname, f'../twnet_parser/messages7/{game}/', f'{name_snake}.py') # if os.path.exists(file_path): # print(f"Warning: file already exists! {file_path}") # return with open(file_path, 'w') as out_file: print(f"Generating {file_path} ...") out_file.write('# generated by scripts/generate_messages.py\n') out_file.write('\n') out_file.write('from twnet_parser.pretty_print import PrettyPrint\n') out_file.write('from twnet_parser.packer import Unpacker\n') out_file.write('from twnet_parser.chunk_header import ChunkHeader\n') out_file.write(get_dependencies(msg)) out_file.write('\n') out_file.write(f'class Msg{name_camel}(PrettyPrint):\n') out_file.write(' def __init__(\n') out_file.write(' self,\n') args: list[str] = [] for member in msg['members']: # {'name': ['message'], 'type': {'kind': 'string', 'disallow_cc': False}} ftype = 'int' default = '-1' if member['type']['kind'] == 'string': # TODO: sanitize cc ftype = 'str' default = "'default'" elif member['type']['kind'] in \ ('raw', 'sha256', 'rest', 'data'): # TODO: data has a size field ftype = 'bytes' default = "b'\\x00'" # {"name": ["mode"], "type": {"kind": "enum", "enum": ["chat"]}}, elif member['type']['kind'] == 'enum': ftype = 'int' default = '0' # TODO: use ENUM_NAME_SOME_VALUE as default here elif member['type']['kind'] in ('int32', 'tick'): ftype = 'int' default = '0' elif member['type']['kind'] == 'boolean': ftype = 'bool' default = 'False' elif member['type']['kind'] == 'tune_param': ftype = 'float' default = '0.0' elif member['type']['kind'] == 'snapshot_object': # TODO: think about snapshot_object ftype = 'int' default = '0' elif member['type']['kind'] == 'array': # TODO: think about array ftype = 'int' default = '0' elif member['type']['kind'] == 'flags': # TODO: think about flags ftype = 'int' default = '0' elif member['type']['kind'] == 'optional': if member['type']['inner']['kind'] == 'string': # TODO: sanitize cc ftype = 'str' default = "''" elif member['type']['inner']['kind'] in ('int32', 'tick'): ftype = 'int' default = '0' else: raise ValueError(f"Error: unknown optional type {member['type']}") else: raise ValueError(f"Error: unknown type {member['type']}") name = name_to_snake(member["name"]) manual_default = get_default(f"{game}.{name_snake}.{name}") if manual_default: default = manual_default args.append(f' {name}: {ftype} = {default}') out_file.write(',\n'.join(args) + '\n') out_file.write(' ) -> None:\n') out_file.write(f" self.message_name = '{name_snake}'\n") sys: str = 'True' if game == 'system' else 'False' out_file.write(f" self.system_message = {sys}\n") out_file.write(" self.header: ChunkHeader\n") out_file.write('\n') for member in msg['members']: # {'name': ['message'], 'type': {'kind': 'string', 'disallow_cc': False}} ftype = 'int' if member['type']['kind'] == 'string': ftype = 'str' elif member['type']['kind'] in \ ('raw', 'sha256', 'rest', 'data'): # TODO: data has a size field ftype = 'bytes' # {"name": ["mode"], "type": {"kind": "enum", "enum": ["chat"]}}, elif member['type']['kind'] == 'enum': ftype = 'int' elif member['type']['kind'] in ('int32', 'tick'): ftype = 'int' elif member['type']['kind'] == 'boolean': ftype = 'bool' elif member['type']['kind'] == 'tune_param': ftype = 'float' elif member['type']['kind'] == 'snapshot_object': # TODO: think about snapshot_object ftype = 'int' elif member['type']['kind'] == 'array': # TODO: think about array ftype = 'int' elif member['type']['kind'] == 'flags': # TODO: think about flags ftype = 'int' elif member['type']['kind'] == 'optional': if member['type']['inner']['kind'] == 'string': # TODO: sanitize cc ftype = 'str' elif member['type']['inner']['kind'] in ('int32', 'tick'): ftype = 'int' else: raise ValueError(f"Error: unknown optional type {member['type']}") else: raise ValueError(f"Error: unknown type {member['type']}") name = name_to_snake(member["name"]) out_file.write(f" self.{name}: {ftype} = {name}\n") out_file.write('\n') out_file.write(' # first byte of data\n') out_file.write(' # has to be the first byte of the message payload\n') out_file.write(' # NOT the chunk header and NOT the message id\n') out_file.write(' def unpack(self, data: bytes) -> bool:\n') if len(msg['members']) > 0: out_file.write(' unpacker = Unpacker(data)\n') for member in msg['members']: # {'name': ['message'], 'type': {'kind': 'string', 'disallow_cc': False}} unpacker = 'int()' if member['type']['kind'] == 'string': # TODO: sanitize cc unpacker = 'str()' elif member['type']['kind'] in \ ('raw', 'sha256', 'rest', 'data'): # TODO: data has a size field unpacker = 'raw()' # {"name": ["mode"], "type": {"kind": "enum", "enum": ["chat"]}}, elif member['type']['kind'] == 'enum': unpacker = 'int() # TODO: this is a enum' elif member['type']['kind'] in ('int32', 'tick'): unpacker = 'int()' elif member['type']['kind'] == 'boolean': unpacker = 'int() == 1' elif member['type']['kind'] == 'tune_param': unpacker = 'int() / 100.0' elif member['type']['kind'] == 'snapshot_object': # TODO: think about snapshot_object unpacker = 'int() # TODO: this is a snapshot object' elif member['type']['kind'] == 'array': # TODO: think about array unpacker = 'int() # TODO: this is an array' elif member['type']['kind'] == 'flags': # TODO: think about flags unpacker = 'int() # TODO: this is a flag' elif member['type']['kind'] == 'optional': # TODO: unpacker should not crash on missing optional fields # check how tw code does it and be smart here if member['type']['inner']['kind'] == 'string': # TODO: sanitize cc unpacker = 'str() # TODO: warning this can fail because it is an optional field' elif member['type']['inner']['kind'] in ('int32', 'tick'): unpacker = 'int() # TODO: warning this can fail because it is an optional field' else: raise ValueError(f"Error: unknown type {member['type']}") name = name_to_snake(member["name"]) out_file.write(f' self.{name} = unpacker.get_{unpacker}\n') out_file.write(' return True\n') out_file.write('\n') out_file.write(' def pack(self) -> bytes:\n') out_file.write(gen_pack_return(msg)) def get_dependencies(msg: NetMessageJson) -> str: packer_deps: list[str] = [] for member in msg['members']: if member['type']['kind'] == 'string': # TODO: sanitize cc packer_deps.append('pack_str') elif member['type']['kind'] in \ ('raw', 'sha256', 'rest', 'data'): # TODO: data has a size field pass # {"name": ["mode"], "type": {"kind": "enum", "enum": ["chat"]}}, elif member['type']['kind'] == 'enum': packer_deps.append('pack_int') elif member['type']['kind'] in ('int32', 'tick'): packer_deps.append('pack_int') elif member['type']['kind'] == 'boolean': packer_deps.append('pack_int') elif member['type']['kind'] == 'tune_param': packer_deps.append('pack_int') elif member['type']['kind'] == 'snapshot_object': # TODO: think about snapshot_object packer_deps.append('pack_int') elif member['type']['kind'] == 'array': # TODO: think about array packer_deps.append('pack_int') elif member['type']['kind'] == 'flags': # TODO: think about flags packer_deps.append('pack_int') elif member['type']['kind'] == 'optional': if member['type']['inner']['kind'] == 'string': packer_deps.append('pack_str') elif member['type']['inner']['kind'] in ('int32', 'tick'): packer_deps.append('pack_int') else: raise ValueError(f"Error: unknown type {member['type']}") if len(packer_deps) == 0: return '' return 'from twnet_parser.packer import ' + \ ', '.join(sorted(set(packer_deps))) + '\n' def pack_field(member: NetMessageMemberJson) -> str: name: str = name_to_snake(member["name"]) field: str = f'self.{name}' packer = 'int' if member['type']['kind'] == 'string': # TODO: sanitize cc packer = 'str' elif member['type']['kind'] in \ ('raw', 'sha256', 'rest', 'data'): # TODO: data has a size field return f'self.{name}' # {"name": ["mode"], "type": {"kind": "enum", "enum": ["chat"]}}, elif member['type']['kind'] == 'enum': packer = 'int' elif member['type']['kind'] in ('int32', 'tick'): packer = 'int' elif member['type']['kind'] == 'boolean': packer = 'int' elif member['type']['kind'] == 'tune_param': packer = 'int' field = f'int({field} * 100.0)' elif member['type']['kind'] == 'snapshot_object': # TODO: think about snapshot_object packer = 'int' elif member['type']['kind'] == 'array': # TODO: think about array packer = 'int' elif member['type']['kind'] == 'flags': # TODO: think about flags packer = 'int' elif member['type']['kind'] == 'optional': packer = 'int' # TODO: unpacker should allow not packing optional fields # check how tw code does it and be smart here if member['type']['inner']['kind'] == 'string': # TODO: sanitize cc packer = 'str' elif member['type']['inner']['kind'] in ('int32', 'tick'): packer = 'int' else: raise ValueError(f"Error: unknown type {member['type']}") return f'pack_{packer}({field})' def gen_pack_return(msg: NetMessageJson) -> str: members: list[NetMessageMemberJson] = msg['members'] if len(members) == 0: return " return b''" if len(members) == 1: return f' return {pack_field(members[0])}' mem_strs: list[str] = [ f' {pack_field(member)}' for member in members[1:]] return f" return {pack_field(members[0])} + \\\n" + \ ' + \\\n'.join(mem_strs) def get_default(field_path: str) -> Optional[str]: """ field_path has the following format: game.msg_name.field_name example: game.sv_tune_params.ground_control_speed """ # COULDDO: make this faster # but then who cares about # code gen speed def_file: str = './data/messages7_defaults.json' if not os.path.exists(def_file): print(f"Failed to open defaults file '{def_file}'") exit(1) with open(def_file) as def_io: def_json: Dict[str, Union[int, float, bool, str]] = json.load(def_io) if field_path not in def_json: return None default = def_json[field_path] # also covers bool cuz python drunk # but this is actually exactly what we want if isinstance(default, int): return str(default) elif isinstance(default, float): return str(default) elif isinstance(default, str): return f"'{default}'" else: print(f"Error: invalid default type for field {field_path}") print(f" please check {def_file} for errors") exit(1) def generate(spec: str) -> None: print(f"generating classes from {spec} ...") with open(spec) as spec_io: spec_data: SpecJson = json.load(spec_io) # for msg in [spec_data['game_messages'][1]]: game_messages: list[NetMessageJson] = spec_data['game_messages'] system_messages: list[NetMessageJson] = spec_data['system_messages'] gen_match_file7('game', game_messages) gen_match_file7('system', system_messages) for msg in game_messages: generate_msg(msg, 'game') for msg in system_messages: generate_msg(msg, 'system') def main() -> None: dirname = os.path.dirname(__file__) spec_07 = os.path.join( dirname, '../../libtw2/gamenet/generate/spec/teeworlds-0.7.5.json') if os.path.exists(spec_07): generate(spec_07) else: print(f"Error: file not found {spec_07}") print(" try running these commands") print("") print(" git clone git@github.com:heinrich5991/libtw2 ..") if __name__ == '__main__': main()