22from .io
import start_json_writer, read_json
28 send_gdb_with_response,
33from .typecheck
import type_check
47 """The DAP server class."""
49 def __init__(self, in_stream, out_stream, child_stream):
57 if sys.version_info[0] == 3
and sys.version_info[1] <= 6:
71 "request_seq": params[
"seq"],
73 "command": params[
"command"],
76 if "arguments" in params:
77 args = params[
"arguments"]
81 body = _commands[params[
"command"]](**args)
84 result[
"success"] =
True
85 except BaseException
as e:
87 result[
"success"] =
False
88 result[
"message"] = str(e)
106 log(
"WROTE: <<<" + json.dumps(obj) +
">>>")
111 """The main loop of the DAP server."""
118 log(
"READ: <<<" + json.dumps(cmd) +
">>>")
123 for event, body
in events:
132 """Send a DAP event back to the client, but only after the
133 current request has completed."""
140 """Send an event to the DAP client.
141 EVENT is the name of the event, a string.
142 BODY is the body of the event, an arbitrary object."""
152 """Request that the server shut down."""
160 """Send an event to the DAP client.
161 EVENT is the name of the event, a string.
162 BODY is the body of the event, an arbitrary object."""
164 _server.send_event(event, body)
169 @functools.wraps(func)
170 def check(*args, **kwargs):
173 from .events
import inferior_running
176 raise Exception(
"notStopped")
177 return func(*args, **kwargs)
185 response: bool =
True,
186 on_dap_thread: bool =
False,
187 expect_stopped: bool =
True
189 """A decorator for DAP requests.
191 This registers the function as the implementation of the DAP
192 request NAME. By default, the function is invoked in the gdb
193 thread, and its result is returned as the 'body' of the DAP
196 Some keyword arguments are provided as well:
198 If RESPONSE is False, the result of the function will not be
199 waited for and no 'body' will be in the response.
201 If ON_DAP_THREAD is True, the function will be invoked in the DAP
202 thread. When ON_DAP_THREAD is True, RESPONSE may not be False.
204 If EXPECT_STOPPED is True (the default), then the request will
205 fail with the 'notStopped' reason if it is processed while the
206 inferior is running. When EXPECT_STOPPED is False, the request
207 will proceed regardless of the inferior's state.
211 assert not on_dap_thread
or response
217 assert code.co_posonlyargcount == 0
218 except AttributeError:
221 assert code.co_argcount == 0
223 assert code.co_flags & inspect.CO_VARKEYWORDS
226 func = type_check(func)
230 cmd = in_dap_thread(func)
232 func = in_gdb_thread(func)
236 def sync_call(**args):
237 return send_gdb_with_response(
lambda:
func(**args))
242 def non_sync_call(**args):
243 return send_gdb(
lambda:
func(**args))
254 _commands[name] = cmd
261 """A decorator that indicates that the wrapper function implements
262 the DAP capability NAME."""
266 _capabilities[name] = value
273 """Return the value of a boolean client capability.
275 If the capability was not specified, or did not have boolean type,
276 False is returned."""
278 if name
in _server.config
and isinstance(_server.config[name], bool):
279 return _server.config[name]
283@request("initialize", on_dap_thread=True)
285 global _server, _capabilities
286 _server.config = args
287 _server.send_event_later(
"initialized")
288 return _capabilities.copy()
291@request("terminate", expect_stopped=False)
292@capability("supportsTerminateRequest")
297@request("disconnect", on_dap_thread=True, expect_stopped=False)
298@capability("supportTerminateDebuggee")
300 if terminateDebuggee:
301 send_gdb_with_response(
"kill")
send_event(self, event, body=None)
_handle_command(self, params)
_read_inferior_output(self)
__init__(self, in_stream, out_stream, child_stream)
send_event_later(self, event, body=None)
capability(name, value=True)
request(str name, *bool response=True, bool on_dap_thread=False, bool expect_stopped=True)
send_event(event, body=None)
client_bool_capability(name)
disconnect(*bool terminateDebuggee=False, **args)
void(* func)(remote_target *remote, char *)
static void check(BOOL ok, const char *file, int line)