Unity 8
fixture_setup.py
1 # -*- Mode: Python; coding: utf-8; indent-tabs-mode: nil; tab-width: 4 -*-
2 #
3 # Unity Autopilot Test Suite
4 # Copyright (C) 2014, 2015 Canonical
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 #
19 
20 import os
21 import subprocess
22 import threading
23 import fixtures
24 import logging
25 
26 import ubuntuuitoolkit
27 from autopilot import introspection
28 from autopilot.matchers import Eventually
29 from testtools.matchers import Equals
30 from ubuntuuitoolkit import fixture_setup
31 
32 from unity8 import (
33  get_binary_path,
34  get_mocks_library_path,
35  get_default_extra_mock_libraries,
36  get_data_dirs,
37  sensors,
38  shell,
39  process_helpers
40 )
41 
42 
43 logger = logging.getLogger(__name__)
44 
45 
46 class LaunchUnityWithFakeSensors(fixtures.Fixture):
47 
48  """Fixture to launch Unity8 with an injectable sensors backend.
49 
50  :ivar unity_proxy: The Autopilot proxy object for the Unity shell.
51 
52  """
53 
54  unity_proxy = None
55  main_win = None
56 
57  def setUp(self):
58  """Restart Unity8 with testability and create sensors."""
59  super().setUp()
60  self.useFixture(
61  fixture_setup.InitctlEnvironmentVariable(
62  UBUNTU_PLATFORM_API_TEST_OVERRIDE='sensors'))
63 
64  self.addCleanup(process_helpers.stop_job, 'unity8')
65  restart_thread = threading.Thread(
67  restart_thread.start()
68 
69  self._create_sensors()
70 
71  restart_thread.join()
72  self.fake_sensors = sensors.FakePlatformSensors()
73 
74  def _get_lightdm_mock_path(self):
75  lib_path = get_mocks_library_path()
76  lightdm_mock_path = os.path.abspath(
77  os.path.join(lib_path, "IntegratedLightDM", "liblightdm")
78  )
79 
80  if not os.path.exists(lightdm_mock_path):
81  raise RuntimeError(
82  "LightDM mock does not exist at path {}.".
83  format(lightdm_mock_path)
84  )
85  return lightdm_mock_path
86 
87  def _get_qml_import_path_with_mock(self):
88  """Return the QML2_IMPORT_PATH value with the mock path prepended."""
89  qml_import_path = [get_mocks_library_path()]
90  if os.getenv('QML2_IMPORT_PATH') is not None:
91  qml_import_path.append(os.getenv('QML2_IMPORT_PATH'))
92 
93  qml_import_path = ':'.join(qml_import_path)
94  return qml_import_path
95 
96  def _restart_unity_with_testability(self):
97  _environment = {}
98 
99  data_dirs = get_data_dirs(True)
100  if data_dirs is not None:
101  _environment['XDG_DATA_DIRS'] = data_dirs
102 
103  _environment['QML2_IMPORT_PATH'] = (
105  )
106 
107  new_ld_library_path = [
108  get_default_extra_mock_libraries(),
110  ]
111  if os.getenv('LD_LIBRARY_PATH') is not None:
112  new_ld_library_path.append(os.getenv('LD_LIBRARY_PATH'))
113  new_ld_library_path = ':'.join(new_ld_library_path)
114  _environment['LD_LIBRARY_PATH'] = new_ld_library_path
115 
116  # FIXME: we shouldn't be doing this
117  # $MIR_SOCKET, fallback to $XDG_RUNTIME_DIR/mir_socket and
118  # /tmp/mir_socket as last resort
119  try:
120  os.unlink(
121  os.getenv('MIR_SOCKET',
122  os.path.join(os.getenv('XDG_RUNTIME_DIR', "/tmp"),
123  "mir_socket")))
124  except OSError:
125  pass
126  try:
127  os.unlink("/tmp/mir_socket")
128  except OSError:
129  pass
130 
131  binary_arg = "BINARY=%s" % get_binary_path()
132  env_args = ["%s=%s" % (k, v) for k, v in _environment.items()]
133  args = [binary_arg] + env_args
134  self.unity_proxy = process_helpers.restart_unity_with_testability(
135  *args)
136  self.main_win = self.unity_proxy.select_single(shell.ShellView)
137 
138  def _create_sensors(self):
139  # Wait for unity to start running.
140  Eventually(Equals(True)).match(
141  lambda: process_helpers.is_job_running('unity8'))
142 
143  # Wait for the sensors fifo file to be created.
144  fifo_path = '/tmp/sensor-fifo-{0}'.format(
145  process_helpers._get_unity_pid())
146  Eventually(Equals(True)).match(
147  lambda: os.path.exists(fifo_path))
148 
149  with open(fifo_path, 'w') as fifo:
150  fifo.write('create accel 0 1000 0.1\n')
151  fifo.write('create light 0 10 1\n')
152  fifo.write('create proximity\n')
153 
154 
155 class RestartUnityWithTestability(fixtures.Fixture):
156 
157  """Fixture to launch Unity8 with testability.
158 
159  :ivar unity_proxy: The Autopilot proxy object for the Unity shell.
160 
161  """
162 
163  unity_proxy = None
164 
165  def __init__(self, binary_path, variables):
166  """Initialize the fixture instance.
167 
168  :param str binary_path: The path to the Dash app binary.
169  :param cli_arguments: The arguments to pass when launching the
170  :param variables: The variables to use when launching the app.
171  :type variables: A dictionary.
172 
173  """
174  super().__init__()
175  self.binary_path = binary_path
176  self.variables = variables
177 
178  def setUp(self):
179  """Restart unity with testability when the fixture is used."""
180  super().setUp()
181  self.addCleanup(self.stop_unity)
182  self.restart_unity()
183 
184  def restart_unity(self):
186 
187  def restart_unity_with_testability(self):
188  self._unlink_mir_socket()
189 
190  binary_arg = 'BINARY={}'.format(self.binary_path)
191  variable_args = [
192  '{}={}'.format(key, value) for key, value in self.variables.items()
193  ]
194  all_args = [binary_arg] + variable_args
195 
196  self.unity_proxy = process_helpers.restart_unity_with_testability(
197  *all_args)
198 
199  def _unlink_mir_socket(self):
200  # FIXME: we shouldn't be doing this
201  # $MIR_SOCKET, fallback to $XDG_RUNTIME_DIR/mir_socket and
202  # /tmp/mir_socket as last resort
203  try:
204  os.unlink(
205  os.getenv('MIR_SOCKET',
206  os.path.join(os.getenv('XDG_RUNTIME_DIR', "/tmp"),
207  "mir_socket")))
208  except OSError:
209  pass
210  try:
211  os.unlink("/tmp/mir_socket")
212  except OSError:
213  pass
214 
215  def stop_unity(self):
216  process_helpers.stop_job('unity8')
217 
218 
219 class LaunchDashApp(fixtures.Fixture):
220 
221  """Fixture to launch the Dash app."""
222 
223  def __init__(self, binary_path, variables):
224  """Initialize an instance.
225 
226  :param str binary_path: The path to the Dash app binary.
227  :param variables: The variables to use when launching the app.
228  :type variables: A dictionary.
229 
230  """
231  super().__init__()
232  self.binary_path = binary_path
233  self.variables = variables
234 
235  def setUp(self):
236  """Launch the dash app when the fixture is used."""
237  super().setUp()
238  self.addCleanup(self.stop_application)
240 
241  def launch_application(self):
242  binary_arg = 'BINARY={}'.format(self.binary_path)
243  testability_arg = 'QT_LOAD_TESTABILITY={}'.format(1)
244  env_args = [
245  '{}={}'.format(key, value) for key, value in self.variables.items()
246  ]
247  all_args = [binary_arg, testability_arg] + env_args
248 
249  pid = process_helpers.start_job('unity8-dash', *all_args)
250  return introspection.get_proxy_object_for_existing_process(
251  pid=pid,
252  emulator_base=ubuntuuitoolkit.UbuntuUIToolkitCustomProxyObjectBase
253  )
254 
255  def stop_application(self):
256  process_helpers.stop_job('unity8-dash')
257 
258 
259 class DisplayRotationLock(fixtures.Fixture):
260 
261  def __init__(self, enable):
262  super().__init__()
263  self.enable = enable
264 
265  def setUp(self):
266  super().setUp()
267  original_state = self._is_rotation_lock_enabled()
268  if self.enable != original_state:
269  self.addCleanup(self._set_rotation_lock, original_state)
270  self._set_rotation_lock(self.enable)
271 
272  def _is_rotation_lock_enabled(self):
273  command = [
274  'gsettings', 'get',
275  'com.ubuntu.touch.system',
276  'rotation-lock'
277  ]
278  output = subprocess.check_output(command, universal_newlines=True)
279  return True if output.count('true') else False
280 
281  def _set_rotation_lock(self, value):
282  value_string = 'true' if value else 'false'
283  command = [
284  'gsettings', 'set',
285  'com.ubuntu.touch.system',
286  'rotation-lock', value_string
287  ]
288  subprocess.check_output(command)
289 
290 
291 class LaunchMockIndicatorService(fixtures.Fixture):
292 
293  """Fixture to launch the indicator test service."""
294 
295  def __init__(self, action_delay, ensure_not_running=True):
296  """Initialize an instance.
297 
298  :param action_delay: The delay to use when activating actions.
299  Measured in milliseconds. Value of -1 will result in infinite delay.
300  :type action_delay: An integer.
301  :param boolean ensure_not_running: Make sure service is not running
302 
303  """
304  super(LaunchMockIndicatorService, self).__init__()
305  self.action_delay = action_delay
306  self.ensure_not_running = ensure_not_running
307 
308  def setUp(self):
309  super().setUp()
310  if self.ensure_not_running:
312  self.addCleanup(self.stop_service)
313  self.application_proxy = self.launch_service()
314 
315  def launch_service(self):
316  logger.info("Starting unity-mock-indicator-service")
317  binary_path = get_binary_path('unity-mock-indicator-service')
318  binary_arg = 'BINARY={}'.format(binary_path)
319  env_args = 'ARGS=-t {}'.format(self.action_delay)
320  all_args = [binary_arg, env_args]
321  process_helpers.start_job('unity-mock-indicator-service', *all_args)
322 
323  def stop_service(self):
324  logger.info("Stopping unity-mock-indicator-service")
325  process_helpers.stop_job('unity-mock-indicator-service')
326 
327  def ensure_service_not_running(self):
328  if process_helpers.is_job_running('unity-mock-indicator-service'):
329  self.stop_service()
def __init__(self, binary_path, variables)
def __init__(self, binary_path, variables)