2023-03-25 17:18:33 +00:00
|
|
|
import os
|
|
|
|
import json
|
|
|
|
|
|
|
|
from typing import TypedDict, Literal
|
|
|
|
|
|
|
|
class ConstantJson(TypedDict):
|
|
|
|
name: list[str]
|
|
|
|
type: str
|
|
|
|
value: int
|
|
|
|
|
|
|
|
class GameEnumValuesJson(TypedDict):
|
|
|
|
value: str
|
|
|
|
name: list[str]
|
|
|
|
|
|
|
|
class GameEnumJson(TypedDict):
|
|
|
|
name: list[str]
|
|
|
|
values: list[GameEnumValuesJson]
|
|
|
|
|
|
|
|
class GameMessageMemberTypeJson(TypedDict):
|
|
|
|
kind: str
|
|
|
|
disallow_cc: bool
|
|
|
|
|
|
|
|
class GameMessageMemberJson(TypedDict):
|
|
|
|
name: list[str]
|
|
|
|
type: GameMessageMemberTypeJson
|
|
|
|
|
|
|
|
class GameMessageJson(TypedDict):
|
|
|
|
id: int
|
|
|
|
name: list[str]
|
|
|
|
members: list[GameMessageMemberJson]
|
|
|
|
attributes: list[Literal['msg_encoding']]
|
|
|
|
|
|
|
|
class SpecJson(TypedDict):
|
|
|
|
constants: list[ConstantJson]
|
|
|
|
game_enumerations: list[GameEnumJson]
|
|
|
|
game_messages: list[GameMessageJson]
|
|
|
|
|
2023-03-29 12:51:58 +00:00
|
|
|
def gen_match_game7(game_messages: list[GameMessageJson]):
|
|
|
|
match_game7: str = """# generated by scripts/generate_messages.py
|
|
|
|
from typing import Optional
|
|
|
|
|
|
|
|
import twnet_parser.msg7
|
|
|
|
from twnet_parser.net_message import NetMessage
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
msg: GameMessageJson
|
|
|
|
for msg in game_messages:
|
|
|
|
name_snake = name_to_snake(msg['name'])
|
|
|
|
match_game7 += "import twnet_parser.messages7.game" \
|
|
|
|
f".{name_snake}" \
|
|
|
|
" as \\\n" \
|
|
|
|
f" game7_{name_snake}\n"
|
|
|
|
|
|
|
|
match_game7 += """
|
|
|
|
def match_game7(msg_id: int, data: bytes) -> NetMessage:
|
|
|
|
msg: Optional[NetMessage] = None
|
|
|
|
"""
|
|
|
|
|
|
|
|
if_ = 'if'
|
|
|
|
for msg in game_messages:
|
|
|
|
name_snake = name_to_snake(msg['name'])
|
|
|
|
name_camel = name_to_camel(msg['name'])
|
|
|
|
match_game7 += \
|
|
|
|
f"""
|
|
|
|
{if_} msg_id == twnet_parser.msg7.{name_snake.upper()}:
|
|
|
|
msg = game7_{name_snake}.{name_camel}()"""
|
|
|
|
if_ = 'elif'
|
|
|
|
|
|
|
|
match_game7 += '\n\n if msg is None:\n'
|
|
|
|
match_game7 += ' '
|
|
|
|
match_game7 += 'raise ValueError('
|
|
|
|
match_game7 += 'f"Error: unknown message sys.id={msg_id} data={data[0]}")\n'
|
|
|
|
match_game7 += '\n'
|
|
|
|
match_game7 += ' msg.unpack(data)\n'
|
|
|
|
match_game7 += ' return msg\n'
|
|
|
|
|
|
|
|
dirname = os.path.dirname(__file__)
|
|
|
|
file_path= os.path.join(
|
|
|
|
dirname,
|
|
|
|
'../twnet_parser/msg_matcher/game7.py')
|
|
|
|
# if os.path.exists(file_path):
|
|
|
|
# print(f"Warning: file already exists! {file_path}")
|
|
|
|
# return
|
|
|
|
with open(file_path, 'w') as out_file:
|
|
|
|
print(f"Generating {file_path} ...")
|
|
|
|
out_file.write(match_game7)
|
|
|
|
|
2023-03-25 17:43:45 +00:00
|
|
|
def fix_name_conflict(name: str) -> str:
|
|
|
|
# https://peps.python.org/pep-0008/#descriptive-naming-styles
|
|
|
|
if name == 'pass':
|
|
|
|
return 'pass_'
|
|
|
|
return name
|
|
|
|
|
2023-03-25 17:18:33 +00:00
|
|
|
def name_to_camel(name_list: list[str]) -> str:
|
2023-03-25 17:43:45 +00:00
|
|
|
name = ''.join([part.capitalize() for part in name_list])
|
|
|
|
return fix_name_conflict(name)
|
2023-03-25 17:18:33 +00:00
|
|
|
|
|
|
|
def name_to_snake(name_list: list[str]) -> str:
|
2023-03-25 17:43:45 +00:00
|
|
|
name = '_'.join(name_list)
|
|
|
|
return fix_name_conflict(name)
|
2023-03-25 17:18:33 +00:00
|
|
|
|
|
|
|
def generate_msg(msg: GameMessageJson) -> None:
|
|
|
|
name_snake = name_to_snake(msg['name'])
|
|
|
|
name_camel = name_to_camel(msg['name'])
|
|
|
|
dirname = os.path.dirname(__file__)
|
|
|
|
file_path= os.path.join(
|
|
|
|
dirname,
|
|
|
|
'../twnet_parser/messages7/game/',
|
|
|
|
f'{name_snake}.py')
|
|
|
|
# if os.path.exists(file_path):
|
|
|
|
# print(f"Warning: file already exists! {file_path}")
|
|
|
|
# return
|
|
|
|
with open(file_path, 'w') as out_file:
|
|
|
|
print(f"Generating {file_path} ...")
|
|
|
|
out_file.write('# generated by scripts/generate_messages.py\n')
|
|
|
|
out_file.write('\n')
|
|
|
|
out_file.write('from twnet_parser.pretty_print import PrettyPrint\n')
|
|
|
|
out_file.write('from twnet_parser.packer import Unpacker\n')
|
2023-03-29 12:51:58 +00:00
|
|
|
out_file.write('from twnet_parser.chunk_header import ChunkHeader\n')
|
2023-03-26 11:03:04 +00:00
|
|
|
out_file.write(get_dependencies(msg))
|
2023-03-25 17:18:33 +00:00
|
|
|
out_file.write('\n')
|
|
|
|
out_file.write(f'class {name_camel}(PrettyPrint):\n')
|
|
|
|
out_file.write(' def __init__(\n')
|
|
|
|
out_file.write(' self,\n')
|
|
|
|
args: list[str] = []
|
|
|
|
for member in msg['members']:
|
|
|
|
# {'name': ['message'], 'type': {'kind': 'string', 'disallow_cc': False}}
|
|
|
|
ftype = 'int'
|
|
|
|
default = '-1'
|
|
|
|
if member['type']['kind'] == 'string': # TODO: sanitize cc
|
|
|
|
ftype = 'str'
|
|
|
|
default = "'default'"
|
|
|
|
elif member['type']['kind'] == 'raw':
|
|
|
|
ftype = 'bytes'
|
|
|
|
default = "b'\\x00'"
|
|
|
|
# {"name": ["mode"], "type": {"kind": "enum", "enum": ["chat"]}},
|
|
|
|
elif member['type']['kind'] == 'enum':
|
|
|
|
ftype = 'int'
|
|
|
|
default = '0'
|
|
|
|
# TODO: use ENUM_NAME_SOME_VALUE as default here
|
|
|
|
elif member['type']['kind'] in ('int32', 'tick'):
|
|
|
|
ftype = 'int'
|
|
|
|
default = '0'
|
|
|
|
elif member['type']['kind'] == 'boolean':
|
|
|
|
ftype = 'bool'
|
|
|
|
default = 'False'
|
|
|
|
elif member['type']['kind'] == 'tune_param': # TODO: think about tune params
|
|
|
|
ftype = 'int'
|
|
|
|
default = '0'
|
|
|
|
elif member['type']['kind'] == 'snapshot_object':
|
|
|
|
# TODO: think about snapshot_object
|
|
|
|
ftype = 'int'
|
|
|
|
default = '0'
|
|
|
|
elif member['type']['kind'] == 'array': # TODO: think about array
|
|
|
|
ftype = 'int'
|
|
|
|
default = '0'
|
|
|
|
elif member['type']['kind'] == 'flags': # TODO: think about flags
|
|
|
|
ftype = 'int'
|
|
|
|
default = '0'
|
|
|
|
else:
|
|
|
|
print(f"Error: unknown type {member['type']}")
|
|
|
|
exit(1)
|
|
|
|
name = name_to_snake(member["name"])
|
|
|
|
args.append(f' {name}: {ftype} = {default}')
|
|
|
|
out_file.write(',\n'.join(args) + '\n')
|
|
|
|
out_file.write(' ) -> None:\n')
|
|
|
|
out_file.write(f" self.message_name = '{name_snake}'\n")
|
2023-03-26 11:03:04 +00:00
|
|
|
out_file.write(" self.system_message = False\n")
|
2023-03-29 12:51:58 +00:00
|
|
|
out_file.write(" self.header: ChunkHeader\n")
|
2023-03-26 11:03:04 +00:00
|
|
|
out_file.write('\n')
|
2023-03-25 17:18:33 +00:00
|
|
|
for member in msg['members']:
|
|
|
|
# {'name': ['message'], 'type': {'kind': 'string', 'disallow_cc': False}}
|
|
|
|
ftype = 'int'
|
|
|
|
if member['type']['kind'] == 'string':
|
|
|
|
ftype = 'str'
|
|
|
|
elif member['type']['kind'] == 'raw':
|
|
|
|
ftype = 'bytes'
|
|
|
|
# {"name": ["mode"], "type": {"kind": "enum", "enum": ["chat"]}},
|
|
|
|
elif member['type']['kind'] == 'enum':
|
|
|
|
ftype = 'int'
|
|
|
|
elif member['type']['kind'] in ('int32', 'tick'):
|
|
|
|
ftype = 'int'
|
|
|
|
elif member['type']['kind'] == 'boolean':
|
|
|
|
ftype = 'bool'
|
|
|
|
elif member['type']['kind'] == 'tune_param': # TODO: think about tune params
|
|
|
|
ftype = 'int'
|
|
|
|
elif member['type']['kind'] == 'snapshot_object':
|
|
|
|
# TODO: think about snapshot_object
|
|
|
|
ftype = 'int'
|
|
|
|
elif member['type']['kind'] == 'array': # TODO: think about array
|
|
|
|
ftype = 'int'
|
|
|
|
elif member['type']['kind'] == 'flags': # TODO: think about flags
|
|
|
|
ftype = 'int'
|
|
|
|
else:
|
|
|
|
print(f"Error: unknown type {member['type']}")
|
|
|
|
exit(1)
|
|
|
|
name = name_to_snake(member["name"])
|
|
|
|
out_file.write(f" self.{name}: {ftype} = {name}\n")
|
|
|
|
out_file.write('\n')
|
|
|
|
out_file.write(' # first byte of data\n')
|
|
|
|
out_file.write(' # has to be the first byte of the message payload\n')
|
|
|
|
out_file.write(' # NOT the chunk header and NOT the message id\n')
|
|
|
|
out_file.write(' def unpack(self, data: bytes) -> bool:\n')
|
|
|
|
out_file.write(' unpacker = Unpacker(data)\n')
|
|
|
|
for member in msg['members']:
|
|
|
|
# {'name': ['message'], 'type': {'kind': 'string', 'disallow_cc': False}}
|
|
|
|
unpacker = 'int()'
|
|
|
|
if member['type']['kind'] == 'string': # TODO: sanitize cc
|
2023-03-25 17:43:45 +00:00
|
|
|
unpacker = 'str()'
|
2023-03-25 17:18:33 +00:00
|
|
|
elif member['type']['kind'] == 'raw':
|
|
|
|
unpacker = 'raw()'
|
|
|
|
# {"name": ["mode"], "type": {"kind": "enum", "enum": ["chat"]}},
|
|
|
|
elif member['type']['kind'] == 'enum':
|
|
|
|
unpacker = 'int() # TODO: this is a enum'
|
|
|
|
elif member['type']['kind'] in ('int32', 'tick'):
|
|
|
|
unpacker = 'int()'
|
|
|
|
elif member['type']['kind'] == 'boolean':
|
|
|
|
unpacker = 'int() == 1'
|
|
|
|
elif member['type']['kind'] == 'tune_param': # TODO: think about tune params
|
|
|
|
unpacker = 'int() # TODO: this is a tune param'
|
|
|
|
elif member['type']['kind'] == 'snapshot_object':
|
|
|
|
# TODO: think about snapshot_object
|
|
|
|
unpacker = 'int() # TODO: this is a snapshot object'
|
|
|
|
elif member['type']['kind'] == 'array': # TODO: think about array
|
|
|
|
unpacker = 'int() # TODO: this is an array'
|
|
|
|
elif member['type']['kind'] == 'flags': # TODO: think about flags
|
|
|
|
unpacker = 'int() # TODO: this is a flag'
|
|
|
|
else:
|
|
|
|
print(f"Error: unknown type {member['type']}")
|
|
|
|
exit(1)
|
|
|
|
name = name_to_snake(member["name"])
|
|
|
|
out_file.write(f' self.{name} = unpacker.get_{unpacker}\n')
|
|
|
|
out_file.write(' return True\n')
|
|
|
|
out_file.write('\n')
|
|
|
|
out_file.write(' def pack(self) -> bytes:\n')
|
2023-03-26 11:03:04 +00:00
|
|
|
out_file.write(gen_pack_return(msg))
|
|
|
|
|
|
|
|
def get_dependencies(msg: GameMessageJson) -> str:
|
|
|
|
packer_deps: list[str] = []
|
|
|
|
for member in msg['members']:
|
|
|
|
if member['type']['kind'] == 'string': # TODO: sanitize cc
|
|
|
|
packer_deps.append('pack_str')
|
|
|
|
elif member['type']['kind'] == 'raw':
|
|
|
|
pass
|
|
|
|
# {"name": ["mode"], "type": {"kind": "enum", "enum": ["chat"]}},
|
|
|
|
elif member['type']['kind'] == 'enum':
|
|
|
|
packer_deps.append('pack_int')
|
|
|
|
elif member['type']['kind'] in ('int32', 'tick'):
|
|
|
|
packer_deps.append('pack_int')
|
|
|
|
elif member['type']['kind'] == 'boolean':
|
|
|
|
packer_deps.append('pack_int')
|
|
|
|
elif member['type']['kind'] == 'tune_param': # TODO: think about tune params
|
|
|
|
packer_deps.append('pack_int')
|
|
|
|
elif member['type']['kind'] == 'snapshot_object':
|
|
|
|
# TODO: think about snapshot_object
|
|
|
|
packer_deps.append('pack_int')
|
|
|
|
elif member['type']['kind'] == 'array': # TODO: think about array
|
|
|
|
packer_deps.append('pack_int')
|
|
|
|
elif member['type']['kind'] == 'flags': # TODO: think about flags
|
|
|
|
packer_deps.append('pack_int')
|
|
|
|
else:
|
|
|
|
print(f"Error: unknown type {member['type']}")
|
|
|
|
exit(1)
|
|
|
|
if len(packer_deps) == 0:
|
|
|
|
return ''
|
|
|
|
return 'from twnet_parser.packer import ' + \
|
|
|
|
', '.join(set(packer_deps)) + '\n'
|
|
|
|
|
|
|
|
def pack_field(member: GameMessageMemberJson) -> str:
|
|
|
|
name: str = name_to_snake(member["name"])
|
|
|
|
packer = 'int'
|
|
|
|
if member['type']['kind'] == 'string': # TODO: sanitize cc
|
|
|
|
packer = 'str'
|
|
|
|
elif member['type']['kind'] == 'raw':
|
|
|
|
return f'self.{name}'
|
|
|
|
# {"name": ["mode"], "type": {"kind": "enum", "enum": ["chat"]}},
|
|
|
|
elif member['type']['kind'] == 'enum':
|
|
|
|
packer = 'int'
|
|
|
|
elif member['type']['kind'] in ('int32', 'tick'):
|
|
|
|
packer = 'int'
|
|
|
|
elif member['type']['kind'] == 'boolean':
|
|
|
|
packer = 'int'
|
|
|
|
elif member['type']['kind'] == 'tune_param': # TODO: think about tune params
|
|
|
|
packer = 'int'
|
|
|
|
elif member['type']['kind'] == 'snapshot_object':
|
|
|
|
# TODO: think about snapshot_object
|
|
|
|
packer = 'int'
|
|
|
|
elif member['type']['kind'] == 'array': # TODO: think about array
|
|
|
|
packer = 'int'
|
|
|
|
elif member['type']['kind'] == 'flags': # TODO: think about flags
|
|
|
|
packer = 'int'
|
|
|
|
else:
|
|
|
|
print(f"Error: unknown type {member['type']}")
|
|
|
|
exit(1)
|
|
|
|
return f'pack_{packer}(self.{name})'
|
2023-03-25 17:18:33 +00:00
|
|
|
|
2023-03-26 11:03:04 +00:00
|
|
|
def gen_pack_return(msg: GameMessageJson) -> str:
|
|
|
|
members: list[GameMessageMemberJson] = msg['members']
|
|
|
|
if len(members) == 0:
|
|
|
|
return " return b''"
|
|
|
|
if len(members) == 1:
|
|
|
|
return f' return {pack_field(members[0])}'
|
|
|
|
mem_strs: list[str] = [
|
|
|
|
f' {pack_field(member)}' for member in members[1:]]
|
|
|
|
return f" return {pack_field(members[0])} + \\\n" + \
|
|
|
|
' + \\\n'.join(mem_strs)
|
2023-03-25 17:18:33 +00:00
|
|
|
|
|
|
|
def generate(spec: str) -> None:
|
|
|
|
print(f"generating classes from {spec} ...")
|
|
|
|
with open(spec) as spec_io:
|
|
|
|
spec_data: SpecJson = json.load(spec_io)
|
|
|
|
# for msg in [spec_data['game_messages'][1]]:
|
2023-03-29 12:51:58 +00:00
|
|
|
game_messages: list[GameMessageJson] = spec_data['game_messages']
|
|
|
|
gen_match_game7(game_messages)
|
|
|
|
exit(0)
|
|
|
|
for msg in game_messages:
|
2023-03-25 17:18:33 +00:00
|
|
|
generate_msg(msg)
|
|
|
|
|
|
|
|
|
|
|
|
def main() -> None:
|
|
|
|
dirname = os.path.dirname(__file__)
|
|
|
|
spec_07 = os.path.join(
|
|
|
|
dirname,
|
|
|
|
'../../libtw2/gamenet/generate/spec/teeworlds-0.7.5.json')
|
|
|
|
if os.path.exists(spec_07):
|
|
|
|
generate(spec_07)
|
|
|
|
else:
|
|
|
|
print(f"Error: file not found {spec_07}")
|
|
|
|
print(" try running these commands")
|
|
|
|
print("")
|
|
|
|
print(" git clone git@github.com:heinrich5991/libtw2 ..")
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
main()
|
|
|
|
|