欢迎光临散文网 会员登陆 & 注册

QPython与Termux的交互:PIOS.py

2022-06-18 19:22 作者:乘着船  | 我要投稿

PIOS = Python Interactive Over Socket 基于套接字的Python交互

Version 版本:v1.1


默认端口号 Default Port : 12000 .


In Termux:

  python -i PIOS/server.py

In QPython:

  from PIOS.client import *

  Connect()


In Termux:

  python -i PIOS/server.py 15234

In QPython:

  from PIOS.client import *

  Connect(15234)


In QPython:

  from PIOS.server import *

  Start()

In Termux:

  python -i PIOS/client.py


In QPython:

  from PIOS.server import *

  Start(18379)

In Termux:

  python -i PIOS/client.py 18379


Server Functions 服务器函数:

  Start Server 启动服务器:

    Start(port) # port 端口号,Return None

    Start() # port = 12000,Return None


Client Functions 客户端函数:

  Connect to Server 连接服务器:

    Connect(port) # port 端口号,Return None

    Connect() # port = 12000,Return None

  Remote Functions 远程函数:

    RemoteEval(code) # Return: (Result,Error)

    RemoteExec(code) # Return: (Result,Error)

    RemoteSl4a(args) # Return: (Result,Error),Only for QPython is a server 仅针对QPython服务器

    RemoteShell(code) # Return: (Result,Error), if QPython is a client and Termux is a server, it will open Termux 如果QPython 3C是客户端且Termux是服务器,将会打开Termux


by 乘着船 20220515


源代码server.py:


def Now():
    t=time.localtime()
    return "%04d-%02d-%02d,%02d:%02d:%02d"%(t.tm_year,t.tm_mon,t.tm_mday,t.tm_hour,t.tm_min,t.tm_sec)
def Accept():
    global Conn
    Conn,addr = Socket.accept()
    print(Now(),'对接地址',addr)
def Send(Result,Error):
    Data=bytes(repr(Result)+'\x00'+Error,'utf-8')
    Conn.sendall(Data)
def Recv():
    a=bytearray()
    while True:
        b=Conn.recv(1024)
        a.extend(b)
        if len(b)<1024:
            break
    return str(a,'utf-8')
from traceback import format_exc
import socket,time,os,sys
class Out:
    result=[]
    error=''
    write=result.append
    def flush():
        pass
def RunEval(code):
    try:
        Out.result=eval(code,Env,Env)
    except:
        Out.error=format_exc()
        Out.result=''
def RunExec(code):
    sys.stdout=Out
    try:
        exec(code,Env,Env)
    except:
        Out.error=format_exc()
    Out.result=''.join(Out.result)
    sys.stdout=fileOut
def RunShell(code):
    code='os.system('+repr(code)+')'
    print()
    return RunEval(code)
def RunSl4a(code):
    code='droid.'+code
    try:
        data=eval(code,Env,Env)
        Out.result=data.result
        Out.error=data.error
        if Out.error==None:
            Out.error=''
    except:
        Out.error=format_exc()
        Out.result=''
def Run(cmd):
    mode,code=cmd.split(':',1)
    Glb['Run'+mode](code)
    data=Out.result,Out.error
    Out.result=[]
    Out.write=Out.result.append
    Out.error=''
    return data
Glb=globals()
def Once():
    data=Recv()
    print(Now(),'传入指令',data)
    try:
        data=Run(data)
    except:
        print(Now(),'连接中断')
        exit()
    print(Now(),'返回结果',data)
    Send(*data)
def Exit(Text='操作结束,谢谢使用!'):
    sys.stdout=fileOut
    Text=str(Text)
    Send(Text,'')
    Conn.close()
    Socket.close()
    print(Now(),'对接结束')
    exit()
Env={'Exit':Exit,'os':os}
def Start(port=12000):#对接流程
    global Socket
    print(Now(),'对接开始 端口号:%s'%port)
    Socket=socket.socket()
    host=socket.gethostname()
    try:
        Socket.bind((host,port))
        print(Now(),'已连接上',host,port)
    except:
        print(Now(),'连接失败,对接结束',format_exc(),host,port)
        exit()
    Socket.listen(5)
    Socket.settimeout(120)
    Accept()
    while True:
        Once()
projectPath=__file__[:__file__.rfind('/')]
os.chdir(projectPath)
try:
    port=int(sys.argv[1])
except:
    port=12000
if os.environ['HOME'].find('qpython')!=-1:
    from androidhelper import Android as droid
    droid=droid()
    Env['droid']=droid
fileOut=sys.__stdout__
print('日志文件:',fileOut)
sys.stdout=fileOut
if __name__=='__main__':
    Start(port)


源代码client.py:


import time,socket,os,sys
def Connect(port=12000):
    global Socket
    Socket = socket.socket()
    host = socket.gethostname()
    startTime = time.time()
    while time.time()-startTime<=5:
        try:
            Socket.connect((host, port))
            break
        except:
            pass
    else:
        print('连接到服务端超时')
        exit()
    global isTermux
    isTermux=RemoteEval('os.environ.get("HOME","").find("termux")!=-1')[0] and os.environ.get('HOME','').find('indi.czc.qpython')!=-1
def Recv():
    a=bytearray()
    while True:
        b=Socket.recv(1024)
        a.extend(b)
        if len(b)<1024:
            break
    s=str(a,'utf-8').rsplit('\x00',1)
    try:
        return eval(s[0]),s[1]
        #数据类型解析成功
    except:
        #数据类型解析失败
        return [s[0],s[1]]
def Send(Data):
    Data=bytes(Data,'utf-8')
    Socket.sendall(Data)
def RemoteEval(code):
    try:
        Send('Eval:'+code)
    except:
        SendErrCode()
    return Recv()
def RemoteExec(code):
    try:
        Send('Exec:'+code)
    except:
        SendErrCode()
    return Recv()
def RemoteShell(code):
    try:
        Send('Shell:'+code)
    except:
        SendErrCode()
    if isTermux:
        try:
            droid.launch(None,'com.termux',False)
        except:
            pass
    return Recv()
def RemoteSl4a(*args):
    method=args[0]
    params=repr(args[1:])
    code=method+params
    try:
        Send('Sl4a:'+code)
    except:
        SendErrCode()
    return Recv()
def SendErrCode():
    Send('Eval:"Traceback: Error Code"')
isTermux=None
if os.environ.get('HOME','').find('qpython')!=-1:
    from androidhelper import Android as droid
    droid=droid()
try:
    port=int(sys.argv[1])
except:
    port=12000
if __name__=='__main__':
    Connect(port)


PIOS用于在QPython各分支之间,以及QPython、Termux、PyDroid之间互相通信传送数据。


视频演示:BV1LP411r7Wh


作者:乘着船@Bilibili

更多文章+下载链接:https://www.bilibili.com/read/readlist/rl321663


QPython与Termux的交互:PIOS.py的评论 (共 条)

分享到微博请遵守国家法律