twnet_parser/scripts/generate_messages.py
ChillerDragon 3b93a6bba2 Packing python bools as tw net bools just works
python handles bools more like the ints 0 and 1
and those are exactly the values the
teeworlds network protocol expects

I can totally see this breaking in python4
or a new mypy version

maybe one day a ``get_bool()`` could make sense
also for readability

but for now this should be stable as long as the
tests pass
2023-04-09 12:14:13 +02:00

585 lines
23 KiB
Python
Executable file

#!/usr/bin/env python3
import os
import json
from typing import TypedDict, Literal, Optional, Dict, Union
KIND = Literal[ \
'int32', \
'tick', \
'string', \
'raw', \
'sha256', \
'data', \
'rest', \
'enum', \
'boolean', \
'tune_param', \
'snapshot_object', \
'array', \
'flags', \
'optional']
class ConstantJson(TypedDict):
name: list[str]
type: str
value: int
class NetEnumValuesJson(TypedDict):
value: str
name: list[str]
class NetEnumJson(TypedDict):
name: list[str]
values: list[NetEnumValuesJson]
class InnerNetMessageMemberTypeJson(TypedDict):
kind: KIND
disallow_cc: bool
class ArrayMemberTypeJson(TypedDict):
kind: KIND
# strings
disallow_cc: bool
class NetMessageMemberTypeJson(TypedDict):
kind: KIND
inner: InnerNetMessageMemberTypeJson
# strings
disallow_cc: bool
# arrays
count: int
member_type: ArrayMemberTypeJson
# data
size: Literal['specified_before']
class NetMessageMemberJson(TypedDict):
name: list[str]
type: NetMessageMemberTypeJson
class NetMessageJson(TypedDict):
id: int
name: list[str]
members: list[NetMessageMemberJson]
attributes: list[Literal['msg_encoding']]
class SpecJson(TypedDict):
constants: list[ConstantJson]
game_enumerations: list[NetEnumJson]
game_messages: list[NetMessageJson]
system_messages: list[NetMessageJson]
def gen_match_file7(
msg_type: Literal['system', 'game'],
messages: list[NetMessageJson]
):
match_code: str = """# generated by scripts/generate_messages.py
from typing import Optional
import twnet_parser.msg7
from twnet_parser.net_message import NetMessage
"""
msg: NetMessageJson
for msg in messages:
name_snake = name_to_snake(msg['name'])
match_code += f"import twnet_parser.messages7.{msg_type}" \
f".{name_snake}" \
" as \\\n" \
f" {msg_type}7_{name_snake}\n"
match_code += f"""
def match_{msg_type}7(msg_id: int, data: bytes) -> NetMessage:
msg: Optional[NetMessage] = None
"""
if_ = 'if'
for msg in messages:
name_snake = name_to_snake(msg['name'])
name_camel = name_to_camel(msg['name'])
match_code += \
f"""
{if_} msg_id == twnet_parser.msg7.{name_snake.upper()}:
msg = {msg_type}7_{name_snake}.Msg{name_camel}()"""
if_ = 'elif'
match_code += '\n\n if msg is None:\n'
match_code += ' '
match_code += 'raise ValueError('
match_code += 'f"Error: unknown ' \
+ msg_type + \
' message id={msg_id} data={data[0]}")\n'
match_code += '\n'
match_code += ' msg.unpack(data)\n'
match_code += ' return msg\n'
dirname = os.path.dirname(__file__)
file_path= os.path.join(
dirname,
f'../twnet_parser/msg_matcher/{msg_type}7.py')
# if os.path.exists(file_path):
# print(f"Warning: file already exists! {file_path}")
# return
with open(file_path, 'w') as out_file:
print(f"Generating {file_path} ...")
out_file.write(match_code)
def fix_name_conflict(name: str) -> str:
# https://peps.python.org/pep-0008/#descriptive-naming-styles
if name == 'pass':
return 'pass_'
return name
def name_to_camel(name_list: list[str]) -> str:
name = ''.join([part.capitalize() for part in name_list])
return fix_name_conflict(name)
def name_to_snake(name_list: list[str]) -> str:
name = '_'.join(name_list)
return fix_name_conflict(name)
def generate_msg(msg: NetMessageJson, game: Literal['game', 'system']) -> None:
name_snake = name_to_snake(msg['name'])
name_camel = name_to_camel(msg['name'])
dirname = os.path.dirname(__file__)
file_path= os.path.join(
dirname,
f'../twnet_parser/messages7/{game}/',
f'{name_snake}.py')
# if os.path.exists(file_path):
# print(f"Warning: file already exists! {file_path}")
# return
with open(file_path, 'w') as out_file:
print(f"Generating {file_path} ...")
out_file.write('# generated by scripts/generate_messages.py\n')
out_file.write('\n')
out_file.write('from twnet_parser.pretty_print import PrettyPrint\n')
if len(msg['members']) > 0:
out_file.write('from twnet_parser.packer import Unpacker\n')
out_file.write('from twnet_parser.chunk_header import ChunkHeader\n')
out_file.write(get_dependencies(msg))
out_file.write('\n')
out_file.write(f'class Msg{name_camel}(PrettyPrint):\n')
out_file.write(' def __init__(\n')
out_file.write(' self,\n')
args: list[str] = []
for member in msg['members']:
# {'name': ['message'], 'type': {'kind': 'string', 'disallow_cc': False}}
ftype = 'int'
default = '-1'
if member['type']['kind'] == 'string':
ftype = 'str'
default = "'default'"
elif member['type']['kind'] in \
('raw', 'sha256', 'rest'): # TODO: rest sha256 and raw
ftype = 'bytes'
default = "b'\\x00'"
elif member['type']['kind'] == 'data':
ftype = 'bytes'
default = "b'\\x00'"
if member['type']['size'] == 'specified_before':
args.append(' data_size: Optional[int] = None')
else:
raise ValueError(f"Error: unknown data size {member['type']}")
# {"name": ["mode"], "type": {"kind": "enum", "enum": ["chat"]}},
elif member['type']['kind'] == 'enum':
ftype = 'int'
default = '0'
# TODO: use ENUM_NAME_SOME_VALUE as default here
elif member['type']['kind'] in ('int32', 'tick'):
ftype = 'int'
default = '0'
elif member['type']['kind'] == 'boolean':
ftype = 'bool'
default = 'False'
elif member['type']['kind'] == 'tune_param':
ftype = 'float'
default = '0.0'
elif member['type']['kind'] == 'snapshot_object':
# TODO: think about snapshot_object
ftype = 'int'
default = '0'
elif member['type']['kind'] == 'array':
size: int = member['type']['count']
if size is None:
print("Error: size is none for the following message")
print(msg)
exit(1)
arr_member: ArrayMemberTypeJson = member['type']['member_type']
if arr_member['kind'] == 'string':
ftype = f'Annotated[list[str], {size}]'
default = '[' + ', '.join(["''"] * size) + ']'
elif arr_member['kind'] == 'boolean':
ftype = f'Annotated[list[bool], {size}]'
default = '[' + ', '.join(["False"] * size) + ']'
elif arr_member['kind'] in ('int32', 'tick', 'enum'):
ftype = f'Annotated[list[int], {size}]'
default = '[' + ', '.join(["0"] * size) + ']'
else:
raise ValueError( \
f"Error: unknown array member type {member['type']}")
# Initializing lists with defaults
# And type annotation can get quite long
# So split it in two lines
default = f'\\\n {default}'
elif member['type']['kind'] == 'flags': # TODO: think about flags
ftype = 'int'
default = '0'
elif member['type']['kind'] == 'optional':
if member['type']['inner']['kind'] == 'string':
ftype = 'str'
default = "''"
elif member['type']['inner']['kind'] in ('int32', 'tick'):
ftype = 'int'
default = '0'
else:
raise ValueError(f"Error: unknown optional type {member['type']}")
else:
raise ValueError(f"Error: unknown type {member['type']}")
name = name_to_snake(member["name"])
manual_default = get_default(f"{game}.{name_snake}.{name}")
if manual_default:
default = manual_default
args.append(f' {name}: {ftype} = {default}')
out_file.write(',\n'.join(args) + '\n')
out_file.write(' ) -> None:\n')
out_file.write(f" self.message_name = '{name_snake}'\n")
sys: str = 'True' if game == 'system' else 'False'
out_file.write(f" self.system_message = {sys}\n")
out_file.write(" self.header: ChunkHeader\n")
out_file.write('\n')
for member in msg['members']:
# {'name': ['message'], 'type': {'kind': 'string', 'disallow_cc': False}}
ftype = 'int'
if member['type']['kind'] == 'string':
ftype = 'str'
elif member['type']['kind'] in \
('raw', 'sha256', 'rest'): # TODO: sha256 and raw
ftype = 'bytes'
elif member['type']['kind'] == 'data':
ftype = 'bytes'
if member['type']['size'] == 'specified_before':
out_file.write(" " \
"self.data_size: int = data_size if data_size else len(data)\n")
else:
raise ValueError(f"Error: unknown data size {member['type']}")
# {"name": ["mode"], "type": {"kind": "enum", "enum": ["chat"]}},
elif member['type']['kind'] == 'enum':
ftype = 'int'
elif member['type']['kind'] in ('int32', 'tick'):
ftype = 'int'
elif member['type']['kind'] == 'boolean':
ftype = 'bool'
elif member['type']['kind'] == 'tune_param':
ftype = 'float'
elif member['type']['kind'] == 'snapshot_object':
# TODO: think about snapshot_object
ftype = 'int'
elif member['type']['kind'] == 'array':
# Array type annotations are so annoyingly long
# also there is a planned refactor
# https://gitlab.com/teeworlds-network/twnet_parser/-/issues/4
# so inherit type from constructor arguments
ftype = ''
elif member['type']['kind'] == 'flags': # TODO: think about flags
ftype = 'int'
elif member['type']['kind'] == 'optional':
if member['type']['inner']['kind'] == 'string':
ftype = 'str'
elif member['type']['inner']['kind'] in ('int32', 'tick'):
ftype = 'int'
else:
raise ValueError(f"Error: unknown optional type {member['type']}")
else:
raise ValueError(f"Error: unknown type {member['type']}")
name = name_to_snake(member["name"])
if ftype != '':
ftype = f': {ftype}'
out_file.write(f" self.{name}{ftype} = {name}\n")
out_file.write('\n')
out_file.write(' # first byte of data\n')
out_file.write(' # has to be the first byte of the message payload\n')
out_file.write(' # NOT the chunk header and NOT the message id\n')
out_file.write(' def unpack(self, data: bytes) -> bool:\n')
if len(msg['members']) > 0:
out_file.write(' unpacker = Unpacker(data)\n')
out_file.write(gen_unpack_members(msg))
out_file.write(' return True\n')
out_file.write('\n')
out_file.write(' def pack(self) -> bytes:\n')
out_file.write(gen_pack_return(msg))
def gen_unpack_members(msg: NetMessageJson) -> str:
res: str = ''
for member in msg['members']:
# {'name': ['message'], 'type': {'kind': 'string', 'disallow_cc': False}}
unpacker = 'int()'
if member['type']['kind'] == 'string':
if member['type']['disallow_cc']:
unpacker = 'str(SANITIZE_CC)'
else:
unpacker = 'str()'
elif member['type']['kind'] in \
('raw', 'sha256', 'rest'): # TODO: do we need to fix size for sha256?
unpacker = 'raw()'
elif member['type']['kind'] == 'data':
if member['type']['size'] == 'specified_before':
res += ' self.data_size = unpacker.get_int()\n'
unpacker = 'raw(self.data_size)'
else:
raise ValueError(f"Error: unknown data size {member['type']}")
# {"name": ["mode"], "type": {"kind": "enum", "enum": ["chat"]}},
elif member['type']['kind'] == 'enum':
unpacker = 'int() # TODO: this is a enum'
elif member['type']['kind'] in ('int32', 'tick'):
unpacker = 'int()'
elif member['type']['kind'] == 'boolean':
unpacker = 'int() == 1'
elif member['type']['kind'] == 'tune_param':
unpacker = 'int() / 100.0'
elif member['type']['kind'] == 'snapshot_object':
# TODO: think about snapshot_object
unpacker = 'int() # TODO: this is a snapshot object'
elif member['type']['kind'] == 'array':
size: int = member['type']['count']
if size is None:
print("Error: size is none for the following message")
print(msg)
exit(1)
arr_member: ArrayMemberTypeJson = member['type']['member_type']
if arr_member['kind'] == 'string':
if arr_member['disallow_cc']:
unpacker = 'str(SANITIZE_CC)'
else:
unpacker = 'str()'
elif arr_member['kind'] == 'enum':
unpacker = 'int() # TODO: this is a enum'
elif arr_member['kind'] == 'boolean':
unpacker = 'int() == 1'
elif arr_member['kind'] in ('int32', 'tick'):
unpacker = 'int()'
else:
raise ValueError(f"Error: unknown array member type {member['type']}")
name = name_to_snake(member["name"])
res += f' for i in range(0, {size}):\n'
res += f' self.{name}[i] = unpacker.get_{unpacker}\n'
continue
elif member['type']['kind'] == 'flags': # TODO: think about flags
unpacker = 'int() # TODO: this is a flag'
elif member['type']['kind'] == 'optional':
# TODO: unpacker should not crash on missing optional fields
# check how tw code does it and be smart here
if member['type']['inner']['kind'] == 'string':
if member['type']['inner']['disallow_cc']:
unpacker = 'str(SANITIZE_CC)'
unpacker += ' # TODO: optionals'
else:
unpacker = 'str() # TODO: optionals'
elif member['type']['inner']['kind'] in ('int32', 'tick'):
unpacker = 'int() # TODO: optionals'
else:
raise ValueError(f"Error: unknown type {member['type']}")
name = name_to_snake(member["name"])
res += f' self.{name} = unpacker.get_{unpacker}\n'
return res
def get_dependencies(msg: NetMessageJson) -> str:
packer_deps: list[str] = []
typing_deps: list[str] = []
for member in msg['members']:
if member['type']['kind'] == 'string':
packer_deps.append('pack_str')
if member['type']['disallow_cc']:
packer_deps.append('SANITIZE_CC')
elif member['type']['kind'] in \
('raw', 'sha256', 'rest'):
pass
elif member['type']['kind'] == 'data':
if member['type']['size'] == 'specified_before':
typing_deps.append('Optional')
else:
raise ValueError(f"Error: unknown data size {member['type']}")
# {"name": ["mode"], "type": {"kind": "enum", "enum": ["chat"]}},
elif member['type']['kind'] == 'enum':
packer_deps.append('pack_int')
elif member['type']['kind'] in ('int32', 'tick'):
packer_deps.append('pack_int')
elif member['type']['kind'] == 'boolean':
packer_deps.append('pack_int')
elif member['type']['kind'] == 'tune_param':
packer_deps.append('pack_int')
elif member['type']['kind'] == 'snapshot_object':
# TODO: think about snapshot_object
packer_deps.append('pack_int')
elif member['type']['kind'] == 'array':
packer_deps.append('pack_int')
typing_deps.append('Annotated')
arr_member: ArrayMemberTypeJson = member['type']['member_type']
if arr_member['kind'] == 'string':
packer_deps.append('pack_str')
if arr_member['disallow_cc']:
packer_deps.append('SANITIZE_CC')
elif arr_member['kind'] == 'enum':
packer_deps.append('pack_int')
elif arr_member['kind'] == 'boolean':
packer_deps.append('pack_int')
elif arr_member['kind'] in ('int32', 'tick'):
packer_deps.append('pack_int')
else:
raise ValueError(f"Error: unknown array member type {member['type']}")
elif member['type']['kind'] == 'flags': # TODO: think about flags
packer_deps.append('pack_int')
elif member['type']['kind'] == 'optional':
if member['type']['inner']['kind'] == 'string':
packer_deps.append('pack_str')
if member['type']['inner']['disallow_cc']:
packer_deps.append('SANITIZE_CC')
elif member['type']['inner']['kind'] in ('int32', 'tick'):
packer_deps.append('pack_int')
else:
raise ValueError(f"Error: unknown type {member['type']}")
res: str = ''
if len(packer_deps) > 0:
res += 'from twnet_parser.packer import ' + \
', '.join(sorted(set(packer_deps))) + '\n'
if len(typing_deps) > 0:
res += 'from typing import ' + \
', '.join(sorted(set(typing_deps))) + '\n'
return res
def pack_field(member: NetMessageMemberJson) -> str:
name: str = name_to_snake(member["name"])
field: str = f'self.{name}'
packer = 'int'
if member['type']['kind'] == 'string':
packer = 'str'
elif member['type']['kind'] in \
('raw', 'sha256', 'rest'): # TODO: raw sha256 rest
return f'self.{name}'
elif member['type']['kind'] == 'data':
if member['type']['size'] == 'specified_before':
return f'pack_int(self.data_size) + \\\n' \
f' self.{name}'
else:
raise ValueError(f"Error: unknown data size {member['type']}")
# {"name": ["mode"], "type": {"kind": "enum", "enum": ["chat"]}},
elif member['type']['kind'] == 'enum':
packer = 'int'
elif member['type']['kind'] in ('int32', 'tick'):
packer = 'int'
elif member['type']['kind'] == 'boolean':
packer = 'int'
elif member['type']['kind'] == 'tune_param':
packer = 'int'
field = f'int({field} * 100.0)'
elif member['type']['kind'] == 'snapshot_object':
# TODO: think about snapshot_object
packer = 'int'
elif member['type']['kind'] == 'array':
arr_member: ArrayMemberTypeJson = member['type']['member_type']
if arr_member['kind'] == 'string':
packer = 'str'
elif arr_member['kind'] == 'enum':
packer = 'int'
elif arr_member['kind'] == 'boolean':
packer = 'int'
elif arr_member['kind'] in ('int32', 'tick'):
packer = 'int'
else:
raise ValueError(f"Error: unknown array member type {member['type']}")
return f"b''.join([pack_{packer}(x) for x in {field}])"
elif member['type']['kind'] == 'flags': # TODO: think about flags
packer = 'int'
elif member['type']['kind'] == 'optional':
packer = 'int'
# TODO: unpacker should allow not packing optional fields
# check how tw code does it and be smart here
if member['type']['inner']['kind'] == 'string':
packer = 'str'
elif member['type']['inner']['kind'] in ('int32', 'tick'):
packer = 'int'
else:
raise ValueError(f"Error: unknown type {member['type']}")
return f'pack_{packer}({field})'
def gen_pack_return(msg: NetMessageJson) -> str:
members: list[NetMessageMemberJson] = msg['members']
if len(members) == 0:
return " return b''"
if len(members) == 1:
return f' return {pack_field(members[0])}'
mem_strs: list[str] = [
f' {pack_field(member)}' for member in members[1:]]
return f" return {pack_field(members[0])} + \\\n" + \
' + \\\n'.join(mem_strs)
def get_default(field_path: str) -> Optional[str]:
"""
field_path has the following format:
game.msg_name.field_name
example:
game.sv_tune_params.ground_control_speed
"""
# COULDDO: make this faster
# but then who cares about
# code gen speed
def_file: str = './data/messages7_defaults.json'
if not os.path.exists(def_file):
print(f"Failed to open defaults file '{def_file}'")
exit(1)
with open(def_file) as def_io:
def_json: Dict[str, Union[int, float, bool, str]] = json.load(def_io)
if field_path not in def_json:
return None
default = def_json[field_path]
# also covers bool cuz python drunk
# but this is actually exactly what we want
if isinstance(default, int):
return str(default)
elif isinstance(default, float):
return str(default)
elif isinstance(default, str):
return f"'{default}'"
else:
print(f"Error: invalid default type for field {field_path}")
print(f" please check {def_file} for errors")
exit(1)
def generate(spec: str) -> None:
print(f"generating classes from {spec} ...")
with open(spec) as spec_io:
spec_data: SpecJson = json.load(spec_io)
# for msg in [spec_data['game_messages'][1]]:
game_messages: list[NetMessageJson] = spec_data['game_messages']
system_messages: list[NetMessageJson] = spec_data['system_messages']
gen_match_file7('game', game_messages)
gen_match_file7('system', system_messages)
for msg in game_messages:
generate_msg(msg, 'game')
for msg in system_messages:
generate_msg(msg, 'system')
def main() -> None:
dirname = os.path.dirname(__file__)
spec_07 = os.path.join(
dirname,
'../../libtw2/gamenet/generate/spec/teeworlds-0.7.5.json')
if os.path.exists(spec_07):
generate(spec_07)
else:
print(f"Error: file not found {spec_07}")
print(" try running these commands")
print("")
print(" git clone git@github.com:heinrich5991/libtw2 ..")
if __name__ == '__main__':
main()