NIKKEI TECHNOLOGY AND CAREER

非同期処理の道具箱

この記事はNikkei Advent Calendar 2022の 13 日目の記事です。

こんにちは、Web チームの井手です。最近 Web チームで働く魅力について語ったので是非とも読んで欲しいです。

今日は非同期ランタイムについて書きます。 私は非同期ランタイムやサーバー進化論が好きで、たまにブログを書いたり、前職でもアドベントカレンダーに書いたりしていました。

本稿では効率的な非同期処理を実現するライブラリが中で何をしているのかを、低レイヤーの非同期処理そのものを解説しながら見ていきたいと思います。説明の都合上 Rust を使うので、Rust 特有の話もありますが、低レベルな API があればどの言語でも当てはまる話だと思います。この辺りは Rust, Scala(JVM), Erlang, Go などはお互いがお互いのアイデアを参考にしていて切磋琢磨しつつも似た仕組みをそれぞれが持つ、もしくは実装可能になっていると思います。

もし Rust も非同期処理も初めてという方は 並行プログラミング入門 ―Rust、C、アセンブリによる実装からのアプローチ という本をオススメします。Rust の解説から始まり、ロックなど並行処理に必要な道具を全部説明した上で、非同期処理を説明してくれる素晴らしい本です。

なのでこのブログに書かれている内容も特に前半はいくつかの部分で上記本の焼き直し感が非常に強いのですが、その本に書かれていることをもっと噛み砕いて説明していこうと思います。

なぜ非同期処理が必要か

コンピューターは一般的に CPU が与えられた命令を上から順に実行していきます。 このように指示された順序で処理を行うことを同期処理と呼びます。

いまファイル I/O やネットワーク I/O などなんらかの I/O 処理で手に入れたデータをもとになんらかの処理をしたいとします。 同期モデルだとこのとき I/O 中は後続の処理もできず、その CPU は他の仕事ができなくなってしまいます。 これをブロッキングと呼びます。 このとき CPU が他の処理をできるように、「I/O 中は該当の処理 A を中断しておき、別の処理 B を行い、I/O が終わってから処理 A を再開する」とするとこの問題は解決します。 そして I/O の完了を検知する方法は OS が提供してくれるため可能な方法です。 I/O 処理中は CPU が使われていない(無駄がある)ので、I/O 処理中に別の処理を割り当てることでその無駄を解消するというわけです。 これを非同期処理と呼びます。

マルチスレッド

非同期処理の解説に入る前に、同期処理によるブロッキングによる問題を回避する方法にはもう一つあるのでそれを紹介します。

それは処理を実行する主体を増やすことです。一般的にコンピュータはプログラムをプロセスという単位で起動し、プロセスはその中でスレッドを作ってそのスレッドに処理を割り当てます。このプロセスやスレッドを増やすことで処理を同時に複数実行できるようになります。 その結果、I/O でブロックされても他の処理を他のスレッドで行えます。

一見するとブロッキングの課題を解決したかに見えますがこれは不完全です。 なぜなら実際には CPU の数が増えたわけでもなく、コンピュータのネイティブスレッドはその裏で OS が限られた CPU 資源に処理を割り当てているだけだからです。 つまり処理能力というリソース自体は変わっていないのでスレッドをいくら増やしても処理能力が向上する限界があります。 それに同じスレッドを使い回した方が CPU のキャッシュを使い倒せてお得なので、スレッドを増やせば増やすほど良いというわけではないです。(このあたりの試行錯誤は cats-effect の blog がとても面白いので気になる方は読んでみてください。https://typelevel.org/blog/2021/02/21/fibers-fast-mkay.html

もし CPU コアの数以上のスレッドの実行をすると実行スレッドの切り替えが発生し、タスクを再開させる時はそのスレッドのタスクの進行状況を復元のためのコストがかかります。 こういったコストのことは概してコンテキストスイッチやスイッチングコストと呼ばれ、Web サーバーに限った話をすると C10K 問題として取り上げられました。 1 リクエストに 1 スレッドを割り当て続けるのは性能悪化を引き起こすためです。 ちなみにこの C10K 問題の解決法の一つがシングルスレッドによる非同期処理であり、Node.js が注目された理由でもあります。 クライアント 1 万台は多いように見えて意外と達成できてしまう数値であり、この問題の解決が求められていました。 Node.js ではイベントループという仕組みで実現しています。

https://nodejs.org/ja/docs/guides/event-loop-timers-and-nexttick/

非同期処理かマルチスレッドか

さて、非同期処理について語る記事でマルチスレッドを例にあげた理由は、現実的にはいま採用されているライブラリやアーキテクチャはマルチスレッドかつ非同期処理で扱われているからです。 それが実際には高効率です。 スレッドもコアの数以内でスレッドプールを作って割り当てる様にすれば、コンテキストスイッチの発生を防げます。

そのためマルチスレッド + 非同期処理というモデルは強力ですが、とはいえただのシングルスレッドの非同期処理でも、十分に性能は出せます。 Node.js が良い例です。 そこでまず非同期処理だけに着目して見ていきましょう。

Rust の非同期処理

Rust で非同期処理というと tokio が有名です。

簡単な例

tokio を使えば非同期処理は、

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
    // Open a TCP stream to the socket address.
    //
    // Note that this is the Tokio TcpStream, which is fully async.
    let mut stream = TcpStream::connect("127.0.0.1:6142").await?;
    println!("created stream");

    let result = stream.write_all(b"hello world\n").await;
    println!("wrote to stream; success={:?}", result.is_ok());

    Ok(())
}

として、main に対するマクロと async/await を書けば終わりです。

チュートリアルの例もとてもわかりやすいです。

https://tokio.rs/tokio/tutorial/hello-tokio

tokio と Rust

tokio は 3rd party のライブラリです。そこで JS などを経験した人は、「どうして非同期処理が 3rd party library なの?」と思うかもしれません。 そこにはランタイムの差し替えが可能になったり、(現実的にそれができるのかという疑問もありますが・・・)扱う問題によってはスケジューラの特性がパフォーマンスに直結するので色々な実装を使えるという点で利点があるのですが、これは JSer からすれば Promise がライブラリになっているようなもので疑問に思う点です。 非同期処理が言語でサポートされていないのではと思うかもしれません。 Rust では非同期処理が言語でサポートされていないのでしょうか。

Rust(std) そのものには非同期処理の仕組み自体はあります。 しかし非同期のタスク群をスケジューリングして実行する仕組みがありません。 これはユーザーもしくはライブラリに任されています。 tokio が提供しているのはそのための schedular や executor です。

では今から tokio などを使わずに Rust 単体で非同期処理をするためにはどうすればいいか見ていきましょう。 普段は考えなくて良いことですが、非同期処理そのものの考え方が見えてくるのでこの訓練はオススメします。

非同期処理の道具箱

そして非同期処理の説明をしたいのですが、先に説明に必要な道具の説明をします。

章の名前はマルチスレッド・プログラミングの道具箱 という記事があってそこからタイトルを真似ました。 これから説明することの前知識として並行プログラミングについて解説します。 もし説明に不十分を感じましたらその記事や並行プログラミング入門 ―Rust、C、アセンブリによる実装からのアプローチ をご参照ください。

channel

Rust では mpsc と呼ばれている機能です。 Multi Producer Single Consumer の頭文字です。

データが流れる queue があり、送信側が複数付いており、受け取り側は一つ付いているという形です。

このように使います。

use std::thread;
use std::sync::mpsc::channel;

// Create a simple streaming channel
let (tx, rx) = channel();
thread::spawn(move|| {
    tx.send(10).unwrap();
});
assert_eq!(rx.recv().unwrap(), 10);

別の thread からデータを一つの集積点に送れるので、マルチスレッドプログラミングでは必須級の道具です。 非同期処理においても非同期タスクが完了した通知を流すために使います。

lock

マルチスレッドプログラミングにおいてはレースコンディションという問題に気をつけなければいけません。 これは複数スレッドから一つのデータを操作した時に起きる問題です。( こちらが説明が平易でわかりやすいです。) この問題を解決するのがロックです。 ロックはその値のアクセスに対するガードとアクセス権だと思うとよく、アクセスする権利を持っている側しかアクセスできないようにします。 競合状態に陥るような処理をするときは事前にロックで対象となるデータをロック(施錠)し、ロック中は他の主体がアクセスできないようにしておき、その処理が終わるとアンロック(開錠)して他の主体もアクセスできるようにするという使い方をします。

実装としてはロックは Spin Lock , Mutex, Semaphore などがあり、Rust では Mutex が頻出です。 Mutex は lock を取った時だけデータを参照できる仕組みと、lock を取られている時は他の主体は lock を取れない仕組みを提供します。 そうすることでデータに同時にアクセスできる人は一つだけという保証を作れます。

非同期処理においても Mutex はよく出てきます。 ロックは非同期ランタイムの重要なコンポーネントです。 なぜならここがオーバーヘッドになりがちだからです。 特に非同期ランタイムの開発においては、実行キューの数、Executor の数をどれくらいにしてどのようなアーキテクチャにするかで大きくパフォーマンスが変わります。現場では実行キューも Executor も複数ある状態で work-stealing を実装することが多いですが、このアーキテクチャに辿り着く前はロックのオーバーヘッドはとても気にして実装されていましたし、これからもそうです。

Rust 自体が持つ非同期処理

非同期処理は言語によって Future, I/O, Promise など様々な呼ばれ方をしています。Rust では Future と呼ばれます。

Future は https://async-book-ja.netlify.app/02_execution/02_future.html の例を借りると

trait SimpleFuture {
    type Output;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}

として表現できます。

future

Output は非同期処理が完了した時に得られる値の型、poll は非同期処理が完了したか問い合わせる関数、Poll は非同期処理完了を表す Ready と非同期処理中を表す Pending から成り立つ enum です。 なんらかの Future が作られたら、それを呼び出す側は poll を呼び出して「タスク完了した?」と聞いて完了していなければ完了するまで呼び出し続けることでいつかは Ready を得られるという算段です。 しかし何回も poll を呼んだり、ましてやそれを loop の中でしているとその実行スレッドを占有することになるので wake という仕組みで防ぐのが現実です。

busy_future

wake は Task という非同期処理の単位を表す struct に ArcWake を実装することで実現します。

wake
struct Task {
    future: Mutex<BoxFuture<'static, ()>>,
    sender: SyncSender<Arc<Task>>,
}

impl ArcWake for Task {
    fn wake_by_ref(_arc_self: &Arc<Self>) {
      let self0 = arc_self.clone();
      arc_self.sender.send(self0).unwrap();
    }
}

wake のデフォルト実装が wake_by_ref なので、wake_by_ref を実装すれば良いです。 ここでは task の wake が呼ばれたら、そのタスク自分自身を executor に実行させるために queue に send しています。 これをスケジューリングと呼びます。

この Task は spawner という仕組みで作ります。

struct Spawner {
    sender: SyncSender<Arc<Task>>,
}

impl Spawner {
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(future),
            sender: self.sender.clone(),
        });

        self.sender.send(task).unwrap();
    }
}

spawn を呼び出せば、executor に繋がった queue への sender と future の組が手に入るようになります。

そして task の wake は Future の poll で実行されることを想定しています。 しかし実物の Future trait の poll のシグネチャからはどこにも wake は見当たりません。

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>

この Context が実は waker のラッパーです。

#[stable(feature = "futures_api", since = "1.36.0")]
pub struct Context<'a> {
    waker: &'a Waker,
    _marker: PhantomData<fn(&'a ()) -> &'a ()>,
}

なので poll の呼び出し元がこの Context を作って poll の引数に渡せば実装できます。 それをしてくれるのが Executor です。

struct Executor {
    sender: SyncSender<Arc<Task>>,
    receiver: Receiver<Arc<Task>>,
}

impl Executor {
    fn run(&self) { // <3>
        while let Ok(task) = self.receiver.recv() {
            let mut future = task.future.lock().unwrap();
            let waker = waker_ref(&task);
            let mut ctx = Context::from_waker(&waker);
            let _ = future.as_mut().poll(&mut ctx);
        }
    }
}

executor は channel から task が送られてくるごとに、その task から waker を取り出し poll を実行してくれます。 ここでのポイントは receiver には I/O の待ちなどがないタスクしか流れてこないと言う点です。 それは poll は I/O が完了して進行できる時にしか wake を呼ばない様にできるからです。 むしろ loop のなかで poll 関数を polling せずにこんなたくさんのモジュールを使ったのは、それをしたかったからです。

フロー図

この辺りの詳細な実装や追跡は

をご参照ください。

またソースコードは 並行プログラミング入門 ―Rust、C、アセンブリによる実装からのアプローチ の例をそのまま使っていました。

キューの詰まりを解消する

さて、上の実装では、queue に流れるタスクは I/O 待ちが完了したものだけなので I/O 待ちによる詰まりを解消できています。 しかし実は問題点があります。 それは I/O の待ち時間にタスクが詰まることはないものの、仮に Executor に重たいタスクがたくさんくると queue が詰まってしまうことです。 I/O 完了後のデータを使ってとても計算量の大きい処理を実行すれば結局 executor が詰まってしまいます。

このように I/O 待ちを解消して高速になる状態は I/O bound と呼ぶのに対し、処理能力を向上して高速になる状態は CPU bound と呼びます。

https://stackoverflow.com/questions/868568/what-do-the-terms-cpu-bound-and-i-o-bound-mean

そして executor の詰まりは CPU bound の問題です。 シングルスレッドの非同期処理は CPU bound な問題に弱いです。

これに対する解決策は垂直スケーリングや水平スケーリングがあるでしょう。 例えば高価なハイスペック CPU を使えば緩和できますし、サーバーの台数を増やすことでも queue の詰まりは緩和できます。 しかしそれは金の弾丸であり、お金がかかってしまいます。 まず今あるサーバーの性能を最大限使い潰してから実行した方が良い手段と言えるでしょう。

実は上の例ではサーバーの性能は完全に引き出せていません。 それはシングルスレッドで実行しているからです。 もしこれを CPU のコアの数だけスレッドを増やせば、コンテキストスイッチの無い範囲でサーバーの性能を全部使い倒すことができます。

つまり、Executor を複数に増やすのです。 そのためにマルチスレッドプログラミングをします。 そこでこれから Executor の複数実行を tokio はどのように実現しているのか見ていきます。

tokio code reading

tokio を使った時のコードは、

#[tokio::main]
async fn main() {
    println!("hello");
}

といったコードです。

https://tokio.rs/tokio/tutorial/hello-tokio

同期的なコードに見えますが、コンパイル時に Rust では async は Future を返す関数に、await は poll への呼び出しへ変換します。 そして先ほど Future を自分で実装した例にあったように非同期処理の実行には Executor が必要です。 それがどこにあるかというと tokio の場合は #[tokio::main] です。 実はこのマクロが展開されると、Executor や非同期 Runtime が生成されます。 先程のコードはこのようになります。

fn main() {
    let mut rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async {
        println!("hello");
    })
}

なので、ここからコードを追っていきましょう。 ただ tokio のコードを追うのはとても大変なので、どのように複数の Executor を作っているのかを見つけることを目的として読んでいきます。

runtime

まず tokio runtime のコードを読みます。

let mut rt = tokio::runtime::Runtime::new().unwrap();

の部分です。

一個ずつ追っていくと、

#[cfg(feature = "rt-multi-thread")]
        #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
        pub fn new() -> std::io::Result<Runtime> {
            Builder::new_multi_thread().enable_all().build()
        }

(https://github.com/tokio-rs/tokio/blob/86ffabe2af69f2440be26d153fd692689c9947fb/tokio/src/runtime/runtime.rs#L131)

pub fn build(&mut self) -> io::Result<Runtime> {
        match &self.kind {
            Kind::CurrentThread => self.build_current_thread_runtime(),
            #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
            Kind::MultiThread => self.build_threaded_runtime(),
        }
    }

(https://github.com/tokio-rs/tokio/blob/36039d0bb94d1accf8ae5569f6c50ca5a0c661ef/tokio/src/runtime/builder.rs#L638)

と来ます。tokio の install では公式の例をそのまま使うと、

tokio = { version = "1", features = ["full"] }

として install します。 つまり全ての features が ON なので rt-multi-thread も ON になっているとします。 この rt-multi-thread モードにしないとマルチスレッドで executor の多重化をしてくれません。

そしてこのような本丸のコードが出てきます。

fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
            let core_threads = self.worker_threads.unwrap_or_else(num_cpus);

            let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;

            ///

            let (scheduler, handle, launch) = MultiThread::new(
                core_threads,
                driver,
                driver_handle,
                blocking_spawner,
                seed_generator_2,
                Config {
                    before_park: self.before_park.clone(),
                    after_unpark: self.after_unpark.clone(),
                    global_queue_interval: self.global_queue_interval,
                    event_interval: self.event_interval,
                    #[cfg(tokio_unstable)]
                    unhandled_panic: self.unhandled_panic.clone(),
                    disable_lifo_slot: self.disable_lifo_slot,
                    seed_generator: seed_generator_1,
                },
            );

            let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };

            // Spawn the thread pool workers
            let _enter = handle.enter();
            launch.launch();

            Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
        }

(https://github.com/tokio-rs/tokio/blob/36039d0bb94d1accf8ae5569f6c50ca5a0c661ef/tokio/src/runtime/builder.rs#L1016)

この関数で返したランタイムに対して、エントリポイントで Future が渡されていくことがポイントです。

num_cpus や core_threads は明らかにマルチスレッドプログラミングをする宣言です。 明らかにこの辺りのコードが怪しいので重点的に見ていきます。

ここで

Spawn the thread pool workers

とある箇所が重要になるので launch のコードを追います。 そのために MultiThread::new を見ていきます。

let (handle, launch) = worker::create(
            size,
            parker,
            driver_handle,
            blocking_spawner,
            seed_generator,
            config,
        );

とあり、

pub(super) fn create(
    size: usize,
    park: Parker,
    driver_handle: driver::Handle,
    blocking_spawner: blocking::Spawner,
    seed_generator: RngSeedGenerator,
    config: Config,
) -> (Arc<Handle>, Launch) {
    ///

    let mut launch = Launch(vec![]);

    for (index, core) in cores.drain(..).enumerate() {
        launch.0.push(Arc::new(Worker {
            handle: handle.clone(),
            index,
            core: AtomicCell::new(Some(core)),
        }));
    }

    (handle, launch)
}

(https://github.com/tokio-rs/tokio/blob/36039d0bb94d1accf8ae5569f6c50ca5a0c661ef/tokio/src/runtime/scheduler/multi_thread/worker.rs#L188)

というコードにたどり着きます。 どうやら CPU のコアの数だけ launch が作られて返されています。

Worker は launch の一部でもあることから、この Worker や Handle は CPU のコア数だけ作られています。

そして launch.launch() を見てみると

/// Starts the workers
pub(crate) struct Launch(Vec<Arc<Worker>>);

impl Launch {
    pub(crate) fn launch(mut self) {
        for worker in self.0.drain(..) {
            runtime::spawn_blocking(move || run(worker));
        }
    }
}

とあり、コアの数だけ渡された worker に対して runtime::spawn_blocking(move || run(worker)); を行います。

この run が executor の run かのようにキモになっていて、

fn run(worker: Arc<Worker>) {
    ///

    let cx = Context {
        worker,
        core: RefCell::new(None),
    };

    CURRENT.set(&cx, || {
        // This should always be an error. It only returns a `Result` to support
        // using `?` to short circuit.
        assert!(cx.run(core).is_err());

        // Check if there are any deferred tasks to notify. This can happen when
        // the worker core is lost due to `block_in_place()` being called from
        // within the task.
        wake_deferred_tasks();
    });
}

という実装です。

そして Context の run を呼ぶわけですが、

fn run(&self, mut core: Box<Core>) -> RunResult {
        while !core.is_shutdown {
            ///

            if let Some(task) = core.steal_work(&self.worker) {
                core = self.run_task(task, core)?;
            } else {
                // Wait for work
                core = if did_defer_tasks() {
                    self.park_timeout(core, Some(Duration::from_millis(0)))
                } else {
                    self.park(core)
                };
            }
        }

        core.pre_shutdown(&self.worker);

        // Signal shutdown
        self.worker.handle.shutdown_core(core);
        Err(())
    }

となっています。 そうです、コアが停止していない限りの無限ループです。 そして steal_work というものがあります。

残念ながらここから先は tokio の深淵を覗くことになるので今は説明は省略させてください。 僕も全てを説明できる自信はないです。 ただ公式が概念的な解説は出しているのでそれを見てみましょう。

https://tokio.rs/blog/2019-10-scheduler

ざっとどういうことかというと複数の executor を走らせる時に、queue の消化が詰まっているところと余裕があるところがあれば、余裕がある方へとタスクを流してくれるという仕組みです。

work_stealing

このモデルの有効性は cats-effect のドキュメントにも示されています。

https://typelevel.org/blog/2021/02/21/fibers-fast-mkay.html

tokio では worker はコアの数だけ作られて、その一つ一つが Executor の run として steal_work を駆使して、なるべくハードウェアリソースを使い切ろうと task を割り振っています。 このようになるべく多くの task をこなせるように最大限の工夫が施されています。 (正確には work-stealing は重たいタスクの消化ではなく、スレッドに対するタスクの偏りや single worker queue から multi executor への割り当てやロックのオーバーヘッドを減らすことが主目的ではあるが、それが結果として重たいタスクの消化にも役立っているという認識。)

まとめ

  • rust の非同期処理をそのまま実装すると executor が詰まるので、スケジューラには工夫が必要
  • tokio は CPU コアの数だけ executor を増やす
  • 重いタスクには work-stealing の仕組みで緩和する

余談ですが重いタスクに関しては tokio なりの工夫はあるものの、そもそも苦手としている節があります。公式ドキュメントにも When not to use Tokio というセクションに

Speeding up CPU-bound computations by running them in parallel on several threads. Tokio is designed for IO-bound applications where each individual task spends most of its time waiting for IO. If the only thing your application does is run computations in parallel, you should be using rayon. That said, it is still possible to "mix & match" if you need to do both.

と書かれています。

非同期ランタイムはアクセスの特性に応じて戦略が変わることや、OS や CPU などと密接に関わってくるので中々に難しいです。もっと勉強していきたいと思います。

最後に

来月、【NIKKEI Tech Talk #3】メディア企業のWebフロントエンド~多様な開発と技術選定~ というイベントを開催します。僕も登壇します。ぜひ遊びにきてください。

明日は藤田さんの「日経電子版が RFC 9116(security.txt)に対応した話」です。

井手優太
ENGINEER井手優太

Entry

各種エントリーはこちらから

キャリア採用
Entry
新卒採用
Entry
カジュアル面談
Entry