, None) # Because we have only one object in code (our function), we can use code.co_consts[0] func = FunctionType(code.co_consts[0], locals(), method.__name__) return func def _polkit_auth(self, action_name, *args): action_id = self._namespace + "." + action_name caller = args[-1] log.debug("checking authorization for action '%s' requested by caller '%s'" % (action_id, caller)) ret = self._polkit.check_authorization(caller, action_id) args_copy = args if ret == 1: log.debug("action '%s' requested by caller '%s' was successfully authorized by polkit" % (action_id, caller)) elif ret == 2: log.warning("polkit error, but action '%s' requested by caller '%s' was successfully authorized by fallback method" % (action_id, caller)) elif ret == 0: log.info("action '%s' requested by caller '%s' wasn't authorized, ignoring the request" % (action_id, caller)) args_copy = list(args[:-1]) + [""] elif ret == -1: log.warning("polkit error and action '%s' requested by caller '%s' wasn't authorized by fallback method, ignoring the request" % (action_id, caller)) args_copy = list(args[:-1]) + [""] else: log.error("polkit error and unable to use fallback method to authorize action '%s' requested by caller '%s', ignoring the request" % (action_id, caller)) args_copy = list(args[:-1]) + [""] return args_copy def export(self, method, in_signature, out_signature, action_name=None): if not ismethod(method): raise Exception("Only bound methods can be exported.") method_name = method.__name__ if method_name in self._dbus_methods: raise Exception("Method with this name is already exported.") action_name = action_name or method_name def auth_wrapper(owner, *args, **kwargs): return method(*self._polkit_auth(action_name, *args), **kwargs) wrapper = self._prepare_for_dbus(method, auth_wrapper) wrapper = dbus.service.method(self._interface_name, in_signature, out_signature, sender_keyword = "caller")(wrapper) self._dbus_methods[method_name] = wrapper def signal(self, method, out_signature): if not ismethod(method): raise Exception("Only bound methods can be exported.") method_name = method.__name__ if method_name in self._dbus_methods: raise Exception("Method with this name is already exported.") def wrapper(owner, *args, **kwargs): return method(*args, **kwargs) wrapper = self._prepare_for_dbus(method, wrapper) wrapper = dbus.service.signal(self._interface_name, out_signature)(wrapper) self._dbus_methods[method_name] = wrapper self._signals.add(method_name) def send_signal(self, signal, *args, **kwargs): err = False if not signal in self._signals or self._bus_object is None: err = True try: method = getattr(self._bus_object, signal) except AttributeError: err = True if err: raise Exception("Signal '%s' doesn't exist." % signal) else: method(*args, **kwargs) def _construct_dbus_object_class(self): if self._dbus_object_cls is not None: raise Exception("The exporter class was already build.") unique_name = "DBusExporter_%d" % id(self) cls = type(unique_name, (dbus.service.Object,), self._dbus_methods) self._dbus_object_cls = cls def start(self): if self.running(): return if self._dbus_object_cls is None: self._construct_dbus_object_class() self.stop() bus = dbus.SystemBus() bus_name = dbus.service.BusName(self._bus_name, bus) self._bus_object = self._dbus_object_cls(bus, self._object_name, bus_name) self._thread = threading.Thread(target=self._thread_code) self._thread.start() def stop(self): if self._thread is not None and self._thread.is_alive(): self._main_loop.quit() self._thread.join() self._thread = None def _thread_code(self): self._main_loop.run() del self._bus_object self._bus_object = None