欢迎光临散文网 会员登陆 & 注册

卫星导航伪距单点定位程序Python实现参考

2023-07-03 21:36 作者:春江花月夜-ac  | 我要投稿

    此程序忽略了实际定位过程中的诸多因素,只用作一个原理的参考!要交作业的同学请勿复制粘贴,看懂原理才是最重要的,代码很长包含了很多工具函数、工具类,熟悉Python的同学可以参考,在此不作讲解,只给出源码和注释。卫星导航定位的原理请自行学习。

    观测文件(.o)格式: https://wenku.baidu.com/view/57163341f6335a8102d276a20029bd64783e6231.html 此文章在我编写程序的过程中起到了重要作用,用以解析观测文件。精密星历文件(.sp3)类似可自行查找。

    注意事项:Lagrange插值函数一定要自己写!千万别用SciPy库里的那个lagrange函数,否则计算得到的最后结果与正确理论值误差能达到五个量级(约十万米),这个问题困扰了我好几天,最后发现是lagrange插值函数的问题,它计算出来的插值结果有误差,在后面的处理中将被放大,是真的拳头硬了。

    一些基本问题:

1. PC是用线性组合法(具体讲是双频改正)修正得到的无电离层误差的信号传播路径。

2. 使用Lagrange插值是为了利用精密星历文件中观测到的卫星精密数据(隔一段时间观测记录一次,通常是15min)插值得到待求观测时刻的卫星数据,包含卫星位置、钟差。

3. 使用最小二乘法是因为观测方程个数(等于卫星个数)多于待求解未知数个数,可能无法求得同时满足所有观测方程的解,因此只能求解一个尽可能靠近所有观测方程的解。

import numpy as np
#---------------------------------------工具函数与工具类-------------------------------------------
class LagrangeInterpolationFunc:#Lagrange插值函数
   n=0#插值阶数
   x_arr=None
   y_arr=None
   def __init__(self,x_arr,y_arr):
       if(len(x_arr)!=len(y_arr)):
           print("Array size is not eq. The larger array will be cut off.")
           self.n=min(len(x_arr),len(y_arr))-1
       else:
           self.n=len(x_arr)-1
       self.x_arr=x_arr[0:self.n+1]#截断数组,选取较小的size
       self.y_arr=y_arr[0:self.n+1]

   def __call__(self,x):
       r=0
       for k in range(0,len(self.x_arr)):
           lk=1
           for i in range(0,len(self.x_arr)):
               if(i!=k):
                   lk=lk*((x-self.x_arr[i])/(self.x_arr[k]-self.x_arr[i]))
           r=r+lk*self.y_arr[k]
       return r

def str_to_float(str):
   if(str.isspace() or len(str)==0):#全是空格表示未知或不存在,取0
       return 0.0
   else:
       return float(str.strip())

def str_to_int(str):
   if(str.isspace() or len(str)==0):#全是空格表示未知或不存在,取0
       return 0
   else:
       return int(str.strip())

def time_to_float(standard_fmt_time):#使用标准时间得到插值横坐标
   h=str_to_float(standard_fmt_time[11:13])
   m=str_to_float(standard_fmt_time[14:16])
   s=str_to_float(standard_fmt_time[17:])
   return h*3600+m*60+s

def time_arr_to_float_arr(standard_fmt_time_arr):#使用标准时间得到插值横坐标
   arr=[]
   for t in range(0,len(standard_fmt_time_arr)):
       arr[t]=time_to_float(standard_fmt_time_arr[t])
   return arr

def get_interpolation_time(sp3_file,time,forward_num,backward_num):#得到时间time的前forward_num组和后backward_num组星历的时间
   times=[]#记录时间字符串对应的时间
   timesf=[]#记录时间字符串对应的时间数字
   timef=time_to_float(time)
   for t in sp3_file.sp3_data_fields:
       this_time=sp3_file.get(t).time
       times.append(this_time)
       timesf.append(time_to_float(this_time))
   times.sort(key=time_to_float)#按时间先后排序
   timesf.sort()
   start_idx=0
   end_idx=-1
   for i in range(0,len(timesf)):
       if(timesf[i]>timef):#遍历时间第一次越过time
           start_idx=i-forward_num
           end_idx=i-1+backward_num
           break
       elif(timesf[i]==timef):
           start_idx=i-forward_num
           end_idx=i+backward_num
           break
   return times[start_idx:end_idx+1],timesf[start_idx:end_idx+1]

def standardize_time(time):#将时间格式标准化,把sp3文件记录的时间格式转换成o文件的格式(空位补零),小数点精确到后8位,不够则补零(o文件需要补零)
   time=list(time.strip())#字符串转换成列表
   if(time[5]==' '):
       time[5]='0'
   if(time[8]==' '):
       time[8]='0'
   if (time[11]==' '):
       time[11]='0'
   if (time[14]==' '):
       time[14]='0'
   if (time[17]==' '):
       time[17]='0'
   time=''.join(time)
   if(len(time)>28):#大于28位则截断末尾小数
       time=time[0:28]
   elif(len(time)<28):
       time=time.ljust(28,'0')#不够28位字符则末位补0
   return time

def select_satellite(o_satellite_name):#传入观测文件中的卫星名称,返回sp3文件的卫星名称
   return "PG"+o_satellite_name[1:]

class ODataPack:#16位字符的数据包
   name=None
   data=0
   lli=0
   ssi=0

   def __init__(self,name,data,lli,ssi):#每个观测数据包由数据、失锁标识符LLI、信号强度SSI组成
       self.name=name
       self.data=data
       self.lli=lli
       self.ssi=ssi

   def print(self):
       print(self.name," Data:", self.data, " LLI:", self.lli, " SSI:", self.ssi)

class OSatelliteData:#观测文件中一个卫星的数据
   name=None
   data_packs=None#数据类型为ODataPack
   def __init__(self,name,data_packs=None):
       if(data_packs!=None):
           self.data_packs=data_packs
       else:
           self.data_packs={}
       self.name=name

   def print(self):
       print("Satellite ", self.name," Total Data ",len(self.data_packs))
       for p in self.data_packs:
           self.data_packs[p].print()

   def add(self,name,data_pack):
       self.data_packs[name]=data_pack

   def get(self,name):
       return self.data_packs[name]

class OSatelliteDataField:#一段时间内的观测数据
   time=None
   epoch_mark=0
   satellite_num=0
   satellite_data=None#数据类型为OSatelliteData

   def set_info(self,time,epoch_mark,satellite_num):
       self.time=time
       self.epoch_mark=epoch_mark
       self.satellite_num=satellite_num
       self.satellite_data={}

   def print(self):
       print("Time:",self.time," Epoch Mark:",self.epoch_mark," Satellite Num:",self.satellite_num)
       for s in self.satellite_data:
           self.satellite_data[s].print()

   def __init__(self,str):
       time=standardize_time(str[0:27])
       epoch_mark=str[27:30].strip()
       satellite_num=str[30:].strip()
       self.set_info(time, epoch_mark, satellite_num)

   def add(self,satellite_no,satellite_data):
       self.satellite_data[satellite_no]=satellite_data

   def get(self,name):
       return self.satellite_data[name]

   def extract_sys_data(self,sys_name):
       sys_data={}
       for s in self.satellite_data:
           satellite_name=self.satellite_data[s].name
           if(satellite_name[0]==sys_name):
               sys_data[satellite_name]=self.satellite_data[s]
       return sys_data

class OFile:#记录所有时间段的观测数据
   satellite_data_fields=None

   def __init__(self):
       self.satellite_data_fields={}

   def add(self,time,satellite_data_field):
       self.satellite_data_fields[time]=satellite_data_field

   def get(self,time):#以时间为索引
       return self.satellite_data_fields[time]

class SP3File:
   sp3_data_fields=None

   def __init__(self):
       self.sp3_data_fields={}

   def add(self,time,sp3_data_field):
       self.sp3_data_fields[time]=sp3_data_field

   def get(self,time): # 以时间为索引
       return self.sp3_data_fields[time]

   def print(self):
       for f in self.sp3_data_fields:
           self.sp3_data_fields[f].print()

class SP3DataField:
   time=None
   satellite_sp3_data_packs=None

   def __init__(self,time):
       self.time=time
       self.satellite_sp3_data_packs={}

   def add(self,satellite_no,sp3_data_pack):
       self.satellite_sp3_data_packs[satellite_no]=sp3_data_pack

   def add_str(self,str):
       data_pack=SP3DataPack(str)
       self.add(data_pack.name,data_pack)

   def get(self,satellite_no): # 以时间为索引
       return self.satellite_sp3_data_packs[satellite_no]

   def print(self):
       print("Time ", self.time," Total Data ",len(self.satellite_sp3_data_packs))
       for p in self.satellite_sp3_data_packs:
           self.satellite_sp3_data_packs[p].print()

class SP3DataPack:
   name=None
   position_x=0
   position_y=0
   position_z=0
   clock_offset=0
   position_x_sd=0
   position_y_sd=0
   position_z_sd=0
   clock_offset_sd=0

   def set_info(self,name,position_x,position_y,position_z,clock_offset,position_x_sd,position_y_sd,position_z_sd,clock_offset_sd):
       self.name=name
       self.position_x=position_x
       self.position_y=position_y
       self.position_z=position_z
       self.clock_offset=clock_offset
       self.position_x_sd=position_x_sd
       self.position_y_sd=position_y_sd
       self.position_z_sd=position_z_sd
       self.clock_offset_sd=clock_offset_sd

   def __init__(self,str):#直接传入一行数据进行构造
       name=str[0:5].strip()
       position_x=str_to_float(str[5:19])#每个数据14个字符
       position_y=str_to_float(str[19:33])#每个数据14个字符
       position_z=str_to_float(str[33:47])#每个数据14个字符
       clock_offset=str_to_float(str[47:61])#每个数据14个字符
       position_x_sd=str_to_int(str[61:64])#每个数据3个字符
       position_y_sd=str_to_int(str[64:67])#每个数据3个字符
       position_z_sd=str_to_int(str[67:70])#每个数据3个字符
       clock_offset_sd=str_to_int(str[70:73])#每个数据3个字符
       self.set_info(name,position_x,position_y,position_z,clock_offset,position_x_sd,position_y_sd,position_z_sd,clock_offset_sd)

   def print(self):
       print("Satellite No. ",self.name," Position X:",self.position_x," Position Y:",self.position_y," Position Z:",self.position_z," Clock Offset:",self.clock_offset," Position X SD:",self.position_x_sd," Position Y SD:",self.position_y_sd," Position Z SD:",self.position_z_sd," Clock Offset SD:",self.clock_offset_sd)

#读取GPS文件
def read_sp3(path):
   time=None
   sp3=SP3File()
   sp3_data_field=None
   for line in open(path):
       if(line[0]=='#' or line[0]=='+' or line[0]=='%' or line.startswith("/*")):
           continue#注释行,跳过
       elif(line[0]=='*'):#*开头的行标注时间
           if(sp3_data_field!=None):
               sp3.add(time,sp3_data_field)
           time=standardize_time(line[1:])#标准化sp3的时间,以便使用相同的时间进行索引
           sp3_data_field=SP3DataField(time)
       elif(line.startswith("EOF")):#读取到EOF标识时结束
           sp3.add(time,sp3_data_field)#记录最后一个时间段的数据
           break
       else:#数据行
           sp3_data_field.add_str(line)
   return sp3

#观测文件格式参考 https://wenku.baidu.com/view/57163341f6335a8102d276a20029bd64783e6231.html
def read_o(path):
   data_attribs_text=""
   data_attribs={}#数据储存格式
   in_data_field=False#是否进入数据段
   o=OFile()#数据数组
   o_data_field=None
   for line in open(path):
       if(line[-1]=='\n'):#去掉末尾的换行符
           line=line[0:-1]
       if(in_data_field):#文件头结束进入数据段
           if(line[0]=='>'):#记录观测时间、卫星个数的行
               if(o_data_field!=None):
                   o.add(o_data_field.time,o_data_field)
               o_data_field=OSatelliteDataField(line[2:])
               continue
           #处理每一行的数据
           satellite_no=line[0:4].strip()#卫星名称(序号)
           data_attrib=data_attribs[satellite_no[0]]#获取该卫星所属系统对应的数据格式
           line=line.ljust(4+len(data_attrib)*16,' ') # 对其字符串,最右边空位补零,补齐后长度为4+16*n
           o_data_line=OSatelliteData(satellite_no)
           for i in range(0,len(data_attrib)):#分割数据,每个数据占16字符,包含空格
               data=str_to_float(line[4+16*i:4+16*(i+1)-3])
               lli=str_to_int(line[4+16*(i+1)-3])
               ssi=str_to_int(line[4+16*(i+1)-2])
               o_data_line.add(data_attrib[i],ODataPack(data_attrib[i],data,lli,ssi))
           o_data_field.add(satellite_no,o_data_line)#根据卫星序号索引对应行的数据
       else:#文件头
           if (line.endswith("SYS / # / OBS TYPES")):#数据储存格式
               data_attribs_text+=line[0:-21]
           elif(line.endswith("END OF HEADER")):#文件头结束时解析文件头
               in_data_field=True
               att_arr=data_attribs_text.split()#解析数据储存格式
               for i in range(0,len(att_arr)):#读取除最后一个系统的其他系统的数据格式
                   str=att_arr[i]
                   if(str.isdigit()):#定位系统数据个数,是一个数字
                       sys_name=att_arr[i-1]#数字前一个元素是系统名称
                       data_attribs[sys_name]=att_arr[i+1:i+1+str_to_int(str)]
   return o

#调试选项
print_o=False#是否打印观测文件数据
print_sp3=False#是否打印精密星历数据
print_A=False#打印A矩阵
print_L=False#打印L向量
print_step_result=False#打印每一次迭代的误差和结果
#程序参数
o_file_path="D:/JFNG1680.20o"
sp3_file_path="D:/igs21102.sp3"
sys_name='G'
time=standardize_time("2020 06 16 10 25 30.0000000")#观测文件时间
interpolation_forward_num=5#插值时,从目标时间往前的时间段数
interpolation_backward_num=5#插值时,从目标时间往后的时间段数
f1=1575.42#MHz
f2=1227.6
position_error_limit=0.0001#位置误差限制
#---------------------------------------伪距定位主程序-------------------------------------------
#读取文件和数据
o_file=read_o(o_file_path)
o_field=o_file.get(time)#读取该时间段的观测数据
if(print_o):
   print("Obeservation Data")
   o_field.print()
   print()
o_data=o_field.extract_sys_data(sys_name)#提取该时间段内的GPS系统(这里是G)的数据
sp3_file=read_sp3(sp3_file_path)#读取精密星历文件
#sp3文件九阶Lagrange插值使用的时间
interpolation_time,interpolation_timef=get_interpolation_time(sp3_file,time,interpolation_forward_num,interpolation_backward_num)#获取插值使用的时间,前5组后5组
timef=time_to_float(time)#目标插值时间的数值
if(print_sp3):
   print("SP3 Data")
   for t in interpolation_time:
       sp3_file.get(t).print()
   print()
it_data_arr={}#索引为卫星序号PGxx,值为插值时间段内它的各种数据数组
start_field=sp3_file.get(interpolation_time[0])#插值的第一个时间段,此变量用于遍历获取卫星名称,因为卫星的序号不连续
for i in start_field.satellite_sp3_data_packs:
   satellite_name=start_field.satellite_sp3_data_packs[i].name
   x_data_arr=[]
   y_data_arr=[]
   z_data_arr=[]
   co_data_arr=[]
   data_arr={}
   for j in range(0,len(interpolation_time)):
       sp3_field=sp3_file.get(interpolation_time[j])
       data_pack=sp3_field.get(satellite_name)#获取PGi的数据
       x_data_arr.append(data_pack.position_x*1E3)#km换算成m
       y_data_arr.append(data_pack.position_y*1E3)
       z_data_arr.append(data_pack.position_z*1E3)
       co_data_arr.append(data_pack.clock_offset*1E-6)#μs换算成s
   data_arr["Position X"]=x_data_arr
   data_arr["Position Y"]=y_data_arr
   data_arr["Position Z"]=z_data_arr
   data_arr["Clock Offset"]=co_data_arr
   it_data_arr[satellite_name]=data_arr
#通过拉格朗日插值法求出2020 06 16 10 25 30.0000000时刻的精密星历数据
sp3_it_function={}#插值的结果,索引为卫星名称,使用方法同sp3_it_function["PG01"]["Position X"](timef)
for i in it_data_arr:
   data_arr=it_data_arr[i]
   x_func=LagrangeInterpolationFunc(interpolation_timef,data_arr["Position X"])#Lagrange插值函数
   y_func=LagrangeInterpolationFunc(interpolation_timef,data_arr["Position Y"])
   z_func=LagrangeInterpolationFunc(interpolation_timef,data_arr["Position Z"])
   co_func=LagrangeInterpolationFunc(interpolation_timef,data_arr["Clock Offset"])
   result_arr={}
   result_arr["Position X"]=x_func
   result_arr["Position Y"]=y_func
   result_arr["Position Z"]=z_func
   result_arr["Clock Offset"]=co_func
   sp3_it_function[i]=result_arr
#-----计算部分-----
n=len(o_data)#观测卫星总个数
c=299792458#光速m/s
X=np.zeros(shape=(1,4))#未知数向量初始化为零向量,设置成估计值可以减少迭代次数。矩阵左上角为(0,0)
L=np.empty(shape=(n,1))
A=np.empty(shape=(n,4))
position_error=float("inf")#位置误差初始化为无穷大
last_rho={}#上一次的距离ρ
step=0#迭代步数
if(print_step_result):
   print("Initial Value\nPosition X:",X.item(0,0),"Position Y:",X.item(0,1),"Position Z:",X.item(0,2)," Clock Offset:",X.item(0,3))
while position_error>=position_error_limit:
   i=0#数字索引
   step=step+1
   if(print_step_result or print_L or print_A):
       print("\nStep ",step)
   #构造观测方程
   for o_satellite_name in o_data:
       sp3_satellite_name=select_satellite(o_satellite_name)
       o_data_pack=o_data[o_satellite_name]
       P1=o_data_pack.get("C1C").data
       P2=o_data_pack.get("C2W").data
       PC=np.square(f1)/(np.square(f1)-np.square(f2))*P1-np.square(f2)/(np.square(f1)-np.square(f2))*P2
       if(len(last_rho)<len(o_data)):#last_rho的元素个数小于卫星个数则表示这是第一次迭代
           last_rho[o_satellite_name]=PC#第一次迭代由于未知距离ρ,因此需要单独修正时间
       modified_timef=timef-last_rho[o_satellite_name]/c#钟差修正,第一次迭代时ρ没有值所以修正为PC
       xs=sp3_it_function[sp3_satellite_name]["Position X"](modified_timef)
       ys=sp3_it_function[sp3_satellite_name]["Position Y"](modified_timef)
       zs=sp3_it_function[sp3_satellite_name]["Position Z"](modified_timef)
       dts=sp3_it_function[sp3_satellite_name]["Clock Offset"](modified_timef)
       rho=np.sqrt(np.square(xs-X.item(0,0))+np.square(ys-X.item(0,1))+np.square(zs-X.item(0,2)))
       ax=(xs-X.item(0,0))/rho
       ay=(ys-X.item(0,1))/rho
       az=(zs-X.item(0,2))/rho
       adt=c
       #计算Li和Ai
       Li=PC-rho-c*X.item(0,3)+c*dts
       Ai=np.array([ax,ay,az,adt])
       #将向量分量和矩阵元都装载进向量L和矩阵A
       L[i][0]=Li
       A[i]=Ai
       i=i+1
       #将此次计算得到的ρ储存起来用于下一次迭代的插值
       last_rho[o_satellite_name]=rho
   if(print_L):
       print("L:\n",L)
   if (print_A):
       print("A:\n", A)
   #计算dX
   Q=np.linalg.inv(np.matmul(A.transpose(),A))#A.shape=(n,4), L.shape=(n,1)
   dX=np.matmul(Q,(np.matmul(A.transpose(),L))).transpose()#转置后未知数向量改正量,shape=(1,4)
   position_error=np.sqrt(np.square(dX.item(0,0))+np.square(dX.item(0,1))+np.square(dX.item(0,2)))
   X=X-dX#改正X
   if(print_step_result):
       print("Position error:", position_error)
       print("Position X:",X.item(0,0),"Position Y:",X.item(0,1),"Position Z:",X.item(0,2)," Clock Offset:",X.item(0,3))
print("\nTotal step",step,", Final Result\nPosition X:",X.item(0,0),"Position Y:",X.item(0,1),"Position Z:",X.item(0,2)," Clock Offset:",X.item(0,3))

卫星导航伪距单点定位程序Python实现参考的评论 (共 条)

分享到微博请遵守国家法律