import socket, asyncio, string import log, packet _ALLOWED_NICK_CHARACTERS: str = string.ascii_letters + string.digits + "_" class Network: def __init__(self, host: str, port: int, max_players: int, world): self.host = host self.port = port self.max_players = max_players self.world = world self.players: dict = {} self.player_coros = set() log.info(f"Starting server at [{self.host}]:{self.port}") self.server = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) # not sure if this will work with link-local adresses self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server.bind((self.host, self.port)) self.server.setblocking(False) async def mainloop(self): loop = asyncio.get_running_loop() await packet.init() self.server.listen(4) # if your accept buffer is larger than 4 you should consider how big your server is(or ur pc is too old lol) log.debug("Listening for connections.") while True: client_sock, client_addr = await loop.sock_accept(self.server) log.debug("Got connection from [%s]:%s" % client_addr[:2]) loop.create_task(self._auth_player(client_sock, client_addr)) async def _auth_player(self, client_sock: socket.socket, client_addr: tuple): await packet.write_Version(client_sock) nick: str = await packet.read_Login(client_sock) if not (3 <= len(nick) <= 15): log.info("Got invalid nick length %s from [%s]:%s" % (len(nick), *client_addr[:2])) client_sock.close() return for char in nick: if char not in _ALLOWED_NICK_CHARACTERS: log.info("Got invalid nick character %s from [%s]:%s" % (ord(char), *client_addr[:2])) client_sock.close() return log.info("[%s]:%s authenticated with nick %s" % (*client_addr[:2], nick)) if nick in self.players: log.info("Disconnecting player because another player with this nickname is logged in.") await packet.write_Disconnect(client_sock, "Already logged in with the same username.") client_sock.close() return if len(self.players) > self.max_players: log.warning("Disconnecting player because the server is full.") await packet.write_Disconnect(client_sock, "The server is full.") client_sock.close() return self.players[nick] = [client_sock, client_addr, nick] # launch new coroutine for the just auth'ed player coro = asyncio.create_task(self._player_coro(nick)) self.player_coros.add(coro) coro.add_done_callback(self.player_coros.discard) async def _player_coro(self, nick: str): sock = self.players[nick][0] while True: packet_type, packet_data = await packet.read(sock) if packet_type is None: log.info(f"{nick} sent an unknown packet, disconnecting.") await packet.write_Disconnect(sock, "Wrong packet received.") sock.close() del self.players[nick] return match packet_type: case packet.p.C_Quit: log.info(f"Player {nick} quit with reason {packet_data}.") sock.close() del self.players[nick] return case packet.p.C_SetCoord: log.error(f"{nick} sent SetCoord, but it isn't implemented, disconnecting.") await packet.write_Disconnect(sock, "SetCoord not implemented.") sock.close() del self.players[nick] return case packet.p.C_Chat: log.info(f"<{nick}> {packet_data}") for player in self.players: await packet.write_Chat(self.players[player][0], packet_data) case packet.p.C_ChunkRQ: await packet.write_Chunk(sock, *packet_data, self.world.get_chunk_data(*packet_data))