24 from autopilot.exceptions
import ProcessSearchError
25 from autopilot.introspection
import get_proxy_object_for_existing_process
30 logger = logging.getLogger(__name__)
33 class JobError(Exception):
37 class CannotAccessUnity(Exception):
41 def unlock_unity(unity_proxy_obj=None):
42 """Helper function that attempts to unlock the unity greeter.
44 If unity_proxy_object is None create a proxy object by querying for the
45 running unity process.
46 Otherwise re-use the passed proxy object.
48 :raises RuntimeError: if the greeter attempts and fails to be unlocked.
50 :raises RuntimeWarning: when the greeter cannot be found because it is
52 :raises CannotAccessUnity: if unity is not introspectable or cannot be
54 :raises CannotAccessUnity: if unity's upstart status is not "start" or the
55 upstart job cannot be found at all.
58 if unity_proxy_obj
is None:
60 pid = _get_unity_pid()
61 unity = _get_unity_proxy_object(pid)
62 main_window = unity.select_single(main_window_emulator.QQuickView)
63 except ProcessSearchError
as e:
64 raise CannotAccessUnity(
65 "Cannot introspect unity, make sure that it has been started "
66 "with testability. Perhaps use the function "
67 "'restart_unity_with_testability' this module provides."
72 main_window = unity_proxy_obj.select_single(
73 main_window_emulator.QQuickView)
75 greeter = main_window.get_greeter()
76 if greeter.created
is False:
77 raise RuntimeWarning(
"Greeter appears to be already unlocked.")
79 bus = dbus.SessionBus()
80 dbus_proxy = bus.get_object(
"com.canonical.UnityGreeter",
"/")
82 dbus_proxy.HideGreeter()
83 except dbus.DBusException:
84 logger.info(
"Failed to unlock greeter")
87 greeter.created.wait_for(
False)
88 logger.info(
"Greeter unlocked, continuing.")
91 def lock_unity(unity_proxy_obj=None):
92 """Helper function that attempts to lock the unity greeter."""
95 uinput = evdev.UInput(name=
'unity8-autopilot-power-button',
96 devnode=
'/dev/autopilot-uinput')
98 uinput.write(evdev.ecodes.EV_KEY, evdev.ecodes.KEY_POWER, 1)
99 uinput.write(evdev.ecodes.EV_KEY, evdev.ecodes.KEY_POWER, 0)
103 uinput.write(evdev.ecodes.EV_KEY, evdev.ecodes.KEY_POWER, 1)
104 uinput.write(evdev.ecodes.EV_KEY, evdev.ecodes.KEY_POWER, 0)
108 def restart_unity_with_testability(*args):
109 """Restarts (or starts) unity with testability enabled.
111 Passes *args arguments to the launched process.
114 args += (
"QT_LOAD_TESTABILITY=1",)
115 return restart_unity(*args)
118 def restart_unity(*args):
119 """Restarts (or starts) unity8 using the provided arguments.
121 Passes *args arguments to the launched process.
123 :raises subprocess.CalledProcessError: if unable to stop or start the
127 status = _get_unity_status()
128 if "start/" in status:
131 pid = start_job(
'unity8', *args)
132 return _get_unity_proxy_object(pid)
135 def start_job(name, *args):
138 :param str name: The name of the job.
139 :param args: The arguments to be used when starting the job.
140 :return: The process id of the started job.
141 :raises CalledProcessError: if the job failed to start.
144 logger.info(
'Starting job {} with arguments {}.'.format(name, args))
145 command = [
'/sbin/initctl',
'start', name] + list(args)
147 output = subprocess.check_output(
149 stderr=subprocess.STDOUT,
150 universal_newlines=
True,
153 pid = get_job_pid(name)
154 except subprocess.CalledProcessError
as e:
155 e.args += (
'Failed to start {}: {}.'.format(name, e.output),)
161 def get_job_pid(name):
162 """Return the process id of a running job.
164 :param str name: The name of the job.
165 :raises JobError: if the job is not running.
168 status = get_job_status(name)
169 if "start/" not in status:
170 raise JobError(
'{} is not in the running state.'.format(name))
171 return int(status.split()[-1])
174 def get_job_status(name):
175 """Return the status of a job.
177 :param str name: The name of the job.
178 :raises JobError: if it's not possible to get the status of the job.
182 return subprocess.check_output([
186 ], universal_newlines=
True)
187 except subprocess.CalledProcessError
as error:
189 "Unable to get {}'s status: {}".format(name, error)
196 :param str name: The name of the job.
197 :raises CalledProcessError: if the job failed to stop.
200 logger.info(
'Stoping job {}.'.format(name))
201 command = [
'/sbin/initctl',
'stop', name]
203 output = subprocess.check_output(
205 stderr=subprocess.STDOUT,
206 universal_newlines=
True,
209 except subprocess.CalledProcessError
as e:
210 e.args += (
'Failed to stop {}: {}.'.format(name, e.output),)
214 def is_job_running(name):
215 """Return True if the job is running. Otherwise, False.
217 :param str name: The name of the job.
218 :raises JobError: if it's not possible to get the status of the job.
221 return 'start/' in get_job_status(name)
224 def _get_unity_status():
226 return get_job_status(
'unity8')
227 except JobError
as error:
228 raise CannotAccessUnity(str(error))
231 def _get_unity_pid():
233 return get_job_pid(
'unity8')
234 except JobError
as error:
235 raise CannotAccessUnity(str(error))
238 def _get_unity_proxy_object(pid):
239 return get_proxy_object_for_existing_process(
241 emulator_base=emulators.UnityEmulatorBase,