1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
build / android / pylib / utils / instrumentation_tracing.py [blame]
# Copyright 2017 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Functions to instrument all Python function calls.
This generates a JSON file readable by Chrome's about:tracing. To use it,
either call start_instrumenting and stop_instrumenting at the appropriate times,
or use the Instrument context manager.
A function is only traced if it is from a Python module that matches at least
one regular expression object in to_include, and does not match any in
to_exclude. In between the start and stop events, every function call of a
function from such a module will be added to the trace.
"""
import contextlib
import functools
import inspect
import os
import re
import sys
import threading
from py_trace_event import trace_event
# Modules to exclude by default (to avoid problems like infinite loops)
DEFAULT_EXCLUDE = [r'py_trace_event\..*']
class _TraceArguments:
def __init__(self):
"""Wraps a dictionary to ensure safe evaluation of repr()."""
self._arguments = {}
@staticmethod
def _safeStringify(item):
try:
item_str = repr(item)
except Exception: # pylint: disable=broad-except
try:
item_str = str(item)
except Exception: # pylint: disable=broad-except
item_str = "<ERROR>"
return item_str
def add(self, key, val):
key_str = _TraceArguments._safeStringify(key)
val_str = _TraceArguments._safeStringify(val)
self._arguments[key_str] = val_str
def __repr__(self):
return repr(self._arguments)
saved_thread_ids = set()
def _shouldTrace(frame, to_include, to_exclude, included, excluded):
"""
Decides whether or not the function called in frame should be traced.
Args:
frame: The Python frame object of this function call.
to_include: Set of regex objects for modules which should be traced.
to_exclude: Set of regex objects for modules which should not be traced.
included: Set of module names we've determined should be traced.
excluded: Set of module names we've determined should not be traced.
"""
if not inspect.getmodule(frame):
return False
module_name = inspect.getmodule(frame).__name__
if module_name in included:
includes = True
elif to_include:
includes = any(pattern.match(module_name) for pattern in to_include)
else:
includes = True
if includes:
included.add(module_name)
else:
return False
# Find the modules of every function in the stack trace.
frames = inspect.getouterframes(frame)
calling_module_names = [inspect.getmodule(fr[0]).__name__ for fr in frames]
# Return False for anything with an excluded module's function anywhere in the
# stack trace (even if the function itself is in an included module).
if to_exclude:
for calling_module in calling_module_names:
if calling_module in excluded:
return False
for pattern in to_exclude:
if pattern.match(calling_module):
excluded.add(calling_module)
return False
return True
def _generate_trace_function(to_include, to_exclude):
to_include = {re.compile(item) for item in to_include}
to_exclude = {re.compile(item) for item in to_exclude}
to_exclude.update({re.compile(item) for item in DEFAULT_EXCLUDE})
included = set()
excluded = set()
tracing_pid = os.getpid()
def traceFunction(frame, event, arg):
del arg
# Don't try to trace in subprocesses.
if os.getpid() != tracing_pid:
sys.settrace(None)
return None
# pylint: disable=unused-argument
if event not in ("call", "return"):
return None
function_name = frame.f_code.co_name
filename = frame.f_code.co_filename
line_number = frame.f_lineno
if _shouldTrace(frame, to_include, to_exclude, included, excluded):
if event == "call":
# This function is beginning; we save the thread name (if that hasn't
# been done), record the Begin event, and return this function to be
# used as the local trace function.
thread_id = threading.current_thread().ident
if thread_id not in saved_thread_ids:
thread_name = threading.current_thread().name
trace_event.trace_set_thread_name(thread_name)
saved_thread_ids.add(thread_id)
arguments = _TraceArguments()
# The function's argument values are stored in the frame's
# |co_varnames| as the first |co_argcount| elements. (Following that
# are local variables.)
for idx in range(frame.f_code.co_argcount):
arg_name = frame.f_code.co_varnames[idx]
arguments.add(arg_name, frame.f_locals[arg_name])
trace_event.trace_begin(function_name, arguments=arguments,
module=inspect.getmodule(frame).__name__,
filename=filename, line_number=line_number)
# Return this function, so it gets used as the "local trace function"
# within this function's frame (and in particular, gets called for this
# function's "return" event).
return traceFunction
if event == "return":
trace_event.trace_end(function_name)
return None
return None
return traceFunction
def no_tracing(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
trace_func = sys.gettrace()
try:
sys.settrace(None)
threading.settrace(None)
return f(*args, **kwargs)
finally:
sys.settrace(trace_func)
threading.settrace(trace_func)
return wrapper
def start_instrumenting(output_file, to_include=(), to_exclude=()):
"""Enable tracing of all function calls (from specified modules)."""
trace_event.trace_enable(output_file)
traceFunc = _generate_trace_function(to_include, to_exclude)
sys.settrace(traceFunc)
threading.settrace(traceFunc)
def stop_instrumenting():
trace_event.trace_disable()
sys.settrace(None)
threading.settrace(None)
@contextlib.contextmanager
def Instrument(output_file, to_include=(), to_exclude=()):
try:
start_instrumenting(output_file, to_include, to_exclude)
yield None
finally:
stop_instrumenting()