PR

フライトログにフーリエ変換でローパスフィルターをかけてノイズを除去するPythonプログラム

igc形式のフライトログにフーリエ変換でローパスフィルターをかけてノイズを除去するPythonプログラムについて説明する

スポンサーリンク

はじめに

最終的にはこんな感じでノイズが取り除かれた.igcファイルを作成する
(左:元データ,右:ノイズ除去後)

↓.igcファイルの概要と読み取り方についてはこちら

それではいってみよう

ローパスフィルタ

GPSでフライトログを取得すると,下図のようなガタガタの飛行軌跡が得られるが,これはグライダーのフライトの滑らかな軌跡にGPSの誤差などの細かなノイズが加わっているためである

グライダーのフライトの時間スケールが数十秒(サーマル旋回)から数時間(フライト全体)なのに対してGPSのノイズの時間スケールが数秒(データの取得間隔の数倍)であることを考えると、得られたデータ信号の時間スケール(周期)に注目すれば真のデータとノイズとを分離できそうである

実際、上の画像のrawデータを以下のフーリエ変換によって周波数解析すると下図のようになる

\begin{equation}
F(\omega)=\int_{-\infty}^{\infty}{f(t)e^{-i \omega t}}dt
\end{equation}

\begin{equation}
T=\frac{1}{\omega}
\end{equation}

周期が数十秒以上の信号だけでなく,周期が数秒のノイズと思われる信号も混ざっていることが分かる

横軸:周期,縦軸:信号の強度

この信号に対して,以下のように周期が20秒以下のものを取り除き,逆フーリエ変換をかけると下図のようになる

\begin{equation}
F'(\omega)=\left\{\begin{array}{ll}
0&\left(T < 20\right)\\
F(\omega)&\left(T \geq 20\right) \\
\end{array}\right.
\end{equation}

\begin{equation}
f'(t)=\frac{1}{2\pi}\int_{-\infty}^{\infty}{F'(\omega)e^{i \omega t}}d\omega
\end{equation}

元データからGPSのノイズらしきものがきれいの取り除かれていることが分かる

このように,ノイズ混じりのデータに対して、周波数の低い真のデータをそのままに、周波数の高いノイズを除去する手法のことをローパスフィルタ(Low Pass Filtering)という

ローパスフィルタの閾値

今回のプログラムのローパスフィルタの閾値を決める際はグライダーのソアリング旋回の周期を参考にした

速度\(V\),バンク角\(\phi\)で定常旋回するグライダーの旋回周期\(T\)は次の式で計算できる

\begin{equation}
T=\frac{2 \pi V}{g \tan{\phi}}
\end{equation}

この式を使うと,速度90km/h(=25.0m/s)の45度旋回の周期は\(16.0\)s,速度80km/h(=22.2m/s)の30度旋回の周期は\(24.7\)sということがわかる

\begin{align}
&T=\frac{2 \pi \times 25.0}{9.8 \tan{45^\circ}} \simeq 16.0\\\\
&T=\frac{2 \pi \times 22.2}{9.8 \tan{30^\circ}} \simeq 24.7\\
\end{align}

以上より,ローパスフィルタの閾値を周期20秒とすれば概ねグライダーの飛行軌跡を取り出すことができると思われる

ソースコード全文

ソースコード全文はこれ

import datetime
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Import IGC file from current directory
path = '2023-02-22-XNA-9AD179D6D6F04DDD95474658245E6D4F-01.igc'
with open(path) as f:
    igcfile = f.readlines()

# Extract B-record from IGC file
date = datetime.date(int(path.split('-')[0]),int(path.split('-')[1]),int(path.split('-')[2]))
igcfile = [line.rstrip("\n") for line in igcfile]
igc_B = [[igcfile.index(lines),lines] for lines in igcfile if 'B' in lines[0] if 'A' in lines]
list_B = [[lines[0],lines[1][0],lines[1][1:7],lines[1][7:14],lines[1][14],lines[1][15:23],lines[1][23],lines[1][24],lines[1][25:30],lines[1][30:]] for lines in igc_B]

# Convert text of B-record to values
idx=0
for _ in list_B:
  list_B[idx][2] = datetime.datetime(date.year,date.month,date.day,int(list_B[idx][2][:2]),int(list_B[idx][2][2:4]),int(list_B[idx][2][4:]))
  list_B[idx][3] = float(list_B[idx][3])/100000.
  list_B[idx][5] = float(list_B[idx][5])/100000.
  list_B[idx][8] = float(list_B[idx][8])
  list_B[idx][9] = float(list_B[idx][9])
  idx+=1

# Create dataframe from list of records and export it as CSV file
df_B = pd.DataFrame(list_B,columns=['igc_index', 'record_type', 'UTC_time','latitude','NS','longitude','EW','fix_validity','press_alt','GNSS_alt'])
df_B.to_csv(path[:-4]+".csv")


# Low pass filtering by using FFT
df_B0 = df_B.copy()
for column in ['latitude','longitude','press_alt','GNSS_alt']:
  f = df_B[column].values
  F = np.fft.rfft(f)
  freq = np.fft.rfftfreq(len(f))
  F[(1/(freq+1e-10) < 20)] = 0
  df_B[column] = np.fft.irfft(F, len(f))

# Convert dataframe to list in same format as IGC
list_B_out = [[lines[0],lines[1]+lines[2].strftime('%H%M%S')+format(int(lines[3]*100000),'07')+lines[4]+format(int(lines[5]*100000),'08')+lines[6]+lines[7]+format(int(lines[8]),'05')+format(int(lines[9]),'05')] for lines in df_B.values.tolist()]
igcfile_out = igcfile
idx_df=0
for idx in np.arange(len(igcfile_out)):
  if idx <= df_B.igc_index.max() and df_B.values.tolist()[idx_df][0] == idx:
    line_B = df_B.values.tolist()[idx_df]
    igcfile_out[idx] = line_B[1]+line_B[2].strftime('%H%M%S')+format(int(line_B[3]*100000),'07')+line_B[4]+format(int(line_B[5]*100000),'08')+line_B[6]+line_B[7]+format(int(line_B[8]),'05')+format(int(line_B[9]),'05')
    idx_df+=1

# Save created list as new igc file
path_w = path[:-4]+"FFT"+path[-4:]
with open(path_w, mode='w') as f:
    f.write('\n'.join(igcfile_out))

# Plot 3D flight path
fig = plt.figure(figsize=(16,9))
ax = fig.add_subplot(projection='3d')

x = np.radians(df_B.longitude-df_B.longitude[0])*6378.137*1000
y = np.radians(df_B.latitude-df_B.latitude[0])*6378.137*1000
z = df_B.press_alt-df_B.press_alt.min()
range = max(abs(x.min()),abs(x.max()),abs(y.min()),abs(y.max()),abs(z.min()),abs(z.max()))

ax.view_init(elev=30, azim=-60)
ax.set_xlim3d(-range,range)
ax.set_ylim3d(-range,range)
ax.set_zlim3d(0,range)
ax.set_box_aspect((2,2,1))
ax.plot(x,y,z,c="red")
plt.show()

↓Git hub

https://github.com/mtkbirdman/mtkbirdman.com/tree/master/igc

↓Google colaboratoryの人はこっち

igc - Google ドライブ

.igcファイルの読み込みについては前回の記事(https://mtkbirdman.com/python-convert-igc-file-to-csv-data)で説明したので,それ以降のプログラムを解説する

プログラムの解説

# Perform Low pass filter by using FFT

df_B0 = df_B.copy()

.copy()でオリジナルのデータフレームをコピーしておく

データフレームは普通に=で複製すると参照渡し,.copy()で複製すると値渡しになる

pandas.DataFrame.copy — pandas 1.5.3 documentation

for column in ['latitude','longitude','press_alt','GNSS_alt']:
  f = df_B[column].values
  F = np.fft.rfft(f)
  freq = np.fft.rfftfreq(len(f))
  F[(1/(freq+1e-10) < 20)] = 0
  df_B[column] = np.fft.irfft(F, len(f))

ここでフーリエ変換を行っている

df.valuesでフーリエ変換したい値\(f\)をnp.ndarrayに変換する

np.fft.rfft(x)でフーリエ変換を行い,np.fft.rfftfreq(len(x))でフーリエ変換\(F\)のサンプル周波数\(\omega\) [Hz]を取得する

\begin{equation}
F(\omega)=\int_{-\infty}^{\infty}{f(t)e^{-i \omega t}}dt
\end{equation}

F[(1/(freq+1e-10) < 20)] = 0で周期\(T=1/\omega\) [s]が20より小さい信号の強度をゼロにする(ローパスフィルター)

最後にnp.fft.irfft(F, len(f))で\(F\)のフーリエ逆変換を行う

\begin{equation}
f(t)=\frac{1}{2\pi}\int_{-\infty}^{\infty}{F(\omega)e^{i \omega t}}d\omega
\end{equation}

https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.values.html
https://numpy.org/doc/stable/reference/generated/numpy.fft.rfft.html

# Convert dataframe to list in same format as IGC

list_B_out = [[lines[0],lines[1]+lines[2].strftime('%H%M%S')+format(int(lines[3]*100000),'07')+lines[4]+format(int(lines[5]*100000),'08')+lines[6]+lines[7]+format(int(lines[8]),'05')+format(int(lines[9]),'05')] for lines in df_B.values.tolist()]

df_B.values.tolist()でデータフレームをリストに変換し,各数値をB recordのテキストに変換・結合する

igcfile_out = igcfile
idx_df=0
for idx in np.arange(len(igcfile_out)):
  if idx <= df_B.igc_index.max() and df_B.values.tolist()[idx_df][0] == idx:
    line_B = df_B.values.tolist()[idx_df]
    igcfile_out[idx] = line_B[1]+line_B[2].strftime('%H%M%S')+format(int(line_B[3]*100000),'07')+line_B[4]+format(int(line_B[5]*100000),'08')+line_B[6]+line_B[7]+format(int(line_B[8]),'05')+format(int(line_B[9]),'05')
    idx_df+=1

前回の記事で用意しておいた変数igc_index(抽出したB recordの元ファイルでのインデックス)を頼りにフーリエ変換後のB recordを.igcファイルのリストに戻していく

# Save created list as new igc file

path_w = path[:-4]+"FFT"+path[-4:]
with open(path_w, mode='w') as f:
    f.write('\n'.join(igcfile_out))

読み込んだファイル名の末尾に「FFT」を付け加え,.igcファイルを出力する

実行例

以下のコマンドでigcfile.pyを実行する

python igcfile.py

igcfile.pyと同じディレクトリに*FFT.igcと*.csvが作成される

/mnt/c/Users/mtkbirdman/Desktop/igc
├── 2023-02-22-XNA-9AD179D6D6F04DDD95474658245E6D4F-01.csv
├── 2023-02-22-XNA-9AD179D6D6F04DDD95474658245E6D4F-01.igc
├── 2023-02-22-XNA-9AD179D6D6F04DDD95474658245E6D4F-01FFT.igc
└── igcfile.py

サーマルでぐるぐる回りまくっても大丈夫である

おわりに

igc形式のフライトログにフーリエ変換でローパスフィルターをかけてノイズを除去するPythonプログラムについて説明した

↓フライトログ記録アプリ

SeeYou Navigator – Naviter.com
‎SeeYou Navigator
‎Want to fly light-weight? Just going for a quick flight or two and don’t want all the heavy equipment? Install SeeYou N...

↓関連記事

コメント