1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55use async_trait::async_trait;
use std::io::Write;
use crate::broker::handler::Handler;
use crate::broker::Broker;
use kafka_protocol::messages::ProduceRequest;
use kafka_protocol::protocol::Request;
#[async_trait]
impl Handler<ProduceRequest> for Broker {
async fn handle(
&self,
req: ProduceRequest,
res: <ProduceRequest as Request>::Response,
) -> anyhow::Result<<ProduceRequest as Request>::Response> {
for (t, td) in req.topic_data.iter() {
let _topic = self.store.get_topic(t)?.expect("TODO: topic doesn't exist");
for pd in td.partition_data.iter() {
if let Some(bytes) = &pd.records {
let p = self
.store
.get_partition(t, pd.index)?
.expect("TODO: partition doesn't exist");
let replica = self
.replicas
.get(p.id)
.expect("TODO: replica doesn't exist");
let mut replica = replica.lock().expect("mutex poisoned");
replica.log.write(&bytes[..])?;
}
}
}
Ok(res)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::broker::handler::test::new_broker;
use anyhow::Result;
use kafka_protocol::messages::ProduceResponse;
#[tokio::test]
async fn execute() -> Result<()> {
let (_rx, broker) = new_broker();
let _res = broker
.handle(ProduceRequest::default(), ProduceResponse::default())
.await?;
Ok(())
}
}