From 983f2b70741f17f30fe2321451f10cabecc013d2 Mon Sep 17 00:00:00 2001 From: Liam Date: Sun, 16 Oct 2022 01:53:56 -0400 Subject: kernel: invert session request handling flow --- src/core/hle/kernel/service_thread.cpp | 232 ++++++++++++++++++++++++--------- 1 file changed, 167 insertions(+), 65 deletions(-) (limited to 'src/core/hle/kernel/service_thread.cpp') diff --git a/src/core/hle/kernel/service_thread.cpp b/src/core/hle/kernel/service_thread.cpp index d23d76706..1fc2edf52 100644 --- a/src/core/hle/kernel/service_thread.cpp +++ b/src/core/hle/kernel/service_thread.cpp @@ -1,15 +1,17 @@ -// SPDX-FileCopyrightText: Copyright 2020 yuzu Emulator Project +// SPDX-FileCopyrightText: Copyright 2022 yuzu Emulator Project // SPDX-License-Identifier: GPL-2.0-or-later -#include #include #include #include #include -#include #include "common/scope_exit.h" #include "common/thread.h" +#include "core/hle/ipc_helpers.h" +#include "core/hle/kernel/hle_ipc.h" +#include "core/hle/kernel/k_event.h" +#include "core/hle/kernel/k_scoped_resource_reservation.h" #include "core/hle/kernel/k_session.h" #include "core/hle/kernel/k_thread.h" #include "core/hle/kernel/kernel.h" @@ -19,101 +21,201 @@ namespace Kernel { class ServiceThread::Impl final { public: - explicit Impl(KernelCore& kernel, std::size_t num_threads, const std::string& name); + explicit Impl(KernelCore& kernel, const std::string& service_name); ~Impl(); - void QueueSyncRequest(KSession& session, std::shared_ptr&& context); + void WaitAndProcessImpl(); + void SessionClosed(KServerSession* server_session, + std::shared_ptr manager); + void LoopProcess(); + + void RegisterServerSession(KServerSession* session, + std::shared_ptr manager); private: - std::vector threads; - std::queue> requests; - std::mutex queue_mutex; - std::condition_variable_any condition; - const std::string service_name; + KernelCore& kernel; + + std::jthread m_thread; + std::mutex m_session_mutex; + std::vector m_sessions; + std::vector> m_managers; + KEvent* m_wakeup_event; + KProcess* m_process; + std::atomic m_shutdown_requested; + const std::string m_service_name; }; -ServiceThread::Impl::Impl(KernelCore& kernel, std::size_t num_threads, const std::string& name) - : service_name{name} { - for (std::size_t i = 0; i < num_threads; ++i) { - threads.emplace_back([this, &kernel](std::stop_token stop_token) { - Common::SetCurrentThreadName(std::string{service_name}.c_str()); +void ServiceThread::Impl::WaitAndProcessImpl() { + // Create local list of waitable sessions. + std::vector objs; + std::vector> managers; - // Wait for first request before trying to acquire a render context - { - std::unique_lock lock{queue_mutex}; - condition.wait(lock, stop_token, [this] { return !requests.empty(); }); - } + { + // Lock to get the list. + std::scoped_lock lk{m_session_mutex}; - if (stop_token.stop_requested()) { - return; - } + // Resize to the needed quantity. + objs.resize(m_sessions.size() + 1); + managers.resize(m_managers.size()); - // Allocate a dummy guest thread for this host thread. - kernel.RegisterHostThread(); + // Copy to our local list. + std::copy(m_sessions.begin(), m_sessions.end(), objs.begin()); + std::copy(m_managers.begin(), m_managers.end(), managers.begin()); - while (true) { - std::function task; + // Insert the wakeup event at the end. + objs.back() = &m_wakeup_event->GetReadableEvent(); + } - { - std::unique_lock lock{queue_mutex}; - condition.wait(lock, stop_token, [this] { return !requests.empty(); }); + // Wait on the list of sessions. + s32 index{-1}; + Result rc = KSynchronizationObject::Wait(kernel, &index, objs.data(), + static_cast(objs.size()), -1); + ASSERT(!rc.IsFailure()); + + // If this was the wakeup event, clear it and finish. + if (index >= static_cast(objs.size() - 1)) { + m_wakeup_event->Clear(); + return; + } - if (stop_token.stop_requested()) { - return; - } + // This event is from a server session. + auto* server_session = static_cast(objs[index]); + auto& manager = managers[index]; - if (requests.empty()) { - continue; - } + // Fetch the HLE request context. + std::shared_ptr context; + rc = server_session->ReceiveRequest(&context, manager); - task = std::move(requests.front()); - requests.pop(); - } + // If the session was closed, handle that. + if (rc == ResultSessionClosed) { + SessionClosed(server_session, manager); - task(); - } - }); + // Finish. + return; } + + // TODO: handle other cases + ASSERT(rc == ResultSuccess); + + // Perform the request. + Result service_rc = manager->CompleteSyncRequest(server_session, *context); + + // Reply to the client. + rc = server_session->SendReply(true); + + if (rc == ResultSessionClosed || service_rc == IPC::ERR_REMOTE_PROCESS_DEAD) { + SessionClosed(server_session, manager); + return; + } + + // TODO: handle other cases + ASSERT(rc == ResultSuccess); + ASSERT(service_rc == ResultSuccess); } -void ServiceThread::Impl::QueueSyncRequest(KSession& session, - std::shared_ptr&& context) { +void ServiceThread::Impl::SessionClosed(KServerSession* server_session, + std::shared_ptr manager) { { - std::unique_lock lock{queue_mutex}; + // Lock to get the list. + std::scoped_lock lk{m_session_mutex}; + + // Get the index of the session. + const auto index = + std::find(m_sessions.begin(), m_sessions.end(), server_session) - m_sessions.begin(); + ASSERT(index < static_cast(m_sessions.size())); + + // Remove the session and its manager. + m_sessions.erase(m_sessions.begin() + index); + m_managers.erase(m_managers.begin() + index); + } - auto* server_session{&session.GetServerSession()}; + // Close our reference to the server session. + server_session->Close(); +} - // Open a reference to the session to ensure it is not closes while the service request - // completes asynchronously. - server_session->Open(); +void ServiceThread::Impl::LoopProcess() { + Common::SetCurrentThreadName(m_service_name.c_str()); - requests.emplace([server_session, context{std::move(context)}]() { - // Close the reference. - SCOPE_EXIT({ server_session->Close(); }); + kernel.RegisterHostThread(); - // Complete the service request. - server_session->CompleteSyncRequest(*context); - }); + while (!m_shutdown_requested.load()) { + WaitAndProcessImpl(); } - condition.notify_one(); +} + +void ServiceThread::Impl::RegisterServerSession(KServerSession* server_session, + std::shared_ptr manager) { + // Open the server session. + server_session->Open(); + + { + // Lock to get the list. + std::scoped_lock lk{m_session_mutex}; + + // Insert the session and manager. + m_sessions.push_back(server_session); + m_managers.push_back(manager); + } + + // Signal the wakeup event. + m_wakeup_event->Signal(); } ServiceThread::Impl::~Impl() { - condition.notify_all(); - for (auto& thread : threads) { - thread.request_stop(); - thread.join(); + // Shut down the processing thread. + m_shutdown_requested.store(true); + m_wakeup_event->Signal(); + m_thread.join(); + + // Lock mutex. + m_session_mutex.lock(); + + // Close all remaining sessions. + for (size_t i = 0; i < m_sessions.size(); i++) { + m_sessions[i]->Close(); } + + // Close event. + m_wakeup_event->GetReadableEvent().Close(); + m_wakeup_event->Close(); + + // Close process. + m_process->Close(); +} + +ServiceThread::Impl::Impl(KernelCore& kernel_, const std::string& service_name) + : kernel{kernel_}, m_service_name{service_name} { + // Initialize process. + m_process = KProcess::Create(kernel); + KProcess::Initialize(m_process, kernel.System(), service_name, + KProcess::ProcessType::KernelInternal, kernel.GetSystemResourceLimit()); + + // Reserve a new event from the process resource limit + KScopedResourceReservation event_reservation(m_process, LimitableResource::Events); + ASSERT(event_reservation.Succeeded()); + + // Initialize event. + m_wakeup_event = KEvent::Create(kernel); + m_wakeup_event->Initialize(m_process); + + // Commit the event reservation. + event_reservation.Commit(); + + // Register the event. + KEvent::Register(kernel, m_wakeup_event); + + // Start thread. + m_thread = std::jthread([this] { LoopProcess(); }); } -ServiceThread::ServiceThread(KernelCore& kernel, std::size_t num_threads, const std::string& name) - : impl{std::make_unique(kernel, num_threads, name)} {} +ServiceThread::ServiceThread(KernelCore& kernel, const std::string& name) + : impl{std::make_unique(kernel, name)} {} ServiceThread::~ServiceThread() = default; -void ServiceThread::QueueSyncRequest(KSession& session, - std::shared_ptr&& context) { - impl->QueueSyncRequest(session, std::move(context)); +void ServiceThread::RegisterServerSession(KServerSession* session, + std::shared_ptr manager) { + impl->RegisterServerSession(session, manager); } } // namespace Kernel -- cgit v1.2.3 From 7837185f0ab54f76aa57c65676b10179465fe42d Mon Sep 17 00:00:00 2001 From: Liam Date: Mon, 24 Oct 2022 21:23:53 -0400 Subject: service_thread: convert to map for session management --- src/core/hle/kernel/service_thread.cpp | 44 ++++++++++++++++------------------ 1 file changed, 21 insertions(+), 23 deletions(-) (limited to 'src/core/hle/kernel/service_thread.cpp') diff --git a/src/core/hle/kernel/service_thread.cpp b/src/core/hle/kernel/service_thread.cpp index 1fc2edf52..1d8775504 100644 --- a/src/core/hle/kernel/service_thread.cpp +++ b/src/core/hle/kernel/service_thread.cpp @@ -2,6 +2,7 @@ // SPDX-License-Identifier: GPL-2.0-or-later #include +#include #include #include #include @@ -37,8 +38,7 @@ private: std::jthread m_thread; std::mutex m_session_mutex; - std::vector m_sessions; - std::vector> m_managers; + std::map> m_sessions; KEvent* m_wakeup_event; KProcess* m_process; std::atomic m_shutdown_requested; @@ -51,19 +51,21 @@ void ServiceThread::Impl::WaitAndProcessImpl() { std::vector> managers; { - // Lock to get the list. + // Lock to get the set. std::scoped_lock lk{m_session_mutex}; - // Resize to the needed quantity. - objs.resize(m_sessions.size() + 1); - managers.resize(m_managers.size()); + // Reserve the needed quantity. + objs.reserve(m_sessions.size() + 1); + managers.reserve(m_sessions.size()); // Copy to our local list. - std::copy(m_sessions.begin(), m_sessions.end(), objs.begin()); - std::copy(m_managers.begin(), m_managers.end(), managers.begin()); + for (const auto& [session, manager] : m_sessions) { + objs.push_back(session); + managers.push_back(manager); + } // Insert the wakeup event at the end. - objs.back() = &m_wakeup_event->GetReadableEvent(); + objs.push_back(&m_wakeup_event->GetReadableEvent()); } // Wait on the list of sessions. @@ -116,17 +118,11 @@ void ServiceThread::Impl::WaitAndProcessImpl() { void ServiceThread::Impl::SessionClosed(KServerSession* server_session, std::shared_ptr manager) { { - // Lock to get the list. + // Lock to get the set. std::scoped_lock lk{m_session_mutex}; - // Get the index of the session. - const auto index = - std::find(m_sessions.begin(), m_sessions.end(), server_session) - m_sessions.begin(); - ASSERT(index < static_cast(m_sessions.size())); - - // Remove the session and its manager. - m_sessions.erase(m_sessions.begin() + index); - m_managers.erase(m_managers.begin() + index); + // Erase the session. + ASSERT(m_sessions.erase(server_session) == 1); } // Close our reference to the server session. @@ -149,12 +145,11 @@ void ServiceThread::Impl::RegisterServerSession(KServerSession* server_session, server_session->Open(); { - // Lock to get the list. + // Lock to get the set. std::scoped_lock lk{m_session_mutex}; // Insert the session and manager. - m_sessions.push_back(server_session); - m_managers.push_back(manager); + m_sessions[server_session] = manager; } // Signal the wakeup event. @@ -171,10 +166,13 @@ ServiceThread::Impl::~Impl() { m_session_mutex.lock(); // Close all remaining sessions. - for (size_t i = 0; i < m_sessions.size(); i++) { - m_sessions[i]->Close(); + for (const auto& [server_session, manager] : m_sessions) { + server_session->Close(); } + // Destroy remaining managers. + m_sessions.clear(); + // Close event. m_wakeup_event->GetReadableEvent().Close(); m_wakeup_event->Close(); -- cgit v1.2.3 From 7aa91c8d9ceb21e631e52c7ac30e47af2ec5a089 Mon Sep 17 00:00:00 2001 From: Liam Date: Wed, 26 Oct 2022 17:32:14 -0400 Subject: k_server_session: add SendReplyHLE --- src/core/hle/kernel/service_thread.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/core/hle/kernel/service_thread.cpp') diff --git a/src/core/hle/kernel/service_thread.cpp b/src/core/hle/kernel/service_thread.cpp index 1d8775504..c8fe42537 100644 --- a/src/core/hle/kernel/service_thread.cpp +++ b/src/core/hle/kernel/service_thread.cpp @@ -103,7 +103,7 @@ void ServiceThread::Impl::WaitAndProcessImpl() { Result service_rc = manager->CompleteSyncRequest(server_session, *context); // Reply to the client. - rc = server_session->SendReply(true); + rc = server_session->SendReplyHLE(); if (rc == ResultSessionClosed || service_rc == IPC::ERR_REMOTE_PROCESS_DEAD) { SessionClosed(server_session, manager); -- cgit v1.2.3