diff --git a/INSTALL b/INSTALL index 6089d59..f2462b5 100644 --- a/INSTALL +++ b/INSTALL @@ -1,180 +1,175 @@ Installation ------------ For Debian testing or unstable there is an official public repository provided by AG Projects. Install the AG Projects debian software signing key: wget http://download.ag-projects.com/agp-debian-gpg.key apt-key add agp-debian-gpg.key Add these lines in etc/apt/sources.list # AG Projects software deb http://ag-projects.com/debian unstable main deb-src http://ag-projects.com/debian unstable main After that, run: apt-get update apt-get install callcontrol For non Debian installations, you must install the following dependencies: python-application (>= 1.2.8) python-twisted-core python-sqlobject Call Control software is available as a tar archive at: http://download.ag-projects.com/CallControl/ Extract it using tar xzvf callcontrol-version.tar.gz and change directory to the newly created callcontrol directory. The source code is managed using darcs version control tool. The darcs repository can be fetched with: darcs get http://devel.ag-projects.com/repositories/callcontrol To obtain the incremental changes after the initial get: cd callcontrol darcs pull -a Install the software: cd callcontrol python setup.py install You may run the software from its own directory or install it in a directory anywhere in the system. Configuration ------------- You must setup the following components: 1. OpenSIPS configuration 2. Call Control (this application) 3. CDRTool rating engine 1. OpenSIPS configuration loadmodule "mi_fifo.so" loadmodule "mi_datagram.so" loadmodule "sl.so" loadmodule "tm.so" loadmodule "dialog.so" loadmodule "call_control.so" modparam("call_control", "disable", 0) route { ... if ((method=="INVITE" && !has_totag())) { # you need to call this function at the first INVITE call_control(); switch ($retcode) { case 2: # Call with no limit case 1: # Call with a limit under callcontrol management (either prepaid # or postpaid) break; case -1: # Not enough credit (prepaid call) xlog("L_INFO", "Call control: not enough credit for prepaid call\n"); acc_rad_request("402"); sl_send_reply("402", "Not enough credit"); exit; break; case -2: # Locked by call in progress (prepaid call) xlog("L_INFO", "Call control: prepaid call locked by another call in progress\n"); acc_rad_request("403"); sl_send_reply("403", "Call locked by another call in progress"); exit; break; case -3: # Duplicated CallID xlog("L_INFO", "Call control: duplicated CallID\n"); acc_rad_request("400"); sl_send_reply("400", "Duplicated CallID"); exit; break; default: # Internal error (message parsing, communication, ...) xlog("L_INFO", "Call control: internal server error\n"); acc_rad_request("500"); sl_send_reply("500", "Internal server error"); exit; } } ... } For more information see the documentation that comes with the OpenSIPS callcontrol module. 2. Call Control configuration (this application) The application is searching for its configuration file config.ini in its current directory and in /etc/callcontrol/config.ini -; detect sessions that have media timeout without BYE -timeout_detection=dialog - [CDRTool] ; connection to cdrtool rating engine for MaxSessionTime() and DebitBalance() address = cdrtool.hostname:9024 [RadiusDatabase] -; connection to Radius database if timeout_detection=radius, you must enable -; MediaProxy Radius accounting for this to work ; user = radius ; password = password ; host = db ; database = radius -; table = radacct%Y%m +; table = radacct%%Y%%m [OpenSIPS] ; Connection to OpenSIPS' MI ; socket_path = /run/opensips/socket A more detailed sample configuration file is available in config.ini.sample. 3. CDRTool rating engine Please see the documentation of CDRTool project to setup the rating engine. Logging ------- Call Control logs all activity to syslog. You may grep for call-control in syslog. The requests can be correlated by call-id with the syslog entries generated by CDRTool rating engine. Control commands ---------------- Monitoring active sessions: /etc/init.d/callcontrol sessions Displaying information about an active session: /etc/init.d/callcontrol session id Terminating a session: /etc/init.d/callcontrol terminate id diff --git a/callcontrol/controller.py b/callcontrol/controller.py index e69a595..68b3542 100644 --- a/callcontrol/controller.py +++ b/callcontrol/controller.py @@ -1,555 +1,506 @@ """Implementation of a call control server for OpenSIPS.""" import os import grp import re import cPickle import time from application import log from application.configuration import ConfigSection, ConfigSetting from application.process import process from application.system import unlink from twisted.internet.protocol import Factory from twisted.protocols.basic import LineOnlyReceiver from twisted.internet import reactor, defer from twisted.python import failure from callcontrol.scheduler import RecurrentCall, KeepRunning from callcontrol.raddb import RadiusDatabase, RadiusDatabaseError from callcontrol.sip import Call from callcontrol.rating import RatingEngineConnections from callcontrol import configuration_file, backup_calls_file class TimeLimit(int): """A positive time limit (in seconds) or None""" def __new__(typ, value): if value.lower() == 'none': return None try: limit = int(value) except: raise ValueError("invalid time limit value: %r" % value) if limit < 0: raise ValueError("invalid time limit value: %r. should be positive." % value) return limit -class TimeoutDetection(str): - _values = ('dialog', 'radius') - - def __new__(cls, value): - value = value.lower() - if value not in cls._values: - raise ValueError('invalid timeout detection value: %r' % value) - instance = super(TimeoutDetection, cls).__new__(cls, value) - instance.use_radius = value == 'radius' - return instance - - class CallControlConfig(ConfigSection): __cfgfile__ = configuration_file __section__ = 'CallControl' socket = process.runtime.file('socket') group = 'opensips' limit = ConfigSetting(type=TimeLimit, value=None) timeout = 24*60*60 # timeout calls that are stale for more than 24 hours. setupTime = 90 # timeout calls that take more than 1'30" to setup. checkInterval = 60 # check for staled calls and calls that did timeout at every minute. - timeout_detection = TimeoutDetection('dialog') # whether or not to use the radius database to find out terminated calls class CommandError(Exception): pass class InvalidRequestError(Exception): pass class CallsMonitor(object): - """Check for staled calls and calls that did timeout and were closed by external means""" + """Check for staled calls""" def __init__(self, period, application): self.application = application self.reccall = RecurrentCall(period, self.run) def run(self): - if CallControlConfig.timeout_detection.use_radius: - # Find out terminated calls - deferred1 = self.application.db.getTerminatedCalls(self.application.calls) - deferred1.addCallbacks(callback=self._clean_calls, errback=self._err_handle, callbackArgs=[self._handle_terminated]) - deferred2 = self.application.db.getTimedoutCalls(self.application.calls) - deferred2.addCallbacks(callback=self._clean_calls, errback=self._err_handle, callbackArgs=[self._handle_timedout]) - defer.DeferredList([deferred1, deferred2]).addCallback(self._finish_checks) - else: - self._finish_checks(None) - return KeepRunning - - def shutdown(self): - self.reccall.cancel() - - def _clean_calls(self, calls, clean_function): - for callid, callinfo in calls.items(): - call = self.application.calls.get(callid) - if call: - self.application.clean_call(callid) - clean_function(call, callinfo) - - def _err_handle(self, fail): - log.error("Couldn't query database for terminated/timedout calls: %s" % fail.value) - - def _handle_terminated(self, call, callinfo): - call.end(calltime=callinfo['duration'], reason='calls monitor as terminated') - - def _handle_timedout(self, call, callinfo): - call.end(reason='calls monitor as timedout', sendbye=True) - - def _finish_checks(self, value): - # Also do the rest of the checking now = time.time() staled = [] nosetup = [] for callid, call in self.application.calls.items(): if not call.complete and (now - call.created >= CallControlConfig.setupTime): self.application.clean_call(callid) nosetup.append(call) elif call.inprogress and call.timer is not None: continue # this call will be expired by its own timer elif now - call.created >= CallControlConfig.timeout: self.application.clean_call(callid) staled.append(call) # Terminate staled for call in staled: call.end(reason='calls monitor as staled', sendbye=True) # Terminate calls that didn't setup in setupTime for call in nosetup: call.end(reason="calls monitor as it didn't setup in %d seconds" % CallControlConfig.setupTime) + return KeepRunning + + def shutdown(self): + self.reccall.cancel() class CallControlProtocol(LineOnlyReceiver): def lineReceived(self, line): if line.strip() == "": if self.line_buf: self._process() self.line_buf = [] else: self.line_buf.append(line.strip()) def _process(self): try: req = Request(self.line_buf[0], self.line_buf[1:]) except InvalidRequestError, e: self._send_error_reply(failure.Failure(e)) else: # log.debug('Got request: %s', req) def _unknown_handler(req): req.deferred.errback(failure.Failure(CommandError(req))) try: getattr(self, '_CC_%s' % req.cmd, _unknown_handler)(req) except Exception, e: self._send_error_reply(failure.Failure(e)) else: req.deferred.addCallbacks(callback=self._send_reply, errback=self._send_error_reply) def connectionMade(self): self.line_buf = [] def _send_reply(self, msg): # log.debug('Sent reply: %s', msg) self.sendLine(msg) def _send_error_reply(self, fail): log.error(fail.value) # log.debug("Sent 'Error' reply") self.sendLine('Error') def _CC_init(self, req): try: call = self.factory.application.calls[req.callid] except KeyError: call = Call(req, self.factory.application) if call.billingParty is None: req.deferred.callback('Error') return self.factory.application.calls[req.callid] = call # log.debug('Call id %s added to list of controlled calls', call.callid) else: if call.token != req.call_token: log.error("Call id %s is duplicated" % call.callid) req.deferred.callback('Duplicated callid') return # The call was previously setup which means it could be in the the users table try: user_calls = self.factory.application.users[call.billingParty] user_calls.remove(call.callid) if len(user_calls) == 0: del self.factory.application.users[call.billingParty] self.factory.application.engines.remove_user(call.billingParty) except (ValueError, KeyError): pass deferred = call.setup(req) deferred.addCallbacks(callback=self._CC_finish_init, errback=self._CC_init_failed, callbackArgs=[req], errbackArgs=[req]) def _CC_finish_init(self, value, req): try: call = self.factory.application.calls[req.callid] except KeyError: log.error("Call id %s disappeared before we could finish initializing it" % req.callid) req.deferred.callback('Error') else: if req.call_limit is not None and len(self.factory.application.users.get(call.billingParty, ())) >= req.call_limit: self.factory.application.clean_call(req.callid) call.end() req.deferred.callback('Call limit reached') elif call.locked: # prepaid account already locked by another call log.info("Call id %s of %s to %s forbidden because the account is locked" % (req.callid, call.user, call.ruri)) self.factory.application.clean_call(req.callid) call.end() req.deferred.callback('Locked') elif call.timelimit == 0: # prepaid account with no credit log.info("Call id %s of %s to %s forbidden because credit is too low" % (req.callid, call.user, call.ruri)) self.factory.application.clean_call(req.callid) call.end() req.deferred.callback('No credit') elif req.call_limit is not None or call.timelimit is not None: # call limited by credit value, a global time limit or number of calls log.info("User %s can make %s concurrent calls" % (call.billingParty, req.call_limit or "unlimited")) self.factory.application.users.setdefault(call.billingParty, []).append(call.callid) req.deferred.callback('Limited') else: # no limit for call log.info("Call id %s of %s to %s is postpaid not limited" % (req.callid, call.user, call.ruri)) self.factory.application.clean_call(req.callid) call.end() req.deferred.callback('No limit') def _CC_init_failed(self, fail, req): self._send_error_reply(fail) self.factory.application.clean_call(req.callid) def _CC_start(self, req): try: call = self.factory.application.calls[req.callid] except KeyError: req.deferred.callback('Not found') else: call.start(req) req.deferred.callback('Ok') def _CC_stop(self, req): try: call = self.factory.application.calls[req.callid] except KeyError: req.deferred.callback('Not found') else: self.factory.application.clean_call(req.callid) call.end(reason='user') req.deferred.callback('Ok') def _CC_debug(self, req): debuglines = [] if req.show == 'sessions': for callid, call in self.factory.application.calls.items(): if not req.user or call.user.startswith(req.user): debuglines.append('Call id %s of %s to %s: %s' % (callid, call.user, call.ruri, call.status)) elif req.show == 'session': try: call = self.factory.application.calls[req.callid] except KeyError: debuglines.append('Call id %s does not exist' % req.callid) else: for key, value in call.items(): debuglines.append('%s: %s' % (key, value)) req.deferred.callback('\r\n'.join(debuglines)+'\r\n') def _CC_terminate(self, req): try: call = self.factory.application.calls[req.callid] except KeyError: req.deferred.callback('Call id %s does not exist\r\n' % req.callid) else: self.factory.application.clean_call(req.callid) call.end(reason='admin', sendbye=True) req.deferred.callback('Ok\r\n') class CallControlFactory(Factory): protocol = CallControlProtocol def __init__(self, application): self.application = application class CallControlServer(object): def __init__(self): unlink(CallControlConfig.socket) self.path = CallControlConfig.socket self.group = CallControlConfig.group self.listening = None self.engines = None self.monitor = None - if CallControlConfig.timeout_detection.use_radius: - self.db = RadiusDatabase() - else: - self.db = None - self.calls = {} self.users = {} self._restore_calls() def clean_call(self, callid): try: call = self.calls[callid] except KeyError: return else: del self.calls[callid] user_calls = self.users.get(call.billingParty, []) try: user_calls.remove(callid) except ValueError: pass if not user_calls: self.users.pop(call.billingParty, None) self.engines.remove_user(call.billingParty) # log.debug('Call id %s removed from the list of controlled calls', callid) def run(self): reactor.addSystemEventTrigger('before', 'startup', self.on_startup) reactor.addSystemEventTrigger('before', 'shutdown', self.on_shutdown) reactor.run() def stop(self): reactor.stop() def on_startup(self): # First set up listening on the unix socket try: gid = grp.getgrnam(self.group)[2] mode = 0o660 except (KeyError, IndexError): gid = -1 mode = 0o666 self.listening = reactor.listenUNIX(address=self.path, factory=CallControlFactory(self)) # Make it writable only to the SIP proxy group members try: os.chown(self.path, -1, gid) os.chmod(self.path, mode) except OSError: log.warning("Couldn't set access rights for %s" % self.path) log.warning("OpenSIPS may not be able to communicate with us!") # Then setup the CallsMonitor self.monitor = CallsMonitor(CallControlConfig.checkInterval, self) # Open the connection to the rating engines self.engines = RatingEngineConnections() def on_shutdown(self): should_close = [] if self.listening is not None: self.listening.stopListening() if self.engines is not None: should_close.append(self.engines.shutdown()) if self.monitor is not None: self.monitor.shutdown() - if self.db is not None: - should_close.append(self.db.close()) d = defer.DeferredList(should_close) d.addBoth(self._save_calls) return d def _save_calls(self, result): if self.calls: log.info('Saving calls') calls_file = process.runtime.file(backup_calls_file) try: f = open(calls_file, 'w') except: pass else: for call in self.calls.values(): call.application = None # we will mark timers with 'running' or 'idle', depending on their current state, # to be able to correctly restore them later (Timer objects cannot be pickled) if call.timer is not None: if call.inprogress: call.timer.cancel() call.timer = 'running' # temporary mark that this timer was running else: call.timer = 'idle' # temporary mark that this timer was not running failed_dump = False try: try: cPickle.dump(self.calls, f) except Exception as e: log.warning('Failed to dump call list: %s', e) failed_dump = True finally: f.close() if failed_dump: unlink(calls_file) else: log.info("Saved calls: %s" % str(self.calls.keys())) self.calls = {} def _restore_calls(self): calls_file = process.runtime.file(backup_calls_file) try: f = open(calls_file, 'r') except: pass else: try: self.calls = cPickle.load(f) except Exception as e: log.warning('Failed to load calls saved in the previous session: %s', e) f.close() unlink(calls_file) if self.calls: log.info("Restoring calls saved previously: %s" % str(self.calls.keys())) # the calls in the 2 sets below are never overlapping because closed and terminated # calls have different database fingerprints. so the dictionary update below is safe try: - db = self.db if self.db is not None else RadiusDatabase() + db = RadiusDatabase() try: terminated = db.query(RadiusDatabase.RadiusTask(None, 'terminated', calls=self.calls)) # calls terminated by caller/called didtimeout = db.query(RadiusDatabase.RadiusTask(None, 'timedout', calls=self.calls)) # calls closed by mediaproxy after a media timeout finally: - if self.db is None: - db.close() + db.close() except RadiusDatabaseError, e: log.error("Could not query database: %s" % e) else: for callid, call in self.calls.items(): callinfo = terminated.get(callid) or didtimeout.get(callid) if callinfo: # call already terminated or did timeout in mediaproxy del self.calls[callid] callinfo['call'] = call call.timer = None continue # close all calls that were already terminated or did timeout count = 0 for callinfo in terminated.values(): call = callinfo.get('call') if call is not None: call.end(calltime=callinfo['duration']) count += 1 for callinfo in didtimeout.values(): call = callinfo.get('call') if call is not None: call.end(sendbye=True) count += 1 if count > 0: log.info("Removed %d already terminated call%s" % (count, 's'*(count!=1))) for callid, call in self.calls.items(): call.application = self if call.timer == 'running': now = time.time() remain = call.starttime + call.timelimit - now if remain < 0: call.timelimit = int(round(now - call.starttime)) remain = 0 call._setup_timer(remain) call.timer.start() elif call.timer == 'idle': call._setup_timer() # also restore users table self.users.setdefault(call.billingParty, []).append(callid) class Request(object): """A request parsed into a structure based on request type""" __methods = {'init': ('callid', 'diverter', 'ruri', 'sourceip', 'from'), 'start': ('callid', 'dialogid'), 'stop': ('callid',), 'debug': ('show',), 'terminate': ('callid',)} def __init__(self, cmd, params): if cmd not in self.__methods.keys(): raise InvalidRequestError("Unknown request: %s" % cmd) try: parameters = dict(re.split(r':\s+', l, 1) for l in params) except ValueError: raise InvalidRequestError("Badly formatted request") for p in self.__methods[cmd]: try: parameters[p] except KeyError: raise InvalidRequestError("Missing %s from request" % p) self.cmd = cmd self.deferred = defer.Deferred() self.__dict__.update(parameters) try: getattr(self, '_RE_%s' % self.cmd)() except AttributeError: pass def _RE_init(self): self.from_ = self.__dict__['from'] if self.cmd=='init' and self.diverter.lower()=='none': self.diverter = None try: self.prepaid except AttributeError: self.prepaid = None else: if self.prepaid.lower() == 'true': self.prepaid = True elif self.prepaid.lower() == 'false': self.prepaid = False else: self.prepaid = None try: self.call_limit = int(self.call_limit) except (AttributeError, ValueError): self.call_limit = None else: if self.call_limit <= 0: self.call_limit = None try: self.call_token except AttributeError: self.call_token = None else: if not self.call_token or self.call_token.lower() == 'none': self.call_token = None try: self.sip_application except AttributeError: self.sip_application = '' def _RE_debug(self): if self.show == 'session': try: if not self.callid: raise InvalidRequestError("Missing callid from request") except AttributeError: raise InvalidRequestError("Missing callid from request") elif self.show == 'sessions': try: self.user except AttributeError: self.user = None else: raise InvalidRequestError("Illegal value for 'show' attribute in request") def __str__(self): if self.cmd == 'init': return "%(cmd)s: callid=%(callid)s from=%(from_)s ruri=%(ruri)s diverter=%(diverter)s sourceip=%(sourceip)s prepaid=%(prepaid)s call_limit=%(call_limit)s" % self.__dict__ elif self.cmd == 'start': return "%(cmd)s: callid=%(callid)s dialogid=%(dialogid)s" % self.__dict__ elif self.cmd == 'stop': return "%(cmd)s: callid=%(callid)s" % self.__dict__ elif self.cmd == 'debug': return "%(cmd)s: show=%(show)s" % self.__dict__ elif self.cmd == 'terminate': return "%(cmd)s: callid=%(callid)s" % self.__dict__ else: return object.__str__(self) diff --git a/config.ini.sample b/config.ini.sample index 34525b8..4ad4826 100644 --- a/config.ini.sample +++ b/config.ini.sample @@ -1,95 +1,88 @@ ; ; Configuration file for Call Control ; [CallControl] ; ; Section for configuring the call controller ; ; The following options are available here: ; ; socket Path to the UNIX socket where the controller receives ; commands from OpenSIPS. This should match the value for ; call_control_socket in opensips.cfg ; Default value: /run/callcontrol/socket ; ; group Put the socket in this group and make it group writable. ; Default value: opensips ; ; limit Limit call duration to this value (in seconds) for all calls. ; Prepaid calls will not use this value, if prepaid_limit is set. ; The value should be the number of seconds of the desired limit ; for the call duration or None for no limit. Warning: 0 means ; exactly that: 0 seconds, not diabled. Use None for disabled. ; Default value: None ; ; prepaid_limit Limit call duration of prepaid calls to this value. If this ; option is not set, then the global limit option applies. If ; this is not set either, the maximum duration of prepaid calls ; is set to 10h. If the limit calculated based on the credit ; value is smaller, then this will apply rather than the setting ; determined above. ; The value should be the number of seconds of the desired limit ; for the call duration or None for no limit. Warning: 0 means ; exactly that: 0 seconds, not diabled. Use None for disabled. ; Default value: None ; -; timeout_detection The mechanism used to find out about timed out calls. If -; set to `radius', it will consider the section -; RadiusDatabase in order to make a connection to the radius -; database. This is only needed if MediaProxy does not use -; OpenSIPS' dialog module to terminate a call that has timed -; out. Otherwise, set to `dialog'. -; ;socket = /run/callcontrol/socket ;group = opensips ;limit = None ;prepaid_limit = None -;timeout_detection = dialog [CDRTool] ; ; Configure where CDRTool rating engine is located ; ; address Network address where the rating engine is located. ; ; - address[:port] ; send rating requests (querying or changing the rating ; credit for a SIP account) to this address. address can ; be an IP or a hostname. If port is missing assume 9024. ; ; Default value: cdrtool. ; ; timeout Timeout after this many milliseconds while waiting for ; response from rating engine. ; ; Default value: 500 ; ;address = cdrtool. ;timeout = 500 [RadiusDatabase] ; -; Used to keep track of calls that timeout and are closed by external means -; (like mediaproxy) without sending BYEs that will close the call in the -; call controller the normal way. -; Database table can be defined as static (e.g. radacct) or as strftime like -; time specification (e.g. radacct%Y%m). This section is used if the option -; timeout_detection in CallControl section is set to `radius'. You need to -; escape the percent signs by doubling them (e.g. radacct%%Y%%m). +; This is used to detect calls that ended or did timeout while callcontrol +; was stopped/restarting. This check is only performed once when starting +; so that the list of calls that was saved before stopping can be updated +; based on what happened while callcontrol was stopped. +; +; The table name can contain strftime time specifications (like %Y %m). +; The percent character needs to be escaped by doubling it, so a dynamic +; table name would look like radacct%%Y%%m ; ;user = dbuser ;password = dbpass ;host = dbhost ;database = radius ;table = radacct%%Y%%m [OpenSIPS] ; ; Configure connection to OpenSIPS' MI ; ; socket_path Path to the OpenSIPS MI socket ; Default value: /run/opensips/socket ; ;socket_path = /run/opensips/socket