add http server threads

This commit is contained in:
SimonFJ20 2025-03-04 16:24:28 +01:00
parent dc01bcdb84
commit 49cf0f4db9
7 changed files with 304 additions and 37 deletions

View File

@ -14,7 +14,7 @@ C_FLAGS = \
-Wall -Wextra -Wpedantic -Wconversion \
-pedantic -pedantic-errors \
L_FLAGS = -lm
L_FLAGS = -lm -pthread
F_FLAGS =
OPTIMIZATION =

View File

@ -1,22 +1,18 @@
#include "http_server.h"
#include "http_server_internal.h"
#include <netinet/in.h>
#include <pthread.h>
#include <signal.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <threads.h>
#include <unistd.h>
typedef struct sockaddr SockAddr;
typedef struct sockaddr_in AddrIn;
struct HttpServer {
int server_fd;
AddrIn server_addr;
thrd_t* worker_threads;
};
HttpServer* http_server_new(HttpServerOpts opts)
{
@ -26,11 +22,14 @@ HttpServer* http_server_new(HttpServerOpts opts)
return NULL;
}
AddrIn server_addr;
SockAddrIn server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(opts.port);
int reuse = 1;
setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
int res = bind(server_fd, (SockAddr*)&server_addr, sizeof(server_addr));
if (res != 0) {
fprintf(stderr, "error: could not bind socket\n");
@ -43,38 +42,208 @@ HttpServer* http_server_new(HttpServerOpts opts)
return NULL;
}
thrd_t* worker_threads = malloc(sizeof(thrd_t) * opts.worker_threads);
for (size_t i = 0; i < opts.worker_threads; ++i) {
// worker_threads[i] = thrd_create(thrd_t *thr, thrd_start_t func, void *arg)
}
HttpServer* server = malloc(sizeof(HttpServer));
*server = (HttpServer) {
server_fd,
server_addr,
.file = server_fd,
.addr = server_addr,
// .ctx = {},
.workers = malloc(sizeof(Worker) * opts.workers_amount),
.workers_size = opts.workers_amount,
};
server_ctx_construct(&server->ctx);
for (size_t i = 0; i < opts.workers_amount; ++i) {
worker_construct(&server->workers[i], &server->ctx);
}
return server;
}
void http_server_free(HttpServer* server)
{
close(server->server_fd);
close(server->file);
for (size_t i = 0; i < server->workers_size; ++i) {
worker_destroy(&server->workers[i]);
}
server_ctx_destroy(&server->ctx);
free(server);
}
int http_server_listen(HttpServer* server)
{
while (true) {
AddrIn client_addr;
socklen_t client_addr_size = sizeof(client_addr);
ServerCtx* ctx = &server->ctx;
int res = accept(
server->server_fd, (SockAddr*)&client_addr, &client_addr_size);
while (true) {
SockAddrIn client_addr;
socklen_t addr_size = sizeof(client_addr);
int res = accept(server->file, (SockAddr*)&client_addr, &addr_size);
if (res == -1) {
fprintf(stderr, "error: could not accept\n");
return -1;
}
Request req = { .client_file = res, client_addr };
pthread_mutex_lock(&ctx->mutex);
res = request_queue_push(&ctx->req_queue, req);
if (res != 0) {
fprintf(stderr, "error: request queue full\n");
return -1;
}
pthread_mutex_unlock(&ctx->mutex);
pthread_cond_signal(&ctx->cond);
}
}
void server_ctx_construct(ServerCtx* ctx)
{
pthread_mutex_init(&ctx->mutex, NULL);
pthread_cond_init(&ctx->cond, NULL);
request_queue_construct(&ctx->req_queue, 8192);
}
void server_ctx_destroy(ServerCtx* ctx)
{
pthread_mutex_destroy(&ctx->mutex);
pthread_cond_destroy(&ctx->cond);
request_queue_destroy(&ctx->req_queue);
}
int request_queue_construct(RequestQueue* queue, size_t capacity)
{
*queue = (RequestQueue) {
.data = malloc(sizeof(Request) * capacity),
.capacity = capacity,
.back = 0,
.front = 0,
};
return 0;
}
void request_queue_destroy(RequestQueue* queue)
{
free(queue->data);
}
int request_queue_push(RequestQueue* queue, Request req)
{
size_t front = queue->front + 1;
if (front >= queue->capacity) {
front = 0;
}
if (front == queue->back) {
return -1;
}
queue->data[queue->front] = req;
queue->front = front;
return 0;
}
size_t request_queue_size(const RequestQueue* queue)
{
return queue->front >= queue->back
? queue->front - queue->back
: (queue->capacity - queue->back) + queue->front;
}
int request_queue_pop(RequestQueue* queue, Request* req)
{
if (queue->back == queue->front) {
return -1;
}
*req = queue->data[queue->back];
size_t back = queue->back + 1;
if (back >= queue->capacity) {
back = 0;
}
queue->back = back;
return 0;
}
int worker_construct(Worker* worker, ServerCtx* ctx)
{
*worker = (Worker) {
// .thread = {},
.ctx = ctx,
};
pthread_create(&worker->thread, NULL, worker_thread_fn, worker);
return 0;
}
void worker_destroy(Worker* worker)
{
if (worker->thread != 0) {
pthread_cancel(worker->thread);
// a bit ugly, but who cares?
pthread_cond_broadcast(&worker->ctx->cond);
pthread_join(worker->thread, NULL);
}
}
void* worker_thread_fn(void* data)
{
Worker* worker = data;
worker_listen(worker);
return NULL;
}
void worker_listen(Worker* worker)
{
ServerCtx* ctx = worker->ctx;
while (true) {
pthread_testcancel();
pthread_mutex_lock(&ctx->mutex);
pthread_cond_wait(&ctx->cond, &ctx->mutex);
if (request_queue_size(&ctx->req_queue) == 0) {
pthread_mutex_unlock(&ctx->mutex);
continue;
}
Request req;
request_queue_pop(&ctx->req_queue, &req);
pthread_mutex_unlock(&ctx->mutex);
worker_handle_request(worker, &req);
}
}
void worker_handle_request(Worker* worker, Request* req)
{
(void)worker;
uint8_t buffer[MAX_HEADER_SIZE] = { 0 };
ssize_t bytes_received = recv(req->client_file, &buffer, sizeof(buffer), 0);
if (bytes_received == -1) {
fprintf(stderr, "error: could not receive request\n");
return;
}
size_t header_end = 0;
for (ssize_t i = 0; i < bytes_received - 3; ++i) {
if (memcmp((char*)&buffer[i], "\r\n\r\n", 4)) {
header_end = (size_t)i + 5;
}
}
if (header_end == 0) {
fprintf(stderr, "error: header too big, exceeded %d bytes\n",
MAX_HEADER_SIZE);
return;
}
puts((char*)buffer);
HttpReq http_req;
http_parse_header(&http_req, (char*)buffer, header_end);
close(req->client_file);
}
int http_parse_header(HttpReq* req, const char* header, size_t header_size)
{
}

View File

@ -7,13 +7,13 @@ typedef struct HttpServer HttpServer;
typedef struct {
uint16_t port;
size_t worker_threads;
size_t workers_amount;
} HttpServerOpts;
/// On ok, HttpServer
/// On error, returns NULL and prints.
HttpServer* http_server_new(HttpServerOpts opts);
void http_server_free(HttpServer* server);
/// On error, returns -1;
/// On ok, returns 0.
/// On error, returns -1 and prints;
int http_server_listen(HttpServer* server);

View File

@ -0,0 +1,94 @@
#pragma once
#include "http_server.h"
#include <bits/pthreadtypes.h>
#include <netinet/in.h>
#include <pthread.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <unistd.h>
typedef struct sockaddr SockAddr;
typedef struct sockaddr_in SockAddrIn;
typedef struct {
int client_file;
SockAddrIn client_addr;
} Request;
/// This shit is implemented as a static size fifo buffer.
typedef struct {
Request* data;
size_t capacity;
size_t back;
size_t front;
} RequestQueue;
/// On ok, returns 0.
/// On error, returns -1.
int request_queue_construct(RequestQueue* queue, size_t capacity);
void request_queue_destroy(RequestQueue* queue);
/// On ok, returns 0.
/// On error, returns -1 if queue is full.
int request_queue_push(RequestQueue* queue, Request req);
size_t request_queue_size(const RequestQueue* queue);
/// On ok, returns 0.
/// On error, returns -1 if queue is empty.
int request_queue_pop(RequestQueue* queue, Request* req);
typedef struct {
pthread_mutex_t mutex;
pthread_cond_t cond;
RequestQueue req_queue;
} ServerCtx;
void server_ctx_construct(ServerCtx* ctx);
void server_ctx_destroy(ServerCtx* ctx);
typedef struct {
pthread_t thread;
ServerCtx* ctx;
} Worker;
/// On ok, returns 0.
/// On error, returns -1;
int worker_construct(Worker* worker, ServerCtx* ctx);
void worker_destroy(Worker* worker);
void* worker_thread_fn(void* data);
void worker_thread_cleanup(void* data);
void worker_listen(Worker* worker);
void worker_handle_request(Worker* worker, Request* req);
struct HttpServer {
int file;
SockAddrIn addr;
ServerCtx ctx;
Worker* workers;
size_t workers_size;
};
#define MAX_HEADER_SIZE 8192
typedef enum {
HttpMethod_GET,
HttpMethod_POST,
} HttpMethod;
typedef struct {
char* key;
char* value;
} HttpHeader;
typedef struct {
HttpMethod method;
char* path;
HttpHeader* headers;
size_t headers_size;
} HttpReq;
/// On error, returns -1.
int http_parse_header(HttpReq* req, const char* header, size_t header_size);

View File

@ -1,14 +1,24 @@
#include "http_server.h"
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
HttpServer* server;
int main(void)
{
printf("hello world\n");
HttpServer* server = http_server_new((HttpServerOpts) {
server = http_server_new((HttpServerOpts) {
.port = 8080,
.worker_threads = 8,
.workers_amount = 8,
});
if (!server) {
return -1;
}
printf("listening at http://127.0.0.1:8080/\n");
http_server_listen(server);
http_server_free(server);
}

View File

@ -1,6 +0,0 @@
#pragma once
typedef struct {
int v;
} RequestQueue;