๐Ÿ“ฆ tychedelia / josefine

๐Ÿ“„ tcp.rs ยท 62 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62use crate::kafka::codec::KafkaServerCodec;
use anyhow::Result;
use futures::SinkExt;
use kafka_protocol::messages::{RequestKind, ResponseHeader, ResponseKind};

use tokio::sync::oneshot;

use tokio::{
    net::{TcpListener, TcpStream},
    sync::mpsc::UnboundedSender,
};
use tokio::sync::oneshot::Sender;
use tokio_stream::StreamExt;
use tokio_util::codec::{FramedRead, FramedWrite};
use crate::Shutdown;

pub async fn receive_task(
    listener: TcpListener,
    in_tx: UnboundedSender<(RequestKind, oneshot::Sender<ResponseKind>)>,
    mut shutdown: Shutdown,
) -> Result<()> {
    loop {
        tokio::select! {
            _ = shutdown.wait() => break,

            Ok((s, _addr)) = listener.accept() => {
                let peer_in_tx = in_tx.clone();
                tokio::spawn(async move {
                    match stream_messages(s, peer_in_tx).await {
                        Ok(()) => {  }
                        Err(_err) => {  }
                    }
                });
            }
        }
    }

    Ok(())
}

async fn stream_messages(
    mut stream: TcpStream,
    in_tx: UnboundedSender<(RequestKind, oneshot::Sender<ResponseKind>)>,
) -> Result<()> {
    let (r, w) = stream.split();
    let mut stream_in = FramedRead::new(r, KafkaServerCodec::new());
    let mut stream_out = FramedWrite::new(w, KafkaServerCodec::new());
    while let Some((header, message)) = stream_in.try_next().await? {
        let (cb_tx, cb_rx) = oneshot::channel();
        in_tx.send((message, cb_tx))?;
        let res = cb_rx.await?;
        let version = header.request_api_version;
        let correlation_id = header.correlation_id;
        let header = ResponseHeader {
            correlation_id,
            ..Default::default()
        };
        stream_out.send((version, header, res)).await?;
    }
    Ok(())
}