1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
build / android / pylib / local / local_test_server_spawner.py [blame]
# Copyright 2014 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import json
import time
from devil.android import forwarder
from devil.android import ports
from pylib.base import test_server
from pylib.constants import host_paths
with host_paths.SysPath(host_paths.BUILD_UTIL_PATH):
from lib.common import chrome_test_server_spawner
# The tests should not need more than one test server instance.
MAX_TEST_SERVER_INSTANCES = 1
def _WaitUntil(predicate, max_attempts=5):
"""Blocks until the provided predicate (function) is true.
Returns:
Whether the provided predicate was satisfied once (before the timeout).
"""
sleep_time_sec = 0.025
for _ in range(1, max_attempts):
if predicate():
return True
time.sleep(sleep_time_sec)
sleep_time_sec = min(1, sleep_time_sec * 2) # Don't wait more than 1 sec.
return False
class PortForwarderAndroid(chrome_test_server_spawner.PortForwarder):
def __init__(self, device):
self.device = device
def Map(self, port_pairs):
forwarder.Forwarder.Map(port_pairs, self.device)
def GetDevicePortForHostPort(self, host_port):
return forwarder.Forwarder.DevicePortForHostPort(host_port)
def WaitHostPortAvailable(self, port):
return _WaitUntil(lambda: ports.IsHostPortAvailable(port))
def WaitPortNotAvailable(self, port):
return _WaitUntil(lambda: not ports.IsHostPortAvailable(port))
def WaitDevicePortReady(self, port):
return _WaitUntil(lambda: ports.IsDevicePortUsed(self.device, port))
def Unmap(self, device_port):
forwarder.Forwarder.UnmapDevicePort(device_port, self.device)
class LocalTestServerSpawner(test_server.TestServer):
def __init__(self, port, device):
super().__init__()
self._device = device
self._spawning_server = chrome_test_server_spawner.SpawningServer(
port, PortForwarderAndroid(device), MAX_TEST_SERVER_INSTANCES)
@property
def server_address(self):
return self._spawning_server.server.server_address
@property
def port(self):
return self.server_address[1]
#override
def SetUp(self):
# See net/test/spawned_test_server/remote_test_server.h for description of
# the fields in the config file.
test_server_config = json.dumps({
'spawner_url_base': 'http://localhost:%d' % self.port
})
self._device.WriteFile(
'%s/net-test-server-config' % self._device.GetExternalStoragePath(),
test_server_config)
forwarder.Forwarder.Map([(self.port, self.port)], self._device)
self._spawning_server.Start()
#override
def Reset(self):
self._spawning_server.CleanupState()
#override
def TearDown(self):
self.Reset()
self._spawning_server.Stop()
forwarder.Forwarder.UnmapDevicePort(self.port, self._device)