1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62use crate::kafka::codec::KafkaServerCodec;
use anyhow::Result;
use futures::SinkExt;
use kafka_protocol::messages::{RequestKind, ResponseHeader, ResponseKind};
use tokio::sync::oneshot;
use tokio::{
net::{TcpListener, TcpStream},
sync::mpsc::UnboundedSender,
};
use tokio::sync::oneshot::Sender;
use tokio_stream::StreamExt;
use tokio_util::codec::{FramedRead, FramedWrite};
use crate::Shutdown;
pub async fn receive_task(
listener: TcpListener,
in_tx: UnboundedSender<(RequestKind, oneshot::Sender<ResponseKind>)>,
mut shutdown: Shutdown,
) -> Result<()> {
loop {
tokio::select! {
_ = shutdown.wait() => break,
Ok((s, _addr)) = listener.accept() => {
let peer_in_tx = in_tx.clone();
tokio::spawn(async move {
match stream_messages(s, peer_in_tx).await {
Ok(()) => { }
Err(_err) => { }
}
});
}
}
}
Ok(())
}
async fn stream_messages(
mut stream: TcpStream,
in_tx: UnboundedSender<(RequestKind, oneshot::Sender<ResponseKind>)>,
) -> Result<()> {
let (r, w) = stream.split();
let mut stream_in = FramedRead::new(r, KafkaServerCodec::new());
let mut stream_out = FramedWrite::new(w, KafkaServerCodec::new());
while let Some((header, message)) = stream_in.try_next().await? {
let (cb_tx, cb_rx) = oneshot::channel();
in_tx.send((message, cb_tx))?;
let res = cb_rx.await?;
let version = header.request_api_version;
let correlation_id = header.correlation_id;
let header = ResponseHeader {
correlation_id,
..Default::default()
};
stream_out.send((version, header, res)).await?;
}
Ok(())
}