Source code for awstin.apigateway.websocket

import boto3


[docs]class Websocket: """ Serverless-to-client push via websocket """ def __init__(self, domain_name, stage=None): """ Parameters ---------- connection_id : str Callback URL of the websocket """ endpoint_url = f"https://{domain_name}" if stage: endpoint_url += f"/{stage}" self.api_client = boto3.client( "apigatewaymanagementapi", endpoint_url=endpoint_url, )
[docs] def send(self, connection_id, message): """ Send a message to the user """ self.api_client.post_to_connection( Data=message, ConnectionId=connection_id, )