86 lines
2.9 KiB
Python
86 lines
2.9 KiB
Python
import struct, asyncio
|
|
|
|
import log
|
|
|
|
##########
|
|
PVN = 0x00
|
|
##########
|
|
|
|
class p:
|
|
# clientbound packets
|
|
C_Quit = 0x00
|
|
C_SetCoord = 0x01
|
|
C_Chat = 0x02
|
|
C_ChunkRQ = 0x03
|
|
|
|
# serverbound packets
|
|
S_Disconnect = 0x00
|
|
S_Chunk = 0x01
|
|
S_Chat = 0x02
|
|
S_Spawn = 0x03
|
|
|
|
async def init() -> None:
|
|
global loop
|
|
loop = asyncio.get_running_loop()
|
|
|
|
# this section writes various packets to the client. it doesn't check whether the specified fields are in range,
|
|
# if something goes wrong the underlying library(struct, etc.) should throw an exception. basically the calling
|
|
# code needs to verify correctness of the data, not these helper functions.
|
|
|
|
async def write_Version(sock) -> None:
|
|
await loop.sock_sendall(sock, b"FWNP" + PVN.to_bytes(4))
|
|
|
|
async def write_Disconnect(sock, message: str) -> None:
|
|
enc_msg: bytes = message.encode("utf-8")
|
|
await loop.sock_sendall(sock, struct.pack("!BH", p.S_Disconnect, len(enc_msg)) + enc_msg)
|
|
|
|
async def write_Chunk(sock, chunk_x: int, chunk_z: int, chunk_data: bytes) -> None:
|
|
await loop.sock_sendall(sock, struct.pack("!BhhH", p.S_Chunk, chunk_x, chunk_z, len(chunk_data)) + chunk_data)
|
|
|
|
async def write_Chat(sock, message: str) -> None:
|
|
enc_msg: bytes = message.encode("utf-8")
|
|
await loop.sock_sendall(sock, struct.pack("!BH", p.S_Chat, len(enc_msg)) + enc_msg)
|
|
|
|
async def write_Spawn(sock, spawn_x: float, spawn_y: float, spawn_z: float) -> None:
|
|
await loop.sock_sendall(sock, struct.pack("!Bfff", spawn_x, spawn_y, spawn_z))
|
|
|
|
# this functions are the only read functions intended to be used
|
|
|
|
async def read(sock) -> tuple:
|
|
packet_id: int = int.from_bytes(await _read_bytes(sock, 1))
|
|
match packet_id:
|
|
case p.C_Quit: return packet_id, await _read_Quit (sock)
|
|
case p.C_SetCoord: return packet_id, await _read_SetCoord(sock)
|
|
case p.C_Chat: return packet_id, await _read_Chat (sock)
|
|
case p.C_ChunkRQ: return packet_id, await _read_ChunkRQ (sock)
|
|
case _: return None, None
|
|
|
|
async def read_Login(sock) -> str:
|
|
nick_len: int = int.from_bytes(await _read_bytes(sock, 1))
|
|
return (await _read_bytes(sock, nick_len)).decode("ascii")
|
|
|
|
# all of these are read helper functions
|
|
|
|
async def _read_bytes(sock, n: int) -> bytes or None:
|
|
buffer: bytes = b""
|
|
while len(buffer) != n:
|
|
data = await loop.sock_recv(sock, n - len(buffer))
|
|
if not data:
|
|
return
|
|
buffer += data
|
|
return buffer
|
|
|
|
async def _read_Quit(sock) -> int:
|
|
return int.from_bytes(await _read_bytes(sock, 1))
|
|
|
|
async def _read_SetCoord(sock) -> tuple[float]:
|
|
return struct.unpack("!fff", await _read_bytes(sock, 12))
|
|
|
|
async def _read_Chat(sock) -> str:
|
|
# yes, i know i could've done this in 1 line, but it's more readable this way - 2o, 22aug2024
|
|
message_len: int = int.from_bytes(await _read_bytes(sock, 2))
|
|
return (await _read_bytes(sock, message_len)).decode("utf-8")
|
|
|
|
async def _read_ChunkRQ(sock) -> tuple[int]:
|
|
return struct.unpack("!hh", await _read_bytes(sock, 4))
|