FoxWorld-server/packet.py
2025-04-11 11:22:16 +03:00

86 lines
2.9 KiB
Python

import struct, asyncio
import log
##########
PVN = 0x00
##########
class p:
# clientbound packets
C_Quit = 0x00
C_SetCoord = 0x01
C_Chat = 0x02
C_ChunkRQ = 0x03
# serverbound packets
S_Disconnect = 0x00
S_Chunk = 0x01
S_Chat = 0x02
S_Spawn = 0x03
async def init() -> None:
global loop
loop = asyncio.get_running_loop()
# this section writes various packets to the client. it doesn't check whether the specified fields are in range,
# if something goes wrong the underlying library(struct, etc.) should throw an exception. basically the calling
# code needs to verify correctness of the data, not these helper functions.
async def write_Version(sock) -> None:
await loop.sock_sendall(sock, b"FWNP" + PVN.to_bytes(4))
async def write_Disconnect(sock, message: str) -> None:
enc_msg: bytes = message.encode("utf-8")
await loop.sock_sendall(sock, struct.pack("!BH", p.S_Disconnect, len(enc_msg)) + enc_msg)
async def write_Chunk(sock, chunk_x: int, chunk_z: int, chunk_data: bytes) -> None:
await loop.sock_sendall(sock, struct.pack("!BhhH", p.S_Chunk, chunk_x, chunk_z, len(chunk_data)) + chunk_data)
async def write_Chat(sock, message: str) -> None:
enc_msg: bytes = message.encode("utf-8")
await loop.sock_sendall(sock, struct.pack("!BH", p.S_Chat, len(enc_msg)) + enc_msg)
async def write_Spawn(sock, spawn_x: float, spawn_y: float, spawn_z: float) -> None:
await loop.sock_sendall(sock, struct.pack("!Bfff", spawn_x, spawn_y, spawn_z))
# this functions are the only read functions intended to be used
async def read(sock) -> tuple:
packet_id: int = int.from_bytes(await _read_bytes(sock, 1))
match packet_id:
case p.C_Quit: return packet_id, await _read_Quit (sock)
case p.C_SetCoord: return packet_id, await _read_SetCoord(sock)
case p.C_Chat: return packet_id, await _read_Chat (sock)
case p.C_ChunkRQ: return packet_id, await _read_ChunkRQ (sock)
case _: return None, None
async def read_Login(sock) -> str:
nick_len: int = int.from_bytes(await _read_bytes(sock, 1))
return (await _read_bytes(sock, nick_len)).decode("ascii")
# all of these are read helper functions
async def _read_bytes(sock, n: int) -> bytes or None:
buffer: bytes = b""
while len(buffer) != n:
data = await loop.sock_recv(sock, n - len(buffer))
if not data:
return
buffer += data
return buffer
async def _read_Quit(sock) -> int:
return int.from_bytes(await _read_bytes(sock, 1))
async def _read_SetCoord(sock) -> tuple[float]:
return struct.unpack("!fff", await _read_bytes(sock, 12))
async def _read_Chat(sock) -> str:
# yes, i know i could've done this in 1 line, but it's more readable this way - 2o, 22aug2024
message_len: int = int.from_bytes(await _read_bytes(sock, 2))
return (await _read_bytes(sock, message_len)).decode("utf-8")
async def _read_ChunkRQ(sock) -> tuple[int]:
return struct.unpack("!hh", await _read_bytes(sock, 4))