28 if sys.version >=
'3':
29 from autopilot.exceptions
import ProcessSearchError
31 from autopilot.introspection
import ProcessSearchError
32 from autopilot.introspection
import get_proxy_object_for_existing_process
37 logger = logging.getLogger(__name__)
40 class JobError(Exception):
44 class CannotAccessUnity(Exception):
48 def unlock_unity(unity_proxy_obj=None):
49 """Helper function that attempts to unlock the unity greeter.
51 If unity_proxy_object is None create a proxy object by querying for the
52 running unity process.
53 Otherwise re-use the passed proxy object.
55 :raises RuntimeError: if the greeter attempts and fails to be unlocked.
57 :raises RuntimeWarning: when the greeter cannot be found because it is
59 :raises CannotAccessUnity: if unity is not introspectable or cannot be
61 :raises CannotAccessUnity: if unity's upstart status is not "start" or the
62 upstart job cannot be found at all.
65 if unity_proxy_obj
is None:
67 pid = _get_unity_pid()
68 unity = _get_unity_proxy_object(pid)
69 main_window = unity.select_single(main_window_emulator.QQuickView)
70 except ProcessSearchError
as e:
71 raise CannotAccessUnity(
72 "Cannot introspect unity, make sure that it has been started "
73 "with testability. Perhaps use the function "
74 "'restart_unity_with_testability' this module provides."
79 main_window = unity_proxy_obj.select_single(
80 main_window_emulator.QQuickView)
82 greeter = main_window.get_greeter()
83 if greeter.created
is False:
84 raise RuntimeWarning(
"Greeter appears to be already unlocked.")
86 bus = dbus.SessionBus()
87 dbus_proxy = bus.get_object(
"com.canonical.UnityGreeter",
"/")
89 dbus_proxy.HideGreeter()
90 except dbus.DBusException:
91 logger.info(
"Failed to unlock greeter")
94 greeter.created.wait_for(
False)
95 logger.info(
"Greeter unlocked, continuing.")
98 def lock_unity(unity_proxy_obj=None):
99 """Helper function that attempts to lock the unity greeter."""
102 uinput = evdev.UInput(name=
'unity8-autopilot-power-button',
103 devnode=
'/dev/autopilot-uinput')
105 uinput.write(evdev.ecodes.EV_KEY, evdev.ecodes.KEY_POWER, 1)
106 uinput.write(evdev.ecodes.EV_KEY, evdev.ecodes.KEY_POWER, 0)
110 uinput.write(evdev.ecodes.EV_KEY, evdev.ecodes.KEY_POWER, 1)
111 uinput.write(evdev.ecodes.EV_KEY, evdev.ecodes.KEY_POWER, 0)
115 def restart_unity_with_testability(*args):
116 """Restarts (or starts) unity with testability enabled.
118 Passes *args arguments to the launched process.
121 args += (
"QT_LOAD_TESTABILITY=1",)
122 return restart_unity(*args)
125 def restart_unity(*args):
126 """Restarts (or starts) unity8 using the provided arguments.
128 Passes *args arguments to the launched process.
130 :raises subprocess.CalledProcessError: if unable to stop or start the
134 status = _get_unity_status()
135 if "start/" in status:
138 pid = start_job(
'unity8', *args)
139 return _get_unity_proxy_object(pid)
142 def start_job(name, *args):
145 :param str name: The name of the job.
146 :param args: The arguments to be used when starting the job.
147 :return: The process id of the started job.
148 :raises CalledProcessError: if the job failed to start.
151 logger.info(
'Starting job {} with arguments {}.'.format(name, args))
152 command = [
'/sbin/initctl',
'start', name] + list(args)
154 output = subprocess.check_output(
156 stderr=subprocess.STDOUT,
157 universal_newlines=
True,
160 pid = get_job_pid(name)
161 except subprocess.CalledProcessError
as e:
162 e.args += (
'Failed to start {}: {}.'.format(name, e.output),)
168 def get_job_pid(name):
169 """Return the process id of a running job.
171 :param str name: The name of the job.
172 :raises JobError: if the job is not running.
175 status = get_job_status(name)
176 if "start/" not in status:
177 raise JobError(
'{} is not in the running state.'.format(name))
178 return int(status.split()[-1])
181 def get_job_status(name):
182 """Return the status of a job.
184 :param str name: The name of the job.
185 :raises JobError: if it's not possible to get the status of the job.
189 return subprocess.check_output([
193 ], universal_newlines=
True)
194 except subprocess.CalledProcessError
as error:
196 "Unable to get {}'s status: {}".format(name, error)
203 :param str name: The name of the job.
204 :raises CalledProcessError: if the job failed to stop.
207 logger.info(
'Stoping job {}.'.format(name))
208 command = [
'/sbin/initctl',
'stop', name]
210 output = subprocess.check_output(
212 stderr=subprocess.STDOUT,
213 universal_newlines=
True,
216 except subprocess.CalledProcessError
as e:
217 e.args += (
'Failed to stop {}: {}.'.format(name, e.output),)
221 def is_job_running(name):
222 """Return True if the job is running. Otherwise, False.
224 :param str name: The name of the job.
225 :raises JobError: if it's not possible to get the status of the job.
228 return 'start/' in get_job_status(name)
231 def _get_unity_status():
233 return get_job_status(
'unity8')
234 except JobError
as error:
235 raise CannotAccessUnity(str(error))
238 def _get_unity_pid():
240 return get_job_pid(
'unity8')
241 except JobError
as error:
242 raise CannotAccessUnity(str(error))
245 def _get_unity_proxy_object(pid):
246 return get_proxy_object_for_existing_process(
248 emulator_base=emulators.UnityEmulatorBase,