LSTM AutoEncoderによる異常検知
1. はじめに
この記事では、KerasでLSTM AutoEncoderを実装し、時系列データの簡単な異常検知を試します。
LSTM AutoEncoderとは
LSTM AutoEncoderは、時系列データの扱いに優れたLSTM(Long Short-Term Memory)モデルを用いたオートエンコーダです。主にネットワーク監視や設備の予兆保全など、時系列データの異常検知に活用されています。この手法を活用することで、サーバ運用者やシステム管理者の作業負荷を軽減し、運用コストの削減にもつながります。
LSTM AutoEncoderの仕組みとして、LSTMモデルを前後に2つ並べています。前段のLSTM(エンコーダ)がログやトラフィックを圧縮し、後段のLSTM(デコーダ)が復元します。
正常なデータで学習するため、正常データの復元誤差は小さくなりますが、パターンが異なる異常時は誤差が大きくなります。正常データの復元誤差を基に閾値を設定してやれば、閾値超過で異常を検知できます。
実施内容
1. 学習データの生成
・学習に使うデータの生成を行います。
・学習に使用するデータとして、周期の異なるsin波を1,000個ほど生成します。
2. モデルの構築と学習
・生成した学習データをもとにモデルを構築します。
・学習データをモデルに通し、正しく復元できることを確認します。
3. 異常データの生成
・異常検知を試すために、異常データの生成を行います。
・異常検知に使用するデータとして、sin波の一部分に+0.5と-0.5を交互に加算したものを使用します。
・加算していない範囲は正常、数値を加算した範囲は異常として扱います。
・異常データをモデルに通し、正しく復元できないことを確認します。
4. 異常検知の実装
・異常データを正しく検知できるよう閾値を設定します。
・設定した閾値で異常を検知できるかを確認します。
2. 環境
・言語およびライブラリのバージョン
Python version: 3.11.13
Keras version: 3.8.0
・実行環境
Kaggle Notebook
(記事に記載したコードブロックはJupyter Notebookのセル上で順番に実行することを想定しています)
3. 必要なライブラリのインポート
from keras.layers import Input, LSTM, RepeatVector, Dense
from keras.models import Model
import matplotlib.pyplot as plt
import numpy as np
4. 学習データの生成
データ生成用の関数
# 今回使うデータ(1次元のランダム周期のsin関数)を生成するための関数
def make_random_sin_data(num_patterns, timesteps):
# 正規分布(平均1, 標準偏差0.1)に従うランダムな周期を作成
mean_period = 1.0
std_dev_period = 0.1
periods = np.random.normal(loc=mean_period, scale=std_dev_period, size=num_patterns)
# 行列を初期化
matrix = np.zeros((num_patterns, timesteps, 1))
# 各パターンに対してsin関数を計算
for i in range(num_patterns):
x = np.linspace(0, 2 * np.pi, timesteps)
matrix[i, :, 0] = np.sin(x / periods[i])
return matrix
データ生成用の関数
# 学習データ生成
X_train = make_random_sin_data(1000,100)
# プロットの準備
plt.figure(figsize=(10,5))
# 各学習データ(sin波)をプロット
for i in range(X_train.shape[0]):
plt.plot(X_train[i, :, 0], alpha=0.5) # alphaを指定して透明度調整
# ラベルの設定
plt.xlabel(‘timesteps’)
plt.ylabel(‘value’)
# グラフの表示
plt.show()
周期の異なる1000個のsin波が今回の学習データです。
5. モデルの構築と学習
# パラメータ設定
timesteps = 100
input_dim = 1
latent_dim = 10
# モデル構築
inputs = Input(shape=(timesteps, input_dim))
encoded = LSTM(latent_dim, activation=”tanh”, return_sequences=False)(inputs)
hidden = RepeatVector(timesteps)(encoded)
decoded = LSTM(latent_dim, activation=”tanh”, return_sequences=True)(hidden)
decoded = Dense(input_dim, activation=”tanh”)(decoded)
# モデルコンパイル
autoencoder = Model(inputs, decoded)
autoencoder.compile(optimizer=’adam’, loss=’mse’)
# モデル訓練
autoencoder.fit(X_train, X_train, epochs=30, batch_size=32, shuffle=True)
# 再構築データの取得
X_train_reconstructed = autoencoder.predict(X_train)
学習データの再構築結果の確認
学習データのsin波の1つと、それを基に再構築したデータを同時に並べてみます。
# x軸のデータ
x = np.arange(1, 101)
# y軸のデータを2つ用意
y1 = X_train[0]
y2 = X_train_reconstructed[0]
# プロットの作成
plt.figure(figsize=(10, 5))
# 1つ目のプロット
plt.plot(x, y1, label=’train’, color=’b’)
# 2つ目のプロット
plt.plot(x, y2, label=’reconstructed’, color=’r’)
# ラベルの設定
plt.xlabel(‘timesteps’)
plt.ylabel(‘value’)
# 凡例の表示
plt.legend()
# グラフの表示
plt.show()
青い波が学習データ、赤い波がそれを基に再構築したデータです。
6. 異常データの生成
# 周期ランダムなsin波を1つ生成(要素数は100)
X_test = make_random_sin_data(1,100)
# 50個目の要素から59個目の要素までに0.5と-0.5を交互に加算
for i in range(50, 60):
if (i – 50) % 2 == 0:
X_test[0, i, 0] += 0.5
else:
X_test[0, i, 0] -= 0.5
# 異常データを基にモデルで再構築
X_test_reconstructed = autoencoder.predict(X_test)
異常データの再構築結果の確認
# x軸のデータ
x = np.arange(1, 101)
# y軸のデータを2つ用意
y1 = X_test[0]
y2 = X_test_reconstructed[0]
# プロットの作成
plt.figure(figsize=(10, 5))
# 1つ目のプロット
plt.plot(x, y1, label=’test’, color=’b’)
# 2つ目のプロット
plt.plot(x, y2, label=’reconstructed’, color=’r’)
# ラベルの設定
plt.xlabel(‘timesteps’)
plt.ylabel(‘value’)
# 凡例の表示
plt.legend()
# グラフの表示
plt.show()
青い波が異常データ(timesteps=50から59の範囲に異常発生)、赤い波がそれを基に再構築したデータです。
異常が発生している範囲において、モデルが異常な振る舞いに反応できず、
入力データと再構築データの間で誤差が発生していることが分かります。
7. 異常検知の実装
異常判定する閾値を学習データから設定
#各学習データと各再構築データの平均二乗誤差を計算し、リストに格納していく
anom_train = list()
for i in range(X_train.shape[0]):
anom = np.sum(np.mean((X_train_reconstructed[i]-X_train[i])**2, axis=0))
anom_train.append(anom)
anom_train = np.array(anom_train)
#リスト内の99.9percentileを、異常判定に使用する閾値に設定
th_value = np.percentile(anom_train, q=99.9)
テストデータの異常度を計算
#テストデータと再構築データの平均二乗誤差を計算
anom_test = list()
for i in range(X_test.shape[1]):
anom = (X_test_reconstructed[0][i]-X_test[0][i])**2
anom_test.append(anom)
anom_test = np.array(anom_test)
閾値を超過したか確認
# x軸のデータ
x = np.arange(1, 101)
# y軸のデータを2つ用意
y1 = np.full(100, th_value)
y2 = anom_test
# プロットの作成
plt.figure(figsize=(10, 5))
# 1つ目のプロット
plt.plot(x, y1, label=’th_value’, color=’b’)
# 2つ目のプロット
plt.plot(x, y2, label=’anom_test’, color=’r’)
# ラベルの設定
plt.xlabel(‘timesteps’)
plt.ylabel(‘value’)
# 凡例の表示
plt.legend()
# グラフの表示
plt.show()
青い線が設定した閾値、赤い波が異常度(テストデータと再構築データの平均二乗誤差)です。異常が発生している範囲において、異常度が閾値を超過しています。正しく異常を検知できていることが分かります。
