Unity 8
test_upstart.py
1 # -*- Mode: Python; coding: utf-8; indent-tabs-mode: nil; tab-width: 4 -*-
2 #
3 # Unity Autopilot Test Suite
4 # Copyright (C) 2013, 2014 Canonical
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 #
19 
20 """Tests for upstart integration."""
21 
22 import os
23 import stat
24 import signal
25 import subprocess
26 import time
27 
28 import fixtures
29 from testtools.matchers import Equals, MismatchError
30 from autopilot.matchers import Eventually
31 from autopilot.introspection import get_proxy_object_for_existing_process
32 
33 from unity8 import get_binary_path
34 from unity8.shell.emulators import UnityEmulatorBase
35 from unity8.shell.tests import UnityTestCase, _get_device_emulation_scenarios
36 
37 import logging
38 
39 logger = logging.getLogger(__name__)
40 
41 
42 class UpstartIntegrationTests(UnityTestCase):
43 
44  scenarios = _get_device_emulation_scenarios()
45 
46  def _get_status(self):
47  pid, status = os.waitpid(
48  self.process.pid, os.WUNTRACED | os.WCONTINUED | os.WNOHANG)
49  logger.debug(
50  "Got status: {0}; stopped: {1}; continued: {2}".format(
51  status, os.WIFSTOPPED(status), os.WIFCONTINUED(status)))
52  return status
53 
54  def _launch_unity(self):
55  self.patch_environment("QT_LOAD_TESTABILITY", "1")
56 
57  try:
58  host_socket = os.getenv("MIR_SOCKET", "/run/mir_socket")
59  if stat.S_ISSOCK(os.stat(host_socket).st_mode):
60  self.patch_environment("MIR_SERVER_HOST_SOCKET",
61  host_socket)
62  socket = os.path.join(os.getenv("XDG_RUNTIME_DIR", "/tmp"),
63  "mir_socket")
64  self.patch_environment("MIR_SERVER_FILE", socket)
65  except OSError:
66  pass
67 
68  self.process = subprocess.Popen(
69  [get_binary_path()] + self.unity_geometry_args)
70 
71  def ensure_stopped():
72  self.process.terminate()
73  for i in range(10):
74  try:
75  self._get_status()
76  except OSError:
77  break
78  else:
79  time.sleep(1)
80  try:
81  self._get_status()
82  except OSError:
83  pass
84  else:
85  self.process.kill()
86 
87  self.process.wait()
88 
89  self.addCleanup(ensure_stopped)
90 
91  def _set_proxy(self):
92  super()._set_proxy(
93  get_proxy_object_for_existing_process(
94  pid=self.process.pid,
95  emulator_base=UnityEmulatorBase,))
96 
97  def test_no_sigstop(self):
98  self.useFixture(
99  fixtures.EnvironmentVariable(
100  'UNITY_MIR_EMITS_SIGSTOP', newvalue=None))
101  self._launch_unity()
102 
103  try:
104  self.assertThat(
105  lambda: os.WIFSTOPPED(self._get_status()),
106  Eventually(Equals(True)))
107  except MismatchError:
108  pass
109  else:
110  self.process.send_signal(signal.SIGCONT)
111  self.fail('Unity8 raised SIGSTOP')
112 
113  self._set_proxy()
114 
115  logger.debug("Unity started, waiting for it to be ready.")
116  self.wait_for_unity()
117  logger.debug("Unity loaded and ready.")
118 
119  def test_expect_sigstop(self):
120  self.useFixture(
121  fixtures.EnvironmentVariable('UNITY_MIR_EMITS_SIGSTOP', '1'))
122  self._launch_unity()
123  self.assertThat(
124  lambda: os.WIFSTOPPED(self._get_status()),
125  Eventually(Equals(True)), "Unity8 should raise SIGSTOP when ready")
126 
127  self.process.send_signal(signal.SIGCONT)
128  self.assertThat(
129  lambda: os.WIFCONTINUED(self._get_status()),
130  Eventually(Equals(True)), "Unity8 should have resumed")
131 
132  logger.debug("Unity started, waiting for it to be ready.")
133  self._set_proxy()
134  self.wait_for_unity()
135  logger.debug("Unity loaded and ready.")