axum框架使用unix socket提供本地http服務

字號+ 編輯: 国内TP粉 修訂: 种花家 來源: axum 2023-09-29 我要說兩句(1)

因爲應用的不甚廣泛,axum對unix socket的封裝程度不如tcp版本。有極致優化的小夥伴可以拿去參考。

通過unix socket進行通訊要比tcp通訊的延遲和吞吐能力都強, 遺憾的就是只能搞本地, 像筆者這種獨立開發者家裡很窮, 搞不了那麽多服務器, 剛好這個unix socket優勢如此大, 也就符合了的類似需求。

網上的simple unix socket版本還是不要用爲好。

參考代碼如下:

#[cfg(unix)]
#[tokio::main]
async fn main() {
    unix::server().await;
}
#[cfg(not(unix))]
fn main() {
    println!("This example requires unix")
}
#[cfg(unix)]
mod unix {
    use axum::{
        body::Body,
        extract::connect_info::{self, ConnectInfo},
        http::{Method, Request, StatusCode, Uri},
        routing::get,
        Router,
    };
    use futures::ready;
    use hyper::{
        client::connect::{Connected, Connection},
        server::accept::Accept,
    };
    use std::{
        io,
        path::PathBuf,
        pin::Pin,
        sync::Arc,
        task::{Context, Poll},
    };
    use tokio::{
        io::{AsyncRead, AsyncWrite},
        net::{unix::UCred, UnixListener, UnixStream},
    };
    use tower::BoxError;
    use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
    pub async fn server() {
        tracing_subscriber::registry()
            .with(
                tracing_subscriber::EnvFilter::try_from_default_env()
                    .unwrap_or_else(|_| "debug".into()),
            )
            .with(tracing_subscriber::fmt::layer())
            .init();
        let path = PathBuf::from("/tmp/axum/helloworld");
        let _ = tokio::fs::remove_file(&path).await;
        tokio::fs::create_dir_all(path.parent().unwrap())
            .await
            .unwrap();
        let uds = UnixListener::bind(path.clone()).unwrap();
        tokio::spawn(async {
            let app = Router::new().route("/", get(handler));
            hyper::Server::builder(ServerAccept { uds })
                .serve(app.into_make_service_with_connect_info::<UdsConnectInfo>())
                .await
                .unwrap();
        });
        let connector = tower::service_fn(move |_: Uri| {
            let path = path.clone();
            Box::pin(async move {
                let stream = UnixStream::connect(path).await?;
                Ok::<_, io::Error>(ClientConnection { stream })
            })
        });
        let client = hyper::Client::builder().build(connector);
        let request = Request::builder()
            .method(Method::GET)
            .uri("http://uri-doesnt-matter.com")
            .body(Body::empty())
            .unwrap();
        let response = client.request(request).await.unwrap();
        assert_eq!(response.status(), StatusCode::OK);
        let body = hyper::body::to_bytes(response.into_body()).await.unwrap();
        let body = String::from_utf8(body.to_vec()).unwrap();
        assert_eq!(body, "Hello, World!");
    }
    async fn handler(ConnectInfo(info): ConnectInfo<UdsConnectInfo>) -> &'static str {
        println!("new connection from `{info:?}`");
        "Hello, World!"
    }
    struct ServerAccept {
        uds: UnixListener,
    }
    impl Accept for ServerAccept {
        type Conn = UnixStream;
        type Error = BoxError;
        fn poll_accept(
            self: Pin<&mut Self>,
            cx: &mut Context<'_>,
        ) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
            let (stream, _addr) = ready!(self.uds.poll_accept(cx))?;
            Poll::Ready(Some(Ok(stream)))
        }
    }
    struct ClientConnection {
        stream: UnixStream,
    }
    impl AsyncWrite for ClientConnection {
        fn poll_write(
            mut self: Pin<&mut Self>,
            cx: &mut Context<'_>,
            buf: &[u8],
        ) -> Poll<Result<usize, io::Error>> {
            Pin::new(&mut self.stream).poll_write(cx, buf)
        }
        fn poll_flush(
            mut self: Pin<&mut Self>,
            cx: &mut Context<'_>,
        ) -> Poll<Result<(), io::Error>> {
            Pin::new(&mut self.stream).poll_flush(cx)
        }
        fn poll_shutdown(
            mut self: Pin<&mut Self>,
            cx: &mut Context<'_>,
        ) -> Poll<Result<(), io::Error>> {
            Pin::new(&mut self.stream).poll_shutdown(cx)
        }
    }
    impl AsyncRead for ClientConnection {
        fn poll_read(
            mut self: Pin<&mut Self>,
            cx: &mut Context<'_>,
            buf: &mut tokio::io::ReadBuf<'_>,
        ) -> Poll<io::Result<()>> {
            Pin::new(&mut self.stream).poll_read(cx, buf)
        }
    }
    impl Connection for ClientConnection {
        fn connected(&self) -> Connected {
            Connected::new()
        }
    }
    #[derive(Clone, Debug)]
    #[allow(dead_code)]
    struct UdsConnectInfo {
        peer_addr: Arc<tokio::net::unix::SocketAddr>,
        peer_cred: UCred,
    }
    impl connect_info::Connected<&UnixStream> for UdsConnectInfo {
        fn connect_info(target: &UnixStream) -> Self {
            let peer_addr = target.peer_addr().unwrap();
            let peer_cred = target.peer_cred().unwrap();
            Self {
                peer_addr: Arc::new(peer_addr),
                peer_cred,
            }
        }
    }
}

 

閲完此文,您的感想如何?
  • 有用

    37

  • 沒用

    2

  • 開心

    7

  • 憤怒

    2

  • 可憐

    2

1.如文章侵犯了您的版權,請發郵件通知本站,該文章將在24小時内刪除;
2.本站標注原創的文章,轉發時煩請注明來源;
3.交流群: 2702237 13835667

相關課文
  • 在rust/axum框架中操作redis

  • rust編譯新的wasm項目操作流程(原文: 編譯 Rust 爲 WebAssembly)

  • rust視圖模板庫askama的使用

  • axum框架當中獲取請求header, 和獲取header指定字段的方法

我要說說
網上賓友點評
1 樓 IP 222.128.***.254 的嘉賓 说道 : 很久前
真香!!