Unity 8
process_helpers.py
1 # -*- Mode: Python; coding: utf-8; indent-tabs-mode: nil; tab-width: 4 -*-
2 #
3 # Unity Autopilot Utilities
4 # Copyright (C) 2013, 2014 Canonical
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 #
19 
20 import logging
21 import subprocess
22 import sys
23 import dbus
24 
25 # This has to work in both python 3 (ap 1.5) and py2 (ap legacy 1.4.1) so we
26 # pick the correct location in each case. Remove the py2 branch once we no
27 # longer need to support python 2.
28 if sys.version >= '3':
29  from autopilot.exceptions import ProcessSearchError
30 else:
31  from autopilot.introspection import ProcessSearchError
32 from autopilot.introspection import get_proxy_object_for_existing_process
33 
34 from unity8.shell import emulators
35 from unity8.shell.emulators import main_window as main_window_emulator
36 
37 logger = logging.getLogger(__name__)
38 
39 
40 class JobError(Exception):
41  pass
42 
43 
44 class CannotAccessUnity(Exception):
45  pass
46 
47 
48 def unlock_unity(unity_proxy_obj=None):
49  """Helper function that attempts to unlock the unity greeter.
50 
51  If unity_proxy_object is None create a proxy object by querying for the
52  running unity process.
53  Otherwise re-use the passed proxy object.
54 
55  :raises RuntimeError: if the greeter attempts and fails to be unlocked.
56 
57  :raises RuntimeWarning: when the greeter cannot be found because it is
58  already unlocked.
59  :raises CannotAccessUnity: if unity is not introspectable or cannot be
60  found on dbus.
61  :raises CannotAccessUnity: if unity's upstart status is not "start" or the
62  upstart job cannot be found at all.
63 
64  """
65  if unity_proxy_obj is None:
66  try:
67  pid = _get_unity_pid()
68  unity = _get_unity_proxy_object(pid)
69  main_window = unity.select_single(main_window_emulator.QQuickView)
70  except ProcessSearchError as e:
71  raise CannotAccessUnity(
72  "Cannot introspect unity, make sure that it has been started "
73  "with testability. Perhaps use the function "
74  "'restart_unity_with_testability' this module provides."
75  "(%s)"
76  % str(e)
77  )
78  else:
79  main_window = unity_proxy_obj.select_single(
80  main_window_emulator.QQuickView)
81 
82  greeter = main_window.get_greeter()
83  if greeter.created is False:
84  raise RuntimeWarning("Greeter appears to be already unlocked.")
85 
86  bus = dbus.SessionBus()
87  dbus_proxy = bus.get_object("com.canonical.UnityGreeter", "/")
88  try:
89  dbus_proxy.HideGreeter()
90  except dbus.DBusException:
91  logger.info("Failed to unlock greeter")
92  raise
93  else:
94  greeter.created.wait_for(False)
95  logger.info("Greeter unlocked, continuing.")
96 
97 
98 def lock_unity(unity_proxy_obj=None):
99  """Helper function that attempts to lock the unity greeter."""
100  import evdev
101  import time
102  uinput = evdev.UInput(name='unity8-autopilot-power-button',
103  devnode='/dev/autopilot-uinput')
104  # One press and release to turn screen off (locking unity)
105  uinput.write(evdev.ecodes.EV_KEY, evdev.ecodes.KEY_POWER, 1)
106  uinput.write(evdev.ecodes.EV_KEY, evdev.ecodes.KEY_POWER, 0)
107  uinput.syn()
108  time.sleep(1)
109  # And another press and release to turn screen back on
110  uinput.write(evdev.ecodes.EV_KEY, evdev.ecodes.KEY_POWER, 1)
111  uinput.write(evdev.ecodes.EV_KEY, evdev.ecodes.KEY_POWER, 0)
112  uinput.syn()
113 
114 
115 def restart_unity_with_testability(*args):
116  """Restarts (or starts) unity with testability enabled.
117 
118  Passes *args arguments to the launched process.
119 
120  """
121  args += ("QT_LOAD_TESTABILITY=1",)
122  return restart_unity(*args)
123 
124 
125 def restart_unity(*args):
126  """Restarts (or starts) unity8 using the provided arguments.
127 
128  Passes *args arguments to the launched process.
129 
130  :raises subprocess.CalledProcessError: if unable to stop or start the
131  unity8 upstart job.
132 
133  """
134  status = _get_unity_status()
135  if "start/" in status:
136  stop_job('unity8')
137 
138  pid = start_job('unity8', *args)
139  return _get_unity_proxy_object(pid)
140 
141 
142 def start_job(name, *args):
143  """Start a job.
144 
145  :param str name: The name of the job.
146  :param args: The arguments to be used when starting the job.
147  :return: The process id of the started job.
148  :raises CalledProcessError: if the job failed to start.
149 
150  """
151  logger.info('Starting job {} with arguments {}.'.format(name, args))
152  command = ['/sbin/initctl', 'start', name] + list(args)
153  try:
154  output = subprocess.check_output(
155  command,
156  stderr=subprocess.STDOUT,
157  universal_newlines=True,
158  )
159  logger.info(output)
160  pid = get_job_pid(name)
161  except subprocess.CalledProcessError as e:
162  e.args += ('Failed to start {}: {}.'.format(name, e.output),)
163  raise
164  else:
165  return pid
166 
167 
168 def get_job_pid(name):
169  """Return the process id of a running job.
170 
171  :param str name: The name of the job.
172  :raises JobError: if the job is not running.
173 
174  """
175  status = get_job_status(name)
176  if "start/" not in status:
177  raise JobError('{} is not in the running state.'.format(name))
178  return int(status.split()[-1])
179 
180 
181 def get_job_status(name):
182  """Return the status of a job.
183 
184  :param str name: The name of the job.
185  :raises JobError: if it's not possible to get the status of the job.
186 
187  """
188  try:
189  return subprocess.check_output([
190  '/sbin/initctl',
191  'status',
192  name
193  ], universal_newlines=True)
194  except subprocess.CalledProcessError as error:
195  raise JobError(
196  "Unable to get {}'s status: {}".format(name, error)
197  )
198 
199 
200 def stop_job(name):
201  """Stop a job.
202 
203  :param str name: The name of the job.
204  :raises CalledProcessError: if the job failed to stop.
205 
206  """
207  logger.info('Stoping job {}.'.format(name))
208  command = ['/sbin/initctl', 'stop', name]
209  try:
210  output = subprocess.check_output(
211  command,
212  stderr=subprocess.STDOUT,
213  universal_newlines=True,
214  )
215  logger.info(output)
216  except subprocess.CalledProcessError as e:
217  e.args += ('Failed to stop {}: {}.'.format(name, e.output),)
218  raise
219 
220 
221 def is_job_running(name):
222  """Return True if the job is running. Otherwise, False.
223 
224  :param str name: The name of the job.
225  :raises JobError: if it's not possible to get the status of the job.
226 
227  """
228  return 'start/' in get_job_status(name)
229 
230 
231 def _get_unity_status():
232  try:
233  return get_job_status('unity8')
234  except JobError as error:
235  raise CannotAccessUnity(str(error))
236 
237 
238 def _get_unity_pid():
239  try:
240  return get_job_pid('unity8')
241  except JobError as error:
242  raise CannotAccessUnity(str(error))
243 
244 
245 def _get_unity_proxy_object(pid):
246  return get_proxy_object_for_existing_process(
247  pid=pid,
248  emulator_base=emulators.UnityEmulatorBase,
249  )