228 lines
6.8 KiB
Python
228 lines
6.8 KiB
Python
import connexion
|
|
from typing import Dict
|
|
from typing import Tuple
|
|
from typing import Union
|
|
|
|
from openapi_server.models.claim_ice_candidates_response import ClaimIceCandidatesResponse # noqa: E501
|
|
from openapi_server.models.ice_candidate import IceCandidate # noqa: E501
|
|
from openapi_server.models.knock import Knock # noqa: E501
|
|
from openapi_server.models.list_knocks_response import ListKnocksResponse # noqa: E501
|
|
from openapi_server.models.room import Room # noqa: E501
|
|
from openapi_server.models.server import Server # noqa: E501
|
|
from openapi_server.models.service import Service # noqa: E501
|
|
from openapi_server.models.status import Status # noqa: E501
|
|
from openapi_server import util
|
|
|
|
servers = {}
|
|
rooms = {}
|
|
knocks = {}
|
|
sessions = {}
|
|
|
|
def peer_net_service_claim_ice_candidates(session): # noqa: E501
|
|
"""peer_net_service_claim_ice_candidates
|
|
|
|
Acts as both List and Delete atomically. # noqa: E501
|
|
|
|
:param session: The session id.
|
|
:type session: str
|
|
:param name:
|
|
:type name: str
|
|
|
|
:rtype: Union[ClaimIceCandidatesResponse, Tuple[ClaimIceCandidatesResponse, int], Tuple[ClaimIceCandidatesResponse, int, Dict[str, str]]
|
|
"""
|
|
if session not in sessions:
|
|
return 'not found', 404
|
|
candidates = [sessions[session][k] for k in sessions[session]]
|
|
sessions[session] = {}
|
|
return ClaimIceCandidatesResponse(ice_candidates=candidates)
|
|
|
|
|
|
def peer_net_service_create_ice_candidate(session, ice_candidate=None, target_session=None): # noqa: E501
|
|
"""peer_net_service_create_ice_candidate
|
|
|
|
# noqa: E501
|
|
|
|
:param session: The session id.
|
|
:type session: str
|
|
:param ice_candidate:
|
|
:type ice_candidate: dict | bytes
|
|
:param target_session:
|
|
:type target_session: str
|
|
|
|
:rtype: Union[IceCandidate, Tuple[IceCandidate, int], Tuple[IceCandidate, int, Dict[str, str]]
|
|
"""
|
|
if connexion.request.is_json:
|
|
ice_candidate = IceCandidate.from_dict(connexion.request.get_json()) # noqa: E501
|
|
sessions[session][ice_candidate.name] = ice_candidate
|
|
return ice_candidate
|
|
|
|
|
|
def peer_net_service_create_knock(server, service, knock=None): # noqa: E501
|
|
"""peer_net_service_create_knock
|
|
|
|
Creates a knock that will be answered by the peer; the signaler may delete the knock, regardless of the state, when it ages. # noqa: E501
|
|
|
|
:param server: The server id.
|
|
:type server: str
|
|
:param service: The service id.
|
|
:type service: str
|
|
:param knock:
|
|
:type knock: dict | bytes
|
|
|
|
:rtype: Union[Knock, Tuple[Knock, int], Tuple[Knock, int, Dict[str, str]]
|
|
"""
|
|
if connexion.request.is_json:
|
|
knock = Knock.from_dict(connexion.request.get_json()) # noqa: E501
|
|
print(knock)
|
|
if server not in servers or service not in [s.name for s in servers[server].services]:
|
|
return 'not found', 404
|
|
knocks[server][service][knock.name] = knock
|
|
sessions[knock.offer.name] = {}
|
|
return knock
|
|
|
|
|
|
def peer_net_service_create_server(server=None): # noqa: E501
|
|
"""peer_net_service_create_server
|
|
|
|
# noqa: E501
|
|
|
|
:param server:
|
|
:type server: dict | bytes
|
|
|
|
:rtype: Union[Server, Tuple[Server, int], Tuple[Server, int, Dict[str, str]]
|
|
"""
|
|
if connexion.request.is_json:
|
|
server = Server.from_dict(connexion.request.get_json()) # noqa: E501
|
|
servers[server.name] = server
|
|
for room in server.rooms:
|
|
if room not in rooms:
|
|
rooms[room] = {}
|
|
rooms[room][server.name] = server
|
|
knocks[server.name] = {}
|
|
for service in server.services:
|
|
knocks[server.name][service.name] = {}
|
|
return server
|
|
|
|
|
|
def peer_net_service_create_service(server, service): # noqa: E501
|
|
"""peer_net_service_create_service
|
|
|
|
# noqa: E501
|
|
|
|
:param server: The server id.
|
|
:type server: str
|
|
:param service:
|
|
:type service: dict | bytes
|
|
|
|
:rtype: Union[Service, Tuple[Service, int], Tuple[Service, int, Dict[str, str]]
|
|
"""
|
|
if connexion.request.is_json:
|
|
service = Service.from_dict(connexion.request.get_json()) # noqa: E501
|
|
return 'do some magic!'
|
|
|
|
|
|
def peer_net_service_delete_server(name=None): # noqa: E501
|
|
"""peer_net_service_delete_server
|
|
|
|
# noqa: E501
|
|
|
|
:param name:
|
|
:type name: str
|
|
|
|
:rtype: Union[None, Tuple[None, int], Tuple[None, int, Dict[str, str]]
|
|
"""
|
|
return 'do some magic!'
|
|
|
|
|
|
def peer_net_service_delete_service(server, service): # noqa: E501
|
|
"""peer_net_service_delete_service
|
|
|
|
# noqa: E501
|
|
|
|
:param server: The server id.
|
|
:type server: str
|
|
:param service: The service id.
|
|
:type service: str
|
|
|
|
:rtype: Union[None, Tuple[None, int], Tuple[None, int, Dict[str, str]]
|
|
"""
|
|
return 'do some magic!'
|
|
|
|
|
|
def peer_net_service_get_knock(server, service, knock): # noqa: E501
|
|
"""peer_net_service_get_knock
|
|
|
|
# noqa: E501
|
|
|
|
:param server: The server id.
|
|
:type server: str
|
|
:param service: The service id.
|
|
:type service: str
|
|
:param knock: The knock id.
|
|
:type knock: str
|
|
|
|
:rtype: Union[Knock, Tuple[Knock, int], Tuple[Knock, int, Dict[str, str]]
|
|
"""
|
|
if server not in knocks or service not in knocks[server] or knock not in knocks[server][service]:
|
|
return 'not found', 404
|
|
return knocks[server][service][knock]
|
|
|
|
def peer_net_service_get_room(room): # noqa: E501
|
|
"""peer_net_service_get_room
|
|
|
|
# noqa: E501
|
|
|
|
:param room: The room id.
|
|
:type room: str
|
|
|
|
:rtype: Union[Room, Tuple[Room, int], Tuple[Room, int, Dict[str, str]]
|
|
"""
|
|
if room not in rooms:
|
|
return 'not found', 404
|
|
s = []
|
|
for server in rooms[room]:
|
|
s.append(servers[server])
|
|
return Room(name=room, servers=s)
|
|
|
|
|
|
def peer_net_service_list_knocks(server, service): # noqa: E501
|
|
"""peer_net_service_list_knocks
|
|
|
|
# noqa: E501
|
|
|
|
:param server: The server id.
|
|
:type server: str
|
|
:param service: The service id.
|
|
:type service: str
|
|
|
|
:rtype: Union[ListKnocksResponse, Tuple[ListKnocksResponse, int], Tuple[ListKnocksResponse, int, Dict[str, str]]
|
|
"""
|
|
if server not in knocks or service not in knocks[server]:
|
|
return 'not found', 404
|
|
return ListKnocksResponse(knocks=[knocks[server][service][k] for k in knocks[server][service]])
|
|
|
|
|
|
def peer_net_service_update_knock(server, service, knock, knock2=None): # noqa: E501
|
|
"""peer_net_service_update_knock
|
|
|
|
# noqa: E501
|
|
|
|
:param server: The server id.
|
|
:type server: str
|
|
:param service: The service id.
|
|
:type service: str
|
|
:param knock: The knock id.
|
|
:type knock: str
|
|
:param knock2:
|
|
:type knock2: dict | bytes
|
|
|
|
:rtype: Union[Knock, Tuple[Knock, int], Tuple[Knock, int, Dict[str, str]]
|
|
"""
|
|
if connexion.request.is_json:
|
|
knock2 = Knock.from_dict(connexion.request.get_json()) # noqa: E501
|
|
if server not in servers or service not in [s.name for s in servers[server].services]:
|
|
return 'not found', 404
|
|
knocks[server][service][knock] = knock2
|
|
sessions[knock2.answer.name] = {}
|
|
return knock2
|