From 5058cd97b1521cddce3e53b943907a0befbd8269 Mon Sep 17 00:00:00 2001 From: David Schramm Date: Mon, 21 Mar 2022 09:48:43 +0100 Subject: [PATCH 1/2] add multiclient streaming --- app_httpd.cpp | 113 ++++--------------------- cam_streamer.c | 178 ++++++++++++++++++++++++++++++++++++++++ cam_streamer.h | 33 ++++++++ esp32-cam-webserver.ino | 1 - 4 files changed, 227 insertions(+), 98 deletions(-) create mode 100644 cam_streamer.c create mode 100644 cam_streamer.h diff --git a/app_httpd.cpp b/app_httpd.cpp index f2b11a9..bb69672 100644 --- a/app_httpd.cpp +++ b/app_httpd.cpp @@ -28,6 +28,9 @@ #include "src/logo.h" #include "storage.h" +extern "C"{ +#include "cam_streamer.h" +} // Functions from the main .ino extern void flashLED(int flashtime); extern void setLamp(int newVal); @@ -67,6 +70,8 @@ extern char otaPassword[]; extern unsigned long xclk; extern int sensorPID; +cam_streamer_t *cam_streamer; + typedef struct { httpd_req_t *req; size_t len; @@ -222,102 +227,13 @@ static esp_err_t capture_handler(httpd_req_t *req){ } static esp_err_t stream_handler(httpd_req_t *req){ - camera_fb_t * fb = NULL; - esp_err_t res = ESP_OK; - size_t _jpg_buf_len = 0; - uint8_t * _jpg_buf = NULL; - char * part_buf[64]; - - streamKill = false; - - Serial.println("Stream requested"); - if (autoLamp && (lampVal != -1)) setLamp(lampVal); - streamCount = 1; // at present we only have one stream handler, so values are 0 or 1.. - flashLED(75); // double flash of status LED - delay(75); - flashLED(75); - - static int64_t last_frame = 0; - if(!last_frame) { - last_frame = esp_timer_get_time(); - } - - res = httpd_resp_set_type(req, _STREAM_CONTENT_TYPE); - if(res != ESP_OK){ - streamCount = 0; - if (autoLamp && (lampVal != -1)) setLamp(0); - Serial.println("STREAM: failed to set HTTP response type"); - return res; - } - - httpd_resp_set_hdr(req, "Access-Control-Allow-Origin", "*"); - - if(res == ESP_OK){ - res = httpd_resp_send_chunk(req, _STREAM_BOUNDARY, strlen(_STREAM_BOUNDARY)); - } - - while(true){ - fb = esp_camera_fb_get(); - if (!fb) { - Serial.println("STREAM: failed to acquire frame"); - res = ESP_FAIL; - } else { - if(fb->format != PIXFORMAT_JPEG){ - Serial.println("STREAM: Non-JPEG frame returned by camera module"); - res = ESP_FAIL; - } else { - _jpg_buf_len = fb->len; - _jpg_buf = fb->buf; - } - } - if(res == ESP_OK){ - size_t hlen = snprintf((char *)part_buf, 64, _STREAM_PART, _jpg_buf_len); - res = httpd_resp_send_chunk(req, (const char *)part_buf, hlen); - } - if(res == ESP_OK){ - res = httpd_resp_send_chunk(req, (const char *)_jpg_buf, _jpg_buf_len); - } - if(res == ESP_OK){ - res = httpd_resp_send_chunk(req, _STREAM_BOUNDARY, strlen(_STREAM_BOUNDARY)); - } - if(fb){ - esp_camera_fb_return(fb); - fb = NULL; - _jpg_buf = NULL; - } else if(_jpg_buf){ - free(_jpg_buf); - _jpg_buf = NULL; - } - if(res != ESP_OK){ - // This is the error exit point from the stream loop. - // We end the stream here only if a Hard failure has been encountered or the connection has been interrupted. - Serial.printf("Stream failed, code = %i : %s\r\n", res, esp_err_to_name(res)); - break; - } - if((res != ESP_OK) || streamKill){ - // We end the stream here when a kill is signalled. - Serial.printf("Stream killed\r\n"); - break; - } - int64_t frame_time = esp_timer_get_time() - last_frame; - frame_time /= 1000; - int32_t frame_delay = (minFrameTime > frame_time) ? minFrameTime - frame_time : 0; - delay(frame_delay); - - if (debugData) { - Serial.printf("MJPG: %uB %ums, delay: %ums, framerate (%.1ffps)\r\n", - (uint32_t)(_jpg_buf_len), - (uint32_t)frame_time, frame_delay, 1000.0 / (uint32_t)(frame_time + frame_delay)); - } - last_frame = esp_timer_get_time(); - } - - streamsServed++; - streamCount = 0; - if (autoLamp && (lampVal != -1)) setLamp(0); - Serial.println("Stream ended"); - last_frame = 0; - return res; + int fd=httpd_req_to_sockfd(req); + if(fd==-1){ + printf("[stream_handler] could not get socket fd!\n"); + return ESP_FAIL; + } + cam_streamer_enqueue_client(cam_streamer, fd); + return ESP_OK; } static esp_err_t cmd_handler(httpd_req_t *req){ @@ -877,7 +793,10 @@ void startCameraServer(int hPort, int sPort){ httpd_register_uri_handler(stream_httpd, &stream_uri); httpd_register_uri_handler(stream_httpd, &info_uri); httpd_register_uri_handler(stream_httpd, &streamviewer_uri); - } + cam_streamer=(cam_streamer_t *) malloc(sizeof(cam_streamer_t)); + cam_streamer_init(cam_streamer, stream_httpd, 3); + cam_streamer_start(cam_streamer); + } httpd_register_uri_handler(stream_httpd, &favicon_16x16_uri); httpd_register_uri_handler(stream_httpd, &favicon_32x32_uri); httpd_register_uri_handler(stream_httpd, &favicon_ico_uri); diff --git a/cam_streamer.c b/cam_streamer.c new file mode 100644 index 0000000..bb82b6e --- /dev/null +++ b/cam_streamer.c @@ -0,0 +1,178 @@ +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "cam_streamer.h" + +#define PART_BOUNDARY "123456789000000000000987654321" + +#define _STREAM_HEADERS "HTTP/1.1 200 OK\r\n"\ + "Access-Control-Allow-Origin: *\r\n"\ + "Connection: Keep-Alive\r\n"\ + "Keep-Alive: timeout=15\r\n"\ + "Content-Type: multipart/x-mixed-replace;boundary=" PART_BOUNDARY "\r\n" + +static const char* _STREAM_BOUNDARY = "\r\n--" PART_BOUNDARY "\r\n"; +static const char* _STREAM_PART = "Content-Type: image/jpeg\r\nContent-Length: %u\r\n\r\n"; + +static uint8_t is_send_error(int r) { + switch(r) { + case HTTPD_SOCK_ERR_INVALID: +#ifdef DEBUG_DEFAULT_ON + printf("[cam_streamer] invalid argument occured!\n"); +#endif + return 1; + case HTTPD_SOCK_ERR_TIMEOUT: +#ifdef DEBUG_DEFAULT_ON + printf("[cam_streamer] timeout/interrupt occured!\n"); +#endif + return 1; + case HTTPD_SOCK_ERR_FAIL: +#ifdef DEBUG_DEFAULT_ON + printf("[cam_streamer] unrecoverable error while send()!\n"); +#endif + return 1; + case ESP_ERR_INVALID_ARG: +#ifdef DEBUG_DEFAULT_ON + printf("[text-streamer] session closed!\n"); +#endif + return 1; + default: +#ifdef DEBUG_DEFAULT_ON + printf("[cam_streamer] sent %d bytes!\n", r); +#endif + return 0; + } +} + +void cam_streamer_init(cam_streamer_t *s, httpd_handle_t server, uint16_t fps) { + memset(s, 0, sizeof(cam_streamer_t)); + s->frame_delay=1000000/fps; + s->clients=xQueueCreate(CAM_STREAMER_MAX_CLIENTS*2, sizeof(int)); + s->server=server; +} + +static void cam_streamer_update_frame(cam_streamer_t *s) { + uint8_t l=0; + while(!__atomic_compare_exchange_n(&s->buf_lock, &l, 1, 0, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) { + l=0; + vTaskDelay(1/portTICK_PERIOD_MS); + } + + if(s->buf) + esp_camera_fb_return(s->buf); + + s->buf=esp_camera_fb_get(); + + s->last_updated=esp_timer_get_time(); + s->part_len=snprintf(s->part_buf, 64, _STREAM_PART, s->buf->len); + __atomic_store_n(&s->buf_lock, 0, __ATOMIC_RELAXED); +#ifdef DEBUG_DEFAULT_ON + printf("[cam_streamer] fetched new frame\n"); +#endif +} + +void cam_streamer_task(void *p) { + cam_streamer_t *s=(cam_streamer_t *) p; + + uint8_t res; + uint64_t last_time=0, current_time; + int fd; + unsigned int n_entries; + for(;;) { + while(!(n_entries=uxQueueMessagesWaiting(s->clients))) + vTaskSuspend(NULL); + + current_time=esp_timer_get_time(); + if((current_time-last_time)frame_delay) + vTaskDelay((s->frame_delay-(current_time-last_time))/(1000*portTICK_PERIOD_MS)); + last_time=current_time; + + cam_streamer_update_frame(s); + + for(unsigned int i=0; iclients, &fd, 10/portTICK_PERIOD_MS)==pdFALSE) { +#ifdef DEBUG_DEFAULT_ON + printf("[cam_streamer] failed to dequeue fd!\n"); +#endif + continue; + } + +#ifdef DEBUG_DEFAULT_ON + printf("[cam_streamer] dequeued fd %d\n", fd); + printf("[cam_streamer] sending part: \"%.*s\"\n", (int) s->part_len, s->part_buf); +#endif + if((res=is_send_error(httpd_socket_send(s->server, fd, s->part_buf, s->part_len, 0)))) + continue; + + if((res|=is_send_error(httpd_socket_send(s->server, fd, s->buf->buf, s->buf->len, 0)))) + continue; + + if((res|=is_send_error(httpd_socket_send(s->server, fd, _STREAM_BOUNDARY, strlen(_STREAM_BOUNDARY), 0)))) + continue; + + if(!res) { +#ifdef DEBUG_DEFAULT_ON + printf("[cam_streamer] fd %d requeued\n", fd); +#endif + xQueueSend(s->clients, (void *) &fd, 10/portTICK_PERIOD_MS); + } + } + } +} + +void cam_streamer_start(cam_streamer_t *s) { + BaseType_t r=xTaskCreate(cam_streamer_task, "cam_streamer", 10*1024, (void *) s, tskIDLE_PRIORITY+3, &s->task); + +#ifdef DEBUG_DEFAULT_ON + if(r!=pdPASS) + printf("[cam_streamer] failed to create task!\n"); +#endif +} + +void cam_streamer_stop(cam_streamer_t *s) { + vTaskDelete(s->task); +} + +bool cam_streamer_enqueue_client(cam_streamer_t *s, int fd) { +#ifdef DEBUG_DEFAULT_ON + printf("sending stream headers:\n%s\nLength: %d\n", _STREAM_HEADERS, strlen(_STREAM_HEADERS)); +#endif + if(is_send_error(httpd_socket_send(s->server, fd, _STREAM_HEADERS, strlen(_STREAM_HEADERS), 0))) { +#ifdef DEBUG_DEFAULT_ON + printf("failed sending headers!\n"); +#endif + return false; + } + + if(is_send_error(httpd_socket_send(s->server, fd, _STREAM_BOUNDARY, strlen(_STREAM_BOUNDARY), 0))) { +#ifdef DEBUG_DEFAULT_ON + printf("failed sending boundary!\n"); +#endif + return false; + } + + const BaseType_t r=xQueueSend(s->clients, (void *) &fd, 10*portTICK_PERIOD_MS); + if(r!=pdTRUE) { +#ifdef DEBUG_DEFAULT_ON + printf("[cam_streamer] failed to enqueue fd %d\n", fd); +#endif +#define EMSG "failed to enqueue" + httpd_socket_send(s->server, fd, EMSG, strlen(EMSG), 0); +#undef EMSG + } else { +#ifdef DEBUG_DEFAULT_ON + printf("[cam_streamer] socket %d enqueued\n", fd); +#endif + vTaskResume(s->task); + } + + return r==pdTRUE?true:false; +} + diff --git a/cam_streamer.h b/cam_streamer.h new file mode 100644 index 0000000..504da23 --- /dev/null +++ b/cam_streamer.h @@ -0,0 +1,33 @@ +#ifndef _INC_CAM_STREAMER +#define _INC_CAM_STREAMER + +#include +#include +#include +#include +#include + +#include +#include +#include + +#define CAM_STREAMER_MAX_CLIENTS 10 +typedef struct { + QueueHandle_t clients; + TaskHandle_t task; + uint64_t last_updated; + int64_t frame_delay; + uint8_t buf_lock; + camera_fb_t *buf; + char part_buf[64]; + size_t part_len; + httpd_handle_t server; +} cam_streamer_t; + +void cam_streamer_init(cam_streamer_t *s, httpd_handle_t server, uint16_t fps); +void cam_streamer_task(void *p); +void cam_streamer_start(cam_streamer_t *s); +void cam_streamer_stop(cam_streamer_t *s); +bool cam_streamer_enqueue_client(cam_streamer_t *s, int fd); + +#endif diff --git a/esp32-cam-webserver.ino b/esp32-cam-webserver.ino index 38b22fa..f03ae5b 100644 --- a/esp32-cam-webserver.ino +++ b/esp32-cam-webserver.ino @@ -9,7 +9,6 @@ #include "time.h" #include - /* This sketch is a extension/expansion/reork of the 'official' ESP32 Camera example * sketch from Expressif: * https://github.com/espressif/arduino-esp32/tree/master/libraries/ESP32/examples/Camera/CameraWebServer From ba3c09fdbd291639747bb19f27bfd848e0ec73bb Mon Sep 17 00:00:00 2001 From: David Schramm Date: Mon, 21 Mar 2022 19:22:57 +0100 Subject: [PATCH 2/2] add stream counter --- app_httpd.cpp | 24 +++++++++++++----------- cam_streamer.c | 38 ++++++++++++++++++++++++++++++-------- cam_streamer.h | 21 ++++++++++++--------- esp32-cam-webserver.ino | 1 - 4 files changed, 55 insertions(+), 29 deletions(-) diff --git a/app_httpd.cpp b/app_httpd.cpp index bb69672..6265f1a 100644 --- a/app_httpd.cpp +++ b/app_httpd.cpp @@ -51,7 +51,6 @@ extern int streamPort; extern char httpURL[]; extern char streamURL[]; extern char default_index[]; -extern int8_t streamCount; extern unsigned long streamsServed; extern unsigned long imagesServed; extern int myRotation; @@ -85,9 +84,6 @@ static const char* _STREAM_PART = "Content-Type: image/jpeg\r\nContent-Length: % httpd_handle_t stream_httpd = NULL; httpd_handle_t camera_httpd = NULL; -// Flag that can be set to kill all active streams -bool streamKill; - #ifdef __cplusplus extern "C" { #endif @@ -152,7 +148,7 @@ void serialDump() { int McuTc = (temprature_sens_read() - 32) / 1.8; // celsius int McuTf = temprature_sens_read(); // fahrenheit Serial.printf("System up: %" PRId64 ":%02i:%02i:%02i (d:h:m:s)\r\n", upDays, upHours, upMin, upSec); - Serial.printf("Active streams: %i, Previous streams: %lu, Images captured: %lu\r\n", streamCount, streamsServed, imagesServed); + Serial.printf("Active streams: %lu, Previous streams: %lu, Images captured: %lu\r\n", cam_streamer_get_num_clients(cam_streamer), streamsServed, imagesServed); Serial.printf("CPU Freq: %i MHz, Xclk Freq: %i MHz\r\n", ESP.getCpuFreqMHz(), xclk); Serial.printf("MCU temperature : %i C, %i F (approximate)\r\n", McuTc, McuTf); Serial.printf("Heap: %i, free: %i, min free: %i, max block: %i\r\n", ESP.getHeapSize(), ESP.getFreeHeap(), ESP.getMinFreeHeap(), ESP.getMaxAllocHeap()); @@ -232,7 +228,10 @@ static esp_err_t stream_handler(httpd_req_t *req){ printf("[stream_handler] could not get socket fd!\n"); return ESP_FAIL; } - cam_streamer_enqueue_client(cam_streamer, fd); + + if(cam_streamer_enqueue_client(cam_streamer, fd)) + ++streamsServed; + return ESP_OK; } @@ -307,7 +306,7 @@ static esp_err_t cmd_handler(httpd_req_t *req){ else if(!strcmp(variable, "autolamp") && (lampVal != -1)) { autoLamp = val; if (autoLamp) { - if (streamCount > 0) setLamp(lampVal); + if (cam_streamer_get_num_clients(cam_streamer) > 0) setLamp(lampVal); else setLamp(0); } else { setLamp(lampVal); @@ -316,7 +315,7 @@ static esp_err_t cmd_handler(httpd_req_t *req){ else if(!strcmp(variable, "lamp") && (lampVal != -1)) { lampVal = constrain(val,0,100); if (autoLamp) { - if (streamCount > 0) setLamp(lampVal); + if (cam_streamer_get_num_clients(cam_streamer) > 0) setLamp(lampVal); else setLamp(0); } else { setLamp(lampVal); @@ -513,7 +512,7 @@ static esp_err_t dump_handler(httpd_req_t *req){ int McuTf = temprature_sens_read(); // fahrenheit d+= sprintf(d,"Up: %" PRId64 ":%02i:%02i:%02i (d:h:m:s)
\n", upDays, upHours, upMin, upSec); - d+= sprintf(d,"Active streams: %i, Previous streams: %lu, Images captured: %lu
\n", streamCount, streamsServed, imagesServed); + d+= sprintf(d,"Active streams: %i, Previous streams: %lu, Images captured: %lu
\n", cam_streamer_get_num_clients(cam_streamer), streamsServed, imagesServed); d+= sprintf(d,"CPU Freq: %i MHz, Xclk Freq: %i MHz
\n", ESP.getCpuFreqMHz(), xclk); d+= sprintf(d,""); d+= sprintf(d,"MCU temperature : %i °C, %i °F\n
", McuTc, McuTf); @@ -550,7 +549,7 @@ static esp_err_t dump_handler(httpd_req_t *req){ static esp_err_t stop_handler(httpd_req_t *req){ flashLED(75); Serial.println("\r\nStream stop requested via Web"); - streamKill = true; + cam_streamer_dequeue_all_clients(cam_streamer); httpd_resp_set_hdr(req, "Access-Control-Allow-Origin", "*"); return httpd_resp_send(req, NULL, 0); } @@ -794,7 +793,10 @@ void startCameraServer(int hPort, int sPort){ httpd_register_uri_handler(stream_httpd, &info_uri); httpd_register_uri_handler(stream_httpd, &streamviewer_uri); cam_streamer=(cam_streamer_t *) malloc(sizeof(cam_streamer_t)); - cam_streamer_init(cam_streamer, stream_httpd, 3); +#ifndef CAM_STREAMER_DESIRED_FPS +#define CAM_STREAMER_DESIRED_FPS 2 +#endif + cam_streamer_init(cam_streamer, stream_httpd, CAM_STREAMER_DESIRED_FPS); cam_streamer_start(cam_streamer); } httpd_register_uri_handler(stream_httpd, &favicon_16x16_uri); diff --git a/cam_streamer.c b/cam_streamer.c index bb82b6e..7dc8fe0 100644 --- a/cam_streamer.c +++ b/cam_streamer.c @@ -78,10 +78,17 @@ static void cam_streamer_update_frame(cam_streamer_t *s) { #endif } +static void cam_streamer_decrement_num_clients(cam_streamer_t *s) { + size_t num_clients=s->num_clients; + while(num_clients>0 && !__atomic_compare_exchange_n(&s->num_clients, &num_clients, num_clients-1, 0, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); +#ifdef DEBUG_DEFAULT_ON + printf("[cam_streamer] num_clients decremented\n"); +#endif +} + void cam_streamer_task(void *p) { cam_streamer_t *s=(cam_streamer_t *) p; - uint8_t res; uint64_t last_time=0, current_time; int fd; unsigned int n_entries; @@ -108,21 +115,26 @@ void cam_streamer_task(void *p) { printf("[cam_streamer] dequeued fd %d\n", fd); printf("[cam_streamer] sending part: \"%.*s\"\n", (int) s->part_len, s->part_buf); #endif - if((res=is_send_error(httpd_socket_send(s->server, fd, s->part_buf, s->part_len, 0)))) + + if(is_send_error(httpd_socket_send(s->server, fd, s->part_buf, s->part_len, 0))) { + cam_streamer_decrement_num_clients(s); continue; + } - if((res|=is_send_error(httpd_socket_send(s->server, fd, s->buf->buf, s->buf->len, 0)))) + if(is_send_error(httpd_socket_send(s->server, fd, s->buf->buf, s->buf->len, 0))) { + cam_streamer_decrement_num_clients(s); continue; + } - if((res|=is_send_error(httpd_socket_send(s->server, fd, _STREAM_BOUNDARY, strlen(_STREAM_BOUNDARY), 0)))) + if(is_send_error(httpd_socket_send(s->server, fd, _STREAM_BOUNDARY, strlen(_STREAM_BOUNDARY), 0))) { + cam_streamer_decrement_num_clients(s); continue; + } - if(!res) { + xQueueSend(s->clients, (void *) &fd, 10/portTICK_PERIOD_MS); #ifdef DEBUG_DEFAULT_ON - printf("[cam_streamer] fd %d requeued\n", fd); + printf("[cam_streamer] fd %d requeued\n", fd); #endif - xQueueSend(s->clients, (void *) &fd, 10/portTICK_PERIOD_MS); - } } } } @@ -140,6 +152,15 @@ void cam_streamer_stop(cam_streamer_t *s) { vTaskDelete(s->task); } +size_t cam_streamer_get_num_clients(cam_streamer_t *s) { + return s->num_clients; +} + +void cam_streamer_dequeue_all_clients(cam_streamer_t *s) { + xQueueReset(s->clients); + __atomic_exchange_n(&s->num_clients, 0, __ATOMIC_RELAXED); +} + bool cam_streamer_enqueue_client(cam_streamer_t *s, int fd) { #ifdef DEBUG_DEFAULT_ON printf("sending stream headers:\n%s\nLength: %d\n", _STREAM_HEADERS, strlen(_STREAM_HEADERS)); @@ -170,6 +191,7 @@ bool cam_streamer_enqueue_client(cam_streamer_t *s, int fd) { #ifdef DEBUG_DEFAULT_ON printf("[cam_streamer] socket %d enqueued\n", fd); #endif + __atomic_fetch_add(&s->num_clients, 1, __ATOMIC_RELAXED); vTaskResume(s->task); } diff --git a/cam_streamer.h b/cam_streamer.h index 504da23..a128926 100644 --- a/cam_streamer.h +++ b/cam_streamer.h @@ -13,15 +13,16 @@ #define CAM_STREAMER_MAX_CLIENTS 10 typedef struct { - QueueHandle_t clients; - TaskHandle_t task; - uint64_t last_updated; - int64_t frame_delay; - uint8_t buf_lock; - camera_fb_t *buf; - char part_buf[64]; - size_t part_len; - httpd_handle_t server; + QueueHandle_t clients; + TaskHandle_t task; + uint64_t last_updated; + int64_t frame_delay; + uint8_t buf_lock; + camera_fb_t *buf; + char part_buf[64]; + size_t part_len; + httpd_handle_t server; + size_t num_clients; } cam_streamer_t; void cam_streamer_init(cam_streamer_t *s, httpd_handle_t server, uint16_t fps); @@ -29,5 +30,7 @@ void cam_streamer_task(void *p); void cam_streamer_start(cam_streamer_t *s); void cam_streamer_stop(cam_streamer_t *s); bool cam_streamer_enqueue_client(cam_streamer_t *s, int fd); +size_t cam_streamer_get_num_clients(cam_streamer_t *s); +void cam_streamer_dequeue_all_clients(cam_streamer_t *s); #endif diff --git a/esp32-cam-webserver.ino b/esp32-cam-webserver.ino index f03ae5b..a39c013 100644 --- a/esp32-cam-webserver.ino +++ b/esp32-cam-webserver.ino @@ -136,7 +136,6 @@ char httpURL[64] = {"Undefined"}; char streamURL[64] = {"Undefined"}; // Counters for info screens and debug -int8_t streamCount = 0; // Number of currently active streams unsigned long streamsServed = 0; // Total completed streams unsigned long imagesServed = 0; // Total image requests