diff --git a/src/util/http_downloader.cpp b/src/util/http_downloader.cpp index fd2a38936..802be317d 100644 --- a/src/util/http_downloader.cpp +++ b/src/util/http_downloader.cpp @@ -174,7 +174,11 @@ void HTTPDownloader::WaitForAllRequests() { std::unique_lock lock(m_pending_http_request_lock); while (!m_pending_http_requests.empty()) + { + // Don't burn too much CPU. + Common::Timer::NanoSleep(1000000); LockedPollRequests(lock); + } } void HTTPDownloader::LockedAddRequest(Request* request) diff --git a/src/util/http_downloader_curl.cpp b/src/util/http_downloader_curl.cpp index ba77103c2..ecd62f8ff 100644 --- a/src/util/http_downloader_curl.cpp +++ b/src/util/http_downloader_curl.cpp @@ -19,7 +19,11 @@ HTTPDownloaderCurl::HTTPDownloaderCurl() : HTTPDownloader() { } -HTTPDownloaderCurl::~HTTPDownloaderCurl() = default; +HTTPDownloaderCurl::~HTTPDownloaderCurl() +{ + if (m_multi_handle) + curl_multi_cleanup(m_multi_handle); +} std::unique_ptr HTTPDownloader::Create(const char* user_agent) { @@ -54,8 +58,14 @@ bool HTTPDownloaderCurl::Initialize(const char* user_agent) } } + m_multi_handle = curl_multi_init(); + if (!m_multi_handle) + { + Log_ErrorPrint("curl_multi_init() failed"); + return false; + } + m_user_agent = user_agent; - m_thread_pool = std::make_unique(m_max_active_requests); return true; } @@ -70,56 +80,6 @@ size_t HTTPDownloaderCurl::WriteCallback(char* ptr, size_t size, size_t nmemb, v return nmemb; } -void HTTPDownloaderCurl::ProcessRequest(Request* req) -{ - std::unique_lock cancel_lock(m_cancel_mutex); - if (req->closed.load()) - return; - - cancel_lock.unlock(); - - // Apparently OpenSSL can fire SIGPIPE... - sigset_t old_block_mask = {}; - sigset_t new_block_mask = {}; - sigemptyset(&old_block_mask); - sigemptyset(&new_block_mask); - sigaddset(&new_block_mask, SIGPIPE); - if (pthread_sigmask(SIG_BLOCK, &new_block_mask, &old_block_mask) != 0) - Log_WarningPrint("Failed to block SIGPIPE"); - - req->start_time = Common::Timer::GetCurrentValue(); - int ret = curl_easy_perform(req->handle); - if (ret == CURLE_OK) - { - long response_code = 0; - curl_easy_getinfo(req->handle, CURLINFO_RESPONSE_CODE, &response_code); - req->status_code = static_cast(response_code); - - char* content_type = nullptr; - if (!curl_easy_getinfo(req->handle, CURLINFO_CONTENT_TYPE, &content_type) && content_type) - req->content_type = content_type; - - Log_DevPrintf("Request for '%s' returned status code %d and %zu bytes", req->url.c_str(), req->status_code, - req->data.size()); - } - else - { - Log_ErrorPrintf("Request for '%s' returned %d", req->url.c_str(), ret); - } - - curl_easy_cleanup(req->handle); - - if (pthread_sigmask(SIG_UNBLOCK, &new_block_mask, &old_block_mask) != 0) - Log_WarningPrint("Failed to unblock SIGPIPE"); - - cancel_lock.lock(); - req->state = Request::State::Complete; - if (req->closed.load()) - delete req; - else - req->closed.store(true); -} - HTTPDownloader::Request* HTTPDownloaderCurl::InternalCreateRequest() { Request* req = new Request(); @@ -135,7 +95,62 @@ HTTPDownloader::Request* HTTPDownloaderCurl::InternalCreateRequest() void HTTPDownloaderCurl::InternalPollRequests() { - // noop - uses thread pool + // Apparently OpenSSL can fire SIGPIPE... + sigset_t old_block_mask = {}; + sigset_t new_block_mask = {}; + sigemptyset(&old_block_mask); + sigemptyset(&new_block_mask); + sigaddset(&new_block_mask, SIGPIPE); + if (pthread_sigmask(SIG_BLOCK, &new_block_mask, &old_block_mask) != 0) + Log_WarningPrint("Failed to block SIGPIPE"); + + int running_handles; + const CURLMcode err = curl_multi_perform(m_multi_handle, &running_handles); + if (err != CURLM_OK) + Log_ErrorFmt("curl_multi_perform() returned {}", static_cast(err)); + + for (;;) + { + int msgq; + struct CURLMsg* msg = curl_multi_info_read(m_multi_handle, &msgq); + if (!msg) + break; + + if (msg->msg != CURLMSG_DONE) + { + Log_WarningFmt("Unexpected multi message {}", static_cast(msg->msg)); + continue; + } + + Request* req; + if (curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &req) != CURLE_OK) + { + Log_ErrorPrint("curl_easy_getinfo() failed"); + continue; + } + + if (msg->data.result == CURLE_OK) + { + long response_code = 0; + curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, &response_code); + req->status_code = static_cast(response_code); + + char* content_type = nullptr; + if (curl_easy_getinfo(req->handle, CURLINFO_CONTENT_TYPE, &content_type) == CURLE_OK && content_type) + req->content_type = content_type; + + Log_DevFmt("Request for '{}' returned status code {} and {} bytes", req->url, req->status_code, req->data.size()); + } + else + { + Log_ErrorFmt("Request for '{}' returned error {}", req->url, static_cast(msg->data.result)); + } + + req->state.store(Request::State::Complete, std::memory_order_release); + } + + if (pthread_sigmask(SIG_UNBLOCK, &new_block_mask, &old_block_mask) != 0) + Log_WarningPrint("Failed to unblock SIGPIPE"); } bool HTTPDownloaderCurl::StartRequest(HTTPDownloader::Request* request) @@ -146,6 +161,7 @@ bool HTTPDownloaderCurl::StartRequest(HTTPDownloader::Request* request) curl_easy_setopt(req->handle, CURLOPT_WRITEFUNCTION, &HTTPDownloaderCurl::WriteCallback); curl_easy_setopt(req->handle, CURLOPT_WRITEDATA, req); curl_easy_setopt(req->handle, CURLOPT_NOSIGNAL, 1); + curl_easy_setopt(req->handle, CURLOPT_PRIVATE, req); if (request->type == Request::Type::Post) { @@ -154,18 +170,27 @@ bool HTTPDownloaderCurl::StartRequest(HTTPDownloader::Request* request) } Log_DevPrintf("Started HTTP request for '%s'", req->url.c_str()); - req->state = Request::State::Started; + req->state.store(Request::State::Started, std::memory_order_release); req->start_time = Common::Timer::GetCurrentValue(); - m_thread_pool->Schedule(std::bind(&HTTPDownloaderCurl::ProcessRequest, this, req)); + + const CURLMcode err = curl_multi_add_handle(m_multi_handle, req->handle); + if (err != CURLM_OK) + { + Log_ErrorFmt("curl_multi_add_handle() returned {}", static_cast(err)); + req->callback(HTTP_STATUS_ERROR, std::string(), req->data); + curl_easy_cleanup(req->handle); + delete req; + return false; + } + return true; } void HTTPDownloaderCurl::CloseRequest(HTTPDownloader::Request* request) { - std::unique_lock cancel_lock(m_cancel_mutex); Request* req = static_cast(request); - if (req->closed.load()) - delete req; - else - req->closed.store(true); + DebugAssert(req->handle); + curl_multi_remove_handle(m_multi_handle, req->handle); + curl_easy_cleanup(req->handle); + delete req; } diff --git a/src/util/http_downloader_curl.h b/src/util/http_downloader_curl.h index b8333fe9a..bd16e2a19 100644 --- a/src/util/http_downloader_curl.h +++ b/src/util/http_downloader_curl.h @@ -4,8 +4,6 @@ #pragma once #include "http_downloader.h" -#include "common/thirdparty/thread_pool.h" - #include #include #include @@ -29,13 +27,10 @@ private: struct Request : HTTPDownloader::Request { CURL* handle = nullptr; - std::atomic_bool closed{false}; }; static size_t WriteCallback(char* ptr, size_t size, size_t nmemb, void* userdata); - void ProcessRequest(Request* req); + CURLM* m_multi_handle = nullptr; std::string m_user_agent; - std::unique_ptr m_thread_pool; - std::mutex m_cancel_mutex; };