http1/body/
buf_body_reader.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
use std::io::{BufReader, Read};

use super::{http_body::HttpBody, Body};

const DEFAULT_BUFFER_SIZE: usize = 4 * 1024; // 4kb

pub struct BufBodyReader<R> {
    reader: BufReader<R>,
    buf: Box<[u8]>,
    eof: bool,
    read: usize,
    content_length: Option<usize>,
}

impl<R> BufBodyReader<R>
where
    R: Read + Send + 'static,
{
    pub fn new(reader: R, content_length: Option<usize>) -> Self {
        Self::with_buffer_size(reader, DEFAULT_BUFFER_SIZE, content_length)
    }

    pub fn with_buffer_size(reader: R, buffer_size: usize, content_length: Option<usize>) -> Self {
        Self::with_buf_reader_and_buffer_size(
            BufReader::new(reader),
            Some(buffer_size),
            content_length,
        )
    }

    pub fn with_buf_reader_and_buffer_size(
        reader: BufReader<R>,
        buffer_size: Option<usize>,
        content_length: Option<usize>,
    ) -> Self {
        let buffer_size = buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE);

        assert!(buffer_size > 0);

        let buf = vec![0; buffer_size].into_boxed_slice();

        BufBodyReader {
            reader,
            buf,
            eof: false,
            read: 0,
            content_length,
        }
    }
}

impl<R> HttpBody for BufBodyReader<R>
where
    R: Read + Send + 'static,
{
    type Err = std::io::Error;
    type Data = Vec<u8>;

    fn read_next(&mut self) -> Result<Option<Self::Data>, Self::Err> {
        if self.eof {
            return Ok(None);
        }

        let len = self
            .content_length
            .unwrap_or(self.buf.len())
            .min(self.buf.len());

        let buf = &mut self.buf[..len];

        match self.reader.read(buf)? {
            0 => {
                self.eof = true;
                Ok(None)
            }
            n => {
                let chunk = buf[..n].to_vec();
                self.read += chunk.len();

                // If we reach the end we don't need to read more data
                if let Some(content_length) = self.content_length {
                    if self.read >= content_length {
                        self.eof = true;
                    }
                }

                Ok(Some(chunk))
            }
        }
    }
}

impl<R> From<BufBodyReader<R>> for Body
where
    R: Read + Send + 'static,
{
    fn from(value: BufBodyReader<R>) -> Self {
        Body::new(value)
    }
}