Rustにおけるasync/.awaitの裏側

  • 2024/11/22
  • Rustにおけるasync/.awaitの裏側 はコメントを受け付けていません

イントロダクション

非同期処理は、タスクの完了を待つことなく多くのタスクを並行して処理することができる、現代のプログラミングにおいて欠かせない技術です。Rustでは async/.await 構文が導入され、直感的な非同期コードが記述できるようになりました。しかし、その背後で何が起きているのかを理解することは、非同期プログラムのパフォーマンスを最適化する上で非常に重要です。

この記事では、Rust公式ガイド「Asynchronous Programming in Rust」第2章を基に、Rustの async/.await 構文の裏側を探ります。具体的には、カスタム非同期ランタイムを実装し、非同期処理がどのように動作するかを確認します。

async/.awaitの裏側

Rust言語において、 asyncと await の裏側でどのような処理が動いているのか確認するために、カスタム非同期ランタイムを実装してみます。Asynchronous Programming in Rust 第2章のカスタムタイマーの例を見ていきます。この例では、ExecutorとSpawnerという2つの主要なコンポーネントを持つ非同期タスク実行環境を構築し、TimerFutureを使用してタイマーの非同期タスクを実行します。 lib.rs と main.rs の二つのファイルに実装します。
まずは、lib.rsファイルです。

lib.rs

use std::{
    future::Future,
    pin::Pin,
    sync::{Arc, Mutex},
    task::{Context, Poll, Waker},
    thread,
    time::Duration,
};
pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}
struct SharedState {
    completed: bool,
    waker: Option<Waker>,
}
impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}
impl TimerFuture {
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });
        TimerFuture { shared_state }
    }
}

lib.rs ファイルでは、TimerFutureというカスタム非同期タイマーを実装しています。このタイマーは指定された期間が経過するまで非同期に待機します。
各要素に関して説明します。

TimerFuture構造体

pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}
struct SharedState {
    completed: bool,
    waker: Option<Waker>,
}

TimerFutureは、 Futureトレイトの実装クラスで、一定の期間後に完了する非同期タスクを表します。SharedStateは、タイマーの時間が経過したかどうかの状態(completed)と、時間経過後にタスクが再びポーリングされるためのwakerを保持します。

Futureトレイトの実装

impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

Futureトレイトを実装することで、TimerFutureは非同期タスクとして扱えるようになります。
pollメソッドはエグゼキューターによって実行されるメソッドで、タイマーが完了しているかをチェックします。完了していればPoll::Readyを返します。完了していない場合、Contextに設定されたwakerをshared_stateに保存し、Poll::Pendingを返します。Contextへのwakerの設定はエグゼキュータ側で行います。

TimerFutureの新規作成

impl TimerFuture {
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });
        TimerFuture { shared_state }
    }
}

newメソッドは、TimerFutureのインスタンスを作成します。shared_stateを初期化し、タイマーの完了状態をfalseに設定します。
また、時間カウント用のスレッドを生成します。このスレッドは、指定された期間が経過した後にcompletedフラグをtrueに設定し、保存されたwakerを呼び出すことでタスクを再スケジュールします。

次に、main.rsです。こちらでは、非同期タスクを管理・実行するカスタム非同期ランタイムを構築します。これには、タスクの生成と実行を管理するエグゼキュータとスポーナーが含まれています。

main.rs

use futures::{
    future::{BoxFuture, FutureExt},
    task::{waker_ref, ArcWake},
};
use std::{
    future::Future,
    sync::mpsc::{sync_channel, Receiver, SyncSender},
    sync::{Arc, Mutex},
    task::Context,
    time::Duration,
};
use timer::TimerFuture;
struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}
impl Executor {
    fn run(&self) {
        while let Ok(task) = self.ready_queue.recv() {
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&waker);
                if future.as_mut().poll(context).is_pending() {
                    *future_slot = Some(future);
                }
            }
        };
    }
}
#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}
impl Spawner {
    fn spawn(&self, future: impl Future<Output=()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        self.task_sender.send(task).expect("too many tasks queued");
    }
}
struct Task {
    future: Mutex<Option<BoxFuture<'static, ()>>>,
    task_sender: SyncSender<Arc<Task>>,
}
impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        let cloned = arc_self.clone();
        arc_self
            .task_sender
            .send(cloned)
            .expect("too many tasks queued");
    }
}
fn new_executor_and_spawner() -> (Executor, Spawner) {
    const MAX_QUEUED_TASKS: usize = 10_000;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
    (Executor { ready_queue }, Spawner { task_sender })
}
fn main() {
    let (executor, spawner) = new_executor_and_spawner();
    spawner.spawn(async {
        println!("howdy!");
        TimerFuture::new(Duration::new(2, 0)).await;
        println!("done!")
    });
    drop(spawner);
    executor.run();
}

各要素に関して説明します。

Executor

struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}
impl Executor {
    fn run(&self) {
        while let Ok(task) = self.ready_queue.recv() {
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&waker);
                if future.as_mut().poll(context).is_pending() {
                    *future_slot = Some(future);
                }
            }
        };
    }
}

Executorは、タスクを実行する責任を持つ構造体です。タスクはReceiver<Arc<Task>>というキューに保存され、順次実行されます。
runメソッドは、キューからタスクを取り出し、タスクのFutureのpollメソッドを実行します。タスクが完了していない場合(Poll::Pendingが返ってきた場合)は、再びfutureをタスクに設定します。

Spawner

#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}
impl Spawner {
    fn spawn(&self, future: impl Future<Output=()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        self.task_sender.send(task).expect("too many tasks queued");
    }
}

Spawnerは、新しい非同期タスクを生成し、エグゼキュータのキューに送信します。

Task

struct Task {
    future: Mutex<Option<BoxFuture<'static, ()>>>,
    task_sender: SyncSender<Arc<Task>>,
}
impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        let cloned = arc_self.clone();
        arc_self
            .task_sender
            .send(cloned)
            .expect("too many tasks queued");
    }
}

Taskは、個々の非同期タスクを表す構造体です。ArcWakeトレイトのwake_by_refによって、ウェイクアップ時の処理を設定しています。TimerFuture::newのメソッドで起動した時間経過カウント用の別スレッドは、指定時間経過後にWaker.wake()を呼び出してwake_by_refの処理を実行することで、エグゼキュータのキューへタスクを再度送信しています。

main メソッドを実行すると、 howdy! が表示され、2秒後に done! が表示されます。

全体処理の流れ

全体の処理の流れをまとめると、以下のようになります。

  1. ExecutorとSpawnerの作成
    new_executor_and_spawner関数で、非同期タスクを管理・実行するためのエグゼキュータとタスクを生成するスポーナーを作成します。
  2. 非同期タスクの生成と送信
    Spawnerが非同期タスクを生成し、エグゼキュータのキューに送信します。このタスクはTimerFutureを使用して2秒間待機する非同期タスクであり、ウェイクアップ用の処理(今回の場合は、エグゼキュータのキューへタスクを送信する処理)もArcWakeのwake_by_refにより設定されています。
  3. エグゼキュータの実行
    エグゼキュータはキューからタスクを取り出します。そのタスクのTimerFuture::pollを実行します。その際、タスクに設定してあったWakerを引数のContextに含めて渡します。
  4. TimerFutureのポーリング
    エグゼキュータによってTimerFuture::pollメソッドが実行され、タスクが完了していない場合(shared_state.completed = false)はPoll::Pendingを返します。このとき、引数で受け取ったWakerをshared_stateに設定します。
  5. カウント用スレッドの実行
    TimerFuture::newのタイミングで別スレッドでカウントが開始され、指定された時間が経過すると、shared_state.completedをtrueに設定し、shared_stateに設定されたWakerを呼び出します。
  6. Waker の呼び出し
    Wakerにより、タスクが再度エグゼキュータに送信されます。
  7. エグゼキュータの再ポーリング
    エグゼキュータがキューからタスクを再度取り出し、TimerFuture::pollを実行します。今度はshared_state.completed = trueとなっているため、Poll::Readyを返します。
  8. タスクの完了
    エグゼキュータはタスクが完了したと判定し、続きの処理を実行します。

このように、async/.awaitのコルーチン的な書き方の背後では、Futureの状態機械とExecutorのカスタムランタイムによるイベントループの処理が動いていることが分かります。

結論

Rustの async/.await は、その直感的な構文の裏で、タスクを効率的に処理するための強力な仕組みを提供しています。本記事で取り上げたカスタム非同期ランタイムの実装を通じてこれらの知識を深めることで、非同期プログラムの仕組みをより深く理解できるだけでなく、性能を引き出すコードの設計が可能になります。今回のカスタムランタイムの例を参考に、より複雑なシステムやユースケースに応じた最適な非同期処理を設計してみてください。

    関連記事

    カテゴリー:

    ブログ

    情シス求人

    1. チームメンバーで作字やってみた#1

    ページ上部へ戻る