call_manager.py =========== """ Discord Audio Bridge - Call Manager Manages active calls, call state, and integration between dialplan and profiles """ import asyncio import logging import time import uuid from typing import Dict, List, Optional, Set, Any, Callable from dataclasses import dataclass, field from enum import Enum from datetime import datetime, timedelta from .dialplan import get_dialplan, DialplanResult, RoutingTarget from .profiles import get_profile_manager, SIPProfile from .config import get_config logger = logging.getLogger(__name__) class CallState(Enum): """Call state enumeration""" INITIATING = "initiating" CONNECTING = "connecting" RINGING = "ringing" CONNECTED = "connected" TRANSFERRING = "transferring" HOLDING = "holding" ENDING = "ending" ENDED = "ended" FAILED = "failed" class CallDirection(Enum): """Call direction enumeration""" INBOUND = "inbound" # SIP -> Discord OUTBOUND = "outbound" # Discord -> SIP class CallType(Enum): """Call type enumeration""" PRIVATE = "private" # 1:1 Discord user to SIP CHANNEL = "channel" # Discord channel to SIP CONFERENCE = "conference" # Multi-party @dataclass class CallParticipant: """Represents a participant in a call""" id: str type: str # 'discord_user', 'discord_channel', 'sip' name: str connected: bool = False muted: bool = False joined_at: Optional[datetime] = None @dataclass class ActiveCall: """Represents an active call""" call_id: str direction: CallDirection call_type: CallType state: CallState # Participants participants: List[CallParticipant] = field(default_factory=list) # Routing information dialplan_result: Optional[DialplanResult] = None profile: Optional[SIPProfile] = None # Call details original_target: str = "" sip_uri: str = "" discord_target: str = "" # Timing created_at: datetime = field(default_factory=datetime.now) connected_at: Optional[datetime] = None ended_at: Optional[datetime] = None # Audio audio_bridge_id: Optional[str] = None recording_enabled: bool = False # Metadata headers: Dict[str, str] = field(default_factory=dict) caller_id: Optional[str] = None metadata: Dict[str, Any] = field(default_factory=dict) @property def duration(self) -> Optional[timedelta]: """Get call duration""" if not self.connected_at: return None end_time = self.ended_at or datetime.now() return end_time - self.connected_at @property def is_active(self) -> bool: """Check if call is in active state""" return self.state in [CallState.CONNECTING, CallState.RINGING, CallState.CONNECTED, CallState.HOLDING] class CallManager: """ Manages all active calls and call state Integrates dialplan routing with profile management """ def __init__(self): self.config = get_config() self.dialplan = get_dialplan() self.profile_manager = get_profile_manager() # Active calls self.active_calls: Dict[str, ActiveCall] = {} # Call event callbacks self.event_callbacks: Dict[str, List[Callable]] = { 'call_initiated': [], 'call_connected': [], 'call_ended': [], 'call_failed': [], 'participant_joined': [], 'participant_left': [] } logger.info("Call manager initialized") def register_callback(self, event: str, callback: Callable): """Register callback for call events""" if event in self.event_callbacks: self.event_callbacks[event].append(callback) def _fire_event(self, event: str, call: ActiveCall, **kwargs): """Fire call event to registered callbacks""" for callback in self.event_callbacks.get(event, []): try: callback(call, **kwargs) except Exception as e: logger.error(f"Error in event callback {event}: {e}") async def initiate_inbound_call(self, sip_uri: str, call_data: Dict[str, Any]) -> ActiveCall: """ Initiate an inbound call (SIP -> Discord) Args: sip_uri: The SIP URI being called call_data: Call data from SIP/SignalWire Returns: ActiveCall object """ call_id = str(uuid.uuid4()) logger.info(f"Initiating inbound call {call_id}: {sip_uri}") # Route the call using dialplan dialplan_result = self.dialplan.route_sip_uri(sip_uri) if not dialplan_result.success: logger.error(f"Dialplan routing failed for {sip_uri}: {dialplan_result.error_message}") raise ValueError(f"No routing found for {sip_uri}") # Create call object call = ActiveCall( call_id=call_id, direction=CallDirection.INBOUND, call_type=CallType.CHANNEL, # Most inbound calls go to channels state=CallState.INITIATING, dialplan_result=dialplan_result, original_target=sip_uri, sip_uri=sip_uri, discord_target=dialplan_result.target.discord_target, caller_id=call_data.get('from', 'Unknown'), headers=call_data.get('headers', {}), metadata=call_data ) # Add SIP participant sip_participant = CallParticipant( id=call_data.get('from', 'sip_caller'), type='sip', name=call_data.get('from', 'SIP Caller') ) call.participants.append(sip_participant) # Store call self.active_calls[call_id] = call # Fire event self._fire_event('call_initiated', call) logger.info(f"Inbound call {call_id} initiated: {sip_uri} -> {call.discord_target}") return call async def initiate_outbound_call(self, discord_user_id: str, channel_name: str, target: str, profile_name: Optional[str] = None, call_type: CallType = CallType.PRIVATE, user_roles: Optional[Set[str]] = None) -> ActiveCall: """ Initiate an outbound call (Discord -> SIP) Args: discord_user_id: Discord user initiating the call channel_name: Discord channel name target: SIP target (phone number, SIP URI, etc.) profile_name: SIP profile to use (optional) call_type: Type of call (private, channel, conference) user_roles: User's Discord roles Returns: ActiveCall object """ call_id = str(uuid.uuid4()) user_roles = user_roles or set() logger.info(f"Initiating outbound call {call_id}: {discord_user_id} -> {target}") # Get available profiles for user/channel available_profiles = self.profile_manager.get_available_profiles(channel_name, user_roles) if not available_profiles: raise ValueError(f"No SIP profiles available for user in channel {channel_name}") # Select profile if profile_name: profile = self.profile_manager.get_profile(profile_name) if not profile or not profile.can_access(channel_name, user_roles): raise ValueError(f"Profile {profile_name} not accessible") else: profile = self.profile_manager.get_default_profile(channel_name, user_roles) if not profile: raise ValueError("No suitable SIP profile found") # Validate target if not self.profile_manager.validate_call_target(profile, target): raise ValueError(f"Call target {target} not allowed for profile {profile.name}") # Generate SIP URI sip_uri = profile.get_sip_uri(target) # Create call object call = ActiveCall( call_id=call_id, direction=CallDirection.OUTBOUND, call_type=call_type, state=CallState.INITIATING, profile=profile, original_target=target, sip_uri=sip_uri, discord_target=channel_name if call_type == CallType.CHANNEL else f"@{discord_user_id}", metadata={"initiated_by": discord_user_id} ) # Add Discord participant discord_participant = CallParticipant( id=discord_user_id, type='discord_user', name=discord_user_id ) call.participants.append(discord_participant) # Store call self.active_calls[call_id] = call # Fire event self._fire_event('call_initiated', call) logger.info(f"Outbound call {call_id} initiated: {discord_user_id} -> {sip_uri} via {profile.name}") return call async def connect_call(self, call_id: str, audio_bridge_id: Optional[str] = None): """Mark call as connected and start audio bridge""" call = self.active_calls.get(call_id) if not call: raise ValueError(f"Call {call_id} not found") call.state = CallState.CONNECTED call.connected_at = datetime.now() call.audio_bridge_id = audio_bridge_id # Mark participants as connected for participant in call.participants: participant.connected = True participant.joined_at = datetime.now() self._fire_event('call_connected', call) logger.info(f"Call {call_id} connected") async def add_participant(self, call_id: str, participant: CallParticipant): """Add participant to active call""" call = self.active_calls.get(call_id) if not call: raise ValueError(f"Call {call_id} not found") call.participants.append(participant) participant.joined_at = datetime.now() # Update call type if needed if len(call.participants) > 2: call.call_type = CallType.CONFERENCE self._fire_event('participant_joined', call, participant=participant) logger.info(f"Participant {participant.name} added to call {call_id}") async def remove_participant(self, call_id: str, participant_id: str): """Remove participant from active call""" call = self.active_calls.get(call_id) if not call: raise ValueError(f"Call {call_id} not found") participant = None for p in call.participants: if p.id == participant_id: participant = p break if participant: call.participants.remove(participant) self._fire_event('participant_left', call, participant=participant) logger.info(f"Participant {participant.name} removed from call {call_id}") # End call if no participants left if len(call.participants) <= 1: await self.end_call(call_id, "No participants remaining") async def transfer_call(self, call_id: str, new_target: str): """Transfer call to new target""" call = self.active_calls.get(call_id) if not call: raise ValueError(f"Call {call_id} not found") call.state = CallState.TRANSFERRING # Route new target if call.direction == CallDirection.INBOUND: # Transfer to different Discord target dialplan_result = self.dialplan.route_sip_uri(f"sip:{new_target}@{self.config.default_sip_domain}") if dialplan_result.success: call.discord_target = dialplan_result.target.discord_target call.dialplan_result = dialplan_result else: # Transfer to different SIP target if call.profile: call.sip_uri = call.profile.get_sip_uri(new_target) call.original_target = new_target logger.info(f"Call {call_id} transferred to {new_target}") async def end_call(self, call_id: str, reason: str = "User request"): """End an active call""" call = self.active_calls.get(call_id) if not call: return # Call already ended call.state = CallState.ENDING call.ended_at = datetime.now() # Mark participants as disconnected for participant in call.participants: participant.connected = False # Remove from active calls del self.active_calls[call_id] self._fire_event('call_ended', call, reason=reason) duration = call.duration logger.info(f"Call {call_id} ended: {reason} (Duration: {duration})") async def fail_call(self, call_id: str, error: str): """Mark call as failed""" call = self.active_calls.get(call_id) if not call: return call.state = CallState.FAILED call.ended_at = datetime.now() call.metadata['error'] = error # Remove from active calls del self.active_calls[call_id] self._fire_event('call_failed', call, error=error) logger.error(f"Call {call_id} failed: {error}") def get_call(self, call_id: str) -> Optional[ActiveCall]: """Get call by ID""" return self.active_calls.get(call_id) def get_calls_by_discord_user(self, user_id: str) -> List[ActiveCall]: """Get all active calls for Discord user""" calls = [] for call in self.active_calls.values(): for participant in call.participants: if participant.type == 'discord_user' and participant.id == user_id: calls.append(call) break return calls def get_calls_by_channel(self, channel_name: str) -> List[ActiveCall]: """Get all active calls for Discord channel""" calls = [] for call in self.active_calls.values(): if call.discord_target == channel_name: calls.append(call) return calls def get_active_calls(self) -> List[ActiveCall]: """Get all active calls""" return list(self.active_calls.values()) def get_call_statistics(self) -> Dict[str, Any]: """Get call statistics""" active_calls = self.get_active_calls() return { "total_active": len(active_calls), "inbound_active": len([c for c in active_calls if c.direction == CallDirection.INBOUND]), "outbound_active": len([c for c in active_calls if c.direction == CallDirection.OUTBOUND]), "private_calls": len([c for c in active_calls if c.call_type == CallType.PRIVATE]), "channel_calls": len([c for c in active_calls if c.call_type == CallType.CHANNEL]), "conference_calls": len([c for c in active_calls if c.call_type == CallType.CONFERENCE]), "oldest_call": min([c.created_at for c in active_calls], default=None), "total_participants": sum([len(c.participants) for c in active_calls]) } # Global call manager instance _call_manager_instance = None def get_call_manager() -> CallManager: """Get the global call manager instance""" global _call_manager_instance if _call_manager_instance is None: _call_manager_instance = CallManager() return _call_manager_instance config.py =========== """ Discord Audio Bridge - Configuration Management Supports both JSON configuration files and environment variables """ import os import json import logging from typing import List, Dict, Any, Optional, Union from dataclasses import dataclass from pathlib import Path logger = logging.getLogger(__name__) @dataclass class RoutingRule: """Represents a routing rule for call handling""" name: str pattern: str type: str # 'user', 'channel', 'channel_pattern' target_channel: Optional[str] = None target_user: Optional[str] = None sip_target: str = "" auth_required: bool = False headers: Optional[Dict[str, str]] = None class Config: """Main configuration class for Discord Audio Bridge""" def __init__(self, config_file: Optional[str] = None): self.config_data = {} # Try to load from JSON file first, then environment variables if config_file and Path(config_file).exists(): self._load_from_json(config_file) else: # Try default config file locations for config_path in ['config.json', './config.json', '/app/config.json']: if Path(config_path).exists(): self._load_from_json(config_path) break # Load/override with environment variables self._load_from_env() # Initialize configuration values self._init_config() logger.info(f"Configuration loaded: {len(self.routing_rules)} routing rules") def _load_from_json(self, config_file: str): """Load configuration from JSON file""" try: with open(config_file, 'r') as f: self.config_data = json.load(f) logger.info(f"Loaded configuration from {config_file}") except Exception as e: logger.error(f"Failed to load config from {config_file}: {e}") self.config_data = {} def _load_from_env(self): """Load configuration from environment variables (overrides JSON)""" env_mapping = { # Discord settings 'DISCORD_TOKEN': ['discord', 'token'], 'DISCORD_GUILD_ID': ['discord', 'guild_id'], 'DISCORD_COMMAND_PREFIX': ['discord', 'command_prefix'], # SIP settings 'SIP_DOMAIN': ['sip', 'domain'], 'SIP_USERNAME': ['sip', 'username'], 'SIP_PASSWORD': ['sip', 'password'], 'SIP_PORT': ['sip', 'port'], 'SIP_TRANSPORT': ['sip', 'transport'], # SignalWire settings 'SIGNALWIRE_PROJECT_ID': ['signalwire', 'project_id'], 'SIGNALWIRE_TOKEN': ['signalwire', 'token'], 'SIGNALWIRE_SPACE': ['signalwire', 'space'], # Audio settings 'AUDIO_BUFFER_SIZE': ['audio', 'buffer_size'], 'AUDIO_SAMPLE_RATE': ['audio', 'sample_rate'], 'AUDIO_CHANNELS': ['audio', 'channels'], # Logging settings 'LOG_LEVEL': ['logging', 'level'], 'LOG_FILE': ['logging', 'file'], # Legacy environment variables for backward compatibility 'DEFAULT_SIP_DOMAIN': ['sip', 'domain'], 'BRIDGE_AUTH_USER': ['sip', 'username'], 'BRIDGE_AUTH_PASS': ['sip', 'password'], 'WEBHOOK_TOKEN': ['security', 'webhook', 'auth_token'], } for env_var, config_path in env_mapping.items(): value = os.getenv(env_var) if value: self._set_nested_value(self.config_data, config_path, value) def _set_nested_value(self, data: dict, path: List[str], value: Any): """Set a nested dictionary value using a path""" current = data for key in path[:-1]: if key not in current: current[key] = {} current = current[key] # Convert string values to appropriate types if isinstance(value, str): if value.lower() in ('true', 'false'): value = value.lower() == 'true' elif value.isdigit(): value = int(value) current[path[-1]] = value def _init_config(self): """Initialize configuration with defaults and loaded values""" # Discord configuration discord_config = self.config_data.get('discord', {}) self.discord_token = discord_config.get('token', '') self.discord_guild_id = discord_config.get('guild_id', '') self.discord_command_prefix = discord_config.get('command_prefix', '!bridge ') # SIP configuration sip_config = self.config_data.get('sip', {}) self.default_sip_domain = sip_config.get('domain', 'discord-bridge.com') self.bridge_auth_user = sip_config.get('username', 'bridge_user') self.bridge_auth_pass = sip_config.get('password', 'bridge_pass') self.sip_port = sip_config.get('port', 5060) self.sip_transport = sip_config.get('transport', 'udp') # SWML service configuration swml_config = self.config_data.get('swml', {}) self.swml_host = swml_config.get('host', '0.0.0.0') self.swml_port = swml_config.get('port', 9090) # Audio configuration audio_config = self.config_data.get('audio', {}) self.audio_buffer_size = audio_config.get('buffer_size', 16384) self.audio_sample_rate = audio_config.get('sample_rate', 48000) self.audio_channels = audio_config.get('channels', 2) self.audio_bit_depth = audio_config.get('bit_depth', 16) # Security configuration security_config = self.config_data.get('security', {}) webhook_config = security_config.get('webhook', {}) self.webhook_token = webhook_config.get('auth_token', 'your-webhook-token') # Logging configuration logging_config = self.config_data.get('logging', {}) self.log_level = logging_config.get('level', 'INFO') self.log_file = logging_config.get('file', 'discord_bridge.log') # Load routing rules self.routing_rules: List[RoutingRule] = [] self.load_routing_rules() def get(self, key: str, default: Any = None) -> Any: """Get a configuration value using dot notation (e.g., 'discord.token')""" keys = key.split('.') current = self.config_data for k in keys: if isinstance(current, dict) and k in current: current = current[k] else: return default return current def load_routing_rules(self): """Load routing rules from configuration""" routing_config = self.config_data.get('routing', {}) rules_data = routing_config.get('rules', []) # If no rules in config, use defaults if not rules_data: default_rules = [ { 'name': 'Josh StarWars User', 'pattern': r'josh_starwars', 'type': 'user', 'target_user': '@josh_discord_user', 'sip_target': f'sip:josh@{self.default_sip_domain}', 'auth_required': True }, { 'name': 'General Channel', 'pattern': r'general', 'type': 'channel', 'target_channel': '#general-voice', 'sip_target': f'sip:general@{self.default_sip_domain}', 'auth_required': True }, { 'name': 'Support Channels', 'pattern': r'support.*', 'type': 'channel_pattern', 'target_channel': '#support-voice', 'sip_target': f'sip:support@{self.default_sip_domain}', 'auth_required': True }, { 'name': 'Fallback Rule', 'pattern': r'.*', 'type': 'channel', 'target_channel': '#general-voice', 'sip_target': f'sip:default@{self.default_sip_domain}', 'auth_required': True } ] rules_data = default_rules # Convert to RoutingRule objects for rule_data in rules_data: try: rule = RoutingRule( name=rule_data.get('name', 'Unnamed Rule'), pattern=rule_data.get('pattern', '.*'), type=rule_data.get('type', 'channel'), target_channel=rule_data.get('target_channel'), target_user=rule_data.get('target_user'), sip_target=rule_data.get('sip_target', ''), auth_required=rule_data.get('auth_required', False), headers=rule_data.get('headers') ) self.routing_rules.append(rule) except Exception as e: logger.error(f"Failed to load routing rule: {e}") def validate(self) -> bool: """Validate configuration""" errors = [] # Required Discord settings if not self.discord_token: errors.append("Discord token is required") # SIP settings validation if not self.default_sip_domain: errors.append("SIP domain is required") # Audio settings validation if self.audio_buffer_size <= 0: errors.append("Audio buffer size must be positive") if self.audio_sample_rate not in [8000, 16000, 22050, 44100, 48000]: errors.append("Invalid audio sample rate") # Routing rules validation if not self.routing_rules: errors.append("No routing rules configured") # Log errors for error in errors: logger.error(f"Configuration validation error: {error}") return len(errors) == 0 def to_dict(self) -> Dict[str, Any]: """Convert configuration to dictionary""" return { 'discord': { 'token': '***' if self.discord_token else '', # Hide sensitive data 'guild_id': self.discord_guild_id, 'command_prefix': self.discord_command_prefix }, 'sip': { 'domain': self.default_sip_domain, 'username': self.bridge_auth_user, 'password': '***' if self.bridge_auth_pass else '', # Hide sensitive data 'port': self.sip_port, 'transport': self.sip_transport }, 'signalwire': { 'project_id': self.signalwire_project_id, 'token': '***' if self.signalwire_token else '', # Hide sensitive data 'space': self.signalwire_space }, 'audio': { 'buffer_size': self.audio_buffer_size, 'sample_rate': self.audio_sample_rate, 'channels': self.audio_channels, 'bit_depth': self.audio_bit_depth }, 'routing_rules_count': len(self.routing_rules), 'log_level': self.log_level } # Global config instance _config_instance = None def load_config(config_file: Optional[str] = None) -> Config: """Load and return global configuration instance""" global _config_instance if _config_instance is None: _config_instance = Config(config_file) if not _config_instance.validate(): logger.warning("Configuration validation failed - some features may not work") return _config_instance def get_config() -> Config: """Get the current configuration instance""" if _config_instance is None: return load_config() return _config_instance def reload_config(config_file: Optional[str] = None): """Reload configuration from file""" global _config_instance _config_instance = None return load_config(config_file) dialplan.py =========== """ Discord Audio Bridge - Dialplan Engine Complete dialplan routing system with address mapping, user mapping, and pattern matching """ import re import logging from typing import Dict, List, Optional, Tuple, Any, Union from dataclasses import dataclass from enum import Enum from .config import get_config logger = logging.getLogger(__name__) class RoutingType(Enum): """Types of routing targets""" CHANNEL = "channel" USER = "user" CHANNEL_PATTERN = "channel_pattern" EXTENSION = "extension" DEFAULT = "default" @dataclass class RoutingTarget: """Represents a routing destination""" type: RoutingType discord_target: str # Discord channel (#general-voice) or user (@username) sip_target: str # SIP URI for the target auth_required: bool = True headers: Optional[Dict[str, str]] = None description: Optional[str] = None @dataclass class DialplanResult: """Result of dialplan lookup""" target: RoutingTarget matched_pattern: str rule_name: str success: bool = True error_message: Optional[str] = None class DialplanEngine: """ Core dialplan engine that handles routing logic Maps incoming SIP URIs to Discord channels/users """ def __init__(self): self.config = get_config() self.domain = self.config.default_sip_domain # Load routing configurations self.channel_mapping = {} self.user_mapping = {} self.extension_mapping = {} self.pattern_rules = [] self.default_target = None self._load_routing_rules() logger.info(f"Dialplan engine initialized with {len(self.channel_mapping)} channels, " f"{len(self.user_mapping)} users, {len(self.extension_mapping)} extensions, " f"{len(self.pattern_rules)} patterns") def _load_routing_rules(self): """Load routing rules from configuration""" # Load from config routing rules for rule in self.config.routing_rules: target = RoutingTarget( type=RoutingType(rule.type), discord_target=rule.target_channel or rule.target_user or "#general-voice", sip_target=rule.sip_target, auth_required=rule.auth_required, headers=rule.headers, description=rule.name ) if rule.type == "channel": # Direct channel mapping self.channel_mapping[rule.pattern] = target elif rule.type == "user": # User mapping self.user_mapping[rule.pattern] = target elif rule.type == "extension": # Extension-based routing self.extension_mapping[rule.pattern] = target elif rule.type == "channel_pattern": # Regex pattern rules self.pattern_rules.append((rule.pattern, target, rule.name)) elif rule.type == "default": # Default fallback self.default_target = target # Add default mappings if not configured if not self.channel_mapping and not self.user_mapping: self._add_default_mappings() def _add_default_mappings(self): """Add default routing mappings""" logger.info("Adding default routing mappings") # Default channel mappings default_channels = { "general": RoutingTarget( type=RoutingType.CHANNEL, discord_target="#general-voice", sip_target=f"sip:general@{self.domain}", description="General voice channel" ), "support": RoutingTarget( type=RoutingType.CHANNEL, discord_target="#support-voice", sip_target=f"sip:support@{self.domain}", description="Support voice channel" ), "sales": RoutingTarget( type=RoutingType.CHANNEL, discord_target="#sales-voice", sip_target=f"sip:sales@{self.domain}", description="Sales voice channel" ) } self.channel_mapping.update(default_channels) # Default extension mappings default_extensions = { "1001": RoutingTarget( type=RoutingType.EXTENSION, discord_target="#general-voice", sip_target=f"sip:1001@{self.domain}", description="Extension 1001 - General" ), "1002": RoutingTarget( type=RoutingType.EXTENSION, discord_target="#support-voice", sip_target=f"sip:1002@{self.domain}", description="Extension 1002 - Support" ) } self.extension_mapping.update(default_extensions) # Default pattern rules self.pattern_rules.extend([ (r"support-.*", RoutingTarget( type=RoutingType.CHANNEL_PATTERN, discord_target="#support-voice", sip_target=f"sip:support@{self.domain}", description="Support pattern routing" ), "Support Pattern"), (r"sales-.*", RoutingTarget( type=RoutingType.CHANNEL_PATTERN, discord_target="#sales-voice", sip_target=f"sip:sales@{self.domain}", description="Sales pattern routing" ), "Sales Pattern") ]) # Default fallback if not self.default_target: self.default_target = RoutingTarget( type=RoutingType.DEFAULT, discord_target="#general-voice", sip_target=f"sip:default@{self.domain}", description="Default fallback routing" ) def route_sip_uri(self, sip_uri: str) -> DialplanResult: """ Route a SIP URI to Discord target Args: sip_uri: Full SIP URI (e.g., "sip:josh_starwars@discord-bridge.com") Returns: DialplanResult with routing information """ try: # Parse SIP URI username, domain = self._parse_sip_uri(sip_uri) if not username: return DialplanResult( target=self.default_target, matched_pattern="default", rule_name="Default Fallback", success=False, error_message="Invalid SIP URI format" ) logger.debug(f"Routing SIP URI: {sip_uri} -> username: {username}, domain: {domain}") # Try routing methods in order of preference # 1. Direct user mapping if username in self.user_mapping: target = self.user_mapping[username] return DialplanResult( target=target, matched_pattern=username, rule_name=f"User: {username}" ) # 2. Direct channel mapping if username in self.channel_mapping: target = self.channel_mapping[username] return DialplanResult( target=target, matched_pattern=username, rule_name=f"Channel: {username}" ) # 3. Extension mapping if username in self.extension_mapping: target = self.extension_mapping[username] return DialplanResult( target=target, matched_pattern=username, rule_name=f"Extension: {username}" ) # 4. Pattern matching for pattern, target, rule_name in self.pattern_rules: if re.match(pattern, username): return DialplanResult( target=target, matched_pattern=pattern, rule_name=rule_name ) # 5. Default fallback if self.default_target: return DialplanResult( target=self.default_target, matched_pattern=".*", rule_name="Default Fallback" ) # No routing found return DialplanResult( target=None, matched_pattern="", rule_name="No Match", success=False, error_message=f"No routing rule found for {username}" ) except Exception as e: logger.error(f"Error routing SIP URI {sip_uri}: {e}") return DialplanResult( target=self.default_target, matched_pattern="error", rule_name="Error Fallback", success=False, error_message=str(e) ) def route_webrtc_call(self, user_variables: Dict[str, Any]) -> DialplanResult: """ Route a WebRTC call based on user variables Args: user_variables: WebRTC call variables from SignalWire Returns: DialplanResult with routing information """ try: # Extract target from user variables target = None # Try different extraction methods if "to" in user_variables: target = user_variables["to"] elif "callOriginHref" in user_variables: # Parse from URL parameters href = user_variables["callOriginHref"] target = self._extract_target_from_href(href) elif "ext" in user_variables: target = user_variables["ext"] if not target: return DialplanResult( target=self.default_target, matched_pattern="default", rule_name="WebRTC Default", success=False, error_message="No target found in WebRTC variables" ) logger.debug(f"Routing WebRTC call to target: {target}") # Route as if it were a SIP username fake_sip_uri = f"sip:{target}@webrtc.local" return self.route_sip_uri(fake_sip_uri) except Exception as e: logger.error(f"Error routing WebRTC call: {e}") return DialplanResult( target=self.default_target, matched_pattern="error", rule_name="WebRTC Error", success=False, error_message=str(e) ) def _parse_sip_uri(self, sip_uri: str) -> Tuple[Optional[str], Optional[str]]: """Parse SIP URI into username and domain parts""" try: # Remove sip: prefix if present if sip_uri.startswith("sip:"): sip_uri = sip_uri[4:] # Split username@domain if "@" in sip_uri: username, domain = sip_uri.split("@", 1) # Remove port and parameters domain = domain.split(":")[0].split(";")[0] return username.strip(), domain.strip() else: # No domain, assume local return sip_uri.strip(), self.domain except Exception as e: logger.error(f"Error parsing SIP URI {sip_uri}: {e}") return None, None def _extract_target_from_href(self, href: str) -> Optional[str]: """Extract target from callOriginHref URL""" try: # Look for ext= parameter if "ext=" in href: # Extract ext parameter ext_match = re.search(r"ext=([^&]+)", href) if ext_match: return ext_match.group(1) # Look for other target indicators if "to=" in href: to_match = re.search(r"to=([^&]+)", href) if to_match: return to_match.group(1) return None except Exception as e: logger.error(f"Error extracting target from href {href}: {e}") return None def list_routes(self) -> Dict[str, List[Dict[str, Any]]]: """List all configured routes""" routes = { "channels": [], "users": [], "extensions": [], "patterns": [], "default": None } # Add channel routes for pattern, target in self.channel_mapping.items(): routes["channels"].append({ "pattern": pattern, "discord_target": target.discord_target, "sip_target": target.sip_target, "description": target.description }) # Add user routes for pattern, target in self.user_mapping.items(): routes["users"].append({ "pattern": pattern, "discord_target": target.discord_target, "sip_target": target.sip_target, "description": target.description }) # Add extension routes for pattern, target in self.extension_mapping.items(): routes["extensions"].append({ "pattern": pattern, "discord_target": target.discord_target, "sip_target": target.sip_target, "description": target.description }) # Add pattern routes for pattern, target, rule_name in self.pattern_rules: routes["patterns"].append({ "pattern": pattern, "discord_target": target.discord_target, "sip_target": target.sip_target, "rule_name": rule_name, "description": target.description }) # Add default route if self.default_target: routes["default"] = { "discord_target": self.default_target.discord_target, "sip_target": self.default_target.sip_target, "description": self.default_target.description } return routes def test_routing(self, test_uri: str) -> DialplanResult: """Test routing for a given URI (for debugging)""" logger.info(f"Testing routing for: {test_uri}") if test_uri.startswith("sip:"): result = self.route_sip_uri(test_uri) else: # Treat as WebRTC-style target result = self.route_webrtc_call({"to": test_uri}) logger.info(f"Routing result: {result.rule_name} -> {result.target.discord_target if result.target else 'None'}") return result # Global dialplan instance _dialplan_instance = None def get_dialplan() -> DialplanEngine: """Get the global dialplan engine instance""" global _dialplan_instance if _dialplan_instance is None: _dialplan_instance = DialplanEngine() return _dialplan_instance def reload_dialplan(): """Reload the dialplan engine""" global _dialplan_instance _dialplan_instance = None return get_dialplan() discord_bot.py =========== """ Discord Audio Bridge - Discord Bot Voice-enabled Discord bot for SIP integration """ import asyncio import logging import os import signal import sys from typing import Optional, Dict, Any import discord from discord.ext import commands from discord import VoiceChannel, VoiceClient from .config import get_config from .sip_bridge import SIPBridge # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class DiscordAudioBot(commands.Bot): """Discord bot with voice capabilities for SIP bridging""" def __init__(self): # Bot configuration intents = discord.Intents.default() intents.message_content = True intents.voice_states = True intents.guilds = True super().__init__( command_prefix='!bridge ', intents=intents, description='Discord Audio Bridge - Connect Discord voice to SIP calls' ) self.config = get_config() self.sip_bridge: Optional[SIPBridge] = None self.voice_connections: Dict[int, VoiceClient] = {} self.active_calls: Dict[str, Dict[str, Any]] = {} async def setup_hook(self): """Initialize the bot when it starts""" logger.info("Setting up Discord Audio Bridge Bot...") # Initialize SIP bridge try: self.sip_bridge = SIPBridge(self) await self.sip_bridge.initialize() logger.info("SIP bridge initialized successfully") except Exception as e: logger.error(f"Failed to initialize SIP bridge: {e}") # Sync commands try: synced = await self.tree.sync() logger.info(f"Synced {len(synced)} command(s)") except Exception as e: logger.error(f"Failed to sync commands: {e}") async def on_ready(self): """Called when bot is ready""" logger.info(f'{self.user} has connected to Discord!') logger.info(f'Bot is in {len(self.guilds)} guilds') # Set bot status activity = discord.Activity( type=discord.ActivityType.listening, name="SIP calls | !bridge help" ) await self.change_presence(activity=activity) async def on_voice_state_update(self, member, before, after): """Handle voice state changes""" if member == self.user: # Bot's voice state changed if before.channel and not after.channel: # Bot was disconnected logger.info(f"Bot disconnected from voice channel: {before.channel}") await self._handle_voice_disconnect(before.channel) elif not before.channel and after.channel: # Bot connected to voice channel logger.info(f"Bot connected to voice channel: {after.channel}") await self._handle_voice_connect(after.channel) async def _handle_voice_connect(self, channel: VoiceChannel): """Handle bot connecting to voice channel""" try: if self.sip_bridge: await self.sip_bridge.start_audio_bridge(channel) logger.info(f"Audio bridge started for channel: {channel.name}") except Exception as e: logger.error(f"Failed to start audio bridge: {e}") async def _handle_voice_disconnect(self, channel: VoiceChannel): """Handle bot disconnecting from voice channel""" try: if self.sip_bridge: await self.sip_bridge.stop_audio_bridge(channel) logger.info(f"Audio bridge stopped for channel: {channel.name}") except Exception as e: logger.error(f"Failed to stop audio bridge: {e}") async def join_voice_channel(self, channel: VoiceChannel) -> Optional[VoiceClient]: """Join a voice channel""" try: if channel.guild.id in self.voice_connections: # Already connected, move to new channel voice_client = self.voice_connections[channel.guild.id] await voice_client.move_to(channel) logger.info(f"Moved to voice channel: {channel.name}") else: # Connect to channel voice_client = await channel.connect() self.voice_connections[channel.guild.id] = voice_client logger.info(f"Connected to voice channel: {channel.name}") return self.voice_connections.get(channel.guild.id) except Exception as e: logger.error(f"Failed to join voice channel {channel.name}: {e}") return None async def leave_voice_channel(self, guild_id: int) -> bool: """Leave voice channel in a guild""" try: if guild_id in self.voice_connections: voice_client = self.voice_connections[guild_id] await voice_client.disconnect() del self.voice_connections[guild_id] logger.info(f"Left voice channel in guild {guild_id}") return True return False except Exception as e: logger.error(f"Failed to leave voice channel: {e}") return False # Import necessary modules for advanced commands from .call_manager import get_call_manager, CallType from .profiles import get_profile_manager from .dialplan import get_dialplan # Command definitions @commands.command(name='join') async def join_channel(ctx, *, channel_name: str = None): """Join a voice channel""" if not channel_name: if ctx.author.voice and ctx.author.voice.channel: channel = ctx.author.voice.channel else: await ctx.send("Please specify a channel name or join a voice channel first.") return else: # Find channel by name channel = discord.utils.get(ctx.guild.voice_channels, name=channel_name) if not channel: await ctx.send(f"Voice channel '{channel_name}' not found.") return voice_client = await ctx.bot.join_voice_channel(channel) if voice_client: await ctx.send(f"✅ Joined voice channel: **{channel.name}**") else: await ctx.send(f"❌ Failed to join voice channel: **{channel.name}**") @commands.command(name='leave') async def leave_channel(ctx): """Leave the current voice channel""" success = await ctx.bot.leave_voice_channel(ctx.guild.id) if success: await ctx.send("✅ Left voice channel") else: await ctx.send("❌ Not connected to a voice channel") @commands.command(name='call') async def make_call(ctx, target: str, profile: str = None): """Make an outbound SIP call Usage: !bridge call 555-1234 [profile] !bridge call john@company-pbx !bridge call emergency """ try: call_manager = get_call_manager() channel_name = f"#{ctx.channel.name}" if hasattr(ctx.channel, 'name') else "#unknown" user_roles = set([role.name for role in ctx.author.roles]) call = await call_manager.initiate_outbound_call( discord_user_id=str(ctx.author.id), channel_name=channel_name, target=target, profile_name=profile, user_roles=user_roles ) await ctx.send(f"✅ Calling **{target}** via `{call.profile.display_name}`...") await ctx.send(f"📞 Call ID: `{call.call_id}`") except Exception as e: await ctx.send(f"❌ Failed to make call: {str(e)}") @commands.command(name='invite-caller') async def invite_caller(ctx, target: str, profile: str = None): """Add external caller to current channel Usage: !bridge invite-caller 555-1234 [profile] """ try: if not ctx.author.voice or not ctx.author.voice.channel: await ctx.send("❌ You must be in a voice channel to invite callers") return call_manager = get_call_manager() channel_name = f"#{ctx.author.voice.channel.name}" user_roles = set([role.name for role in ctx.author.roles]) call = await call_manager.initiate_outbound_call( discord_user_id=str(ctx.author.id), channel_name=channel_name, target=target, profile_name=profile, call_type=CallType.CHANNEL, user_roles=user_roles ) await ctx.send(f"✅ Inviting **{target}** to {channel_name}...") await ctx.send(f"📞 Call ID: `{call.call_id}`") except Exception as e: await ctx.send(f"❌ Failed to invite caller: {str(e)}") @commands.command(name='conference-dial') async def conference_dial(ctx, target: str, profile: str = None): """Dial into current channel conversation Usage: !bridge conference-dial 555-1234 [profile] """ try: if not ctx.author.voice or not ctx.author.voice.channel: await ctx.send("❌ You must be in a voice channel for conference calls") return call_manager = get_call_manager() channel_name = f"#{ctx.author.voice.channel.name}" user_roles = set([role.name for role in ctx.author.roles]) call = await call_manager.initiate_outbound_call( discord_user_id=str(ctx.author.id), channel_name=channel_name, target=target, profile_name=profile, call_type=CallType.CONFERENCE, user_roles=user_roles ) await ctx.send(f"✅ Adding **{target}** to conference in {channel_name}...") await ctx.send(f"📞 Call ID: `{call.call_id}`") except Exception as e: await ctx.send(f"❌ Failed to start conference call: {str(e)}") @commands.command(name='hangup') async def hangup_call(ctx): """Hang up active SIP call""" try: if ctx.guild.id not in ctx.bot.voice_connections: await ctx.send("❌ No active voice connection") return voice_client = ctx.bot.voice_connections[ctx.guild.id] channel = voice_client.channel # Check if there's an active call handler if channel.id in ctx.bot.sip_bridge.channel_handlers: handler = ctx.bot.sip_bridge.channel_handlers[channel.id] # Stop the audio bridge handler.stop_audio_bridge() # Hangup the SIP call call = ctx.bot.sip_bridge.lib.get_call_by_id(handler.call_info.id) if call and call.is_valid(): call.hangup() # Clean up handlers del ctx.bot.sip_bridge.channel_handlers[channel.id] if handler.call_info.id in ctx.bot.sip_bridge.call_handlers: del ctx.bot.sip_bridge.call_handlers[handler.call_info.id] await ctx.send("✅ Call ended and audio bridge stopped") else: await ctx.send("❌ No active SIP call in this channel") except Exception as e: await ctx.send(f"❌ Error ending call: {str(e)}") @commands.command(name='status') async def bot_status(ctx): """Show bot status and active connections""" embed = discord.Embed(title="Discord Audio Bridge Status", color=0x00ff00) # Voice connections voice_count = len(ctx.bot.voice_connections) embed.add_field(name="Voice Connections", value=f"{voice_count} active", inline=True) # SIP bridge status sip_status = "🟢 Active" if ctx.bot.sip_bridge and ctx.bot.sip_bridge.is_initialized else "🔴 Inactive" embed.add_field(name="SIP Bridge", value=sip_status, inline=True) # Active calls call_count = len(ctx.bot.sip_bridge.call_handlers) if ctx.bot.sip_bridge else 0 embed.add_field(name="Active SIP Calls", value=f"{call_count} calls", inline=True) # SIP account status if ctx.bot.sip_bridge and ctx.bot.sip_bridge.account: reg_status = ctx.bot.sip_bridge.account.info().reg_status if reg_status == 200: sip_account = "🟢 Registered" else: sip_account = f"🟡 Status: {reg_status}" else: sip_account = "🔴 Not registered" embed.add_field(name="SIP Account", value=sip_account, inline=True) # List active calls if ctx.bot.sip_bridge and ctx.bot.sip_bridge.call_handlers: call_list = [] for call_id, handler in ctx.bot.sip_bridge.call_handlers.items(): call_info = handler.call_info call_list.append(f"• Call {call_id}: {call_info.remote_uri}") embed.add_field( name="Active Calls", value="\n".join(call_list) if call_list else "None", inline=False ) await ctx.send(embed=embed) @commands.command(name='test-audio') async def test_audio(ctx): """Test audio streaming by making a test call""" try: # Check if bot is in a voice channel if ctx.guild.id not in ctx.bot.voice_connections: await ctx.send("❌ Bot must be in a voice channel to test audio. Use `!bridge join` first.") return # Make a test call to echo server or test number test_uri = "sip:echo@sipserver.com" # Replace with actual test server await ctx.send(f"🧪 Starting audio test call to: `{test_uri}`...") voice_client = ctx.bot.voice_connections[ctx.guild.id] channel = voice_client.channel call_handler = await ctx.bot.sip_bridge.make_outbound_call(test_uri, channel) if call_handler: await ctx.send( f"✅ Test call established!\n" f"🎙️ Speak into your microphone to test Discord → SIP audio\n" f"🔊 SIP audio should play back through Discord\n" f"Use `!bridge hangup` to end the test" ) else: await ctx.send("❌ Failed to establish test call") except Exception as e: await ctx.send(f"❌ Error in audio test: {str(e)}") @commands.command(name='help') async def bot_help(ctx): """Show help for bot commands""" embed = discord.Embed(title="Discord Audio Bridge Commands", color=0x0099ff) commands_help = [ ("!bridge join [channel]", "Join a voice channel"), ("!bridge leave", "Leave current voice channel"), ("!bridge call ", "Make outbound SIP call (e.g., !bridge call user@domain.com)"), ("!bridge hangup", "End active SIP call"), ("!bridge status", "Show bot status and active calls"), ("!bridge test-audio", "Test audio streaming with echo server"), ("!bridge help", "Show this help message"), ] for cmd, desc in commands_help: embed.add_field(name=cmd, value=desc, inline=False) embed.add_field( name="📋 How it works:", value=( "1. Join a voice channel with `!bridge join`\n" "2. Make a SIP call with `!bridge call user@domain.com`\n" "3. Talk in Discord - your voice streams to SIP\n" "4. SIP caller's voice streams to Discord\n" "5. End call with `!bridge hangup`" ), inline=False ) await ctx.send(embed=embed) # Add commands to bot def setup_commands(bot: DiscordAudioBot): """Add commands to the bot""" bot.add_command(join_channel) bot.add_command(leave_channel) bot.add_command(make_call) bot.add_command(hangup_call) bot.add_command(bot_status) bot.add_command(test_audio) bot.add_command(bot_help) async def run_discord_bot(): """Main function to run the Discord bot""" config = get_config() # Check for Discord token discord_token = os.getenv('DISCORD_TOKEN') or config.discord_token if not discord_token: logger.error("DISCORD_TOKEN environment variable not set") sys.exit(1) # Create and setup bot bot = DiscordAudioBot() setup_commands(bot) # Handle shutdown gracefully def signal_handler(signum, frame): logger.info("Received shutdown signal") asyncio.create_task(bot.close()) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) try: logger.info("Starting Discord Audio Bridge Bot...") await bot.start(discord_token) except Exception as e: logger.error(f"Failed to start Discord bot: {e}") sys.exit(1) finally: if not bot.is_closed(): await bot.close() if __name__ == "__main__": asyncio.run(run_discord_bot()) __init__.py =========== """ Discord Audio Bridge - SIP to Discord Voice Bridge A comprehensive system for bridging Discord voice channels with SIP infrastructure, enabling bidirectional communication between Discord users and external SIP endpoints. Components: - SIP Bridge: Core PJSIP integration for SIP call handling - Discord Bot: Discord slash commands and voice channel management - SWML Handler: SignalWire webhook endpoint for call routing - Main: Orchestrates all components together """ __version__ = "0.1.0" __author__ = "Discord Audio Bridge Team" main.py =========== """ Discord Audio Bridge - Main Entry Point """ import asyncio import logging import os import sys from typing import Optional # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler('discord_audio_bridge.log') if os.getenv('LOG_TO_FILE', 'false').lower() == 'true' else logging.NullHandler() ] ) logger = logging.getLogger(__name__) def run_swml_handler(): """Run the SWML handler service""" try: from .swml_handler import app import uvicorn port = int(os.getenv('PORT', '8080')) host = os.getenv('HOST', '0.0.0.0') logger.info(f"Starting SWML Handler on {host}:{port}") uvicorn.run(app, host=host, port=port) except Exception as e: logger.error(f"Failed to start SWML handler: {e}") sys.exit(1) def run_sip_bridge(): """Run the SIP bridge service""" try: from . import test_sip test_sip.main() except Exception as e: logger.error(f"Failed to start SIP bridge: {e}") sys.exit(1) def run_discord_bot(): """Run the Discord bot service""" try: from .discord_bot import run_discord_bot asyncio.run(run_discord_bot()) except Exception as e: logger.error(f"Failed to start Discord bot: {e}") sys.exit(1) def main(): """Main entry point""" service = os.getenv('SERVICE', 'swml').lower() logger.info(f"Discord Audio Bridge starting - Service: {service}") if service == 'swml': run_swml_handler() elif service == 'sip': run_sip_bridge() elif service == 'discord': run_discord_bot() else: logger.error(f"Unknown service: {service}") logger.info("Available services: swml, sip, discord") sys.exit(1) if __name__ == "__main__": main() profiles.py =========== """ Discord Audio Bridge - Profile Management System Profile-based outbound calling with SIP provider configurations and access controls """ import logging from typing import Dict, List, Optional, Set, Any from dataclasses import dataclass, field from enum import Enum from .config import get_config logger = logging.getLogger(__name__) class AuthType(Enum): """Types of SIP authentication""" DIGEST = "digest" IP_BASED = "ip_based" CERTIFICATE = "certificate" NONE = "none" @dataclass class SIPCredentials: """SIP authentication credentials""" username: Optional[str] = None password: Optional[str] = None auth_type: AuthType = AuthType.DIGEST certificate_path: Optional[str] = None @dataclass class SIPProfile: """ SIP provider profile configuration Contains all settings needed to connect to a specific SIP provider """ name: str display_name: str server: str port: int = 5060 transport: str = "udp" # udp, tcp, tls credentials: Optional[SIPCredentials] = None headers: Dict[str, str] = field(default_factory=dict) params: str = "" # Additional SIP parameters description: str = "" enabled: bool = True # Access control allowed_channels: Set[str] = field(default_factory=set) # Empty = all channels required_roles: Set[str] = field(default_factory=set) # Required Discord roles # Connection settings registration_required: bool = True keepalive_interval: int = 60 timeout: int = 30 # Cost and limits cost_per_minute: float = 0.0 monthly_limit: int = 0 # 0 = unlimited daily_limit: int = 0 # 0 = unlimited def get_sip_uri(self, target: str) -> str: """Generate SIP URI for target using this profile""" # Remove sip: prefix if already present to avoid double prefix if target.startswith("sip:"): target = target[4:] if "@" in target: # Target already has domain base_uri = f"sip:{target}" else: # Add our server domain base_uri = f"sip:{target}@{self.server}" # Add port if not default if self.port != 5060: base_uri += f":{self.port}" # Add transport parameter if self.transport.lower() != "udp": separator = ";" if not self.params else "&" base_uri += f"{separator}transport={self.transport}" # Add custom parameters if self.params: base_uri += self.params return base_uri def can_access(self, channel_name: str, user_roles: Set[str]) -> bool: """Check if profile can be accessed from given channel by user with roles""" # Check if profile is enabled if not self.enabled: return False # Check channel restrictions if self.allowed_channels and channel_name not in self.allowed_channels: return False # Check role requirements if self.required_roles and not self.required_roles.intersection(user_roles): return False return True class ProfileManager: """ Manages SIP profiles and access control Handles profile loading, validation, and permission checking """ def __init__(self): self.config = get_config() self.profiles: Dict[str, SIPProfile] = {} self.default_profiles: List[str] = [] self.channel_overrides: Dict[str, Dict[str, Any]] = {} self._load_profiles() logger.info(f"Profile manager initialized with {len(self.profiles)} profiles") def _load_profiles(self): """Load profiles from configuration""" # Load from config if available profiles_config = self.config.get('profiles', {}) if not profiles_config: # Create default profiles self._create_default_profiles() else: # Load configured profiles for profile_name, profile_data in profiles_config.items(): self._load_profile(profile_name, profile_data) # Load default profiles list self.default_profiles = self.config.get('default_profiles', ['internal', 'emergency']) # Load channel overrides self.channel_overrides = self.config.get('channel_overrides', {}) def _create_default_profiles(self): """Create default SIP profiles""" logger.info("Creating default SIP profiles") # Internal company profile internal_profile = SIPProfile( name="internal", display_name="Internal Company", server=self.config.default_sip_domain, credentials=SIPCredentials( username=self.config.bridge_auth_user, password=self.config.bridge_auth_pass ), description="Internal company SIP system", headers={"X-Source": "discord-bridge"} ) self.profiles["internal"] = internal_profile # Emergency services (usually unrestricted) emergency_profile = SIPProfile( name="emergency", display_name="Emergency Services", server=self.config.default_sip_domain, credentials=SIPCredentials( username=self.config.bridge_auth_user, password=self.config.bridge_auth_pass ), description="Emergency calling services", headers={"X-Priority": "Emergency"} ) self.profiles["emergency"] = emergency_profile # SignalWire production (if configured) if self.config.signalwire_project_id: signalwire_profile = SIPProfile( name="signalwire-prod", display_name="SignalWire Production", server=f"{self.config.signalwire_space}", port=5060, transport="tls", credentials=SIPCredentials( username=self.config.signalwire_project_id, password=self.config.signalwire_token ), description="SignalWire production calling", headers={"X-Company": "Discord-Bridge"}, params="?transport=tls", cost_per_minute=0.01 # Example cost ) self.profiles["signalwire-prod"] = signalwire_profile # Restricted external calling profile external_profile = SIPProfile( name="external-calling", display_name="External Calling", server=self.config.default_sip_domain, credentials=SIPCredentials( username=self.config.bridge_auth_user, password=self.config.bridge_auth_pass ), description="External phone number calling", headers={"X-Type": "External"}, allowed_channels={"#executive", "#sales"}, # Restricted channels required_roles={"Manager", "Sales"}, # Required roles cost_per_minute=0.05, daily_limit=100 # Limit calls per day ) self.profiles["external-calling"] = external_profile def _load_profile(self, profile_name: str, profile_data: Dict[str, Any]): """Load a single profile from configuration data""" try: # Parse credentials credentials = None if "auth" in profile_data and profile_data["auth"]: auth_data = profile_data["auth"] credentials = SIPCredentials( username=auth_data.get("username"), password=auth_data.get("password"), auth_type=AuthType(auth_data.get("type", "digest")) ) # Create profile profile = SIPProfile( name=profile_name, display_name=profile_data.get("display_name", profile_name.title()), server=profile_data["server"], port=profile_data.get("port", 5060), transport=profile_data.get("transport", "udp"), credentials=credentials, headers=profile_data.get("headers", {}), params=profile_data.get("params", ""), description=profile_data.get("description", ""), enabled=profile_data.get("enabled", True), allowed_channels=set(profile_data.get("allowed_channels", [])), required_roles=set(profile_data.get("required_roles", [])), registration_required=profile_data.get("registration_required", True), cost_per_minute=profile_data.get("cost_per_minute", 0.0), monthly_limit=profile_data.get("monthly_limit", 0), daily_limit=profile_data.get("daily_limit", 0) ) self.profiles[profile_name] = profile logger.debug(f"Loaded profile: {profile_name}") except Exception as e: logger.error(f"Error loading profile {profile_name}: {e}") def get_profile(self, profile_name: str) -> Optional[SIPProfile]: """Get a profile by name""" return self.profiles.get(profile_name) def get_available_profiles(self, channel_name: str, user_roles: Set[str]) -> List[SIPProfile]: """ Get profiles available to user in specific channel Args: channel_name: Discord channel name (e.g., "#general") user_roles: Set of user's Discord role names Returns: List of accessible SIP profiles """ available = [] # Start with default profiles profile_names = set(self.default_profiles) # Add channel-specific additional profiles if channel_name in self.channel_overrides: additional = self.channel_overrides[channel_name].get("additional_profiles", []) profile_names.update(additional) # Filter by accessibility for profile_name in profile_names: profile = self.profiles.get(profile_name) if profile and profile.can_access(channel_name, user_roles): available.append(profile) return available def get_default_profile(self, channel_name: str, user_roles: Set[str]) -> Optional[SIPProfile]: """Get the default profile for a channel/user combination""" available = self.get_available_profiles(channel_name, user_roles) if not available: return None # Return first available profile from defaults list for default_name in self.default_profiles: for profile in available: if profile.name == default_name: return profile # Fallback to first available return available[0] def validate_call_target(self, profile: SIPProfile, target: str) -> bool: """Validate if target is allowed for profile""" # Basic validation - can be extended with more rules if not target: return False # Check for emergency numbers (always allow) emergency_patterns = ["911", "112", "999"] if any(pattern in target for pattern in emergency_patterns): return True # Profile-specific validation can be added here # e.g., regex patterns, blacklists, etc. return True def list_profiles(self, include_restricted: bool = False) -> Dict[str, Dict[str, Any]]: """List all profiles with their information""" result = {} for name, profile in self.profiles.items(): if not include_restricted and (profile.allowed_channels or profile.required_roles): continue result[name] = { "display_name": profile.display_name, "server": profile.server, "transport": profile.transport, "description": profile.description, "enabled": profile.enabled, "has_restrictions": bool(profile.allowed_channels or profile.required_roles), "cost_per_minute": profile.cost_per_minute } return result def get_profile_usage_info(self, profile_name: str) -> Dict[str, Any]: """Get usage information for a profile (for monitoring/billing)""" profile = self.get_profile(profile_name) if not profile: return {} # This would connect to usage tracking system # For now, return basic info return { "profile_name": profile_name, "display_name": profile.display_name, "cost_per_minute": profile.cost_per_minute, "daily_limit": profile.daily_limit, "monthly_limit": profile.monthly_limit, # "current_usage": self._get_current_usage(profile_name), # "remaining_daily": self._get_remaining_daily(profile_name), # "remaining_monthly": self._get_remaining_monthly(profile_name) } def reload_profiles(self): """Reload profiles from configuration""" self.profiles.clear() self._load_profiles() logger.info("Profiles reloaded") # Global profile manager instance _profile_manager_instance = None def get_profile_manager() -> ProfileManager: """Get the global profile manager instance""" global _profile_manager_instance if _profile_manager_instance is None: _profile_manager_instance = ProfileManager() return _profile_manager_instance def reload_profiles(): """Reload the profile manager""" global _profile_manager_instance _profile_manager_instance = None return get_profile_manager() __pycache__ =========== sip_bridge.py =========== """ Discord Audio Bridge - SIP Integration Real-time audio bridging between Discord voice channels and SIP calls """ import asyncio import logging import threading import time from typing import Optional, Dict, Any, Callable import queue import wave import struct import pjsua as pj import discord from discord import VoiceChannel, VoiceClient from discord.sinks import WaveSink from .config import get_config # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class AudioBuffer: """Thread-safe circular buffer for audio data""" def __init__(self, max_size: int = 16384): # Increased buffer size self.buffer = queue.Queue(maxsize=max_size) self.lock = threading.Lock() def write(self, data: bytes): """Write audio data to buffer""" try: if not self.buffer.full(): self.buffer.put(data, block=False) except queue.Full: # Buffer full, drop oldest data try: self.buffer.get(block=False) self.buffer.put(data, block=False) except queue.Empty: pass def read(self, size: int) -> bytes: """Read audio data from buffer""" data = b'' while len(data) < size: try: chunk = self.buffer.get(block=False) data += chunk if len(data) >= size: # Return exact size and put back remainder if len(data) > size: remainder = data[size:] self.buffer.put(remainder, block=False) return data[:size] except queue.Empty: # Not enough data, return what we have break return data def clear(self): """Clear the buffer""" with self.lock: while not self.buffer.empty(): try: self.buffer.get(block=False) except queue.Empty: break class SIPCallHandler: """Handles audio bridging for individual SIP calls""" def __init__(self, call_info: pj.CallInfo, discord_channel: VoiceChannel, sip_bridge: "SIPBridge"): self.call_info = call_info self.discord_channel = discord_channel self.sip_bridge = sip_bridge # Audio buffers for bidirectional streaming self.discord_to_sip = AudioBuffer() self.sip_to_discord = AudioBuffer() # Control flags self.is_active = False self.stop_event = threading.Event() self.audio_thread: Optional[threading.Thread] = None logger.info(f"Created SIP call handler for call {call_info.id}") def start_audio_bridge(self): """Start bidirectional audio bridging""" if self.is_active: return self.is_active = True self.stop_event.clear() # Start audio processing thread self.audio_thread = threading.Thread(target=self._audio_loop, daemon=True) self.audio_thread.start() logger.info(f"Started audio bridge for SIP call {self.call_info.id}") def stop_audio_bridge(self): """Stop audio bridging""" if not self.is_active: return self.is_active = False self.stop_event.set() if self.audio_thread and self.audio_thread.is_alive(): self.audio_thread.join(timeout=1.0) # Clear buffers self.discord_to_sip.clear() self.sip_to_discord.clear() logger.info(f"Stopped audio bridge for SIP call {self.call_info.id}") def _audio_loop(self): """Main audio processing loop""" logger.info("Audio bridge loop started") while not self.stop_event.is_set(): try: # Process Discord → SIP audio discord_audio = self.discord_to_sip.read(320) # 20ms at 16kHz if discord_audio: self._send_to_sip(discord_audio) # Process SIP → Discord audio sip_audio = self.sip_to_discord.read(320) if sip_audio: self._send_to_discord(sip_audio) # Small sleep to prevent busy waiting time.sleep(0.01) # 10ms except Exception as e: logger.error(f"Error in audio loop: {e}") break logger.info("Audio bridge loop ended") def _send_to_sip(self, audio_data: bytes): """Send audio data to SIP call""" try: # Convert Discord audio (48kHz stereo) to SIP audio (16kHz mono) converted_audio = self._convert_discord_to_sip_audio(audio_data) # Get the call object and send audio call = self.sip_bridge.lib.get_call_by_id(self.call_info.id) if call and call.is_valid(): # Use PJSIP's media streaming to send audio call_media = call.media if call_media: # This sends PCM audio data to the SIP call call_media.put_frame(converted_audio) except Exception as e: logger.error(f"Failed to send audio to SIP: {e}") def _send_to_discord(self, audio_data: bytes): """Send audio data to Discord channel""" try: # Convert SIP audio (16kHz mono) to Discord audio (48kHz stereo) converted_audio = self._convert_sip_to_discord_audio(audio_data) # Send to Discord voice channel via the bot's voice client voice_client = self.sip_bridge.discord_bot.voice_connections.get(self.discord_channel.guild.id) if voice_client and voice_client.is_connected(): # Use Discord's voice client to send audio voice_client.send_audio_packet(converted_audio) except Exception as e: logger.error(f"Failed to send audio to Discord: {e}") def _convert_discord_to_sip_audio(self, audio_data: bytes) -> bytes: """Convert Discord audio format to SIP audio format""" try: # Discord: 48kHz stereo 16-bit PCM # SIP: 16kHz mono 16-bit PCM # Convert bytes to samples samples = struct.unpack(f'<{len(audio_data)//2}h', audio_data) # Convert stereo to mono (take left channel or average) if len(samples) % 2 == 0: mono_samples = [samples[i] for i in range(0, len(samples), 2)] else: mono_samples = list(samples) # Downsample from 48kHz to 16kHz (3:1 ratio) downsampled = [mono_samples[i] for i in range(0, len(mono_samples), 3)] # Convert back to bytes return struct.pack(f'<{len(downsampled)}h', *downsampled) except Exception as e: logger.error(f"Error converting Discord to SIP audio: {e}") return b'' def _convert_sip_to_discord_audio(self, audio_data: bytes) -> bytes: """Convert SIP audio format to Discord audio format""" try: # SIP: 16kHz mono 16-bit PCM # Discord: 48kHz stereo 16-bit PCM # Convert bytes to samples samples = struct.unpack(f'<{len(audio_data)//2}h', audio_data) # Upsample from 16kHz to 48kHz (1:3 ratio) upsampled = [] for sample in samples: upsampled.extend([sample, sample, sample]) # Convert mono to stereo (duplicate channel) stereo_samples = [] for sample in upsampled: stereo_samples.extend([sample, sample]) # Convert back to bytes return struct.pack(f'<{len(stereo_samples)}h', *stereo_samples) except Exception as e: logger.error(f"Error converting SIP to Discord audio: {e}") return b'' def handle_discord_audio(self, user_id: int, audio_data: bytes): """Handle incoming audio from Discord user""" # Write to Discord → SIP buffer self.discord_to_sip.write(audio_data) def handle_sip_audio(self, audio_data: bytes): """Handle incoming audio from SIP call""" # Write to SIP → Discord buffer self.sip_to_discord.write(audio_data) class SIPMediaCallback(pj.CallCallback): """Callback for handling SIP call media events""" def __init__(self, call_handler: SIPCallHandler): pj.CallCallback.__init__(self) self.call_handler = call_handler def on_media_state(self): """Called when media state changes""" if self.call.info().media_state == pj.MediaState.ACTIVE: logger.info("SIP call media is now active") # Start audio bridge when media becomes active self.call_handler.start_audio_bridge() def on_state(self): """Called when call state changes""" call_info = self.call.info() logger.info(f"SIP call state changed: {call_info.state_text}") if call_info.state == pj.CallState.DISCONNECTED: logger.info("SIP call disconnected") # Stop audio bridge when call ends self.call_handler.stop_audio_bridge() class SIPBridge: """Main SIP bridge that manages PJSIP and Discord integration""" def __init__(self, discord_bot: "DiscordAudioBot"): self.discord_bot = discord_bot self.lib: Optional[pj.Lib] = None self.account: Optional[pj.Account] = None self.is_initialized = False # Active call handlers self.call_handlers: Dict[int, SIPCallHandler] = {} # call_id -> handler self.channel_handlers: Dict[int, SIPCallHandler] = {} # channel_id -> handler logger.info("SIP Bridge initialized") async def initialize(self): """Initialize PJSIP library and account""" if self.is_initialized: return try: # Run PJSIP initialization in thread to avoid blocking async loop await asyncio.get_event_loop().run_in_executor( None, self._init_pjsip ) self.is_initialized = True logger.info("SIP Bridge initialization completed") except Exception as e: logger.error(f"Failed to initialize SIP bridge: {e}") raise def _init_pjsip(self): """Initialize PJSIP (runs in thread)""" # Create library instance self.lib = pj.Lib() try: # Initialize library self.lib.init(log_cfg=pj.LogConfig(level=3, callback=self._pjsip_log)) # Create UDP transport transport = self.lib.create_transport(pj.TransportType.UDP, pj.TransportConfig(5060)) logger.info(f"SIP transport created on port {transport.info().port}") # Start library self.lib.start() # Create a default account for incoming calls self._create_account() except pj.Error as e: logger.error(f"PJSIP initialization error: {e}") raise def _create_account(self): """Create SIP account for handling calls""" try: config = get_config() # Create account configuration acc_config = pj.AccountConfig() acc_config.id = f"sip:bridge@{config.default_sip_domain}" # Only set registration if we have credentials if config.bridge_auth_user and config.bridge_auth_pass: acc_config.reg_uri = f"sip:{config.default_sip_domain}" acc_config.auth_cred = [ pj.AuthCred("*", config.bridge_auth_user, config.bridge_auth_pass) ] logger.info(f"SIP account will register with credentials") else: # No registration - just listen for calls logger.info(f"SIP account created without registration (no credentials)") acc_config.auth_cred = [] # Create account self.account = self.lib.create_account(acc_config, cb=SIPAccountCallback(self)) logger.info(f"SIP account created: {acc_config.id}") except Exception as e: logger.error(f"Failed to create SIP account: {e}") def _pjsip_log(self, level, str_log, len_log): """PJSIP log callback""" if level <= 3: # Only log errors and warnings logger.info(f"PJSIP: {str_log.strip()}") async def start_audio_bridge(self, channel: VoiceChannel): """Start audio bridge for a Discord voice channel""" try: # Check if we're already bridging this channel if channel.id in self.channel_handlers: logger.warning(f"Audio bridge already active for channel {channel.name}") return logger.info(f"Audio bridge started for Discord channel: {channel.name}") # Start recording from Discord channel await self._start_discord_recording(channel) # Wait for incoming SIP calls to bridge # This will be handled by the SIP account callback except Exception as e: logger.error(f"Failed to start audio bridge for channel {channel.name}: {e}") async def stop_audio_bridge(self, channel: VoiceChannel): """Stop audio bridge for a Discord voice channel""" try: if channel.id in self.channel_handlers: handler = self.channel_handlers[channel.id] handler.stop_audio_bridge() del self.channel_handlers[channel.id] logger.info(f"Stopped audio bridge for channel {channel.name}") # Stop Discord recording await self._stop_discord_recording(channel) except Exception as e: logger.error(f"Failed to stop audio bridge for channel {channel.name}: {e}") async def _start_discord_recording(self, channel: VoiceChannel): """Start recording audio from Discord channel""" try: # Get voice client for the channel's guild voice_client = self.discord_bot.voice_connections.get(channel.guild.id) if not voice_client: logger.warning(f"No voice client found for guild {channel.guild.id}") return # Create a custom sink that forwards audio to SIP sink = SIPAudioSink(channel.id, self) # Start recording voice_client.start_recording(sink, self._recording_finished, channel) logger.info(f"Started Discord audio recording for channel {channel.name}") except Exception as e: logger.error(f"Failed to start Discord recording: {e}") async def _stop_discord_recording(self, channel: VoiceChannel): """Stop recording audio from Discord channel""" try: voice_client = self.discord_bot.voice_connections.get(channel.guild.id) if voice_client and voice_client.recording: voice_client.stop_recording() logger.info(f"Stopped Discord audio recording for channel {channel.name}") except Exception as e: logger.error(f"Failed to stop Discord recording: {e}") def _recording_finished(self, sink, channel: VoiceChannel, *args): """Called when Discord recording finishes""" logger.info(f"Discord recording finished for channel {channel.name}") def handle_incoming_sip_call(self, call_info: pj.CallInfo, channel: VoiceChannel): """Handle incoming SIP call and bridge to Discord channel""" try: logger.info(f"Handling incoming SIP call from {call_info.remote_uri}") # Create call handler call_handler = SIPCallHandler(call_info, channel, self) self.call_handlers[call_info.id] = call_handler self.channel_handlers[channel.id] = call_handler # Answer the call call = self.lib.get_call_by_id(call_info.id) if call: call.answer(200) logger.info(f"Answered SIP call {call_info.id}") except Exception as e: logger.error(f"Failed to handle incoming SIP call: {e}") async def make_outbound_call(self, target_uri: str, channel: VoiceChannel): """Make outbound SIP call from Discord channel""" try: if not self.account or not self.account.info().reg_status == 200: logger.error("SIP account not registered") return None # Create call call = self.account.make_call(target_uri, cb=None) call_info = call.info() # Create call handler call_handler = SIPCallHandler(call_info, channel, self) self.call_handlers[call_info.id] = call_handler self.channel_handlers[channel.id] = call_handler # Set up call callback call.set_callback(SIPMediaCallback(call_handler)) logger.info(f"Made outbound SIP call to {target_uri}") return call_handler except Exception as e: logger.error(f"Failed to make outbound SIP call: {e}") return None async def shutdown(self): """Shutdown SIP bridge""" try: # Stop all active call handlers for handler in list(self.call_handlers.values()): handler.stop_audio_bridge() self.call_handlers.clear() self.channel_handlers.clear() # Shutdown PJSIP in thread if self.lib: await asyncio.get_event_loop().run_in_executor( None, self._shutdown_pjsip ) logger.info("SIP Bridge shutdown completed") except Exception as e: logger.error(f"Error during SIP bridge shutdown: {e}") def _shutdown_pjsip(self): """Shutdown PJSIP (runs in thread)""" try: if self.account: self.account = None if self.lib: self.lib.destroy() self.lib = None except Exception as e: logger.error(f"Error shutting down PJSIP: {e}") class SIPAccountCallback(pj.AccountCallback): """Callback for SIP account events""" def __init__(self, sip_bridge: SIPBridge): pj.AccountCallback.__init__(self) self.sip_bridge = sip_bridge def on_incoming_call(self, call): """Handle incoming SIP call""" call_info = call.info() logger.info(f"Incoming SIP call from {call_info.remote_uri}") # For now, route to the first available Discord channel # In production, this would use routing rules if self.sip_bridge.discord_bot.voice_connections: guild_id = next(iter(self.sip_bridge.discord_bot.voice_connections.keys())) voice_client = self.sip_bridge.discord_bot.voice_connections[guild_id] channel = voice_client.channel # Handle the call asyncio.create_task( asyncio.get_event_loop().run_in_executor( None, self.sip_bridge.handle_incoming_sip_call, call_info, channel ) ) class SIPAudioSink(WaveSink): """Custom audio sink that forwards Discord audio to SIP bridge""" def __init__(self, channel_id: int, sip_bridge: SIPBridge): # Don't actually write to file, just use WaveSink as base super().__init__(file=None) self.channel_id = channel_id self.sip_bridge = sip_bridge def write(self, data, user): """Handle audio data from Discord users""" try: # Forward audio to SIP bridge if self.channel_id in self.sip_bridge.channel_handlers: handler = self.sip_bridge.channel_handlers[self.channel_id] handler.handle_discord_audio(user.id, data) except Exception as e: logger.error(f"Error in SIP audio sink: {e}") swml_core.py =========== #!/usr/bin/env python3 """ SWML Core Logic - No Web Dependencies Core routing and SWML generation logic without FastAPI dependencies. This can be tested independently and then used by the web handler. """ import os import re import json from typing import Dict, Any, Optional, List, Union from urllib.parse import urlparse, parse_qs # Core Data Classes (without Pydantic) class CallInfo: def __init__(self, call_id: str, direction: str, type: str, from_addr: str = "", to_addr: str = "", project_id: str = ""): self.call_id = call_id self.direction = direction self.type = type self.from_addr = from_addr self.to_addr = to_addr self.project_id = project_id class CallRequest: def __init__(self, call: Dict[str, Any], vars: Optional[Dict[str, Any]] = None): call_data = call self.call = CallInfo( call_id=call_data.get('call_id', ''), direction=call_data.get('direction', ''), type=call_data.get('type', ''), from_addr=call_data.get('from', ''), to_addr=call_data.get('to', ''), project_id=call_data.get('project_id', '') ) self.vars = vars or {} class RoutingRule: def __init__(self, pattern: str, type: str, discord_target: str, sip_target: str, auth_required: bool = False, headers: Optional[Dict[str, str]] = None): self.pattern = pattern self.type = type self.discord_target = discord_target self.sip_target = sip_target self.auth_required = auth_required self.headers = headers or {} # Configuration class Config: def __init__(self): self.routing_rules: List[RoutingRule] = [] self.default_sip_domain = os.getenv('SIP_DOMAIN', 'discord-bridge.com') self.bridge_auth_user = os.getenv('BRIDGE_AUTH_USER', 'bridge_user') self.bridge_auth_pass = os.getenv('BRIDGE_AUTH_PASS', 'bridge_pass') self.load_routing_rules() def load_routing_rules(self): """Load routing rules from configuration""" # Default routing rules default_rules = [ { 'pattern': r'josh_starwars', 'type': 'user', 'discord_target': '@josh_discord_user', 'sip_target': f'sip:josh@{self.default_sip_domain}', 'auth_required': True }, { 'pattern': r'general', 'type': 'channel', 'discord_target': '#general-voice', 'sip_target': f'sip:general@{self.default_sip_domain}', 'auth_required': True }, { 'pattern': r'support-.*', 'type': 'channel_pattern', 'discord_target': '#support-voice', 'sip_target': f'sip:support@{self.default_sip_domain}', 'auth_required': True }, { 'pattern': r'.*', # Fallback rule 'type': 'channel', 'discord_target': '#general-voice', 'sip_target': f'sip:default@{self.default_sip_domain}', 'auth_required': True } ] self.routing_rules = [RoutingRule(**rule) for rule in default_rules] # Core Functions def extract_target_from_call(call_request: CallRequest) -> str: """Extract the target destination from call data""" if call_request.call.type == 'sip': # For SIP calls, extract from 'to' field to_addr = call_request.call.to_addr if to_addr.startswith('sip:'): # Extract username from sip:username@domain sip_uri = to_addr[4:] # Remove 'sip:' prefix username = sip_uri.split('@')[0] return username elif call_request.call.type == 'webrtc': # For WebRTC calls, check user variables user_vars = call_request.vars.get('userVariables', {}) # Check explicit 'to' variable first if 'to' in user_vars: target = user_vars['to'] return target # Check callOriginHref for query parameters if 'callOriginHref' in user_vars: href = user_vars['callOriginHref'] parsed_url = urlparse(href) query_params = parse_qs(parsed_url.query) # Look for 'ext' parameter if 'ext' in query_params: target = query_params['ext'][0] return target return "default" def find_matching_rule(target: str, config: Config) -> RoutingRule: """Find the first routing rule that matches the target""" for rule in config.routing_rules: if re.match(rule.pattern, target): return rule # This shouldn't happen since we have a catch-all rule return config.routing_rules[-1] def generate_swml_response(rule: RoutingRule, call_request: CallRequest, config: Config) -> Dict[str, Any]: """Generate SWML response for SignalWire""" # Build SIP headers for context headers = [ {"name": "X-Discord-Target", "value": rule.discord_target}, {"name": "X-Call-Type", "value": call_request.call.type}, {"name": "X-Original-Caller", "value": call_request.call.from_addr}, {"name": "X-Call-ID", "value": call_request.call.call_id} ] # Add any custom headers from rule if rule.headers: for name, value in rule.headers.items(): headers.append({"name": name, "value": value}) # Build connect instruction connect_params = { "to": rule.sip_target, "headers": headers } # Add authentication if required if rule.auth_required: connect_params["username"] = config.bridge_auth_user connect_params["password"] = config.bridge_auth_pass # Generate SWML swml_response = { "version": "1.0.0", "sections": { "main": [ { "connect": connect_params } ] } } return swml_response class SWMLRouter: """Main router class for handling SWML routing logic""" def __init__(self, config): self.config = config def route_call(self, call_data: Dict[str, Any]) -> Dict[str, Any]: """Route a call and generate SWML response""" # Parse call request call_request = CallRequest(**call_data) # Extract target from call data target = extract_target_from_call(call_request) # Find matching routing rule rule = find_matching_rule(target, self.config) # Generate SWML response return generate_swml_response(rule, call_request, self.config) def route_discord_call(self, call_data: Dict[str, Any]) -> Dict[str, Any]: """Route a Discord-initiated call""" # For Discord calls, we might handle differently # For now, use the same logic return self.route_call(call_data) def process_webhook(call_data: Dict[str, Any]) -> Dict[str, Any]: """ Main function to process a SignalWire webhook and return SWML Args: call_data: Raw webhook data from SignalWire Returns: SWML response dictionary """ config = Config() router = SWMLRouter(config) return router.route_call(call_data) # Generate SWML response swml_response = generate_swml_response(rule, call_request, config) return swml_response # Test function if __name__ == "__main__": # Test with sample data test_sip_data = { "call": { "call_id": "test-123", "direction": "inbound", "type": "sip", "from": "sip:+1234@test.com", "to": "sip:josh_starwars@test.com", "project_id": "test-project" }, "vars": {} } print("🧪 Testing SWML Core Logic") print("=" * 30) try: result = process_webhook(test_sip_data) print("✅ SWML generated successfully!") print(json.dumps(result, indent=2)) except Exception as e: print(f"❌ Error: {e}") import traceback traceback.print_exc() swml_handler.py =========== """ Discord Audio Bridge - SWML Handler FastAPI-based web server for SignalWire SWML call routing """ import os import json import logging import ssl from typing import Dict, Any, Optional from fastapi import FastAPI, HTTPException, Request, Depends from fastapi.security import HTTPBasic, HTTPBasicCredentials from pydantic import BaseModel import uvicorn from .config import get_config from .swml_core import SWMLRouter # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Security security = HTTPBasic() # FastAPI app app = FastAPI( title="Discord Audio Bridge - SWML Handler", description="SignalWire webhook endpoint for call routing", version="1.0.0" ) # Data Models class CallInfo(BaseModel): call_id: str direction: str type: str # 'sip' or 'webrtc' from_addr: str = "" to_addr: str = "" project_id: str = "" class CallRequest(BaseModel): call: CallInfo vars: Optional[Dict[str, Any]] = {} # Global configuration and router config = get_config() router = None def get_router(): """Get or create SWML router instance""" global router if router is None: from .swml_core import SWMLRouter router = SWMLRouter(config) return router # Security Functions async def verify_auth(request: Request, credentials: HTTPBasicCredentials = Depends(security)): """Verify basic auth credentials if configured""" swml_config = config.get('swml', {}) auth_config = swml_config.get('auth', {}) # Check if auth is configured username = auth_config.get('username') password = auth_config.get('password') if not username: return "anonymous" # No auth required # Validate credentials if credentials.username == username and credentials.password == password: return credentials.username logger.warning(f"Authentication failed for user: {credentials.username}") raise HTTPException( status_code=401, detail="Invalid credentials", headers={"WWW-Authenticate": "Basic"}, ) def get_client_ip(request: Request) -> str: """Get client IP, handling reverse proxy scenarios""" swml_config = config.get('swml', {}) trust_proxy = swml_config.get('trust_proxy', True) if trust_proxy: # Check for common proxy headers forwarded_for = request.headers.get('X-Forwarded-For') if forwarded_for: # Take the first IP in the chain return forwarded_for.split(',')[0].strip() real_ip = request.headers.get('X-Real-IP') if real_ip: return real_ip # Fallback to direct connection return request.client.host if request.client else "unknown" # API Endpoints @app.get("/") async def health_check(): """Health check endpoint""" swml_router = get_router() return { "status": "healthy", "service": "Discord Audio Bridge - SWML Handler", "version": "1.0.0", "routing_rules": len(config.routing_rules) if hasattr(config, 'routing_rules') else 0, "auth_enabled": bool(config.get('swml', {}).get('auth', {}).get('username')) } @app.post("/swml/webhook") async def handle_swml_webhook( call_request: CallRequest, request: Request, user: str = Depends(verify_auth) ): """ Main webhook endpoint for SignalWire call routing Receives POST requests from SignalWire when calls are initiated and returns SWML instructions for routing the call. """ client_ip = get_client_ip(request) logger.info(f"Received SWML webhook for call {call_request.call.call_id} from {client_ip}") logger.debug(f"Call data: {call_request.dict()}") try: swml_router = get_router() # Convert to dict format expected by swml_core call_data = call_request.dict() # Route the call and generate SWML response swml_response = swml_router.route_call(call_data) logger.info(f"Generated SWML response for call {call_request.call.call_id}") logger.debug(f"SWML response: {json.dumps(swml_response, indent=2)}") return swml_response except Exception as e: logger.error(f"Error processing webhook: {str(e)}") raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @app.post("/call/discord/{channel_id}") async def initiate_discord_call( channel_id: str, call_data: Optional[Dict[str, Any]] = None, user: str = Depends(verify_auth) ): """Initiate a call to a Discord channel""" try: if call_data is None: call_data = {} call_data['discord_channel_id'] = channel_id logger.info(f"Initiating Discord call to channel {channel_id}") swml_router = get_router() # Generate SWML for Discord call swml_response = swml_router.route_discord_call(call_data) return swml_response except Exception as e: logger.error(f"Error initiating Discord call: {e}") raise HTTPException(status_code=500, detail=f"Failed to initiate Discord call: {str(e)}") @app.get("/rules") async def get_routing_rules(user: str = Depends(verify_auth)): """Get current routing rules (for debugging)""" try: from .dialplan import get_dialplan dialplan = get_dialplan() routes = dialplan.list_routes() return { "routing_rules": routes, "config": { "sip_domain": config.default_sip_domain, "auth_enabled": bool(config.get('swml', {}).get('auth', {}).get('username')) } } except Exception as e: logger.error(f"Error getting routing rules: {e}") raise HTTPException(status_code=500, detail=str(e)) def create_ssl_context(): """Create SSL context from certificate file if it exists""" # Standard certificate paths to check cert_paths = [ "/app/certs/server.pem", "/app/ssl/server.pem", "./certs/server.pem", "./ssl/server.pem" ] for cert_file in cert_paths: if os.path.exists(cert_file): try: ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) ssl_context.load_cert_chain(cert_file) logger.info(f"SSL auto-enabled with certificate: {cert_file}") return ssl_context except Exception as e: logger.warning(f"Found certificate at {cert_file} but failed to load: {e}") continue logger.info("No SSL certificate found - running HTTP only") return None # Main entry point def run_swml_server(): """Run the SWML web server""" swml_config = config.get('swml', {}) host = swml_config.get('host', '0.0.0.0') port = swml_config.get('port', 9090) # SSL configuration ssl_context = create_ssl_context() protocol = "https" if ssl_context else "http" logger.info(f"Starting SWML Handler on {protocol}://{host}:{port}") auth_config = swml_config.get('auth', {}) if auth_config.get('username'): logger.info("Basic authentication enabled") else: logger.info("No authentication configured") uvicorn.run( app, host=host, port=port, ssl_context=ssl_context ) if __name__ == "__main__": run_swml_server()