ChillerDragon
a19df04b80
Seems like the set has different orders when running the generate script multiple times this causes all game message class files to change
351 lines
14 KiB
Python
Executable file
351 lines
14 KiB
Python
Executable file
import os
|
|
import json
|
|
|
|
from typing import TypedDict, Literal
|
|
|
|
class ConstantJson(TypedDict):
|
|
name: list[str]
|
|
type: str
|
|
value: int
|
|
|
|
class GameEnumValuesJson(TypedDict):
|
|
value: str
|
|
name: list[str]
|
|
|
|
class GameEnumJson(TypedDict):
|
|
name: list[str]
|
|
values: list[GameEnumValuesJson]
|
|
|
|
class GameMessageMemberTypeJson(TypedDict):
|
|
kind: str
|
|
disallow_cc: bool
|
|
|
|
class NetMessageMemberJson(TypedDict):
|
|
name: list[str]
|
|
type: GameMessageMemberTypeJson
|
|
|
|
class NetMessageJson(TypedDict):
|
|
id: int
|
|
name: list[str]
|
|
members: list[NetMessageMemberJson]
|
|
attributes: list[Literal['msg_encoding']]
|
|
|
|
class SpecJson(TypedDict):
|
|
constants: list[ConstantJson]
|
|
game_enumerations: list[GameEnumJson]
|
|
game_messages: list[NetMessageJson]
|
|
system_messages: list[NetMessageJson]
|
|
|
|
def gen_match_game7(game_messages: list[NetMessageJson]):
|
|
match_game7: str = """# generated by scripts/generate_messages.py
|
|
from typing import Optional
|
|
|
|
import twnet_parser.msg7
|
|
from twnet_parser.net_message import NetMessage
|
|
|
|
"""
|
|
|
|
msg: NetMessageJson
|
|
for msg in game_messages:
|
|
name_snake = name_to_snake(msg['name'])
|
|
match_game7 += "import twnet_parser.messages7.game" \
|
|
f".{name_snake}" \
|
|
" as \\\n" \
|
|
f" game7_{name_snake}\n"
|
|
|
|
match_game7 += """
|
|
def match_game7(msg_id: int, data: bytes) -> NetMessage:
|
|
msg: Optional[NetMessage] = None
|
|
"""
|
|
|
|
if_ = 'if'
|
|
for msg in game_messages:
|
|
name_snake = name_to_snake(msg['name'])
|
|
name_camel = name_to_camel(msg['name'])
|
|
match_game7 += \
|
|
f"""
|
|
{if_} msg_id == twnet_parser.msg7.{name_snake.upper()}:
|
|
msg = game7_{name_snake}.Msg{name_camel}()"""
|
|
if_ = 'elif'
|
|
|
|
match_game7 += '\n\n if msg is None:\n'
|
|
match_game7 += ' '
|
|
match_game7 += 'raise ValueError('
|
|
match_game7 += 'f"Error: unknown message sys.id={msg_id} data={data[0]}")\n'
|
|
match_game7 += '\n'
|
|
match_game7 += ' msg.unpack(data)\n'
|
|
match_game7 += ' return msg\n'
|
|
|
|
dirname = os.path.dirname(__file__)
|
|
file_path= os.path.join(
|
|
dirname,
|
|
'../twnet_parser/msg_matcher/game7.py')
|
|
# if os.path.exists(file_path):
|
|
# print(f"Warning: file already exists! {file_path}")
|
|
# return
|
|
with open(file_path, 'w') as out_file:
|
|
print(f"Generating {file_path} ...")
|
|
out_file.write(match_game7)
|
|
|
|
def fix_name_conflict(name: str) -> str:
|
|
# https://peps.python.org/pep-0008/#descriptive-naming-styles
|
|
if name == 'pass':
|
|
return 'pass_'
|
|
return name
|
|
|
|
def name_to_camel(name_list: list[str]) -> str:
|
|
name = ''.join([part.capitalize() for part in name_list])
|
|
return fix_name_conflict(name)
|
|
|
|
def name_to_snake(name_list: list[str]) -> str:
|
|
name = '_'.join(name_list)
|
|
return fix_name_conflict(name)
|
|
|
|
def generate_msg(msg: NetMessageJson, game: str) -> None:
|
|
name_snake = name_to_snake(msg['name'])
|
|
name_camel = name_to_camel(msg['name'])
|
|
dirname = os.path.dirname(__file__)
|
|
file_path= os.path.join(
|
|
dirname,
|
|
f'../twnet_parser/messages7/{game}/',
|
|
f'{name_snake}.py')
|
|
# if os.path.exists(file_path):
|
|
# print(f"Warning: file already exists! {file_path}")
|
|
# return
|
|
with open(file_path, 'w') as out_file:
|
|
print(f"Generating {file_path} ...")
|
|
out_file.write('# generated by scripts/generate_messages.py\n')
|
|
out_file.write('\n')
|
|
out_file.write('from twnet_parser.pretty_print import PrettyPrint\n')
|
|
out_file.write('from twnet_parser.packer import Unpacker\n')
|
|
out_file.write('from twnet_parser.chunk_header import ChunkHeader\n')
|
|
out_file.write(get_dependencies(msg))
|
|
out_file.write('\n')
|
|
out_file.write(f'class Msg{name_camel}(PrettyPrint):\n')
|
|
out_file.write(' def __init__(\n')
|
|
out_file.write(' self,\n')
|
|
args: list[str] = []
|
|
for member in msg['members']:
|
|
# {'name': ['message'], 'type': {'kind': 'string', 'disallow_cc': False}}
|
|
ftype = 'int'
|
|
default = '-1'
|
|
if member['type']['kind'] == 'string': # TODO: sanitize cc
|
|
ftype = 'str'
|
|
default = "'default'"
|
|
elif member['type']['kind'] in \
|
|
('raw', 'sha256', 'rest', 'data'): # TODO: data has a size field
|
|
ftype = 'bytes'
|
|
default = "b'\\x00'"
|
|
# {"name": ["mode"], "type": {"kind": "enum", "enum": ["chat"]}},
|
|
elif member['type']['kind'] == 'enum':
|
|
ftype = 'int'
|
|
default = '0'
|
|
# TODO: use ENUM_NAME_SOME_VALUE as default here
|
|
elif member['type']['kind'] in ('int32', 'tick'):
|
|
ftype = 'int'
|
|
default = '0'
|
|
elif member['type']['kind'] == 'boolean':
|
|
ftype = 'bool'
|
|
default = 'False'
|
|
elif member['type']['kind'] == 'tune_param': # TODO: think about tune params
|
|
ftype = 'int'
|
|
default = '0'
|
|
elif member['type']['kind'] == 'snapshot_object':
|
|
# TODO: think about snapshot_object
|
|
ftype = 'int'
|
|
default = '0'
|
|
elif member['type']['kind'] == 'array': # TODO: think about array
|
|
ftype = 'int'
|
|
default = '0'
|
|
elif member['type']['kind'] == 'flags': # TODO: think about flags
|
|
ftype = 'int'
|
|
default = '0'
|
|
elif member['type']['kind'] == 'optional': # TODO: think about optionals
|
|
ftype = 'int'
|
|
default = '0'
|
|
else:
|
|
raise ValueError(f"Error: unknown type {member['type']}")
|
|
name = name_to_snake(member["name"])
|
|
args.append(f' {name}: {ftype} = {default}')
|
|
out_file.write(',\n'.join(args) + '\n')
|
|
out_file.write(' ) -> None:\n')
|
|
out_file.write(f" self.message_name = '{name_snake}'\n")
|
|
sys: str = 'True' if game == 'system' else 'False'
|
|
out_file.write(f" self.system_message = {sys}\n")
|
|
out_file.write(" self.header: ChunkHeader\n")
|
|
out_file.write('\n')
|
|
for member in msg['members']:
|
|
# {'name': ['message'], 'type': {'kind': 'string', 'disallow_cc': False}}
|
|
ftype = 'int'
|
|
if member['type']['kind'] == 'string':
|
|
ftype = 'str'
|
|
elif member['type']['kind'] in \
|
|
('raw', 'sha256', 'rest', 'data'): # TODO: data has a size field
|
|
ftype = 'bytes'
|
|
# {"name": ["mode"], "type": {"kind": "enum", "enum": ["chat"]}},
|
|
elif member['type']['kind'] == 'enum':
|
|
ftype = 'int'
|
|
elif member['type']['kind'] in ('int32', 'tick'):
|
|
ftype = 'int'
|
|
elif member['type']['kind'] == 'boolean':
|
|
ftype = 'bool'
|
|
elif member['type']['kind'] == 'tune_param': # TODO: think about tune params
|
|
ftype = 'int'
|
|
elif member['type']['kind'] == 'snapshot_object':
|
|
# TODO: think about snapshot_object
|
|
ftype = 'int'
|
|
elif member['type']['kind'] == 'array': # TODO: think about array
|
|
ftype = 'int'
|
|
elif member['type']['kind'] == 'flags': # TODO: think about flags
|
|
ftype = 'int'
|
|
elif member['type']['kind'] == 'optional': # TODO: think about optionals
|
|
ftype = 'int'
|
|
else:
|
|
raise ValueError(f"Error: unknown type {member['type']}")
|
|
name = name_to_snake(member["name"])
|
|
out_file.write(f" self.{name}: {ftype} = {name}\n")
|
|
out_file.write('\n')
|
|
out_file.write(' # first byte of data\n')
|
|
out_file.write(' # has to be the first byte of the message payload\n')
|
|
out_file.write(' # NOT the chunk header and NOT the message id\n')
|
|
out_file.write(' def unpack(self, data: bytes) -> bool:\n')
|
|
out_file.write(' unpacker = Unpacker(data)\n')
|
|
for member in msg['members']:
|
|
# {'name': ['message'], 'type': {'kind': 'string', 'disallow_cc': False}}
|
|
unpacker = 'int()'
|
|
if member['type']['kind'] == 'string': # TODO: sanitize cc
|
|
unpacker = 'str()'
|
|
elif member['type']['kind'] in \
|
|
('raw', 'sha256', 'rest', 'data'): # TODO: data has a size field
|
|
unpacker = 'raw()'
|
|
# {"name": ["mode"], "type": {"kind": "enum", "enum": ["chat"]}},
|
|
elif member['type']['kind'] == 'enum':
|
|
unpacker = 'int() # TODO: this is a enum'
|
|
elif member['type']['kind'] in ('int32', 'tick'):
|
|
unpacker = 'int()'
|
|
elif member['type']['kind'] == 'boolean':
|
|
unpacker = 'int() == 1'
|
|
elif member['type']['kind'] == 'tune_param': # TODO: think about tune params
|
|
unpacker = 'int() # TODO: this is a tune param'
|
|
elif member['type']['kind'] == 'snapshot_object':
|
|
# TODO: think about snapshot_object
|
|
unpacker = 'int() # TODO: this is a snapshot object'
|
|
elif member['type']['kind'] == 'array': # TODO: think about array
|
|
unpacker = 'int() # TODO: this is an array'
|
|
elif member['type']['kind'] == 'flags': # TODO: think about flags
|
|
unpacker = 'int() # TODO: this is a flag'
|
|
elif member['type']['kind'] == 'optional': # TODO: think about optional
|
|
unpacker = 'int() # TODO: this is a optional of type any'
|
|
else:
|
|
raise ValueError(f"Error: unknown type {member['type']}")
|
|
name = name_to_snake(member["name"])
|
|
out_file.write(f' self.{name} = unpacker.get_{unpacker}\n')
|
|
out_file.write(' return True\n')
|
|
out_file.write('\n')
|
|
out_file.write(' def pack(self) -> bytes:\n')
|
|
out_file.write(gen_pack_return(msg))
|
|
|
|
def get_dependencies(msg: NetMessageJson) -> str:
|
|
packer_deps: list[str] = []
|
|
for member in msg['members']:
|
|
if member['type']['kind'] == 'string': # TODO: sanitize cc
|
|
packer_deps.append('pack_str')
|
|
elif member['type']['kind'] in \
|
|
('raw', 'sha256', 'rest', 'data'): # TODO: data has a size field
|
|
pass
|
|
# {"name": ["mode"], "type": {"kind": "enum", "enum": ["chat"]}},
|
|
elif member['type']['kind'] == 'enum':
|
|
packer_deps.append('pack_int')
|
|
elif member['type']['kind'] in ('int32', 'tick'):
|
|
packer_deps.append('pack_int')
|
|
elif member['type']['kind'] == 'boolean':
|
|
packer_deps.append('pack_int')
|
|
elif member['type']['kind'] == 'tune_param': # TODO: think about tune params
|
|
packer_deps.append('pack_int')
|
|
elif member['type']['kind'] == 'snapshot_object':
|
|
# TODO: think about snapshot_object
|
|
packer_deps.append('pack_int')
|
|
elif member['type']['kind'] == 'array': # TODO: think about array
|
|
packer_deps.append('pack_int')
|
|
elif member['type']['kind'] == 'flags': # TODO: think about flags
|
|
packer_deps.append('pack_int')
|
|
elif member['type']['kind'] == 'optional': # TODO: think about optional
|
|
packer_deps.append('pack_int')
|
|
else:
|
|
raise ValueError(f"Error: unknown type {member['type']}")
|
|
if len(packer_deps) == 0:
|
|
return ''
|
|
return 'from twnet_parser.packer import ' + \
|
|
', '.join(sorted(set(packer_deps))) + '\n'
|
|
|
|
def pack_field(member: NetMessageMemberJson) -> str:
|
|
name: str = name_to_snake(member["name"])
|
|
packer = 'int'
|
|
if member['type']['kind'] == 'string': # TODO: sanitize cc
|
|
packer = 'str'
|
|
elif member['type']['kind'] in \
|
|
('raw', 'sha256', 'rest', 'data'): # TODO: data has a size field
|
|
return f'self.{name}'
|
|
# {"name": ["mode"], "type": {"kind": "enum", "enum": ["chat"]}},
|
|
elif member['type']['kind'] == 'enum':
|
|
packer = 'int'
|
|
elif member['type']['kind'] in ('int32', 'tick'):
|
|
packer = 'int'
|
|
elif member['type']['kind'] == 'boolean':
|
|
packer = 'int'
|
|
elif member['type']['kind'] == 'tune_param': # TODO: think about tune params
|
|
packer = 'int'
|
|
elif member['type']['kind'] == 'snapshot_object':
|
|
# TODO: think about snapshot_object
|
|
packer = 'int'
|
|
elif member['type']['kind'] == 'array': # TODO: think about array
|
|
packer = 'int'
|
|
elif member['type']['kind'] == 'flags': # TODO: think about flags
|
|
packer = 'int'
|
|
elif member['type']['kind'] == 'optional': # TODO: think about optional
|
|
packer = 'int'
|
|
else:
|
|
raise ValueError(f"Error: unknown type {member['type']}")
|
|
return f'pack_{packer}(self.{name})'
|
|
|
|
def gen_pack_return(msg: NetMessageJson) -> str:
|
|
members: list[NetMessageMemberJson] = msg['members']
|
|
if len(members) == 0:
|
|
return " return b''"
|
|
if len(members) == 1:
|
|
return f' return {pack_field(members[0])}'
|
|
mem_strs: list[str] = [
|
|
f' {pack_field(member)}' for member in members[1:]]
|
|
return f" return {pack_field(members[0])} + \\\n" + \
|
|
' + \\\n'.join(mem_strs)
|
|
|
|
def generate(spec: str) -> None:
|
|
print(f"generating classes from {spec} ...")
|
|
with open(spec) as spec_io:
|
|
spec_data: SpecJson = json.load(spec_io)
|
|
# for msg in [spec_data['game_messages'][1]]:
|
|
game_messages: list[NetMessageJson] = spec_data['game_messages']
|
|
system_messages: list[NetMessageJson] = spec_data['system_messages']
|
|
gen_match_game7(game_messages)
|
|
for msg in game_messages:
|
|
generate_msg(msg, 'game')
|
|
for msg in system_messages:
|
|
generate_msg(msg, 'system')
|
|
|
|
def main() -> None:
|
|
dirname = os.path.dirname(__file__)
|
|
spec_07 = os.path.join(
|
|
dirname,
|
|
'../../libtw2/gamenet/generate/spec/teeworlds-0.7.5.json')
|
|
if os.path.exists(spec_07):
|
|
generate(spec_07)
|
|
else:
|
|
print(f"Error: file not found {spec_07}")
|
|
print(" try running these commands")
|
|
print("")
|
|
print(" git clone git@github.com:heinrich5991/libtw2 ..")
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
|