๐Ÿ“ฆ tychedelia / josefine

๐Ÿ“„ produce.rs ยท 55 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55use async_trait::async_trait;
use std::io::Write;

use crate::broker::handler::Handler;
use crate::broker::Broker;

use kafka_protocol::messages::ProduceRequest;
use kafka_protocol::protocol::Request;

#[async_trait]
impl Handler<ProduceRequest> for Broker {
    async fn handle(
        &self,
        req: ProduceRequest,
        res: <ProduceRequest as Request>::Response,
    ) -> anyhow::Result<<ProduceRequest as Request>::Response> {
        for (t, td) in req.topic_data.iter() {
            let _topic = self.store.get_topic(t)?.expect("TODO: topic doesn't exist");
            for pd in td.partition_data.iter() {
                if let Some(bytes) = &pd.records {
                    let p = self
                        .store
                        .get_partition(t, pd.index)?
                        .expect("TODO: partition doesn't exist");
                    let replica = self
                        .replicas
                        .get(p.id)
                        .expect("TODO: replica doesn't exist");
                    let mut replica = replica.lock().expect("mutex poisoned");
                    replica.log.write(&bytes[..])?;
                }
            }
        }

        Ok(res)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::broker::handler::test::new_broker;
    use anyhow::Result;
    use kafka_protocol::messages::ProduceResponse;

    #[tokio::test]
    async fn execute() -> Result<()> {
        let (_rx, broker) = new_broker();
        let _res = broker
            .handle(ProduceRequest::default(), ProduceResponse::default())
            .await?;
        Ok(())
    }
}