QPython与Termux的交互:PIOS.py
PIOS = Python Interactive Over Socket 基于套接字的Python交互
Version 版本:v1.1
默认端口号 Default Port : 12000 .
In Termux:
python -i PIOS/server.py
In QPython:
from PIOS.client import *
Connect()
In Termux:
python -i PIOS/server.py 15234
In QPython:
from PIOS.client import *
Connect(15234)
In QPython:
from PIOS.server import *
Start()
In Termux:
python -i PIOS/client.py
In QPython:
from PIOS.server import *
Start(18379)
In Termux:
python -i PIOS/client.py 18379
Server Functions 服务器函数:
Start Server 启动服务器:
Start(port) # port 端口号,Return None
Start() # port = 12000,Return None
Client Functions 客户端函数:
Connect to Server 连接服务器:
Connect(port) # port 端口号,Return None
Connect() # port = 12000,Return None
Remote Functions 远程函数:
RemoteEval(code) # Return: (Result,Error)
RemoteExec(code) # Return: (Result,Error)
RemoteSl4a(args) # Return: (Result,Error),Only for QPython is a server 仅针对QPython服务器
RemoteShell(code) # Return: (Result,Error), if QPython is a client and Termux is a server, it will open Termux 如果QPython 3C是客户端且Termux是服务器,将会打开Termux
by 乘着船 20220515
源代码server.py:
def Now():
t=time.localtime()
return "%04d-%02d-%02d,%02d:%02d:%02d"%(t.tm_year,t.tm_mon,t.tm_mday,t.tm_hour,t.tm_min,t.tm_sec)
def Accept():
global Conn
Conn,addr = Socket.accept()
print(Now(),'对接地址',addr)
def Send(Result,Error):
Data=bytes(repr(Result)+'\x00'+Error,'utf-8')
Conn.sendall(Data)
def Recv():
a=bytearray()
while True:
b=Conn.recv(1024)
a.extend(b)
if len(b)<1024:
break
return str(a,'utf-8')
from traceback import format_exc
import socket,time,os,sys
class Out:
result=[]
error=''
write=result.append
def flush():
pass
def RunEval(code):
try:
Out.result=eval(code,Env,Env)
except:
Out.error=format_exc()
Out.result=''
def RunExec(code):
sys.stdout=Out
try:
exec(code,Env,Env)
except:
Out.error=format_exc()
Out.result=''.join(Out.result)
sys.stdout=fileOut
def RunShell(code):
code='os.system('+repr(code)+')'
print()
return RunEval(code)
def RunSl4a(code):
code='droid.'+code
try:
data=eval(code,Env,Env)
Out.result=data.result
Out.error=data.error
if Out.error==None:
Out.error=''
except:
Out.error=format_exc()
Out.result=''
def Run(cmd):
mode,code=cmd.split(':',1)
Glb['Run'+mode](code)
data=Out.result,Out.error
Out.result=[]
Out.write=Out.result.append
Out.error=''
return data
Glb=globals()
def Once():
data=Recv()
print(Now(),'传入指令',data)
try:
data=Run(data)
except:
print(Now(),'连接中断')
exit()
print(Now(),'返回结果',data)
Send(*data)
def Exit(Text='操作结束,谢谢使用!'):
sys.stdout=fileOut
Text=str(Text)
Send(Text,'')
Conn.close()
Socket.close()
print(Now(),'对接结束')
exit()
Env={'Exit':Exit,'os':os}
def Start(port=12000):#对接流程
global Socket
print(Now(),'对接开始 端口号:%s'%port)
Socket=socket.socket()
host=socket.gethostname()
try:
Socket.bind((host,port))
print(Now(),'已连接上',host,port)
except:
print(Now(),'连接失败,对接结束',format_exc(),host,port)
exit()
Socket.listen(5)
Socket.settimeout(120)
Accept()
while True:
Once()
projectPath=__file__[:__file__.rfind('/')]
os.chdir(projectPath)
try:
port=int(sys.argv[1])
except:
port=12000
if os.environ['HOME'].find('qpython')!=-1:
from androidhelper import Android as droid
droid=droid()
Env['droid']=droid
fileOut=sys.__stdout__
print('日志文件:',fileOut)
sys.stdout=fileOut
if __name__=='__main__':
Start(port)
源代码client.py:
import time,socket,os,sys
def Connect(port=12000):
global Socket
Socket = socket.socket()
host = socket.gethostname()
startTime = time.time()
while time.time()-startTime<=5:
try:
Socket.connect((host, port))
break
except:
pass
else:
print('连接到服务端超时')
exit()
global isTermux
isTermux=RemoteEval('os.environ.get("HOME","").find("termux")!=-1')[0] and os.environ.get('HOME','').find('indi.czc.qpython')!=-1
def Recv():
a=bytearray()
while True:
b=Socket.recv(1024)
a.extend(b)
if len(b)<1024:
break
s=str(a,'utf-8').rsplit('\x00',1)
try:
return eval(s[0]),s[1]
#数据类型解析成功
except:
#数据类型解析失败
return [s[0],s[1]]
def Send(Data):
Data=bytes(Data,'utf-8')
Socket.sendall(Data)
def RemoteEval(code):
try:
Send('Eval:'+code)
except:
SendErrCode()
return Recv()
def RemoteExec(code):
try:
Send('Exec:'+code)
except:
SendErrCode()
return Recv()
def RemoteShell(code):
try:
Send('Shell:'+code)
except:
SendErrCode()
if isTermux:
try:
droid.launch(None,'com.termux',False)
except:
pass
return Recv()
def RemoteSl4a(*args):
method=args[0]
params=repr(args[1:])
code=method+params
try:
Send('Sl4a:'+code)
except:
SendErrCode()
return Recv()
def SendErrCode():
Send('Eval:"Traceback: Error Code"')
isTermux=None
if os.environ.get('HOME','').find('qpython')!=-1:
from androidhelper import Android as droid
droid=droid()
try:
port=int(sys.argv[1])
except:
port=12000
if __name__=='__main__':
Connect(port)
PIOS用于在QPython各分支之间,以及QPython、Termux、PyDroid之间互相通信传送数据。
视频演示:BV1LP411r7Wh
作者:乘着船@Bilibili
更多文章+下载链接:https://www.bilibili.com/read/readlist/rl321663