Unity 8
process_helpers.py
1 # -*- Mode: Python; coding: utf-8; indent-tabs-mode: nil; tab-width: 4 -*-
2 #
3 # Unity Autopilot Utilities
4 # Copyright (C) 2013, 2014, 2015 Canonical
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 #
19 
20 import logging
21 import subprocess
22 import dbus
23 
24 from autopilot.exceptions import ProcessSearchError
25 from autopilot.introspection import get_proxy_object_for_existing_process
26 
27 from unity8.shell import emulators
28 from unity8.shell.emulators import main_window as main_window_emulator
29 
30 logger = logging.getLogger(__name__)
31 
32 
33 class JobError(Exception):
34  pass
35 
36 
37 class CannotAccessUnity(Exception):
38  pass
39 
40 
41 def unlock_unity(unity_proxy_obj=None):
42  """Helper function that attempts to unlock the unity greeter.
43 
44  If unity_proxy_object is None create a proxy object by querying for the
45  running unity process.
46  Otherwise re-use the passed proxy object.
47 
48  :raises RuntimeError: if the greeter attempts and fails to be unlocked.
49 
50  :raises RuntimeWarning: when the greeter cannot be found because it is
51  already unlocked.
52  :raises CannotAccessUnity: if unity is not introspectable or cannot be
53  found on dbus.
54  :raises CannotAccessUnity: if unity's upstart status is not "start" or the
55  upstart job cannot be found at all.
56 
57  """
58  if unity_proxy_obj is None:
59  try:
60  pid = _get_unity_pid()
61  unity = _get_unity_proxy_object(pid)
62  main_window = unity.select_single(main_window_emulator.QQuickView)
63  except ProcessSearchError as e:
64  raise CannotAccessUnity(
65  "Cannot introspect unity, make sure that it has been started "
66  "with testability. Perhaps use the function "
67  "'restart_unity_with_testability' this module provides."
68  "(%s)"
69  % str(e)
70  )
71  else:
72  main_window = unity_proxy_obj.select_single(
73  main_window_emulator.QQuickView)
74 
75  greeter = main_window.get_greeter()
76  if greeter.created is False:
77  raise RuntimeWarning("Greeter appears to be already unlocked.")
78 
79  bus = dbus.SessionBus()
80  dbus_proxy = bus.get_object("com.canonical.UnityGreeter", "/")
81  try:
82  dbus_proxy.HideGreeter()
83  except dbus.DBusException:
84  logger.info("Failed to unlock greeter")
85  raise
86  else:
87  greeter.created.wait_for(False)
88  logger.info("Greeter unlocked, continuing.")
89 
90 
91 def lock_unity(unity_proxy_obj=None):
92  """Helper function that attempts to lock the unity greeter."""
93  import evdev
94  import time
95  uinput = evdev.UInput(name='unity8-autopilot-power-button',
96  devnode='/dev/autopilot-uinput')
97  # One press and release to turn screen off (locking unity)
98  uinput.write(evdev.ecodes.EV_KEY, evdev.ecodes.KEY_POWER, 1)
99  uinput.write(evdev.ecodes.EV_KEY, evdev.ecodes.KEY_POWER, 0)
100  uinput.syn()
101  time.sleep(1)
102  # And another press and release to turn screen back on
103  uinput.write(evdev.ecodes.EV_KEY, evdev.ecodes.KEY_POWER, 1)
104  uinput.write(evdev.ecodes.EV_KEY, evdev.ecodes.KEY_POWER, 0)
105  uinput.syn()
106 
107 
108 def restart_unity_with_testability(*args):
109  """Restarts (or starts) unity with testability enabled.
110 
111  Passes *args arguments to the launched process.
112 
113  """
114  args += ("QT_LOAD_TESTABILITY=1",)
115  return restart_unity(*args)
116 
117 
118 def restart_unity(*args):
119  """Restarts (or starts) unity8 using the provided arguments.
120 
121  Passes *args arguments to the launched process.
122 
123  :raises subprocess.CalledProcessError: if unable to stop or start the
124  unity8 upstart job.
125 
126  """
127  status = _get_unity_status()
128  if "start/" in status:
129  stop_job('unity8')
130 
131  pid = start_job('unity8', *args)
132  return _get_unity_proxy_object(pid)
133 
134 
135 def start_job(name, *args):
136  """Start a job.
137 
138  :param str name: The name of the job.
139  :param args: The arguments to be used when starting the job.
140  :return: The process id of the started job.
141  :raises CalledProcessError: if the job failed to start.
142 
143  """
144  logger.info('Starting job {} with arguments {}.'.format(name, args))
145  command = ['/sbin/initctl', 'start', name] + list(args)
146  try:
147  output = subprocess.check_output(
148  command,
149  stderr=subprocess.STDOUT,
150  universal_newlines=True,
151  )
152  logger.info(output)
153  pid = get_job_pid(name)
154  except subprocess.CalledProcessError as e:
155  e.args += ('Failed to start {}: {}.'.format(name, e.output),)
156  raise
157  else:
158  return pid
159 
160 
161 def get_job_pid(name):
162  """Return the process id of a running job.
163 
164  :param str name: The name of the job.
165  :raises JobError: if the job is not running.
166 
167  """
168  status = get_job_status(name)
169  if "start/" not in status:
170  raise JobError('{} is not in the running state.'.format(name))
171  return int(status.split()[-1])
172 
173 
174 def get_job_status(name):
175  """Return the status of a job.
176 
177  :param str name: The name of the job.
178  :raises JobError: if it's not possible to get the status of the job.
179 
180  """
181  try:
182  return subprocess.check_output([
183  '/sbin/initctl',
184  'status',
185  name
186  ], universal_newlines=True)
187  except subprocess.CalledProcessError as error:
188  raise JobError(
189  "Unable to get {}'s status: {}".format(name, error)
190  )
191 
192 
193 def stop_job(name):
194  """Stop a job.
195 
196  :param str name: The name of the job.
197  :raises CalledProcessError: if the job failed to stop.
198 
199  """
200  logger.info('Stoping job {}.'.format(name))
201  command = ['/sbin/initctl', 'stop', name]
202  try:
203  output = subprocess.check_output(
204  command,
205  stderr=subprocess.STDOUT,
206  universal_newlines=True,
207  )
208  logger.info(output)
209  except subprocess.CalledProcessError as e:
210  e.args += ('Failed to stop {}: {}.'.format(name, e.output),)
211  raise
212 
213 
214 def is_job_running(name):
215  """Return True if the job is running. Otherwise, False.
216 
217  :param str name: The name of the job.
218  :raises JobError: if it's not possible to get the status of the job.
219 
220  """
221  return 'start/' in get_job_status(name)
222 
223 
224 def _get_unity_status():
225  try:
226  return get_job_status('unity8')
227  except JobError as error:
228  raise CannotAccessUnity(str(error))
229 
230 
231 def _get_unity_pid():
232  try:
233  return get_job_pid('unity8')
234  except JobError as error:
235  raise CannotAccessUnity(str(error))
236 
237 
238 def _get_unity_proxy_object(pid):
239  return get_proxy_object_for_existing_process(
240  pid=pid,
241  emulator_base=emulators.UnityEmulatorBase,
242  )