diff --git a/call-control b/call-control index 444f78a..0194fc0 100644 --- a/call-control +++ b/call-control @@ -1,121 +1,118 @@ #!/usr/bin/env python -# Copyright (C) 2005-2009 AG Projects. See LICENSE for details. -# - """Implementation of a call controller for OpenSIPS.""" def send_command(command, **kwargs): import socket sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.connect('%s/socket' % process.runtime_directory) sock.sendall('%s\r\n' % command + '\r\n'.join(['%s: %s' % (key, value) for key, value in kwargs.items()]) + '\r\n\r\n') response = '' while True: data = sock.recv(4096) response += data if not data or data.endswith('\r\n\r\n'): break sock.close() for line in response.splitlines(): if line: print line if __name__ == '__main__': import sys from optparse import OptionParser from application.process import process, ProcessError from application import log import callcontrol name = 'call-control' fullname = 'SIP call-control' description = 'Implementation of call-control for SIP' config_directory = '/etc/callcontrol' runtime_directory = '/var/run/callcontrol' default_pid = "%s/%s.pid" % (runtime_directory, name) parser = OptionParser(version="%%prog %s" % callcontrol.__version__) parser.add_option("--no-fork", action="store_false", dest="fork", default=True, help="run the process in the foreground") parser.add_option("--pid", dest="pid_file", default='/var/run/callcontrol/call-control.pid', help="pid file (/var/run/callcontrol/call-control.pid)", metavar="FILE") parser.add_option("--debug", dest="debug", default=None, help="get information about a currently running call-control daemon", metavar="COMMAND") parser.add_option("--terminate", dest="terminate", default=None, help="terminate an on-going session", metavar="CALLID") (options, args) = parser.parse_args() pid_file = options.pid_file process.system_config_directory = config_directory try: process.runtime_directory = runtime_directory except ProcessError, e: log.fatal(str(e)) sys.exit(1) if options.debug is not None and options.terminate is not None: log.error('cannot run with both --debug and --terminate options in the same time') sys.exit(1) if options.debug is not None: if options.debug == '': log.error('must specify debug command') sys.exit(1) try: send_command('debug', show=options.debug, **dict([arg.split('=',1) for arg in args if arg.find('=') >= 0])) except Exception, e: log.error('failed to complete debug command: %s' % e) sys.exit(1) else: sys.exit(0) if options.terminate is not None: if options.terminate == '': log.error('must specify callid to terminate') sys.exit(1) try: send_command('terminate', callid=options.terminate) except Exception, e: log.error('failed to terminate session: %s' % e) else: sys.exit(0) if options.fork: try: process.daemonize(pid_file) except ProcessError, e: log.fatal(str(e)) sys.exit(1) log.start_syslog(name) log.msg("Starting %s %s" % (fullname, callcontrol.__version__)) from callcontrol.controller import CallControlServer if not options.fork: from application.debug.memory import memory_dump try: cserver = CallControlServer() except Exception, e: log.error("failed to create SIP Call Control Server: %s" % e) log.err() sys.exit(1) try: cserver.run() except Exception, e: log.error("failed to start SIP Call Control Server: %s" % e) if not options.fork: memory_dump() diff --git a/callcontrol/__init__.py b/callcontrol/__init__.py index a635aff..cba349e 100644 --- a/callcontrol/__init__.py +++ b/callcontrol/__init__.py @@ -1,10 +1,8 @@ -# Copyright (C) 2005-2010 AG-Projects. See LICENSE for details. -# """SIP Callcontrol""" __version__ = "2.1.0" configuration_filename = 'config.ini' backup_calls_file = 'calls.dat' diff --git a/callcontrol/controller.py b/callcontrol/controller.py index 840d7ed..f7f7dcc 100644 --- a/callcontrol/controller.py +++ b/callcontrol/controller.py @@ -1,562 +1,560 @@ -# Copyright (C) 2005-2010 AG Projects. See LICENSE for details. -# """Implementation of a call control server for OpenSIPS.""" import os import grp import re import cPickle import time from application.configuration import ConfigSection, ConfigSetting from application.process import process from application import log from twisted.internet.protocol import Factory from twisted.protocols.basic import LineOnlyReceiver from twisted.internet import reactor, defer from twisted.python import failure from callcontrol.scheduler import RecurrentCall, KeepRunning from callcontrol.raddb import RadiusDatabase, RadiusDatabaseError from callcontrol.sip import Call from callcontrol.rating import RatingEngineConnections from callcontrol import configuration_filename, backup_calls_file class TimeLimit(int): """A positive time limit (in seconds) or None""" def __new__(typ, value): if value.lower() == 'none': return None try: limit = int(value) except: raise ValueError("invalid time limit value: %r" % value) if limit < 0: raise ValueError("invalid time limit value: %r. should be positive." % value) return limit class TimeoutDetection(str): _values = ('dialog', 'radius') def __init__(self, value): value = value.lower() if value not in self._values: raise ValueError("invalid timeout detection value: %r" % value) if value == 'radius': self.use_radius = True else: self.use_radius = False def __new__(cls, value): return str.__new__(cls, value.lower()) class CallControlConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'CallControl' socket = "%s/socket" % process.runtime_directory group = 'opensips' limit = ConfigSetting(type=TimeLimit, value=None) timeout = 24*60*60 ## timeout calls that are stale for more than 24 hours. setupTime = 90 ## timeout calls that take more than 1'30" to setup. checkInterval = 60 ## check for staled calls and calls that did timeout at every minute. timeout_detection = TimeoutDetection('dialog') ## whether or not to use the radius database to find out terminated calls ## Classes class CommandError(Exception): pass class InvalidRequestError(Exception): pass class CallsMonitor(object): """Check for staled calls and calls that did timeout and were closed by external means""" def __init__(self, period, application): self.application = application self.reccall = RecurrentCall(period, self.run) def run(self): if CallControlConfig.timeout_detection.use_radius: ## Find out terminated calls deferred1 = self.application.db.getTerminatedCalls(self.application.calls) deferred1.addCallbacks(callback=self._clean_calls, errback=self._err_handle, callbackArgs=[self._handle_terminated]) deferred2 = self.application.db.getTimedoutCalls(self.application.calls) deferred2.addCallbacks(callback=self._clean_calls, errback=self._err_handle, callbackArgs=[self._handle_timedout]) defer.DeferredList([deferred1, deferred2]).addCallback(self._finish_checks) else: self._finish_checks(None) return KeepRunning def shutdown(self): self.reccall.cancel() def _clean_calls(self, calls, clean_function): for callid, callinfo in calls.items(): call = self.application.calls.get(callid) if call: self.application.clean_call(callid) clean_function(call, callinfo) def _err_handle(self, fail): log.error("Couldn't query database for terminated/timedout calls: %s" % fail.value) def _handle_terminated(self, call, callinfo): call.end(calltime=callinfo['duration'], reason='calls monitor as terminated') def _handle_timedout(self, call, callinfo): call.end(reason='calls monitor as timedout', sendbye=True) def _finish_checks(self, value): ## Also do the rest of the checking now = time.time() staled = [] nosetup = [] for callid, call in self.application.calls.items(): if not call.complete and (now - call.created >= CallControlConfig.setupTime): self.application.clean_call(callid) nosetup.append(call) elif call.inprogress and call.timer is not None: continue ## this call will be expired by its own timer elif now - call.created >= CallControlConfig.timeout: self.application.clean_call(callid) staled.append(call) ## Terminate staled for call in staled: call.end(reason='calls monitor as staled', sendbye=True) ## Terminate calls that didn't setup in setupTime for call in nosetup: call.end(reason="calls monitor as it didn't setup in %d seconds" % CallControlConfig.setupTime) class CallControlProtocol(LineOnlyReceiver): def lineReceived(self, line): if line.strip() == "": if self.line_buf: self._process() self.line_buf = [] else: self.line_buf.append(line.strip()) def _process(self): try: req = Request(self.line_buf[0], self.line_buf[1:]) except InvalidRequestError, e: self._send_error_reply(failure.Failure(e)) else: # log.debug("Got request: %s" % str(req)) #DEBUG def _unknown_handler(req): req.deferred.errback(failure.Failure(CommandError(req))) try: getattr(self, '_CC_%s' % req.cmd, _unknown_handler)(req) except Exception, e: self._send_error_reply(failure.Failure(e)) else: req.deferred.addCallbacks(callback=self._send_reply, errback=self._send_error_reply) def connectionMade(self): self.line_buf = [] def _send_reply(self, msg): # log.debug('Sent reply: %s' % msg) #DEBUG self.sendLine(msg) def _send_error_reply(self, fail): log.error(fail.value) # log.debug("Sent 'Error' reply") #DEBUG self.sendLine('Error') def _CC_init(self, req): try: call = self.factory.application.calls[req.callid] except KeyError: call = Call(req, self.factory.application) if call.billingParty is None: req.deferred.callback('Error') return self.factory.application.calls[req.callid] = call # log.debug("Call id %s added to list of controlled calls" % (call.callid)) #DEBUG else: if call.token != req.call_token: log.error("Call id %s is duplicated" % call.callid) req.deferred.callback('Duplicated callid') return # The call was previously setup which means it could be in the the users table try: user_calls = self.factory.application.users[call.billingParty] user_calls.remove(call.callid) if len(user_calls) == 0: del self.factory.application.users[call.billingParty] self.factory.application.engines.remove_user(call.billingParty) except (ValueError, KeyError): pass deferred = call.setup(req) deferred.addCallbacks(callback=self._CC_finish_init, errback=self._CC_init_failed, callbackArgs=[req], errbackArgs=[req]) def _CC_finish_init(self, value, req): try: call = self.factory.application.calls[req.callid] except KeyError: log.error("Call id %s disappeared before we could finish initializing it" % req.callid) req.deferred.callback('Error') else: if req.call_limit is not None and len(self.factory.application.users.get(call.billingParty, ())) >= req.call_limit: self.factory.application.clean_call(req.callid) call.end() req.deferred.callback('Call limit reached') elif call.locked: ## prepaid account already locked by another call log.info("Call id %s of %s to %s forbidden because the account is locked" % (req.callid, call.user, call.ruri)) self.factory.application.clean_call(req.callid) call.end() req.deferred.callback('Locked') elif call.timelimit == 0: ## prepaid account with no credit log.info("Call id %s of %s to %s forbidden because credit is too low" % (req.callid, call.user, call.ruri)) self.factory.application.clean_call(req.callid) call.end() req.deferred.callback('No credit') elif req.call_limit is not None or call.timelimit is not None: ## call limited by credit value, a global time limit or number of calls log.info("User %s can make %s concurrent calls" % (call.billingParty, req.call_limit or "unlimited")) self.factory.application.users.setdefault(call.billingParty, []).append(call.callid) req.deferred.callback('Limited') else: ## no limit for call log.info("Call id %s of %s to %s is postpaid not limited" % (req.callid, call.user, call.ruri)) self.factory.application.clean_call(req.callid) call.end() req.deferred.callback('No limit') def _CC_init_failed(self, fail, req): self._send_error_reply(fail) self.factory.application.clean_call(req.callid) def _CC_start(self, req): try: call = self.factory.application.calls[req.callid] except KeyError: req.deferred.callback('Not found') else: call.start(req) req.deferred.callback('Ok') def _CC_stop(self, req): try: call = self.factory.application.calls[req.callid] except KeyError: req.deferred.callback('Not found') else: self.factory.application.clean_call(req.callid) call.end(reason='user') req.deferred.callback('Ok') def _CC_debug(self, req): debuglines = [] if req.show == 'sessions': for callid, call in self.factory.application.calls.items(): if not req.user or call.user.startswith(req.user): debuglines.append('Call id %s of %s to %s: %s' % (callid, call.user, call.ruri, call.status)) elif req.show == 'session': try: call = self.factory.application.calls[req.callid] except KeyError: debuglines.append('Call id %s does not exist' % req.callid) else: for key, value in call.items(): debuglines.append('%s: %s' % (key, value)) req.deferred.callback('\r\n'.join(debuglines)+'\r\n') def _CC_terminate(self, req): try: call = self.factory.application.calls[req.callid] except KeyError: req.deferred.callback('Call id %s does not exist\r\n' % req.callid) else: self.factory.application.clean_call(req.callid) call.end(reason='admin', sendbye=True) req.deferred.callback('Ok\r\n') class CallControlFactory(Factory): protocol = CallControlProtocol def __init__(self, application): self.application = application class CallControlServer(object): def __init__(self, path=None, group=None): self.path = path or CallControlConfig.socket self.group = group or CallControlConfig.group try: os.unlink(self.path) except OSError: pass self.listening = None self.engines = None self.monitor = None if CallControlConfig.timeout_detection.use_radius: self.db = RadiusDatabase() else: self.db = None self.calls = {} self.users = {} self._restore_calls() def clean_call(self, callid): try: call = self.calls[callid] except KeyError: return else: del self.calls[callid] user_calls = self.users.get(call.billingParty, []) try: user_calls.remove(callid) except ValueError: pass if not user_calls: self.users.pop(call.billingParty, None) self.engines.remove_user(call.billingParty) # log.debug("Call id %s removed from the list of controlled calls" % callid) #DEBUG def run(self): ## Add a callback for the startup/shutdown stuff reactor.addSystemEventTrigger('before', 'startup', self.on_startup) reactor.addSystemEventTrigger('before', 'shutdown', self.on_shutdown) ## And start reactor reactor.run() ## And do the shutdown def stop(self): reactor.stop() def on_startup(self): ## First set up listening on the unix socket try: gid = grp.getgrnam(self.group)[2] mode = 0660 except (KeyError, IndexError): gid = -1 mode = 0666 self.listening = reactor.listenUNIX(address=self.path, factory=CallControlFactory(self)) ## Make it writable only to the SIP proxy group members try: os.chown(self.path, -1, gid) os.chmod(self.path, mode) except OSError: log.warn("Couldn't set access rights for %s" % self.path) log.warn("OpenSIPS may not be able to communicate with us!") ## Then setup the CallsMonitor self.monitor = CallsMonitor(CallControlConfig.checkInterval, self) ## Open the connection to the rating engines self.engines = RatingEngineConnections() def on_shutdown(self): should_close = [] if self.listening is not None: self.listening.stopListening() if self.engines is not None: should_close.append(self.engines.shutdown()) if self.monitor is not None: self.monitor.shutdown() if self.db is not None: should_close.append(self.db.close()) d = defer.DeferredList(should_close) d.addBoth(self._save_calls) return d def _save_calls(self, result): if self.calls: log.info('Saving calls') calls_file = '%s/%s' % (process.runtime_directory, backup_calls_file) try: f = open(calls_file, 'w') except: pass else: for call in self.calls.values(): call.application = None ## we will mark timers with 'running' or 'idle', depending on their current state, ## to be able to correctly restore them later (Timer objects cannot be pickled) if call.timer is not None: if call.inprogress: call.timer.cancel() call.timer = 'running' ## temporary mark that this timer was running else: call.timer = 'idle' ## temporary mark that this timer was not running failed_dump = False try: try: cPickle.dump(self.calls, f) except Exception, why: log.warn("Failed to dump call list: %s" % why) failed_dump = True finally: f.close() if failed_dump: try: os.unlink(calls_file) except: pass else: log.info("Saved calls: %s" % str(self.calls.keys())) self.calls = {} def _restore_calls(self): calls_file = '%s/%s' % (process.runtime_directory, backup_calls_file) try: f = open(calls_file, 'r') except: pass else: try: self.calls = cPickle.load(f) except Exception, why: log.warn("Failed to load calls saved in the previous session: %s" % why) f.close() try: os.unlink(calls_file) except: pass if self.calls: log.info("Restoring calls saved previously: %s" % str(self.calls.keys())) ## the calls in the 2 sets below are never overlapping because closed and terminated ## calls have different database fingerprints. so the dictionary update below is safe try: db = self.db if self.db is not None else RadiusDatabase() try: terminated = db.query(RadiusDatabase.RadiusTask(None, 'terminated', calls=self.calls)) ## calls terminated by caller/called didtimeout = db.query(RadiusDatabase.RadiusTask(None, 'timedout', calls=self.calls)) ## calls closed by mediaproxy after a media timeout finally: if self.db is None: db.close() except RadiusDatabaseError, e: log.error("Could not query database: %s" % e) else: for callid, call in self.calls.items(): callinfo = terminated.get(callid) or didtimeout.get(callid) if callinfo: ## call already terminated or did timeout in mediaproxy del self.calls[callid] callinfo['call'] = call call.timer = None continue ## close all calls that were already terminated or did timeout count = 0 for callinfo in terminated.values(): call = callinfo.get('call') if call is not None: call.end(calltime=callinfo['duration']) count += 1 for callinfo in didtimeout.values(): call = callinfo.get('call') if call is not None: call.end(sendbye=True) count += 1 if count > 0: log.info("Removed %d already terminated call%s" % (count, 's'*(count!=1))) for callid, call in self.calls.items(): call.application = self if call.timer == 'running': now = time.time() remain = call.starttime + call.timelimit - now if remain < 0: call.timelimit = int(round(now - call.starttime)) remain = 0 call._setup_timer(remain) call.timer.start() elif call.timer == 'idle': call._setup_timer() # also restore users table self.users.setdefault(call.billingParty, []).append(callid) class Request(object): """A request parsed into a structure based on request type""" __methods = {'init': ('callid', 'diverter', 'ruri', 'sourceip', 'from'), 'start': ('callid', 'dialogid'), 'stop': ('callid',), 'debug': ('show',), 'terminate': ('callid',)} def __init__(self, cmd, params): if cmd not in self.__methods.keys(): raise InvalidRequestError("Unknown request: %s" % cmd) try: parameters = dict([re.split(r':\s+', l, 1) for l in params]) except ValueError: raise InvalidRequestError("Badly formatted request") for p in self.__methods[cmd]: try: parameters[p] except KeyError: raise InvalidRequestError("Missing %s from request" % p) self.cmd = cmd self.deferred = defer.Deferred() self.__dict__.update(parameters) try: getattr(self, '_RE_%s' % self.cmd)() except AttributeError: pass def _RE_init(self): self.from_ = self.__dict__['from'] if self.cmd=='init' and self.diverter.lower()=='none': self.diverter = None try: self.prepaid except AttributeError: self.prepaid = None else: if self.prepaid.lower() == 'true': self.prepaid = True elif self.prepaid.lower() == 'false': self.prepaid = False else: self.prepaid = None try: self.call_limit = int(self.call_limit) except (AttributeError, ValueError): self.call_limit = None else: if self.call_limit <= 0: self.call_limit = None try: self.call_token except AttributeError: self.call_token = None else: if not self.call_token or self.call_token.lower() == 'none': self.call_token = None try: self.sip_application except AttributeError: self.sip_application = '' def _RE_debug(self): if self.show == 'session': try: if not self.callid: raise InvalidRequestError("Missing callid from request") except AttributeError: raise InvalidRequestError("Missing callid from request") elif self.show == 'sessions': try: self.user except AttributeError: self.user = None else: raise InvalidRequestError("Illegal value for 'show' attribute in request") def __str__(self): if self.cmd == 'init': return "%(cmd)s: callid=%(callid)s from=%(from_)s ruri=%(ruri)s diverter=%(diverter)s sourceip=%(sourceip)s prepaid=%(prepaid)s call_limit=%(call_limit)s" % self.__dict__ elif self.cmd == 'start': return "%(cmd)s: callid=%(callid)s dialogid=%(dialogid)s" % self.__dict__ elif self.cmd == 'stop': return "%(cmd)s: callid=%(callid)s" % self.__dict__ elif self.cmd == 'debug': return "%(cmd)s: show=%(show)s" % self.__dict__ elif self.cmd == 'terminate': return "%(cmd)s: callid=%(callid)s" % self.__dict__ else: return object.__str__(self) diff --git a/callcontrol/opensips.py b/callcontrol/opensips.py index 3edc8c8..1543210 100644 --- a/callcontrol/opensips.py +++ b/callcontrol/opensips.py @@ -1,181 +1,179 @@ -# Copyright (C) 2006-2009 AG Projects. -# """The OpenSIPS Management Interface""" import socket from collections import deque from twisted.internet import reactor, defer from twisted.internet.protocol import DatagramProtocol from twisted.internet.error import CannotListenError from twisted.python.failure import Failure from application.configuration import ConfigSection from application.python.types import Singleton from application.process import process from application.system import unlink from application import log from callcontrol import configuration_filename class OpenSIPSConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'OpenSIPS' socket_path = '/var/run/opensips/socket' max_connections = 10 class Error(Exception): pass class CommandError(Error): pass class TimeoutError(Error): pass class NegativeReplyError(Error): pass class DialogID(str): def __new__(cls, did): if did is None: return None try: h_entry, h_id = did.split(':') except: log.error("invalid dialog_id value: `%s'" % did) return None instance = str.__new__(cls, did) instance.h_entry = h_entry instance.h_id = h_id return instance class Request(object): def __init__(self, command): self.command = command self.deferred = defer.Deferred() class UNIXSocketProtocol(DatagramProtocol): noisy = False def datagramReceived(self, data, address): deferred = self.transport.deferred if deferred is None or deferred.called: return # accumulate in a buffer until message end (do this later when implemented by opensips) -Dan if not data: failure = Failure(CommandError("Empty reply from OpenSIPS")) deferred.errback(failure) return try: status, msg = data.split('\n', 1) except ValueError: failure = Failure(CommandError("Missing line terminator after status line in OpenSIPS reply")) deferred.errback(failure) return if status.upper() == '200 OK': deferred.callback(msg) else: deferred.errback(Failure(NegativeReplyError(status))) class UNIXSocketConnection(object): timeout = 3 def __init__(self, socket_path): self._initialized = False self.path = socket_path self.transport = reactor.listenUNIXDatagram(self.path, UNIXSocketProtocol()) reactor.addSystemEventTrigger('during', 'shutdown', self.close) self.transport.deferred = None ## placeholder for the deferred used by a request self._initialized = True def close(self): if self._initialized: self.transport.stopListening() unlink(self.path) def _get_deferred(self): return self.transport.deferred def _set_deferred(self, d): self.transport.deferred = d deferred = property(_get_deferred, _set_deferred) def _did_timeout(self, deferred): if deferred.called: return deferred.errback(Failure(TimeoutError("OpenSIPS command did timeout"))) def send(self, request): self.deferred = request.deferred try: self.transport.write(request.command, OpenSIPSConfig.socket_path) except socket.error, why: log.error("cannot write request to %s: %s" % (OpenSIPSConfig.socket_path, why[1])) self.deferred.errback(Failure(CommandError("Cannot send request to OpenSIPS"))) else: reactor.callLater(self.timeout, self._did_timeout, self.deferred) class UNIXSocketConnectionPool(object): """Pool of UNIX socket connection to OpenSIPS""" def __init__(self, max_connections=10, pool_id=''): assert max_connections > 0, 'maximum should be > 0' self.max = max_connections self.id = pool_id self.workers = 0 self.waiters = deque() self.connections = deque() def _create_connections_as_needed(self): while self.workers < self.max and len(self.waiters) > len(self.connections): socket_name = "opensips_%s%02d.sock" % (self.id, self.workers+1) socket_path = process.runtime_file(socket_name) unlink(socket_path) try: conn = UNIXSocketConnection(socket_path) except CannotListenError, why: log.error("cannot create an OpenSIPS UNIX socket connection: %s" % str(why)) break self.connections.append(conn) self.workers += 1 def _release_connection(self, result, conn): self.connections.append(conn) self._process_waiters() return result def _process_waiters(self): while self.waiters: try: conn = self.connections.popleft() except IndexError: return request = self.waiters.popleft() request.deferred.addBoth(self._release_connection, conn) conn.send(request) def defer_to_connection(self, command): request = Request(command) self.waiters.append(request) self._create_connections_as_needed() self._process_waiters() return request.deferred class ManagementInterface(object): __metaclass__ = Singleton def __init__(self): self.pool = UNIXSocketConnectionPool(OpenSIPSConfig.max_connections) ## Reply handlers __RH_xxx def __RH_end_dialog(self, result): if isinstance(result, Failure): log.error("failed to end dialog: %s" % result.value) return False return True def end_dialog(self, dialog_id): cmd = ':dlg_end_dlg:\n%s\n%s\n\n' % (dialog_id.h_entry, dialog_id.h_id) return self.pool.defer_to_connection(cmd).addBoth(self.__RH_end_dialog) diff --git a/callcontrol/raddb.py b/callcontrol/raddb.py index 9e06e05..28f387c 100644 --- a/callcontrol/raddb.py +++ b/callcontrol/raddb.py @@ -1,156 +1,154 @@ -# Copyright (C) 2005-2009 AG Projects. See LICENSE for details. -# """RadiusDatabase related API""" import time import sqlobject from application.configuration import ConfigSection from application.python.queue import EventQueue from application import log from twisted.internet import defer, reactor, threads from twisted.python import failure from callcontrol import configuration_filename class RadacctTable(str): """A radacct table name""" @property def normalized(self): return time.strftime(self) class RadiusDatabaseConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'RadiusDatabase' user = '' password = '' host = 'localhost' database = 'radius' table = RadacctTable('radacct%Y%m') sessionIdField = 'AcctSessionId' durationField = 'AcctSessionTime' stopTimeField = 'AcctStopTime' stopInfoField = 'ConnectInfo_stop' mediaInfoField = 'MediaInfo' fromTagField = 'SipFromTag' toTagField = 'SipToTag' class RadiusDatabaseError(Exception): pass class RadiusDatabase(object): """Interface with the Radius database""" class RadiusTask(object): def __init__(self, deferred, tasktype, **kwargs): self.deferred = deferred self.tasktype = tasktype self.args = kwargs def __init__(self): self.queue = EventQueue(handler=self._handle_task, name='RadiusQueue') self.queue.start() credentials = RadiusDatabaseConfig.user and ( "%s%s@" % (RadiusDatabaseConfig.user, RadiusDatabaseConfig.password and ":%s" % (RadiusDatabaseConfig.password) or '') ) or '' self.conn = sqlobject.connectionForURI("mysql://%s%s/%s" % (credentials, RadiusDatabaseConfig.host, RadiusDatabaseConfig.database)) def close(self): return threads.deferToThread(self._close) def _close(self): self.queue.stop() self.queue.join() self.conn.close() def getTerminatedCalls(self, calls): """ Retrieve those calls from the ones in progress that were already terminated by caller/called. Returns a Deferred. Callback will be called with list of call ids. """ deferred = defer.Deferred() self.queue.put(RadiusDatabase.RadiusTask(deferred, 'terminated', calls=calls)) return deferred def getTimedoutCalls(self, calls): """ Retrieve those calls from the ones in progress that did timeout and were closed by mediaproxy. Returns a Deferred. Callback will be called with list of call ids. """ deferred = defer.Deferred() self.queue.put(RadiusDatabase.RadiusTask(deferred, 'timedout', calls=calls)) return deferred def query(self, task): def _unknown_task(task): raise RadiusDatabaseError("Got unknown task to handle: %s" % task.tasktype) return getattr(self, '_RD_%s' % task.tasktype, _unknown_task)(task) def _handle_task(self, task): try: reactor.callFromThread(task.deferred.callback, self.query(task)) except Exception, e: reactor.callFromThread(task.deferred.errback, failure.Failure(e)) def _RD_terminated(self, task): try: calls = dict([(call.callid, call) for call in task.args['calls'].values() if call.inprogress]) if not calls: return {} ids = "(%s)" % ','.join(["'" + key + "'" for key in calls.keys()]) query = """SELECT %(session_id_field)s AS callid, %(duration_field)s AS duration, %(from_tag_field)s AS fromtag, %(to_tag_field)s AS totag FROM %(table)s WHERE %(session_id_field)s IN %(ids)s AND (%(stop_info_field)s IS NOT NULL OR %(stop_time_field)s IS NOT NULL)""" % {'session_id_field': RadiusDatabaseConfig.sessionIdField, 'duration_field': RadiusDatabaseConfig.durationField, 'from_tag_field': RadiusDatabaseConfig.fromTagField, 'to_tag_field': RadiusDatabaseConfig.toTagField, 'stop_info_field': RadiusDatabaseConfig.stopInfoField, 'stop_time_field': RadiusDatabaseConfig.stopTimeField, 'table': RadiusDatabaseConfig.table.normalized, 'ids': ids} rows = self.conn.queryAll(query) def find(row, calls): try: call = calls[row[0]] except KeyError: return False return call.fromtag==row[2] and call.totag==row[3] return dict([(row[0], {'callid': row[0], 'duration': row[1], 'fromtag': row[2], 'totag': row[3]}) for row in rows if find(row, calls)]) except Exception, e: log.error("Query failed: %s" % query) raise RadiusDatabaseError("Exception while querying for terminated calls %s." % e) def _RD_timedout(self, task): try: calls = dict([(call.callid, call) for call in task.args['calls'].values() if call.inprogress]) if not calls: return {} ids = "(%s)" % ','.join(["'" + key + "'" for key in calls.keys()]) query = '''SELECT %(session_id_field)s AS callid, %(duration_field)s AS duration, %(from_tag_field)s AS fromtag, %(to_tag_field)s AS totag FROM %(table)s WHERE %(session_id_field)s IN %(ids)s AND %(media_info_field)s = 'timeout' AND %(stop_info_field)s IS NULL''' % {'session_id_field': RadiusDatabaseConfig.sessionIdField, 'duration_field': RadiusDatabaseConfig.durationField, 'from_tag_field': RadiusDatabaseConfig.fromTagField, 'to_tag_field': RadiusDatabaseConfig.toTagField, 'media_info_field': RadiusDatabaseConfig.mediaInfoField, 'stop_info_field': RadiusDatabaseConfig.stopInfoField, 'table': RadiusDatabaseConfig.table.normalized, 'ids': ids} rows = self.conn.queryAll(query) def find(row, calls): try: call = calls[row[0]] except KeyError: return False return call.fromtag==row[2] and call.totag==row[3] return dict([(row[0], {'callid': row[0], 'duration': row[1], 'fromtag': row[2], 'totag': row[3]}) for row in rows if find(row, calls)]) except Exception, e: log.error("Query failed: %s" % query) raise RadiusDatabaseError("Exception while querying for timedout calls %s." % e) diff --git a/callcontrol/rating/__init__.py b/callcontrol/rating/__init__.py index e2757fe..ed3ff21 100644 --- a/callcontrol/rating/__init__.py +++ b/callcontrol/rating/__init__.py @@ -1,349 +1,347 @@ -# Copyright (C) 2005-2014 AG Projects. See LICENSE for details. -# """Rating engine interface implementation.""" import random from collections import deque from application.configuration import ConfigSection, ConfigSetting from application.configuration.datatypes import EndpointAddress from application import log from application.python.types import Singleton from twisted.internet.protocol import ReconnectingClientFactory from twisted.internet.error import TimeoutError from twisted.internet import reactor, defer from twisted.protocols.basic import LineOnlyReceiver from twisted.python import failure from callcontrol import configuration_filename ## ## Rating engine configuration ## class ThorNodeConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'ThorNetwork' enabled = False class RatingConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'CDRTool' timeout = 500 class TimeLimit(int): """A positive time limit (in seconds) or None""" def __new__(typ, value): if value.lower() == 'none': return None try: limit = int(value) except: raise ValueError("invalid time limit value: %r" % value) if limit < 0: raise ValueError("invalid time limit value: %r. should be positive." % value) return limit class CallControlConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'CallControl' prepaid_limit = ConfigSetting(type=TimeLimit, value=None) limit = ConfigSetting(type=TimeLimit, value=None) class RatingError(Exception): pass class RatingEngineError(RatingError): pass class RatingEngineTimeoutError(TimeoutError): pass class RatingRequest(str): def __init__(self, command, reliable=True, **kwargs): self.command = command self.reliable = reliable self.kwargs = kwargs self.deferred = defer.Deferred() def __new__(cls, command, reliable=True, **kwargs): reqstr = command + (kwargs and (' ' + ' '.join("%s=%s" % (name, value) for name, value in kwargs.items())) or '') obj = str.__new__(cls, reqstr) return obj class RatingEngineProtocol(LineOnlyReceiver): delimiter = '\n\n' def __init__(self): self.connected = False self.__request = None self.__timeout_call = None self._request_queue = deque() def connectionMade(self): log.info("Connected to Rating Engine at %s:%d" % (self.transport.getPeer().host, self.transport.getPeer().port)) self.connected = True self.factory.application.connectionMade(self.transport.connector) if self._request_queue: self._send_next_request() def connectionLost(self, reason=None): log.info("Disconnected from Rating Engine at %s:%d" % (self.transport.getPeer().host, self.transport.getPeer().port)) self.connected = False if self.__request is not None: if self.__request.reliable: self._request_queue.appendleft(self.__request) self.__request = None else: self._respond("Connection with the Rating Engine is down: %s" % reason, success=False) self.factory.application.connectionLost(self.transport.connector, reason, self) def timeoutConnection(self): log.info("Connection to Rating Engine at %s:%d timedout" % (self.transport.getPeer().host, self.transport.getPeer().port)) self.transport.loseConnection(RatingEngineTimeoutError()) def lineReceived(self, line): # log.debug("Got reply from rating engine: %s" % line) #DEBUG if not line: return if self.__timeout_call is not None: self.__timeout_call.cancel() if self.__request is None: log.warn("Got reply for unexisting request: %s" % line) return try: self._respond(getattr(self, '_PE_%s' % self.__request.command.lower())(line)) except AttributeError: self._respond("Unknown command in request. Cannot handle reply. Reply is: %s" % line, success=False) except Exception, e: self._respond(str(e), success=False) def _PE_maxsessiontime(self, line): lines = line.splitlines() try: limit = lines[0].strip().capitalize() except IndexError: raise ValueError("Empty reply from rating engine") try: limit = int(limit) except: if limit == 'None': limit = None elif limit == 'Locked': pass else: raise ValueError("limit must be a non-negative number, None or Locked: %s" % limit) else: if limit < 0: raise ValueError("limit must be a non-negative number, None or Locked: %s" % limit) info = dict(line.split('=', 1) for line in lines[1:]) if 'type' in info: type = info['type'].lower() if type == 'prepaid': prepaid = True elif type == 'postpaid': prepaid = False else: raise ValueError("prepaid must be either True or False: %s" % prepaid) else: prepaid = limit is not None return limit, prepaid def _PE_debitbalance(self, line): valid_answers = ('Ok', 'Failed', 'Not prepaid') lines = line.splitlines() try: result = lines[0].strip().capitalize() except IndexError: raise ValueError("Empty reply from rating engine") if result not in valid_answers: log.error("Invalid reply from rating engine: `%s'" % lines[0].strip()) log.warn("Rating engine possible failed query: %s" % self.__request) raise RatingEngineError('Invalid rating engine response') elif result == 'Failed': log.warn("Rating engine failed query: %s" % self.__request) raise RatingEngineError('Rating engine failed query') else: try: timelimit = int(lines[1].split('=', 1)[1].strip()) totalcost = lines[2].strip() except: log.error("Invalid reply from rating engine for DebitBalance on lines 2, 3: `%s'" % ("', `".join(lines[1:3]))) timelimit = None totalcost = 0 return timelimit, totalcost def _send_next_request(self): if self.connected: self.__request = self._request_queue.popleft() self.sendLine(self.__request) self._set_timeout() # log.debug("Sent request to rating engine: %s" % self.__request) #DEBUG else: self.__request = None def _respond(self, result, success=True): if self.__request is not None: req = self.__request self.__request = None try: if success: req.deferred.callback(result) else: req.deferred.errback(failure.Failure(RatingEngineError(result))) except defer.AlreadyCalledError: log.debug("Request %s was already responded to" % str(req)) if self._request_queue: self._send_next_request() def _set_timeout(self, timeout=None): if timeout is None: timeout = self.factory.timeout self.__timeout_call = reactor.callLater(timeout/1000.0, self.timeoutConnection) def send_request(self, request): if not request.reliable and not self.connected: request.deferred.errback(failure.Failure(RatingEngineError("Connection with the Rating Engine is down"))) return request self._request_queue.append(request) if self.__request is None: self._send_next_request() return request class RatingEngineFactory(ReconnectingClientFactory): protocol = RatingEngineProtocol timeout = RatingConfig.timeout # reconnect parameters maxDelay = 15 factor = maxDelay initialDelay = 1.0/factor delay = initialDelay def __init__(self, application): self.application = application def buildProtocol(self, addr): self.resetDelay() return ReconnectingClientFactory.buildProtocol(self, addr) def clientConnectionFailed(self, connector, reason): if self.application.disconnecting: return ReconnectingClientFactory.clientConnectionFailed(self, connector, reason) def clientConnectionLost(self, connector, reason): if self.application.disconnecting: return ReconnectingClientFactory.clientConnectionLost(self, connector, reason) class DummyRatingEngine(object): def getCallLimit(self, call, max_duration=CallControlConfig.prepaid_limit, reliable=True): return defer.fail(failure.Failure(RatingEngineError("Connection with the Rating Engine not yet established"))) def debitBalance(self, call, reliable=True): return defer.fail(failure.Failure(RatingEngineError("Connection with the Rating Engine not yet established"))) class RatingEngine(object): def __init__(self, address): self.address = address self.disconnecting = False self.connector = reactor.connectTCP(self.address[0], self.address[1], factory=RatingEngineFactory(self)) self.connection = None self.__unsent_req = deque() def shutdown(self): self.disconnecting = True self.connector.disconnect() def connectionMade(self, connector): self.connection = connector.transport self.connection.protocol._request_queue.extend(self.__unsent_req) for req in self.__unsent_req: log.debug("Requeueing request for the rating engine: %s" % (req,)) self.__unsent_req.clear() def connectionLost(self, connector, reason, protocol): while protocol._request_queue: req = protocol._request_queue.pop() if not req.reliable: log.debug("Request is considered failed: %s" % (req,)) req.deferred.errback(failure.Failure(RatingEngineError("Connection with the Rating Engine is down"))) else: log.debug("Saving request to be requeued later: %s" % (req,)) self.__unsent_req.appendleft(req) self.connection = None def getCallLimit(self, call, max_duration=CallControlConfig.prepaid_limit, reliable=True): max_duration = max_duration or CallControlConfig.limit or 36000 args = {} if call.inprogress: args['State'] = 'Connected' req = RatingRequest('MaxSessionTime', reliable=reliable, CallId=call.callid, From=call.billingParty, To=call.ruri, Gateway=call.sourceip, Duration=max_duration, Application=call.sip_application, **args) if self.connection is not None: return self.connection.protocol.send_request(req).deferred else: self.__unsent_req.append(req) return req.deferred def debitBalance(self, call, reliable=True): req = RatingRequest('DebitBalance', reliable=reliable, CallId=call.callid, From=call.billingParty, To=call.ruri, Gateway=call.sourceip, Duration=call.duration, Application=call.sip_application) if self.connection is not None: return self.connection.protocol.send_request(req).deferred else: self.__unsent_req.append(req) return req.deferred class RatingEngineAddress(EndpointAddress): default_port = 9024 name = 'rating engine address' class RatingEngineConnections(object): __metaclass__ = Singleton def __init__(self): self.user_connections = {} if not ThorNodeConfig.enabled: from callcontrol.rating.backends.opensips import OpensipsBackend self.backend = OpensipsBackend() else: from callcontrol.rating.backends.sipthor import SipthorBackend self.backend = SipthorBackend() @staticmethod def getConnection(call=None): engines = RatingEngineConnections() try: conn = random.choice(engines.backend.connections) except IndexError: return DummyRatingEngine() if call is None: return conn return engines.user_connections.setdefault(call.billingParty, conn) def remove_user(self, user): try: del self.user_connections[user] except KeyError: pass def shutdown(self): return defer.maybeDeferred(self.backend.shutdown) diff --git a/callcontrol/rating/backends/__init__.py b/callcontrol/rating/backends/__init__.py index 9c201e1..e69de29 100644 --- a/callcontrol/rating/backends/__init__.py +++ b/callcontrol/rating/backends/__init__.py @@ -1,2 +0,0 @@ -# Copyright (C) 2014 AG Projects. See LICENSE for details. -# diff --git a/callcontrol/rating/backends/opensips.py b/callcontrol/rating/backends/opensips.py index 003bb5f..575664e 100644 --- a/callcontrol/rating/backends/opensips.py +++ b/callcontrol/rating/backends/opensips.py @@ -1,48 +1,47 @@ -# Copyright (C) 2014 AG Projects. See LICENSE for details. -# + import socket from application.configuration import ConfigSection, ConfigSetting from application.system import host from application import log from callcontrol import configuration_filename from callcontrol.rating import RatingEngine, RatingEngineAddress ## ## Rating engine configuration ## class RatingEngineAddresses(list): def __new__(cls, engines): engines = engines.split() engines = [RatingEngineAddress(engine) for engine in engines] return engines class RatingConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'CDRTool' address = ConfigSetting(type=RatingEngineAddresses, value=[]) timeout = 500 class OpensipsBackend(object): def __init__(self): self.connections = [] if not RatingConfig.address: try: RatingConfig.address = RatingEngineAddresses('cdrtool.' + socket.gethostbyaddr(host.default_ip)[0].split('.', 1)[1]) except Exception: log.fatal('Cannot resolve hostname %s' % ('cdrtool.' + socket.gethostbyaddr(host.default_ip)[0].split('.', 1)[1])) for engine in RatingConfig.address: self.connections.append(RatingEngine(engine)) def shutdown(self): for connection in self.connections: connection.shutdown() diff --git a/callcontrol/rating/backends/sipthor.py b/callcontrol/rating/backends/sipthor.py index 83cbae5..4cdc791 100644 --- a/callcontrol/rating/backends/sipthor.py +++ b/callcontrol/rating/backends/sipthor.py @@ -1,134 +1,130 @@ -# Copyright (C) 2014 AG-Projects. -# -# This module is proprietary to AG Projects. Use of this module by third -# parties is not supported. from application.version import Version from application.configuration import ConfigSection, ConfigSetting from application.system import host from application import log from application.python.types import Singleton from gnutls.interfaces.twisted import X509Credentials from gnutls.constants import COMP_DEFLATE, COMP_LZO, COMP_NULL from thor import __version__ as thor_version from thor.eventservice import EventServiceClient, ThorEvent from thor.entities import ThorEntitiesRoleMap, GenericThorEntity as ThorEntity from callcontrol.tls import Certificate, PrivateKey from callcontrol.rating import RatingEngine, RatingEngineAddress from callcontrol import configuration_filename, __version__ from twisted.internet import defer, reactor if Version.parse(thor_version) < '1.1.21': raise RuntimeError('Thor version is smaller the 1.1.21 (%s)' % thor_version) class ThorNodeConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'ThorNetwork' enabled = False domain = "sipthor.net" multiply = 1000 certificate = ConfigSetting(type=Certificate, value=None) private_key = ConfigSetting(type=PrivateKey, value=None) ca = ConfigSetting(type=Certificate, value=None) class CallcontrolNode(EventServiceClient): __metaclass__ = Singleton topics = ["Thor.Members"] def __init__(self): self.node = ThorEntity(host.default_ip, ['call_control'], version=__version__) self.networks = {} self.rating_connections = {} self.presence_message = ThorEvent('Thor.Presence', self.node.id) self.shutdown_message = ThorEvent('Thor.Leave', self.node.id) credentials = X509Credentials(ThorNodeConfig.certificate, ThorNodeConfig.private_key, [ThorNodeConfig.ca]) credentials.verify_peer = True credentials.session_params.compressions = (COMP_LZO, COMP_DEFLATE, COMP_NULL) EventServiceClient.__init__(self, ThorNodeConfig.domain, credentials) def publish(self, event): self._publish(event) def stop(self): return self._shutdown() def connectionLost(self, connector, reason): """Called when an event server connection goes away""" self.connections.discard(connector.transport) def connectionFailed(self, connector, reason): """Called when an event server connection has an unrecoverable error""" connector.failed = True def _disconnect_all(self, result): for conn in self.connectors: conn.disconnect() def _shutdown(self): if self.disconnecting: return self.disconnecting = True self.dns_monitor.cancel() if self.advertiser: self.advertiser.cancel() if self.shutdown_message: self._publish(self.shutdown_message) requests = [conn.protocol.unsubscribe(*self.topics) for conn in self.connections] d = defer.DeferredList([request.deferred for request in requests]) d.addCallback(self._disconnect_all) return d def handle_event(self, event): reactor.callFromThread(self._handle_event, event) def _handle_event(self, event): networks = self.networks role_map = ThorEntitiesRoleMap(event.message) # mapping between role names and lists of nodes with that role role = 'rating_server' try: network = networks[role] except KeyError: from thor import network as thor_network network = thor_network.new(ThorNodeConfig.multiply) networks[role] = network new_nodes = set([node.ip for node in role_map.get(role, [])]) old_nodes = set(network.nodes) added_nodes = new_nodes - old_nodes removed_nodes = old_nodes - new_nodes if added_nodes: for node in added_nodes: network.add_node(node) address = RatingEngineAddress(node) self.rating_connections[address] = RatingEngine(address) plural = 's' if len(added_nodes) != 1 else '' log.msg("Added rating node%s: %s" % (plural, ', '.join(added_nodes))) if removed_nodes: for node in removed_nodes: network.remove_node(node) address = RatingEngineAddress(node) self.rating_connections[address].shutdown() del self.rating_connections[address] plural = 's' if len(removed_nodes) != 1 else '' log.msg("Removed rating node%s: %s" % (plural, ', '.join(removed_nodes))) class SipthorBackend(object): def __init__(self): self.node = CallcontrolNode() @property def connections(self): return self.node.rating_connections.values() def shutdown(self): for connection in self.connections: connection.shutdown() return self.node.stop() diff --git a/callcontrol/scheduler.py b/callcontrol/scheduler.py index eea8f52..991ebd7 100644 --- a/callcontrol/scheduler.py +++ b/callcontrol/scheduler.py @@ -1,48 +1,46 @@ -# Copyright (C) 2007-2008 Dan Pascu . See LICENSE for details. -# """Schedule calls on a twisted reactor""" __all__ = ['RecurrentCall', 'KeepRunning'] from time import time class KeepRunning: """Return this class from a recurrent function to indicate that it should keep running""" pass class RecurrentCall(object): """Execute a function repeatedly at the given interval, until signaled to stop""" def __init__(self, period, func, *args, **kwargs): from twisted.internet import reactor self.func = func self.args = args self.kwargs = kwargs self.period = period self.now = None self.next = None self.callid = reactor.callLater(period, self) def __call__(self): from twisted.internet import reactor self.callid = None if self.now is None: self.now = time() self.next = self.now + self.period else: self.now, self.next = self.next, self.next + self.period result = self.func(*self.args, **self.kwargs) if result is KeepRunning: delay = max(self.next-time(), 0) self.callid = reactor.callLater(delay, self) def cancel(self): if self.callid is not None: try: self.callid.cancel() except ValueError: pass self.callid = None diff --git a/callcontrol/sip.py b/callcontrol/sip.py index 12f2cd9..eb1d175 100644 --- a/callcontrol/sip.py +++ b/callcontrol/sip.py @@ -1,326 +1,324 @@ -# Copyright (C) 2005-2010 AG Projects. See LICENSE for details. -# """ Implementation of Call objects used to store call information and manage a call. """ import time import re from application import log from twisted.internet.error import AlreadyCalled from twisted.internet import reactor, defer from callcontrol.rating import RatingEngineConnections from callcontrol.opensips import DialogID, ManagementInterface class CallError(Exception): pass ## ## Call data types ## class ReactorTimer(object): def __init__(self, delay, function, args=[], kwargs={}): self.calldelay = delay self.function = function self.args = args self.kwargs = kwargs self.dcall = None def start(self): if self.dcall is None: self.dcall = reactor.callLater(self.calldelay, self.function, *self.args, **self.kwargs) def cancel(self): if self.dcall is not None: try: self.dcall.cancel() except AlreadyCalled: self.dcall = None def delay(self, seconds): if self.dcall is not None: try: self.dcall.delay(seconds) except AlreadyCalled: self.dcall = None def reset(self, seconds): if self.dcall is not None: try: self.dcall.reset(seconds) except AlreadyCalled: self.dcall = None class Structure(dict): def __init__(self): dict.__init__(self) def __getitem__(self, key): elements = key.split('.') obj = self ## start with ourselves for e in elements: if not isinstance(obj, dict): raise TypeError("unsubscriptable object") obj = dict.__getitem__(obj, e) return obj def __setitem__(self, key, value): self.__dict__[key] = value dict.__setitem__(self, key, value) def __delitem__(self, key): dict.__delitem__(self, key) del self.__dict__[key] __setattr__ = __setitem__ def __delattr__(self, name): try: del self.__dict__[name] except KeyError: raise AttributeError("'%s' object has no attribute '%s'" % (self.__class__.__name__, name)) else: dict.__delitem__(self, name) def update(self, other): dict.update(self, other) for key, value in other.items(): self.__dict__[key] = value class Call(Structure): """Defines a call""" def __init__(self, request, application): Structure.__init__(self) self.prepaid = request.prepaid self.locked = False ## if the account is locked because another call is in progress self.expired = False ## if call did consume its timelimit before being terminated self.created = time.time() self.timer = None self.starttime = None self.endtime = None self.timelimit = None self.duration = 0 self.callid = request.callid self.dialogid = None self.diverter = request.diverter self.ruri = request.ruri self.sourceip = request.sourceip self.token = request.call_token self.sip_application = request.sip_application self['from'] = request.from_ ## from is a python keyword ## Determine who will pay for the call if self.diverter is not None: self.billingParty = 'sip:%s' % self.diverter self.user = self.diverter else: match = re.search(r'(?P
sip:(?P([^@]+@)?[^\s:;>]+))', request.from_) if match is not None: self.billingParty = match.groupdict()['address'] self.user = match.groupdict()['user'] else: self.billingParty = None self.user = None self.__initialized = False self.application = application def __str__(self): return ("callid=%(callid)s from=%(from)s ruri=%(ruri)s " "diverter=%(diverter)s sourceip=%(sourceip)s " "timelimit=%(timelimit)s status=%%s" % self % self.status) def __expire(self): self.expired = True self.application.clean_call(self.callid) self.end(reason='call control', sendbye=True) def setup(self, request): """ Perform call setup when first called (determine time limit and add timer). If call was previously setup but did not start yet, and the new request changes call parameters (ruri, diverter, ...), then update the call parameters and redo the setup to update the timer and time limit. """ deferred = defer.Deferred() if not self.__initialized: ## setup called for the first time if self.prepaid: rating = RatingEngineConnections.getConnection(self) rating.getCallLimit(self, reliable=False).addCallbacks(callback=self._setup_finish_calllimit, errback=self._setup_error, callbackArgs=[deferred], errbackArgs=[deferred]) else: deferred.addCallback(self._setup_finish_timelimit) deferred.callback(None) return deferred elif self.__initialized and self.starttime is None: if self.diverter != request.diverter or self.ruri != request.ruri: ## call parameters have changed. ## unlock previous rating request self.prepaid = request.prepaid if self.prepaid: rating = RatingEngineConnections.getConnection(self) if not self.locked: rating.debitBalance(self).addCallbacks(callback=self._setup_finish_debitbalance, errback=self._setup_error, callbackArgs=[request, deferred], errbackArgs=[deferred]) else: rating.getCallLimit(self, reliable=False).addCallbacks(callback=self._setup_finish_calllimit, errback=self._setup_error, callbackArgs=[deferred], errbackArgs=[deferred]) else: deferred.addCallback(self._setup_finish_timelimit) deferred.callback(None) return deferred else: deferred.callback(None) return deferred def _setup_finish_timelimit(self, result): from callcontrol.controller import CallControlConfig self.timelimit = CallControlConfig.limit if self.timelimit is not None and self.timelimit > 0: self._setup_timer() self.__initialized = True def _setup_finish_calllimit(self, (limit, prepaid), deferred): if limit == 'Locked': self.timelimit = 0 self.locked = True elif limit is not None: self.timelimit = limit else: from callcontrol.controller import CallControlConfig self.timelimit = CallControlConfig.limit if self.prepaid and not prepaid: self.timelimit = 0 deferred.errback(CallError("Caller %s is regarded as postpaid by the rating engine and prepaid by OpenSIPS" % self.user)) return else: self.prepaid = prepaid and limit is not None if self.timelimit is not None and self.timelimit > 0: self._setup_timer() self.__initialized = True deferred.callback(None) def _setup_finish_debitbalance(self, value, request, deferred): ## update call paramaters self.diverter = request.diverter self.ruri = request.ruri if self.diverter is not None: self.billingParty = 'sip:%s' % self.diverter ## update time limit and timer rating = RatingEngineConnections.getConnection(self) rating.getCallLimit(self, reliable=False).addCallbacks(callback=self._setup_finish_calllimit, errback=self._setup_error, callbackArgs=[deferred], errbackArgs=[deferred]) def _setup_timer(self, timeout=None): if timeout is None: timeout = self.timelimit self.timer = ReactorTimer(timeout, self.__expire) def _setup_error(self, fail, deferred): deferred.errback(fail) def start(self, request): assert self.__initialized, "Trying to start an unitialized call" if self.starttime is None: self.dialogid = DialogID(request.dialogid) self.starttime = time.time() if self.timer is not None: log.info("Call id %s of %s to %s started for maximum %d seconds" % (self.callid, self.user, self.ruri, self.timelimit)) self.timer.start() # also reset all calls of user to this call's timelimit # no reason to alter other calls if this call is not prepaid if self.prepaid: rating = RatingEngineConnections.getConnection(self) rating.getCallLimit(self).addCallbacks(callback=self._start_finish_calllimit, errback=self._start_error) for callid in self.application.users[self.billingParty]: if callid == self.callid: continue call = self.application.calls[callid] if not call.prepaid: continue # only alter prepaid calls if call.inprogress: call.timelimit = self.starttime - call.starttime + self.timelimit if call.timer: call.timer.reset(self.timelimit) log.info("Call id %s of %s to %s also set to %d seconds" % (callid, call.user, call.ruri, self.timelimit)) elif not call.complete: call.timelimit = self.timelimit call._setup_timer() def _start_finish_calllimit(self, (limit, prepaid)): if limit not in (None, 'Locked'): delay = limit - self.timelimit for callid in self.application.users[self.billingParty]: call = self.application.calls[callid] if not call.prepaid: continue # only alter prepaid calls if call.inprogress: call.timelimit += delay if call.timer: call.timer.delay(delay) log.info("Call id %s of %s to %s %s maximum %d seconds" % (callid, call.user, call.ruri, (call is self) and 'connected for' or 'previously connected set to', limit)) elif not call.complete: call.timelimit = self.timelimit call._setup_timer() def _start_error(self, fail): log.info("Could not get call limit for call id %s of %s to %s" % (self.callid, self.user, self.ruri)) def end(self, calltime=None, reason=None, sendbye=False): if sendbye and self.dialogid is not None: ManagementInterface().end_dialog(self.dialogid) if self.timer: self.timer.cancel() self.timer = None fullreason = '%s%s' % (self.inprogress and 'disconnected' or 'canceled', reason and (' by %s' % reason) or '') if self.inprogress: self.endtime = time.time() duration = self.endtime - self.starttime if calltime: ## call did timeout and was ended by external means (like mediaproxy). ## we were notified of this and we have the actual call duration in `calltime' #self.endtime = self.starttime + calltime self.duration = calltime log.info("Call id %s of %s to %s was already disconnected (ended or did timeout) after %s seconds" % (self.callid, self.user, self.ruri, self.duration)) elif self.expired: self.duration = self.timelimit if duration > self.timelimit + 10: log.warn("Time difference between sending BYEs and actual closing is > 10 seconds") else: self.duration = duration if self.prepaid and not self.locked and self.timelimit > 0: ## even if call was not started we debit 0 seconds anyway to unlock the account rating = RatingEngineConnections.getConnection(self) rating.debitBalance(self).addCallbacks(callback=self._end_finish, errback=self._end_error, callbackArgs=[reason and fullreason or None]) elif reason is not None: log.info("Call id %s of %s to %s %s%s" % (self.callid, self.user, self.ruri, fullreason, self.duration and (' after %d seconds' % self.duration) or '')) def _end_finish(self, (timelimit, value), reason): if timelimit is not None and timelimit > 0: now = time.time() for callid in self.application.users.get(self.billingParty, ()): call = self.application.calls[callid] if not call.prepaid: continue # only alter prepaid calls if call.inprogress: call.timelimit = now - call.starttime + timelimit if call.timer: log.info("Call id %s of %s to %s previously connected set to %d seconds" % (callid, call.user, call.ruri, timelimit)) call.timer.reset(timelimit) elif not call.complete: call.timelimit = timelimit call._setup_timer() # log ended call if self.duration > 0: log.info("Call id %s of %s to %s %s after %d seconds, call price is %s" % (self.callid, self.user, self.ruri, reason, self.duration, value)) elif reason is not None: log.info("Call id %s of %s to %s %s" % (self.callid, self.user, self.ruri, reason)) def _end_error(self, fail): log.info("Could not debit balance for call id %s of %s to %s" % (self.callid, self.user, self.ruri)) status = property(lambda self: self.inprogress and 'in-progress' or 'pending') complete = property(lambda self: self.dialogid is not None) inprogress = property(lambda self: self.starttime is not None and self.endtime is None) # # End Call data types # diff --git a/callcontrol/tls.py b/callcontrol/tls.py index 6e06b73..213c778 100644 --- a/callcontrol/tls.py +++ b/callcontrol/tls.py @@ -1,52 +1,50 @@ -# Copyright (C) 2007-2011 AG Projects. -# """TLS helper classes""" __all__ = ['Certificate', 'PrivateKey'] from gnutls.crypto import X509Certificate, X509PrivateKey from application import log from application.process import process def file_content(file): path = process.config_file(file) if path is None: raise Exception("File '%s' does not exist" % file) try: f = open(path, 'rt') except Exception: raise Exception("File '%s' could not be open" % file) try: return f.read() finally: f.close() class Certificate(object): """Configuration data type. Used to create a gnutls.crypto.X509Certificate object from a file given in the configuration file.""" def __new__(typ, value): if isinstance(value, basestring): try: return X509Certificate(file_content(value)) except Exception, e: log.warn("Certificate file '%s' could not be loaded: %s" % (value, str(e))) return None else: raise TypeError, 'value should be a string' class PrivateKey(object): """Configuration data type. Used to create a gnutls.crypto.X509PrivateKey object from a file given in the configuration file.""" def __new__(typ, value): if isinstance(value, basestring): try: return X509PrivateKey(file_content(value)) except Exception, e: log.warn("Private key file '%s' could not be loaded: %s" % (value, str(e))) return None else: raise TypeError, 'value should be a string'