diff --git a/slige/runtime/Makefile b/slige/runtime/Makefile index 5aee3f4..37f9892 100644 --- a/slige/runtime/Makefile +++ b/slige/runtime/Makefile @@ -14,7 +14,7 @@ C_FLAGS = \ -Wall -Wextra -Wpedantic -Wconversion \ -pedantic -pedantic-errors \ -L_FLAGS = -lm +L_FLAGS = -lm -pthread F_FLAGS = OPTIMIZATION = diff --git a/slige/runtime/src/http_server.c b/slige/runtime/src/http_server.c index e6a5b51..c8b8aed 100644 --- a/slige/runtime/src/http_server.c +++ b/slige/runtime/src/http_server.c @@ -1,22 +1,18 @@ #include "http_server.h" +#include "http_server_internal.h" #include +#include +#include #include +#include #include #include +#include #include +#include #include #include -typedef struct sockaddr SockAddr; -typedef struct sockaddr_in AddrIn; - - -struct HttpServer { - int server_fd; - AddrIn server_addr; - thrd_t* worker_threads; -}; - HttpServer* http_server_new(HttpServerOpts opts) { @@ -26,11 +22,14 @@ HttpServer* http_server_new(HttpServerOpts opts) return NULL; } - AddrIn server_addr; + SockAddrIn server_addr; server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = INADDR_ANY; server_addr.sin_port = htons(opts.port); + int reuse = 1; + setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); + int res = bind(server_fd, (SockAddr*)&server_addr, sizeof(server_addr)); if (res != 0) { fprintf(stderr, "error: could not bind socket\n"); @@ -43,38 +42,208 @@ HttpServer* http_server_new(HttpServerOpts opts) return NULL; } - thrd_t* worker_threads = malloc(sizeof(thrd_t) * opts.worker_threads); - for (size_t i = 0; i < opts.worker_threads; ++i) { - // worker_threads[i] = thrd_create(thrd_t *thr, thrd_start_t func, void *arg) - } - HttpServer* server = malloc(sizeof(HttpServer)); *server = (HttpServer) { - server_fd, - server_addr, + .file = server_fd, + .addr = server_addr, + // .ctx = {}, + .workers = malloc(sizeof(Worker) * opts.workers_amount), + .workers_size = opts.workers_amount, }; + server_ctx_construct(&server->ctx); + for (size_t i = 0; i < opts.workers_amount; ++i) { + worker_construct(&server->workers[i], &server->ctx); + } + return server; } void http_server_free(HttpServer* server) { - close(server->server_fd); + close(server->file); + for (size_t i = 0; i < server->workers_size; ++i) { + worker_destroy(&server->workers[i]); + } + server_ctx_destroy(&server->ctx); free(server); } int http_server_listen(HttpServer* server) { - while (true) { - AddrIn client_addr; - socklen_t client_addr_size = sizeof(client_addr); + ServerCtx* ctx = &server->ctx; - int res = accept( - server->server_fd, (SockAddr*)&client_addr, &client_addr_size); + while (true) { + SockAddrIn client_addr; + socklen_t addr_size = sizeof(client_addr); + + int res = accept(server->file, (SockAddr*)&client_addr, &addr_size); if (res == -1) { fprintf(stderr, "error: could not accept\n"); return -1; } + Request req = { .client_file = res, client_addr }; + pthread_mutex_lock(&ctx->mutex); + + res = request_queue_push(&ctx->req_queue, req); + if (res != 0) { + fprintf(stderr, "error: request queue full\n"); + return -1; + } + pthread_mutex_unlock(&ctx->mutex); + pthread_cond_signal(&ctx->cond); } } + +void server_ctx_construct(ServerCtx* ctx) +{ + pthread_mutex_init(&ctx->mutex, NULL); + pthread_cond_init(&ctx->cond, NULL); + request_queue_construct(&ctx->req_queue, 8192); +} + +void server_ctx_destroy(ServerCtx* ctx) +{ + pthread_mutex_destroy(&ctx->mutex); + pthread_cond_destroy(&ctx->cond); + request_queue_destroy(&ctx->req_queue); +} + +int request_queue_construct(RequestQueue* queue, size_t capacity) +{ + *queue = (RequestQueue) { + .data = malloc(sizeof(Request) * capacity), + .capacity = capacity, + .back = 0, + .front = 0, + }; + return 0; +} + +void request_queue_destroy(RequestQueue* queue) +{ + free(queue->data); +} + +int request_queue_push(RequestQueue* queue, Request req) +{ + size_t front = queue->front + 1; + if (front >= queue->capacity) { + front = 0; + } + if (front == queue->back) { + return -1; + } + queue->data[queue->front] = req; + queue->front = front; + return 0; +} + +size_t request_queue_size(const RequestQueue* queue) +{ + return queue->front >= queue->back + ? queue->front - queue->back + : (queue->capacity - queue->back) + queue->front; +} + +int request_queue_pop(RequestQueue* queue, Request* req) +{ + if (queue->back == queue->front) { + return -1; + } + *req = queue->data[queue->back]; + size_t back = queue->back + 1; + if (back >= queue->capacity) { + back = 0; + } + queue->back = back; + return 0; +} + +int worker_construct(Worker* worker, ServerCtx* ctx) +{ + *worker = (Worker) { + // .thread = {}, + .ctx = ctx, + }; + + pthread_create(&worker->thread, NULL, worker_thread_fn, worker); + return 0; +} + +void worker_destroy(Worker* worker) +{ + if (worker->thread != 0) { + pthread_cancel(worker->thread); + + // a bit ugly, but who cares? + pthread_cond_broadcast(&worker->ctx->cond); + + pthread_join(worker->thread, NULL); + } +} + +void* worker_thread_fn(void* data) +{ + Worker* worker = data; + worker_listen(worker); + return NULL; +} + +void worker_listen(Worker* worker) +{ + ServerCtx* ctx = worker->ctx; + while (true) { + pthread_testcancel(); + + pthread_mutex_lock(&ctx->mutex); + pthread_cond_wait(&ctx->cond, &ctx->mutex); + + if (request_queue_size(&ctx->req_queue) == 0) { + pthread_mutex_unlock(&ctx->mutex); + continue; + } + + Request req; + request_queue_pop(&ctx->req_queue, &req); + pthread_mutex_unlock(&ctx->mutex); + + worker_handle_request(worker, &req); + } +} + +void worker_handle_request(Worker* worker, Request* req) +{ + (void)worker; + + uint8_t buffer[MAX_HEADER_SIZE] = { 0 }; + ssize_t bytes_received = recv(req->client_file, &buffer, sizeof(buffer), 0); + + if (bytes_received == -1) { + fprintf(stderr, "error: could not receive request\n"); + return; + } + + size_t header_end = 0; + for (ssize_t i = 0; i < bytes_received - 3; ++i) { + if (memcmp((char*)&buffer[i], "\r\n\r\n", 4)) { + header_end = (size_t)i + 5; + } + } + if (header_end == 0) { + fprintf(stderr, "error: header too big, exceeded %d bytes\n", + MAX_HEADER_SIZE); + return; + } + puts((char*)buffer); + + HttpReq http_req; + http_parse_header(&http_req, (char*)buffer, header_end); + + close(req->client_file); +} + +int http_parse_header(HttpReq* req, const char* header, size_t header_size) +{ +} diff --git a/slige/runtime/src/http_server.h b/slige/runtime/src/http_server.h index 6475141..c508ebf 100644 --- a/slige/runtime/src/http_server.h +++ b/slige/runtime/src/http_server.h @@ -7,13 +7,13 @@ typedef struct HttpServer HttpServer; typedef struct { uint16_t port; - size_t worker_threads; + size_t workers_amount; } HttpServerOpts; +/// On ok, HttpServer /// On error, returns NULL and prints. HttpServer* http_server_new(HttpServerOpts opts); void http_server_free(HttpServer* server); - -/// On error, returns -1; +/// On ok, returns 0. +/// On error, returns -1 and prints; int http_server_listen(HttpServer* server); - diff --git a/slige/runtime/src/http_server_internal.h b/slige/runtime/src/http_server_internal.h new file mode 100644 index 0000000..b060a54 --- /dev/null +++ b/slige/runtime/src/http_server_internal.h @@ -0,0 +1,94 @@ +#pragma once + +#include "http_server.h" +#include +#include +#include +#include +#include +#include +#include +#include + +typedef struct sockaddr SockAddr; +typedef struct sockaddr_in SockAddrIn; + +typedef struct { + int client_file; + SockAddrIn client_addr; +} Request; + +/// This shit is implemented as a static size fifo buffer. +typedef struct { + Request* data; + size_t capacity; + size_t back; + size_t front; +} RequestQueue; + +/// On ok, returns 0. +/// On error, returns -1. +int request_queue_construct(RequestQueue* queue, size_t capacity); +void request_queue_destroy(RequestQueue* queue); + +/// On ok, returns 0. +/// On error, returns -1 if queue is full. +int request_queue_push(RequestQueue* queue, Request req); +size_t request_queue_size(const RequestQueue* queue); + +/// On ok, returns 0. +/// On error, returns -1 if queue is empty. +int request_queue_pop(RequestQueue* queue, Request* req); + +typedef struct { + pthread_mutex_t mutex; + pthread_cond_t cond; + RequestQueue req_queue; +} ServerCtx; + +void server_ctx_construct(ServerCtx* ctx); +void server_ctx_destroy(ServerCtx* ctx); + +typedef struct { + pthread_t thread; + ServerCtx* ctx; +} Worker; + +/// On ok, returns 0. +/// On error, returns -1; +int worker_construct(Worker* worker, ServerCtx* ctx); +void worker_destroy(Worker* worker); +void* worker_thread_fn(void* data); +void worker_thread_cleanup(void* data); +void worker_listen(Worker* worker); +void worker_handle_request(Worker* worker, Request* req); + +struct HttpServer { + int file; + SockAddrIn addr; + ServerCtx ctx; + Worker* workers; + size_t workers_size; +}; + +#define MAX_HEADER_SIZE 8192 + +typedef enum { + HttpMethod_GET, + HttpMethod_POST, +} HttpMethod; + +typedef struct { + char* key; + char* value; +} HttpHeader; + +typedef struct { + HttpMethod method; + char* path; + HttpHeader* headers; + size_t headers_size; +} HttpReq; + +/// On error, returns -1. +int http_parse_header(HttpReq* req, const char* header, size_t header_size); diff --git a/slige/runtime/src/main.c b/slige/runtime/src/main.c index 1039211..17151ce 100644 --- a/slige/runtime/src/main.c +++ b/slige/runtime/src/main.c @@ -1,14 +1,24 @@ #include "http_server.h" +#include #include +#include + +HttpServer* server; int main(void) { printf("hello world\n"); - HttpServer* server = http_server_new((HttpServerOpts) { + server = http_server_new((HttpServerOpts) { .port = 8080, - .worker_threads = 8, + .workers_amount = 8, }); + if (!server) { + return -1; + } + + printf("listening at http://127.0.0.1:8080/\n"); + http_server_listen(server); http_server_free(server); } diff --git a/slige/runtime/src/request.c b/slige/runtime/src/request.c deleted file mode 100644 index e69de29..0000000 diff --git a/slige/runtime/src/request.h b/slige/runtime/src/request.h deleted file mode 100644 index 4eb394a..0000000 --- a/slige/runtime/src/request.h +++ /dev/null @@ -1,6 +0,0 @@ -#pragma once - -typedef struct { - int v; -} RequestQueue; -