use std::{
net::{SocketAddr, TcpListener, ToSocketAddrs},
sync::{atomic::AtomicBool, Arc},
};
use crate::{
common::thread_pool::ThreadPool,
handler::RequestHandler,
protocol::{connection::Connection, h1::handle_incoming},
};
#[derive(Clone)]
pub struct ServerHandle {
is_closed: Arc<AtomicBool>,
is_ready: Arc<AtomicBool>,
}
impl ServerHandle {
pub fn shutdown(self) {
self.is_closed
.store(true, std::sync::atomic::Ordering::Relaxed);
}
pub fn is_closed(&self) -> bool {
self.is_closed.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn is_ready(&self) -> bool {
self.is_ready.load(std::sync::atomic::Ordering::Relaxed)
}
}
struct Guard(Arc<AtomicBool>);
impl Drop for Guard {
fn drop(&mut self) {
self.0.store(false, std::sync::atomic::Ordering::Relaxed);
}
}
#[derive(Clone, Debug)]
pub struct Config {
pub include_date_header: bool,
pub max_body_size: Option<usize>,
pub include_conn_info: bool,
pub include_server_info: bool,
}
impl Default for Config {
fn default() -> Self {
Self {
include_date_header: true,
max_body_size: Some(crate::constants::DEFAULT_MAX_BODY_SIZE),
include_conn_info: false,
include_server_info: true,
}
}
}
type OnReady = Box<dyn FnOnce(&SocketAddr) + Send>;
pub struct Server<E = ThreadPool> {
executor: E,
config: Config,
server_handle: ServerHandle,
on_ready: Option<OnReady>,
}
impl Server<()> {
pub fn new() -> Server<ThreadPool> {
let executor = ThreadPool::new().expect("failed to initialize thread pool executor");
Self::with_executor(executor)
}
pub fn with_executor<E: Executor>(executor: E) -> Server<E> {
let config = Config::default();
let server_handle = ServerHandle {
is_closed: Arc::new(AtomicBool::new(false)),
is_ready: Arc::new(AtomicBool::new(false)),
};
Server {
config,
executor,
server_handle,
on_ready: None,
}
}
}
impl<E> Server<E> {
pub fn include_date_header(mut self, include: bool) -> Self {
self.config.include_date_header = include;
self
}
pub fn max_body_size(mut self, max_body_size_bytes: Option<usize>) -> Self {
if let Some(max_body_size_bytes) = max_body_size_bytes {
assert!(max_body_size_bytes > 0);
}
self.config.max_body_size = max_body_size_bytes;
self
}
pub fn include_conn_info(mut self, insert_conn_info: bool) -> Self {
self.config.include_conn_info = insert_conn_info;
self
}
pub fn include_server_info(mut self, include_server_info: bool) -> Self {
self.config.include_server_info = include_server_info;
self
}
pub fn on_ready<F>(mut self, f: F) -> Self
where
F: FnOnce(&SocketAddr) + Send + 'static,
{
self.on_ready = Some(Box::new(f));
self
}
pub fn handle(&self) -> ServerHandle {
self.server_handle.clone()
}
}
impl<E> Server<E>
where
E: Executor,
{
pub fn listen<H, A: ToSocketAddrs>(self, addr: A, handler: H) -> std::io::Result<()>
where
H: RequestHandler + Send + Sync + 'static,
{
let Server {
config,
server_handle,
executor,
mut on_ready,
} = self;
let listener = TcpListener::bind(addr)?;
let local_addr = listener.local_addr()?;
if let Some(on_ready) = on_ready.take() {
on_ready(&local_addr)
}
server_handle
.is_ready
.store(true, std::sync::atomic::Ordering::Relaxed);
let handler = Arc::new(handler);
let _guard = Guard(server_handle.is_ready.clone());
loop {
if server_handle.is_closed() {
log::debug!("Closing server...");
break;
}
let (stream, _) = listener.accept()?;
let config = config.clone();
let handler = handler.clone();
let result = executor.execute(move || {
match handle_incoming(&handler, &config, Connection::Tcp(stream)) {
Ok(..) => {}
Err(err) => log::error!("{err}"),
}
});
if let Err(err) = result {
return Err(err.into());
}
}
Ok(())
}
}
pub trait Executor {
type Err: Into<std::io::Error>;
fn execute<F>(&self, f: F) -> Result<(), Self::Err>
where
F: FnOnce() + Send + 'static;
}
pub struct BlockingExecutor;
impl Executor for BlockingExecutor {
type Err = std::io::Error;
fn execute<F>(&self, f: F) -> Result<(), Self::Err>
where
F: FnOnce() + Send + 'static,
{
f();
Ok(())
}
}
impl Executor for ThreadPool {
type Err = std::io::Error;
fn execute<F>(&self, f: F) -> Result<(), Self::Err>
where
F: FnOnce() + Send + 'static,
{
self.execute(f)
}
}
pub struct SpawnExecutor;
impl Executor for SpawnExecutor {
type Err = std::io::Error;
fn execute<F>(&self, f: F) -> Result<(), Self::Err>
where
F: FnOnce() + Send + 'static,
{
std::thread::spawn(f);
Ok(())
}
}