diff --git a/call-control b/call-control index ef5158c..3f4a417 100755 --- a/call-control +++ b/call-control @@ -1,120 +1,122 @@ #!/usr/bin/env python """Implementation of a call controller for OpenSIPS.""" + def send_command(command, **kwargs): import socket sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - sock.connect('%s/socket' % process.runtime_directory) + sock.connect(process.runtime.file('socket')) sock.sendall('%s\r\n' % command + '\r\n'.join(['%s: %s' % (key, value) for key, value in kwargs.items()]) + '\r\n\r\n') response = '' while True: data = sock.recv(4096) response += data if not data or data.endswith('\r\n\r\n'): break sock.close() for line in response.splitlines(): if line: print line if __name__ == '__main__': import sys from optparse import OptionParser from application.process import process, ProcessError from application import log import callcontrol name = 'call-control' fullname = 'SIP call-control engine' description = 'Implementation of a call-control engine for SIP' - config_directory = '/etc/callcontrol' - runtime_directory = '/var/run/callcontrol' - default_pid = "%s/%s.pid" % (runtime_directory, name) + process.configuration.user_directory = None + process.configuration.subdirectory = 'callcontrol' + process.runtime.subdirectory = 'callcontrol' default_pid = process.runtime.file('{}.pid'.format(name)) parser = OptionParser(version="%%prog %s" % callcontrol.__version__) parser.add_option("--no-fork", action="store_false", dest="fork", default=True, help="run the process in the foreground") - parser.add_option("--pid", dest="pid_file", default='/var/run/callcontrol/call-control.pid', - help="pid file (/var/run/callcontrol/call-control.pid)", + parser.add_option("--pid", dest="pid_file", default=default_pid, + help="pid file when forking ({})".format(default_pid), metavar="FILE") parser.add_option("--debug", dest="debug", default=None, help="get information about a currently running call-control daemon", metavar="COMMAND") parser.add_option("--terminate", dest="terminate", default=None, help="terminate an on-going session", metavar="CALLID") (options, args) = parser.parse_args() - process.system_config_directory = config_directory + log.Formatter.prefix_format = '{record.levelname:<8s} ' + try: - process.runtime_directory = runtime_directory - except ProcessError, e: + process.runtime.create_directory() + except ProcessError as e: log.critical('Cannot start %s: %s', fullname, e) sys.exit(1) if options.debug is not None and options.terminate is not None: log.error('cannot run with both --debug and --terminate options in the same time') sys.exit(1) if options.debug is not None: if options.debug == '': log.error('must specify debug command') sys.exit(1) try: send_command('debug', show=options.debug, **dict([arg.split('=',1) for arg in args if arg.find('=') >= 0])) except Exception, e: log.error('failed to complete debug command: %s', e) sys.exit(1) else: sys.exit(0) if options.terminate is not None: if options.terminate == '': log.error('must specify callid to terminate') sys.exit(1) try: send_command('terminate', callid=options.terminate) except Exception, e: log.error('failed to terminate session: %s' % e) else: sys.exit(0) if options.fork: try: process.daemonize(options.pid_file) except ProcessError, e: log.critical('Cannot start %s: %s', fullname, e) sys.exit(1) log.use_syslog(name) log.info('Starting %s %s', fullname, callcontrol.__version__) from callcontrol.controller import CallControlServer if not options.fork: from application.debug.memory import memory_dump try: cserver = CallControlServer() except Exception, e: log.critical('Could not create %s: %s', fullname, e) if type(e) is not RuntimeError: log.exception() sys.exit(1) try: cserver.run() except Exception, e: log.critical('Could not run %s: %s', fullname, e) if type(e) is not RuntimeError: log.exception() if not options.fork: memory_dump() diff --git a/callcontrol/controller.py b/callcontrol/controller.py index e99ecb4..b9e2115 100644 --- a/callcontrol/controller.py +++ b/callcontrol/controller.py @@ -1,560 +1,560 @@ """Implementation of a call control server for OpenSIPS.""" import os import grp import re import cPickle import time from application.configuration import ConfigSection, ConfigSetting from application.process import process from application import log from twisted.internet.protocol import Factory from twisted.protocols.basic import LineOnlyReceiver from twisted.internet import reactor, defer from twisted.python import failure from callcontrol.scheduler import RecurrentCall, KeepRunning from callcontrol.raddb import RadiusDatabase, RadiusDatabaseError from callcontrol.sip import Call from callcontrol.rating import RatingEngineConnections from callcontrol import configuration_file, backup_calls_file class TimeLimit(int): """A positive time limit (in seconds) or None""" def __new__(typ, value): if value.lower() == 'none': return None try: limit = int(value) except: raise ValueError("invalid time limit value: %r" % value) if limit < 0: raise ValueError("invalid time limit value: %r. should be positive." % value) return limit class TimeoutDetection(str): _values = ('dialog', 'radius') def __new__(cls, value): value = value.lower() if value not in cls._values: raise ValueError('invalid timeout detection value: %r' % value) instance = super(TimeoutDetection, cls).__new__(cls, value) instance.use_radius = value == 'radius' return instance class CallControlConfig(ConfigSection): __cfgfile__ = configuration_file __section__ = 'CallControl' - socket = "%s/socket" % process.runtime_directory + socket = process.runtime.file('socket') group = 'opensips' limit = ConfigSetting(type=TimeLimit, value=None) timeout = 24*60*60 # timeout calls that are stale for more than 24 hours. setupTime = 90 # timeout calls that take more than 1'30" to setup. checkInterval = 60 # check for staled calls and calls that did timeout at every minute. timeout_detection = TimeoutDetection('dialog') # whether or not to use the radius database to find out terminated calls class CommandError(Exception): pass class InvalidRequestError(Exception): pass class CallsMonitor(object): """Check for staled calls and calls that did timeout and were closed by external means""" def __init__(self, period, application): self.application = application self.reccall = RecurrentCall(period, self.run) def run(self): if CallControlConfig.timeout_detection.use_radius: # Find out terminated calls deferred1 = self.application.db.getTerminatedCalls(self.application.calls) deferred1.addCallbacks(callback=self._clean_calls, errback=self._err_handle, callbackArgs=[self._handle_terminated]) deferred2 = self.application.db.getTimedoutCalls(self.application.calls) deferred2.addCallbacks(callback=self._clean_calls, errback=self._err_handle, callbackArgs=[self._handle_timedout]) defer.DeferredList([deferred1, deferred2]).addCallback(self._finish_checks) else: self._finish_checks(None) return KeepRunning def shutdown(self): self.reccall.cancel() def _clean_calls(self, calls, clean_function): for callid, callinfo in calls.items(): call = self.application.calls.get(callid) if call: self.application.clean_call(callid) clean_function(call, callinfo) def _err_handle(self, fail): log.error("Couldn't query database for terminated/timedout calls: %s" % fail.value) def _handle_terminated(self, call, callinfo): call.end(calltime=callinfo['duration'], reason='calls monitor as terminated') def _handle_timedout(self, call, callinfo): call.end(reason='calls monitor as timedout', sendbye=True) def _finish_checks(self, value): # Also do the rest of the checking now = time.time() staled = [] nosetup = [] for callid, call in self.application.calls.items(): if not call.complete and (now - call.created >= CallControlConfig.setupTime): self.application.clean_call(callid) nosetup.append(call) elif call.inprogress and call.timer is not None: continue # this call will be expired by its own timer elif now - call.created >= CallControlConfig.timeout: self.application.clean_call(callid) staled.append(call) # Terminate staled for call in staled: call.end(reason='calls monitor as staled', sendbye=True) # Terminate calls that didn't setup in setupTime for call in nosetup: call.end(reason="calls monitor as it didn't setup in %d seconds" % CallControlConfig.setupTime) class CallControlProtocol(LineOnlyReceiver): def lineReceived(self, line): if line.strip() == "": if self.line_buf: self._process() self.line_buf = [] else: self.line_buf.append(line.strip()) def _process(self): try: req = Request(self.line_buf[0], self.line_buf[1:]) except InvalidRequestError, e: self._send_error_reply(failure.Failure(e)) else: # log.debug('Got request: %s', req) def _unknown_handler(req): req.deferred.errback(failure.Failure(CommandError(req))) try: getattr(self, '_CC_%s' % req.cmd, _unknown_handler)(req) except Exception, e: self._send_error_reply(failure.Failure(e)) else: req.deferred.addCallbacks(callback=self._send_reply, errback=self._send_error_reply) def connectionMade(self): self.line_buf = [] def _send_reply(self, msg): # log.debug('Sent reply: %s', msg) self.sendLine(msg) def _send_error_reply(self, fail): log.error(fail.value) # log.debug("Sent 'Error' reply") self.sendLine('Error') def _CC_init(self, req): try: call = self.factory.application.calls[req.callid] except KeyError: call = Call(req, self.factory.application) if call.billingParty is None: req.deferred.callback('Error') return self.factory.application.calls[req.callid] = call # log.debug('Call id %s added to list of controlled calls', call.callid) else: if call.token != req.call_token: log.error("Call id %s is duplicated" % call.callid) req.deferred.callback('Duplicated callid') return # The call was previously setup which means it could be in the the users table try: user_calls = self.factory.application.users[call.billingParty] user_calls.remove(call.callid) if len(user_calls) == 0: del self.factory.application.users[call.billingParty] self.factory.application.engines.remove_user(call.billingParty) except (ValueError, KeyError): pass deferred = call.setup(req) deferred.addCallbacks(callback=self._CC_finish_init, errback=self._CC_init_failed, callbackArgs=[req], errbackArgs=[req]) def _CC_finish_init(self, value, req): try: call = self.factory.application.calls[req.callid] except KeyError: log.error("Call id %s disappeared before we could finish initializing it" % req.callid) req.deferred.callback('Error') else: if req.call_limit is not None and len(self.factory.application.users.get(call.billingParty, ())) >= req.call_limit: self.factory.application.clean_call(req.callid) call.end() req.deferred.callback('Call limit reached') elif call.locked: # prepaid account already locked by another call log.info("Call id %s of %s to %s forbidden because the account is locked" % (req.callid, call.user, call.ruri)) self.factory.application.clean_call(req.callid) call.end() req.deferred.callback('Locked') elif call.timelimit == 0: # prepaid account with no credit log.info("Call id %s of %s to %s forbidden because credit is too low" % (req.callid, call.user, call.ruri)) self.factory.application.clean_call(req.callid) call.end() req.deferred.callback('No credit') elif req.call_limit is not None or call.timelimit is not None: # call limited by credit value, a global time limit or number of calls log.info("User %s can make %s concurrent calls" % (call.billingParty, req.call_limit or "unlimited")) self.factory.application.users.setdefault(call.billingParty, []).append(call.callid) req.deferred.callback('Limited') else: # no limit for call log.info("Call id %s of %s to %s is postpaid not limited" % (req.callid, call.user, call.ruri)) self.factory.application.clean_call(req.callid) call.end() req.deferred.callback('No limit') def _CC_init_failed(self, fail, req): self._send_error_reply(fail) self.factory.application.clean_call(req.callid) def _CC_start(self, req): try: call = self.factory.application.calls[req.callid] except KeyError: req.deferred.callback('Not found') else: call.start(req) req.deferred.callback('Ok') def _CC_stop(self, req): try: call = self.factory.application.calls[req.callid] except KeyError: req.deferred.callback('Not found') else: self.factory.application.clean_call(req.callid) call.end(reason='user') req.deferred.callback('Ok') def _CC_debug(self, req): debuglines = [] if req.show == 'sessions': for callid, call in self.factory.application.calls.items(): if not req.user or call.user.startswith(req.user): debuglines.append('Call id %s of %s to %s: %s' % (callid, call.user, call.ruri, call.status)) elif req.show == 'session': try: call = self.factory.application.calls[req.callid] except KeyError: debuglines.append('Call id %s does not exist' % req.callid) else: for key, value in call.items(): debuglines.append('%s: %s' % (key, value)) req.deferred.callback('\r\n'.join(debuglines)+'\r\n') def _CC_terminate(self, req): try: call = self.factory.application.calls[req.callid] except KeyError: req.deferred.callback('Call id %s does not exist\r\n' % req.callid) else: self.factory.application.clean_call(req.callid) call.end(reason='admin', sendbye=True) req.deferred.callback('Ok\r\n') class CallControlFactory(Factory): protocol = CallControlProtocol def __init__(self, application): self.application = application class CallControlServer(object): def __init__(self, path=None, group=None): self.path = path or CallControlConfig.socket self.group = group or CallControlConfig.group try: os.unlink(self.path) except OSError: pass self.listening = None self.engines = None self.monitor = None if CallControlConfig.timeout_detection.use_radius: self.db = RadiusDatabase() else: self.db = None self.calls = {} self.users = {} self._restore_calls() def clean_call(self, callid): try: call = self.calls[callid] except KeyError: return else: del self.calls[callid] user_calls = self.users.get(call.billingParty, []) try: user_calls.remove(callid) except ValueError: pass if not user_calls: self.users.pop(call.billingParty, None) self.engines.remove_user(call.billingParty) # log.debug('Call id %s removed from the list of controlled calls', callid) def run(self): reactor.addSystemEventTrigger('before', 'startup', self.on_startup) reactor.addSystemEventTrigger('before', 'shutdown', self.on_shutdown) reactor.run() def stop(self): reactor.stop() def on_startup(self): # First set up listening on the unix socket try: gid = grp.getgrnam(self.group)[2] mode = 0o660 except (KeyError, IndexError): gid = -1 mode = 0o666 self.listening = reactor.listenUNIX(address=self.path, factory=CallControlFactory(self)) # Make it writable only to the SIP proxy group members try: os.chown(self.path, -1, gid) os.chmod(self.path, mode) except OSError: log.warning("Couldn't set access rights for %s" % self.path) log.warning("OpenSIPS may not be able to communicate with us!") # Then setup the CallsMonitor self.monitor = CallsMonitor(CallControlConfig.checkInterval, self) # Open the connection to the rating engines self.engines = RatingEngineConnections() def on_shutdown(self): should_close = [] if self.listening is not None: self.listening.stopListening() if self.engines is not None: should_close.append(self.engines.shutdown()) if self.monitor is not None: self.monitor.shutdown() if self.db is not None: should_close.append(self.db.close()) d = defer.DeferredList(should_close) d.addBoth(self._save_calls) return d def _save_calls(self, result): if self.calls: log.info('Saving calls') - calls_file = '%s/%s' % (process.runtime_directory, backup_calls_file) + calls_file = process.runtime.file(backup_calls_file) try: f = open(calls_file, 'w') except: pass else: for call in self.calls.values(): call.application = None # we will mark timers with 'running' or 'idle', depending on their current state, # to be able to correctly restore them later (Timer objects cannot be pickled) if call.timer is not None: if call.inprogress: call.timer.cancel() call.timer = 'running' # temporary mark that this timer was running else: call.timer = 'idle' # temporary mark that this timer was not running failed_dump = False try: try: cPickle.dump(self.calls, f) except Exception as e: log.warning('Failed to dump call list: %s', e) failed_dump = True finally: f.close() if failed_dump: try: os.unlink(calls_file) except: pass else: log.info("Saved calls: %s" % str(self.calls.keys())) self.calls = {} def _restore_calls(self): - calls_file = '%s/%s' % (process.runtime_directory, backup_calls_file) + calls_file = process.runtime.file(backup_calls_file) try: f = open(calls_file, 'r') except: pass else: try: self.calls = cPickle.load(f) except Exception as e: log.warning('Failed to load calls saved in the previous session: %s', e) f.close() try: os.unlink(calls_file) except: pass if self.calls: log.info("Restoring calls saved previously: %s" % str(self.calls.keys())) # the calls in the 2 sets below are never overlapping because closed and terminated # calls have different database fingerprints. so the dictionary update below is safe try: db = self.db if self.db is not None else RadiusDatabase() try: terminated = db.query(RadiusDatabase.RadiusTask(None, 'terminated', calls=self.calls)) # calls terminated by caller/called didtimeout = db.query(RadiusDatabase.RadiusTask(None, 'timedout', calls=self.calls)) # calls closed by mediaproxy after a media timeout finally: if self.db is None: db.close() except RadiusDatabaseError, e: log.error("Could not query database: %s" % e) else: for callid, call in self.calls.items(): callinfo = terminated.get(callid) or didtimeout.get(callid) if callinfo: # call already terminated or did timeout in mediaproxy del self.calls[callid] callinfo['call'] = call call.timer = None continue # close all calls that were already terminated or did timeout count = 0 for callinfo in terminated.values(): call = callinfo.get('call') if call is not None: call.end(calltime=callinfo['duration']) count += 1 for callinfo in didtimeout.values(): call = callinfo.get('call') if call is not None: call.end(sendbye=True) count += 1 if count > 0: log.info("Removed %d already terminated call%s" % (count, 's'*(count!=1))) for callid, call in self.calls.items(): call.application = self if call.timer == 'running': now = time.time() remain = call.starttime + call.timelimit - now if remain < 0: call.timelimit = int(round(now - call.starttime)) remain = 0 call._setup_timer(remain) call.timer.start() elif call.timer == 'idle': call._setup_timer() # also restore users table self.users.setdefault(call.billingParty, []).append(callid) class Request(object): """A request parsed into a structure based on request type""" __methods = {'init': ('callid', 'diverter', 'ruri', 'sourceip', 'from'), 'start': ('callid', 'dialogid'), 'stop': ('callid',), 'debug': ('show',), 'terminate': ('callid',)} def __init__(self, cmd, params): if cmd not in self.__methods.keys(): raise InvalidRequestError("Unknown request: %s" % cmd) try: parameters = dict([re.split(r':\s+', l, 1) for l in params]) except ValueError: raise InvalidRequestError("Badly formatted request") for p in self.__methods[cmd]: try: parameters[p] except KeyError: raise InvalidRequestError("Missing %s from request" % p) self.cmd = cmd self.deferred = defer.Deferred() self.__dict__.update(parameters) try: getattr(self, '_RE_%s' % self.cmd)() except AttributeError: pass def _RE_init(self): self.from_ = self.__dict__['from'] if self.cmd=='init' and self.diverter.lower()=='none': self.diverter = None try: self.prepaid except AttributeError: self.prepaid = None else: if self.prepaid.lower() == 'true': self.prepaid = True elif self.prepaid.lower() == 'false': self.prepaid = False else: self.prepaid = None try: self.call_limit = int(self.call_limit) except (AttributeError, ValueError): self.call_limit = None else: if self.call_limit <= 0: self.call_limit = None try: self.call_token except AttributeError: self.call_token = None else: if not self.call_token or self.call_token.lower() == 'none': self.call_token = None try: self.sip_application except AttributeError: self.sip_application = '' def _RE_debug(self): if self.show == 'session': try: if not self.callid: raise InvalidRequestError("Missing callid from request") except AttributeError: raise InvalidRequestError("Missing callid from request") elif self.show == 'sessions': try: self.user except AttributeError: self.user = None else: raise InvalidRequestError("Illegal value for 'show' attribute in request") def __str__(self): if self.cmd == 'init': return "%(cmd)s: callid=%(callid)s from=%(from_)s ruri=%(ruri)s diverter=%(diverter)s sourceip=%(sourceip)s prepaid=%(prepaid)s call_limit=%(call_limit)s" % self.__dict__ elif self.cmd == 'start': return "%(cmd)s: callid=%(callid)s dialogid=%(dialogid)s" % self.__dict__ elif self.cmd == 'stop': return "%(cmd)s: callid=%(callid)s" % self.__dict__ elif self.cmd == 'debug': return "%(cmd)s: show=%(show)s" % self.__dict__ elif self.cmd == 'terminate': return "%(cmd)s: callid=%(callid)s" % self.__dict__ else: return object.__str__(self) diff --git a/callcontrol/opensips.py b/callcontrol/opensips.py index e58bfb4..776fa19 100644 --- a/callcontrol/opensips.py +++ b/callcontrol/opensips.py @@ -1,249 +1,249 @@ import json import socket import urlparse from abc import ABCMeta, abstractmethod, abstractproperty from application import log from application.configuration import ConfigSection from application.python.types import Singleton from application.process import process from application.system import unlink from random import getrandbits from twisted.internet import reactor, defer from twisted.internet.protocol import DatagramProtocol from twisted.python.failure import Failure from callcontrol import configuration_file class OpenSIPSConfig(ConfigSection): __cfgfile__ = configuration_file __section__ = 'OpenSIPS' socket_path = '/run/opensips/socket' location_table = 'location' class Error(Exception): pass class TimeoutError(Error): pass class OpenSIPSError(Error): pass class NegativeReplyError(OpenSIPSError): def __init__(self, code, message): super(NegativeReplyError, self).__init__(code, message) self.code = code self.message = message def __repr__(self): return '{0.__class__.__name__}({0.code!r}, {0.message!r})'.format(self) def __str__(self): return '[{0.code}] {0.message}'.format(self) class Request(object): __metaclass__ = ABCMeta method = abstractproperty() @abstractmethod def __init__(self, *args): self.id = '{:x}'.format(getrandbits(32)) self.args = list(args) self.deferred = defer.Deferred() @property def __data__(self): return dict(jsonrpc='2.0', id=self.id, method=self.method, params=self.args) @abstractmethod def process_response(self, response): raise NotImplementedError # noinspection PyAbstractClass class BooleanRequest(Request): """A request that returns True if successful, False otherwise""" def process_response(self, response): return not isinstance(response, Failure) class AddressReload(BooleanRequest): method = 'address_reload' def __init__(self): super(AddressReload, self).__init__() class DomainReload(BooleanRequest): method = 'domain_reload' def __init__(self): super(DomainReload, self).__init__() class EndDialog(BooleanRequest): method = 'dlg_end_dlg' def __init__(self, dialog_id): super(EndDialog, self).__init__(dialog_id) class RefreshWatchers(BooleanRequest): method = 'refresh_watchers' def __init__(self, account, refresh_type): super(RefreshWatchers, self).__init__('sip:{}'.format(account), 'presence', refresh_type) class UpdateSubscriptions(BooleanRequest): method = 'rls_update_subscriptions' def __init__(self, account): super(UpdateSubscriptions, self).__init__('sip:{}'.format(account)) class GetOnlineDevices(Request): method = 'ul_show_contact' def __init__(self, account): super(GetOnlineDevices, self).__init__(OpenSIPSConfig.location_table, account) def process_response(self, response): if isinstance(response, Failure): if response.type is NegativeReplyError and response.value.code == 404: return [] return response return [ContactData(contact) for contact in response[u'Contacts']] class ContactData(dict): __fields__ = {u'contact', u'expires', u'received', u'user_agent'} def __init__(self, data): super(ContactData, self).__init__({key: value for key, value in ((key.lower().replace(u'-', u'_'), value) for key, value in data.iteritems()) if key in self.__fields__}) self.setdefault(u'user_agent', None) if u'received' in self: parsed_received = urlparse.parse_qs(self[u'received']) if u'target' in parsed_received: self[u'NAT_contact'] = parsed_received[u'target'][0] else: self[u'NAT_contact'] = self[u'received'] del self[u'received'] else: self[u'NAT_contact'] = self[u'contact'] class UNIXSocketProtocol(DatagramProtocol): noisy = False def datagramReceived(self, data, address): log.debug('Got MI response: {}'.format(data)) try: response = json.loads(data) except ValueError: code, _, message = data.partition(' ') try: code = int(code) except ValueError: log.error('MI response from OpenSIPS cannot be parsed (neither JSON nor status reply)') return # we got one of the 'code message' type of replies. This means either parsing error or internal error in OpenSIPS. # if we only have one request pending, we can associate the response with it, otherwise is impossible to tell to # which request the response corresponds. The failed request will fail with timeout later. if len(self.transport.requests) == 1: _, request = self.transport.requests.popitem() request.deferred.errback(Failure(NegativeReplyError(code, message))) log.error('MI request {.method} failed with: {} {}'.format(request, code, message)) else: log.error('Got MI status reply from OpenSIPS that cannot be associated with a request: {!r}'.format(data)) else: try: request_id = response['id'] except KeyError: log.error('MI JSON response from OpenSIPS lacks id field') return if request_id not in self.transport.requests: log.error('MI JSON response from OpenSIPS has unknown id: {!r}'.format(request_id)) return request = self.transport.requests.pop(request_id) if 'result' in response: request.deferred.callback(response['result']) elif 'error' in response: log.error('MI request {0.method} failed with: {1[error][code]} {1[error][message]}'.format(request, response)) request.deferred.errback(Failure(NegativeReplyError(response['error']['code'], response['error']['message']))) else: log.error('Invalid MI JSON response from OpenSIPS') request.deferred.errback(Failure(OpenSIPSError('Invalid MI JSON response from OpenSIPS'))) class UNIXSocketConnection(object): timeout = 3 def __init__(self): - socket_path = process.runtime_file('opensips.sock') + socket_path = process.runtime.file('opensips.sock') unlink(socket_path) self.path = socket_path self.transport = reactor.listenUNIXDatagram(self.path, UNIXSocketProtocol()) self.transport.requests = {} reactor.addSystemEventTrigger('during', 'shutdown', self.close) def close(self): for request in self.transport.requests.values(): if not request.deferred.called: request.deferred.errback(Error('shutting down')) self.transport.requests.clear() self.transport.stopListening() unlink(self.path) def send(self, request): try: self.transport.write(json.dumps(request.__data__), OpenSIPSConfig.socket_path) except socket.error as e: log.error("cannot write request to %s: %s" % (OpenSIPSConfig.socket_path, e[1])) request.deferred.errback(Failure(Error("Cannot send MI request %s to OpenSIPS" % request.method))) else: self.transport.requests[request.id] = request request.deferred.addBoth(request.process_response) reactor.callLater(self.timeout, self._did_timeout, request) log.debug('Send MI request: {}'.format(request.__data__)) return request.deferred def _did_timeout(self, request): if not request.deferred.called: request.deferred.errback(Failure(TimeoutError("OpenSIPS command did timeout"))) self.transport.requests.pop(request.id) class ManagementInterface(object): __metaclass__ = Singleton def __init__(self): self.connection = UNIXSocketConnection() def reload_domains(self): return self.connection.send(DomainReload()) def reload_addresses(self): return self.connection.send(AddressReload()) def end_dialog(self, dialog_id): return self.connection.send(EndDialog(dialog_id)) def get_online_devices(self, account): return self.connection.send(GetOnlineDevices(account)) def refresh_watchers(self, account, refresh_type): return self.connection.send(RefreshWatchers(account, refresh_type)) def update_subscriptions(self, account): return self.connection.send(UpdateSubscriptions(account)) diff --git a/callcontrol/tls.py b/callcontrol/tls.py index 440a769..f6a643a 100644 --- a/callcontrol/tls.py +++ b/callcontrol/tls.py @@ -1,51 +1,51 @@ """TLS helper classes""" __all__ = ['Certificate', 'PrivateKey'] from gnutls.crypto import X509Certificate, X509PrivateKey from application import log from application.process import process def file_content(file): - path = process.config_file(file) + path = process.configuration.file(file) if path is None: raise Exception("File '%s' does not exist" % file) try: f = open(path, 'rt') except Exception: raise Exception("File '%s' could not be open" % file) try: return f.read() finally: f.close() class Certificate(object): """Configuration data type. Used to create a gnutls.crypto.X509Certificate object from a file given in the configuration file.""" def __new__(cls, value): if isinstance(value, basestring): try: return X509Certificate(file_content(value)) except Exception, e: log.warning("Certificate file '%s' could not be loaded: %s" % (value, e)) return None else: raise TypeError, 'value should be a string' class PrivateKey(object): """Configuration data type. Used to create a gnutls.crypto.X509PrivateKey object from a file given in the configuration file.""" def __new__(cls, value): if isinstance(value, basestring): try: return X509PrivateKey(file_content(value)) except Exception, e: log.warning("Private key file '%s' could not be loaded: %s" % (value, e)) return None else: raise TypeError, 'value should be a string' diff --git a/debian/control b/debian/control index 13c2e1d..ab4cb52 100644 --- a/debian/control +++ b/debian/control @@ -1,22 +1,22 @@ Source: callcontrol Section: net Priority: optional Maintainer: Dan Pascu Uploaders: Adrian Georgescu , Tijmen de Mes Build-Depends: debhelper (>= 9), python (>= 2.7) Standards-Version: 3.9.8 Package: callcontrol Architecture: all -Depends: ${python:Depends}, ${misc:Depends}, lsb-base, python-application (>= 1.2.8), python-gnutls (>= 3.0.0), python-twisted-core, python-sqlobject +Depends: ${python:Depends}, ${misc:Depends}, lsb-base, python-application (>= 2.8.0), python-gnutls (>= 3.0.0), python-twisted-core, python-sqlobject Description: Call Control prepaid application for OpenSIPS Call Control is a prepaid application that can be used together with OpenSIPS call_control module and CDRTool rating engine to limit the duration of SIP sessions based on a prepaid balance. It can also be used to limit the duration of any session to a predefined maximum value without debiting a balance. . Call Control achieves this by maintaining a timer for each session and sending BYE messages to both SIP end-points, if the session exceeds its maximum session limit or if the Call Control receives a command to forcefully close the call from outside.