この記事はNikkei Advent Calendar 2022の21日目の記事です。
こんにちは。日本経済新聞社のプラットフォーム推進室でデータエンジニアをしている玉越です。 普段は日経内製のリアルタイムアクセスログ解析基盤 Atlas の開発、運用、保守を行なっています。
先日行われた【NIKKEI Tech Talk #2】メディア企業のデータエンジニアリング~日経のデータ基盤~」というイベントで日経でのデータエンジニアリングの取り組みについて紹介させていただきました。
日経におけるデータエンジニアリングの取り組み ~ NIKKEI Tech Talk #2
こちらの資料でも事前に告知したとおり、「BigQuery へのニアリアルタイム連携」について技術的な詳細をこちらの記事でお伝えします。
はじめに
BigQuery のニアリアルタイム連携についての話の前にまず Atlas とは何かを説明します。
Atlas は、2016年頃に 「リアルタイムに」「全てのデータを」扱える、データの計測から活用までを担うプラットフォームを目指して、AWS 上に構築されたリアルタイムアクセスログ解析基盤です。
この Atlas の導入(とその活用を進める施策と多数の人の数多の努力)により、日経内でデータ活用の風土が育ち、データドリブンな組織文化が醸成されつつあります。
リアルタイムアクセスログ分析基盤をAWSに構築した話 (JAWS UG ビッグデータ支部)
冒頭で BigQuery と言っておきながらなぜ AWS と思われるかもしれません。実は当初 AWS の Redshift で Atlas のセルフサービス分析基盤を構築していましたが、昨年から今年にかけて Redshift から BigQuery に移行しました。
本記事では移行した BigQuery を対象とします。
本題
そんな Atlas の分析基盤の中核を担う BigQuery への行動ログの連携をニアリアルタイム、つまり行動ログが収集されてから数秒 ~ 数分に短縮するための実装に現在進行系で取り組んでいます。 「現在進行系で」と書きましたが、実は今年の10月にリリースしたものの BigQuery と Dataflow でいくつかの問題が発生し(後ほど触れます)、旧来の仕組みに戻さざるを得なくなるということがおきました。そこで得られた反省点も踏まえ、アーキテクチャをどのように変えニアリアルタイム連携を実現しようとしているかについてお話します。
多少文章が長くなるため、ここで話の全体の流れについてまとめます。
目次
- 1. 旧来の仕組み
- 2. 旧来の仕組みの課題
- 3. BigQuery のニアリアルタイム連携への期待
- 4. BigQuery のニアリアルタイム連携の設計方針、アーキテクチャ(第一弾)
- 5. 第一弾で発生した問題
- 6. BigQuery のニアリアルタイム連携の新しいアーキテクチャ
- 7. まとめ
1. 旧来の仕組み
元々ニアリアルタイムにデータを参照したいケースでは Elasticsearch (with Kibana)を参照し、蓄積された大容量データは(ある程度時間が経過してもよいから) BigQuery のテーブルを参照するというようにデータベース使い分ける、というのが Atlas の設計思想でした。
Single Store (MemSQL) | BigQuery | Elasticsearch | |
---|---|---|---|
Latency | 1 秒未満 | 1 ~ 2 時間程度 | 数秒〜数分 |
データ構造 | カラムナ(※メモリ上のみ) | カラムナ | JSON ドキュメント |
クエリ | SQL | SQL | Lucene |
特徴 | MySQL 互換のインメモリー DB ユニーク制約可能、データ投入高速 クエリーも超高速 | 大容量データ対応、フラットレート契約 JSON 文字列も柔軟に扱う 分析者に開放している環境 | 新しいフィールドに迅速に対応できる Kibana 経由でデータを探索できる 最新データを簡単に参照できる環境 |
そのためストリーミング処理をしてニアリアルタイム処理するといったものはそもそも要件として存在しないため、バッチで安定的にデータの取り込みを行い、クエリ時の負荷を下げるためにデータの取り込み時に重複排除をする仕組みを構築しました。その概要が以下の図です。
ファイルからデータ連携されたデータを重複ありの状態で受付け、1 時間に 1 回重複排除したデータを行動ログテーブルにマージする、というのが大まかな手法の説明です。このコンポーネントは atlas-consumer-s3
と呼ばれます。

2. 旧来の仕組みの課題
上記の仕組みで大きな障害も発生せずに運用できていましたが、以下のような課題を抱えていました。
- 2-1. 原因不明の不具合によりデプロイ時に Kinesis からの 1 日分の読み直し作業が発生する
- 2-2. 新規カラム追加の実装/作業が煩雑
- 2-3. データの整合性の検証が不十分
2-1. 原因不明の不具合によりデプロイ時に Kinesis からの 1 日分の読み直し作業が発生する
既存の仕組みでは Kinesis から読み込んだレコードを CSV に変換する処理で KCL (Kinesis Consumer Library) を利用しています。KCL は各シャードをどのプロセスがどこまで読み込んだかを管理するための情報を DynamoDB テーブルで管理していますが、新規バージョンを Elastic Beanstalk にデプロイすると、この DynamoDB テーブルの情報に実態との齟齬が生じてシャードからの読み込みに失敗し続けてしまう現象が数回に一度の頻度で発生していました。
このような齟齬が生じた場合には、DynamoDB を新規作成しなおし、Kinesis の読み込みを 1 日前からやり直す必要が生じ、最新のデータ反映に追いつくまでに約半日を要していました。 データの反映が遅れるだけではなく、DynamoDB の作成し直しや設定の変更等いくつかのステップが別途必要となりリリース作業に 1 日を費やしてしまうこともざらにありました。
2-2. 新規カラム追加の実装/作業が煩雑
上記のリリース作業時のリカバリ作業がなくとも、行動ログのテーブルに新規カラムを追加して、atlas-consumer-s3 からデータ連携されるように修正するには以下のようなステップが必要で時間を要していました。
- atlas-consumer-s3 に追加するカラムがどの項目から取得するものなのかの定義を変更する。その際には必ずファイルの末尾に追記する必要がある。
- atlas-consumer-s3 を Elastic Beanstalk にデプロイする。
- atlas-consumer-s3 から連携される GCS ファイルの各行の末尾に追加したカラムの情報が存在していることを確認する。連携されるまで 2 時間程度かかるため、2 時間後に確認する。
- BigQuery の 行動ログを蓄積するテーブルと重複ありで最新の行動ログを格納する表それぞれにカラムを追加する。この際にカラムの順序を 1 で設定した順序に必ず指定する必要がある。実装者、レビュー者がそれぞれ確認する。また型指定を誤るとデータ連携が失敗するが、実際に流れてくるデータがその型のデータであることは保証されないため慎重に設定する必要がある。
- カラム追加後、また 2 時間待ちデータが反映されることを確認する。
2-3. データの整合性の検証が不十分
上記の手順の説明の中にも含まれていましたが、Kinesis まで流れてくるデータは JSON で十分なデータの検証を経ずにデータが流れてきます。例えば以下のようにいくつか型の不整合が生じているケースがありました。
- true or false の boolean 型を期待しているところに、文字列型が混入
- オブジェクトを期待しているのに、空文字列や空の配列が指定される
…etc
旧来の仕組みで利用した BigQuery のデータ転送は CSV データをある程度きれいになるように(例えば日付フォーマットの変換や先頭 BOM の削除等)してくれますが、型の不整合までは対処することができません。
また JSON 構造のデータを BigQuery 上では文字列型として扱っているため、不正な JSON 文字列や期待と異なる構造をしている JSON (例えば オブジェクトを期待しているのに配列がくる等)には対応できません。エラーにはなりませんが、クエリをする側が対処する必要がなり、クエリの複雑化やクエリパフォーマンスの低下を招きます。
3. BigQuery のニアリアルタイム連携への期待
前述したとおり、Atlas はレイテンシごとに異なるデータソースを提供する設計思想でしたが、SQL で(ニア)リアルタイムにあらゆる行動ログデータを参照したいという要望がいくつかあがっていました。
-
SEO 対策のデータを見るときに現在は GA360 でしかリアルタイム情報を参照できない。GA360 では大まかな情報しかない。 BigQuery へリアルタイムにデータ連携されると Atlas に対して(GA360 よりも)細かい情報を柔軟に取得できるようになる
-
MemSQL や ES でしか出来てない処理の一部を BigQuery に寄せられる
また、Atlas の主要なデータパイプラインをよりスケーラブルで安定的に高パフォーマンスで稼働する環境を目指すべく、GCP 上に移行する計画が浮上しました。 GCP 上にデータパイプラインを一度にすべてビッグバン移行することはリスクが高すぎるため、データパイプラインを部分的に移行していく方針をたてる中で、上記で説明した課題感や期待感、BigQuery に一番近いところで GCP 移行しやすいという点で BigQuery にデータ連携する仕組みを新たに構築することに決めました。
4. BigQuery のニアリアルタイム連携の設計方針、アーキテクチャ(第一弾)
まずデータパイプラインを流れるデータのスキーマを protobuf で定義することにしました。
これは以下の理由によります。
- スキーマ依存のデータフォーマットのため JSON よりも型の不整合を防ぐことができる
- Dataflow (Apache Beam) で標準で扱うことができるデータフォーマット
- AWS から GCP へのデータ転送で JSON 等のテキストフォーマットよりも転送サイズを抑える事ができる
- protobuf の定義ファイルからコードの自動生成、テーブルのスキーマの自動生成等今まで人手で生成した部分を自動化することができる
- JSON との相互運用性が高い
次にデータパイプラインの中心は、GCP PubSub をキューとして、Dataflow で ETL 処理を行うことにしました。これは以下の理由によります。
- 将来的にデータパイプラインを GCP に移行するにあたって PubSub のような分散キューサービスがあったほうが移行しやすい
- PubSub + Dataflow によって BigQuery にストリーミングインサート処理する事例が豊富で成熟している
- 重複排除の仕組みが簡単に実装できる
アーキテクチャの全体像は以下の通りです。 PubSub を Subscribe する Dataflow の Streaming Job が BigQuery に Storage WRITE API でデータを書き込むという比較的シンプルな構成です。

このアーキテクチャで構築された BigQuery ニアリアルタイム連携がリリースされ、当初1ヶ月間は大きな問題もなく安定的に稼働していました。 しかし 1 ヶ月後に突然 BigQuery へのデータ連携がストップし、最新のデータが全く BigQuery に連携されなくなりました。様々な対応をとりましたがすぐには元に戻せる見込みがなかったため、旧来の仕組みを復活させて今もなおその仕組みのまま稼働しています。
5. 第一弾で発生した問題
第一弾のアーキテクチャでは様々な問題が発生しました。その問題のほとんどは Google Cloud の製品不具合によるものでした。その代表的な問題についてここで取り上げます。これらの問題は現在進行系で Google Cloud のサポートと密にコミュニケーションを取りながら問題解決に向けて動いています。
5-1. Storage wRITE API の対象テーブルへの SELECT が失敗する
BigQuery ニアリアルタイム連携第一弾をリリースした後、BigQuery の Storage WRITE API で書き込む対象のテーブルに対して SELECT でクエリを投げると、Streaming data from <table_name> is temporarily unavailable
というエラーが発生するという報告を受けました。当初は発生頻度も少なく(3週間に1回程度)、すぐにリトライすればエラーを回避できたため無視していました。
しかし、BigQuery ニアリアルタイム連携においてデータの書き込みが進まない事象が発生する2,3日前あたりから発生頻度が上がり、リトライしても同じエラーが発生し続けるという事象が発生しました。
この件について Google Cloud のサポートに問い合わせたところ、BigQuery で発生した Incident の影響と BigQuery の内部的なメンテナンス時に発生する非常に稀な現象であるとのことでした。しかし継続的に調査を進めていただいた結果、非常に頻度は低いが現在も発生する可能性があると判明しました。現在修正に向けて取り組んでいただいている状況です。
5-2. Storage wRITE API の対象テーブルへの DELETE/UPDATE が失敗する
こちらは上記の問題に近いですが、SELECT ではなく DELETE 文や UPDATE 文を Storage WRITE API の対象テーブルに実行すると以下のエラーが発生するという問題です。
UPDATE or DELETE statement over table <table_name> would affect rows in the streaming buffer, which is not supported
BigQuery へのニアリアルタイム連携第一弾の更新が停止された後1日以上経過しても、DELETE 操作で上記のエラーが発生していました。BigQuery へのニアリアルタイム連携に問題が起きたタイミングで Dataflow Job を強制停止した影響で書き込み対象のテーブルに重複レコードが大量に発生する問題が起きていたためなんとか対処する必要がありました。
元々 Storage WRITE API を含めたストリーミングによって挿入された行は最大 90 分間、「更新、削除、コピー」操作ができないという仕様が存在します。(ドキュメントでは以前のストリーミング API の仕様として記載されていますが、Storage WRITE API にも適用されるようです。)
しかしこの仕様が適用されるのは、Storage WRITE API でデータが継続的に挿入され続けている状況のときで、データの更新が止まっている現状でも同様のエラーが発生する理由がよくわからなかったため Google Cloud のサポートに調査を依頼しました。
結果として判明したのは以下のとおりです。
- Storage WRITE API を使った書き込みを行う Dataflow Streaming Job がキャンセル、もしくは強制キャンセルされると Storage WRITE API で使用するバッファが残り続ける場合がある
- このバッファは最大7日間保持され、ユーザ側から明示的に削除することはできない
- バッファが削除されない限り UPDATE/DELETE 操作を行うことはできない(INSERT/SELECTは行える)
最終的には BigQuery のニアリアルタイム連携第一弾で問題が発生してから、7日経過後に重複レコードを削除する対応を取りました。
5-3. Dataflow Job の drain が終わらない
こちらは BigQuery ニアリアルタイム連携処理でデータの連携が全くされなくなったために、一度 Dataflow Streaming Job を安全に停止しようと Drain したところ、Drain の処理が2時間経過しても終わらない事象が発生しました。Streaming Job の Drain は流れるデータ量に応じて完了時間が伸びることは知っていましたが、これほど長時間化することはこれまでの経験上ありませんでした。
これに関しても Google Cloud の調査を依頼したところ、Dataflow の不具合にあたったことがわかりました。 Dataflow の Runner V2 では Runner Harness というプロセスを起動し、Apache Beam SDK を使ったユーザコードによるプロセスを管理します。Dataflow Job の Drain をリクエストすると、この Runner Harness がリクエスト受け取り、ユーザコード側のプロセスを終了させます。今回は Runner Harness とユーザコードのコミュニケーションが何らかの問題によってとれなくなったことで Drain の処理がいつまで経っても終わらない問題が起きたようです。本件については Public Issue Tracker に登録され (https://issuetracker.google.com/259630147) 現在 Google Cloud の Dataflow チームが解決に向けて取り組んでいただいています。
5-4. Dataflow Job にて「CreateWriteStream requests 」を超えたというエラーメッセージが出て、Storage WRITE API による更新ができなくなる
この問題が BigQuery ニアリアルタイム連携で BigQuery にデータが連携されなくなった根本原因です。 まずこの問題はさらに2つに分けられます。まず1つ目は 「CreateWriteStream Requests」の Quota 使用量が常に 0% と表示される問題です。これは以前から認識されている問題で、既に Public Issue Tracker に登録 (https://issuetracker.google.com/issues/233351402) されていました。
2つ目が 1) CreateWriteStream requests の Quota Limit を実際には超えていないにも関わらず上記のエラーが出る、 2) CreateWriteStream requests が予期せず大量に発行されて Quota Limit を超えてしまう、のうちのいずれかです。いずれかといったのは1つ目の問題のせいで実際にどのくらい消費されたがわからないためです。 本件についても Google Cloud のサポートと密に連携をとり、解決に向けて取り組んでいただいている最中です。根本原因ついては調査中ですが、どうやら BigQuery Storage WRITE API を Exaclty Once セマンティクスで利用した場合に発生するようです。
6. BigQuery のニアリアルタイム連携の新しいアーキテクチャ
上記で説明した問題の大部分は BigQuery Storage WRITE API (の Exaclty Once セマンティクス)を利用していることに起因しています。 BigQuery Storage WRITE API や Dataflow を使わない選択肢も考えましたが、実装コストやパフォーマンス等を考慮した結果第一弾のアーキテクチャの大枠を変えることなく、BigQuery ニアリアルタイム連携をより安全に実現するためのアーキテクチャに改善することに決めました。
現在進行系で取り組んでいるため今後変更する可能性もありますが、現在検討中のアーキテクチャは以下のとおりです。

第一弾と比べたときの大きな変更点としてはメインのパイプラインとバックアップのパイプラインを用意したことです。メインのパイプラインでは BigQuery の Storage WRITE API でニアリアルタイムにデータを処理します。バックアップのパイプラインでは、旧来の仕組みに近い仕組みでバックアップデータを GCS に保持しながら、バッチ処理でデータを連携します。 バックアップラインでは既にデータが存在する場合には重複してデータを連携しないようにスケジュールクエリで Merge 文を実行するようになっているため、メインのパイプラインと同時に実行し続けてもデータが重複して格納されることはありません。
このように上述したいくつかの BigQuery の Storage WRITE API の問題が発生したとしても、レイテンシは大きくなるがデータ連携は止まらない仕組みが構築できます。 以下ではこのアーキテクチャで重要な2つのトピックについて説明します。
6-1. 重複排除の仕組み
このパイプラインにおいて重要なトピックの一つが重複排除です。Atlas のデータパイプラインでは、AWS の SQS、Kinesis、GCP の PubSub 等基本的に At Least Once のセマンティクスでメッセージングサービスを構成しているため、データが重複する可能性があります。
BigQuery でクエリをなげるときに重複排除するという考え方もありますが、行動ログを蓄積するテーブルはサイズが大きく、いくらパーティション化しているとはいえ、クエリ実行時に重複排除するのはクエリを投げるコストが増大するため許容できません。 そのためデータ連携時に重複排除する必要があります。
今回設計したアーキテクチャでは3つのしくみで重複排除を実現します。
1つ目が Dataflow の Deduplicate トランスフォーマです。これは特定のキーを一定期間保存してそのキーの値に基づいて重複を排除する、というものです。Atlas ではリクエストIDというレコードを一意に識別するためのIDがあるため、それをキーとしてこのトランスフォーマを実行します。
2つ目が BigQuery にデータ連携するときに BigQuery Storage WRITE API を Exaclty Once のセマンティクスで利用します。これにより、Deduplicate で重複排除されたデータが必ず一回だけ書き込まれることが保証されます。
最後に 3つ目が保険的な仕組みです。Deduplicate で重複排除されるのは一定期間だけで、例えば30分と設定した場合には、30分以上経過して同一のデータが流れてきた際には重複排除できません。そうしたケースで稀に重複が発生しうるため、バックアップラインで定期的に実行されるスケジュールクエリのなかで重複を排除するための SQL を実行します。これにより完全に重複を排除することができます。

6-2. protobuf を中心としたスキーマ駆動開発
2つ目の重要なトピックがスキーマ駆動開発です。課題にあげたとおり、旧来の仕組みではテーブルにフィールドを追加する作業が煩雑でした。 これを protobuf のスキーマファイルを変更するだけで 1)テーブル定義の変更、2) データ変換処理の修正を実現できるようにします。そのフローが以下の図です。

まず protobuf のデータをデシリアライズするために必要な Java のクラスファイルは protoc コマンドで生成することができます。 これに加えて 2つの仕組みで必要なソースコードや定義ファイルを生成する仕組みを作ります。
protoc-gen-bq-schema
まず protobuf の定義から BigQuery のテーブルスキーマを生成するために、protoc-gen-bq-schema を使います。これは protoc の plugin で protobuf のフィールドオプションとして BigQuery のテーブルのフィールド定義を行うことができるものです。
例えば以下のように protobuf スキーマを定義すると
syntax = "proto3";
package foo;
import "bq_table.proto";
import "bq_field.proto";
message Bar {
option (gen_bq_schema.bigquery_opts).table_name = "bar_table";
uint64 column1 = 1 [
(gen_bq_schema.bigquery) = {
description: 'カラムの説明'
require: true
type_override: 'TIMESTAMP'
}
];
}
go install github.com/GoogleCloudPlatform/protoc-gen-bq-schema@latest
コマンドを実行して、protoc-gen-bq-schema
コマンドの PATH を通した状態で
protoc --bq-schema_out=. foo.proto
コマンドを実行すると以下のような JSON ファイルが生成されます。
[
{
"name": "column1",
"type": "TIMESTAMP",
"mode": "REQUIRED",
"description": "カラムの説明"
}
]
後は protobuf の定義ファイルが変更されたタイミングで protoc-gen-bq-schema を実行して JSON ファイルを生成した後にテーブルスキーマを管理しているリポジトリに PR を出す仕組みを Github Actions で構築してあげれば、スキーマの変更をするだけでテーブル定義の変更を自動的に行うことができます。
Pluggable Annotation Processing
BigQuery のテーブルへデータを格納する箇所ではデータの変換が必要です。変換元のデータのフィールドと変換先のテーブルのフィールドをマッピングして、変換処理を行うプログラムで対応する必要があります。これも protobuf のスキーマ定義で対応付けを行います。 以下のようにカスタムフィールドオプションとして、source_enrichment_options を用意し、変換元のデータの対応フィールド名を設定します。
optional string er_bot_bot_name = 5 [
(source_enrichment_options) = "enriched.bot.bot_name",
];
そして Java の Pluggable Annotation Processing という機能を利用します。 これは、コンパイル時点 (javac コマンド実行時点で) Annotation 処理を行うことができる機能です。Annotation 処理を行う Annotation Processor クラスを定義して、コンパイル時点で protobuf の定義ファイルを読み込み、上記の変換フィールドの対応付を読み込んで変換処理を自動生成します。
例えば上記の protobuf の定義から生成される変換処理の例がこちらです。
if(rEnrichedBot.hasBotName() && !rEnrichedBot.getBotName().isEmpty()) { builder.setErBotBotName(safeString(rEnrichedBot.getBotName())); }
ややわかりにくいですが、変換元のフィールドが存在するかどうかを確認し、存在する場合 builder パターンで対応する変換先フィールドに値を設定しています。 この仕組みにより、新しくフィールドが追加されるたびにソースコードを手動で変更する必要がなくなります。
7. まとめ
だいぶ長くなってしまいました。この文章のまとめです。
- BigQuery ニアリアルタイム連携処理を実装しようとしている。
- 一度リリースしたが、BigQuery Storage WRITE API や Dataflow の不具合にあたり、旧来の仕組みに戻した。
- 最初のリリースで発生した問題も踏まえて、バックアップパイプラインを同時に走らせて、Storage WRITE API でデータ処理するメインパイプラインが止まってもデータ連携自体は止まらないアーキテクチャに修正した。
- 重複排除も3つの仕組みで実現している。
- スキーマ駆動開発でジョブの変更開発コストも削減できる。
明日は伊藤さんによる「日経ビジュアルデータの仕事(新聞社で働くデザイナーのハナシ)」です。お楽しみに!