import connexion from typing import Dict from typing import Tuple from typing import Union from openapi_server.models.claim_ice_candidates_response import ClaimIceCandidatesResponse # noqa: E501 from openapi_server.models.ice_candidate import IceCandidate # noqa: E501 from openapi_server.models.knock import Knock # noqa: E501 from openapi_server.models.list_knocks_response import ListKnocksResponse # noqa: E501 from openapi_server.models.room import Room # noqa: E501 from openapi_server.models.server import Server # noqa: E501 from openapi_server.models.service import Service # noqa: E501 from openapi_server.models.status import Status # noqa: E501 from openapi_server import util servers = {} rooms = {} knocks = {} sessions = {} def peer_net_service_claim_ice_candidates(session): # noqa: E501 """peer_net_service_claim_ice_candidates Acts as both List and Delete atomically. # noqa: E501 :param session: The session id. :type session: str :param name: :type name: str :rtype: Union[ClaimIceCandidatesResponse, Tuple[ClaimIceCandidatesResponse, int], Tuple[ClaimIceCandidatesResponse, int, Dict[str, str]] """ if session not in sessions: return 'not found', 404 candidates = [sessions[session][k] for k in sessions[session]] sessions[session] = {} return ClaimIceCandidatesResponse(ice_candidates=candidates) def peer_net_service_create_ice_candidate(session, ice_candidate=None, target_session=None): # noqa: E501 """peer_net_service_create_ice_candidate # noqa: E501 :param session: The session id. :type session: str :param ice_candidate: :type ice_candidate: dict | bytes :param target_session: :type target_session: str :rtype: Union[IceCandidate, Tuple[IceCandidate, int], Tuple[IceCandidate, int, Dict[str, str]] """ if connexion.request.is_json: ice_candidate = IceCandidate.from_dict(connexion.request.get_json()) # noqa: E501 sessions[session][ice_candidate.name] = ice_candidate return ice_candidate def peer_net_service_create_knock(server, service, knock=None): # noqa: E501 """peer_net_service_create_knock Creates a knock that will be answered by the peer; the signaler may delete the knock, regardless of the state, when it ages. # noqa: E501 :param server: The server id. :type server: str :param service: The service id. :type service: str :param knock: :type knock: dict | bytes :rtype: Union[Knock, Tuple[Knock, int], Tuple[Knock, int, Dict[str, str]] """ if connexion.request.is_json: knock = Knock.from_dict(connexion.request.get_json()) # noqa: E501 print(knock) if server not in servers or service not in [s.name for s in servers[server].services]: return 'not found', 404 knocks[server][service][knock.name] = knock sessions[knock.offer.name] = {} return knock def peer_net_service_create_server(server=None): # noqa: E501 """peer_net_service_create_server # noqa: E501 :param server: :type server: dict | bytes :rtype: Union[Server, Tuple[Server, int], Tuple[Server, int, Dict[str, str]] """ if connexion.request.is_json: server = Server.from_dict(connexion.request.get_json()) # noqa: E501 servers[server.name] = server for room in server.rooms: if room not in rooms: rooms[room] = {} rooms[room][server.name] = server knocks[server.name] = {} for service in server.services: knocks[server.name][service.name] = {} return server def peer_net_service_create_service(server, service): # noqa: E501 """peer_net_service_create_service # noqa: E501 :param server: The server id. :type server: str :param service: :type service: dict | bytes :rtype: Union[Service, Tuple[Service, int], Tuple[Service, int, Dict[str, str]] """ if connexion.request.is_json: service = Service.from_dict(connexion.request.get_json()) # noqa: E501 return 'do some magic!' def peer_net_service_delete_server(name=None): # noqa: E501 """peer_net_service_delete_server # noqa: E501 :param name: :type name: str :rtype: Union[None, Tuple[None, int], Tuple[None, int, Dict[str, str]] """ return 'do some magic!' def peer_net_service_delete_service(server, service): # noqa: E501 """peer_net_service_delete_service # noqa: E501 :param server: The server id. :type server: str :param service: The service id. :type service: str :rtype: Union[None, Tuple[None, int], Tuple[None, int, Dict[str, str]] """ return 'do some magic!' def peer_net_service_get_knock(server, service, knock): # noqa: E501 """peer_net_service_get_knock # noqa: E501 :param server: The server id. :type server: str :param service: The service id. :type service: str :param knock: The knock id. :type knock: str :rtype: Union[Knock, Tuple[Knock, int], Tuple[Knock, int, Dict[str, str]] """ if server not in knocks or service not in knocks[server] or knock not in knocks[server][service]: return 'not found', 404 return knocks[server][service][knock] def peer_net_service_get_room(room): # noqa: E501 """peer_net_service_get_room # noqa: E501 :param room: The room id. :type room: str :rtype: Union[Room, Tuple[Room, int], Tuple[Room, int, Dict[str, str]] """ if room not in rooms: return 'not found', 404 s = [] for server in rooms[room]: s.append(servers[server]) return Room(name=room, servers=s) def peer_net_service_list_knocks(server, service): # noqa: E501 """peer_net_service_list_knocks # noqa: E501 :param server: The server id. :type server: str :param service: The service id. :type service: str :rtype: Union[ListKnocksResponse, Tuple[ListKnocksResponse, int], Tuple[ListKnocksResponse, int, Dict[str, str]] """ if server not in knocks or service not in knocks[server]: return 'not found', 404 return ListKnocksResponse(knocks=[knocks[server][service][k] for k in knocks[server][service]]) def peer_net_service_update_knock(server, service, knock, knock2=None): # noqa: E501 """peer_net_service_update_knock # noqa: E501 :param server: The server id. :type server: str :param service: The service id. :type service: str :param knock: The knock id. :type knock: str :param knock2: :type knock2: dict | bytes :rtype: Union[Knock, Tuple[Knock, int], Tuple[Knock, int, Dict[str, str]] """ if connexion.request.is_json: knock2 = Knock.from_dict(connexion.request.get_json()) # noqa: E501 if server not in servers or service not in [s.name for s in servers[server].services]: return 'not found', 404 knocks[server][service][knock] = knock2 sessions[knock2.answer.name] = {} return knock2