1
0
Fork 0
mirror of https://github.com/Atmosphere-NX/Atmosphere.git synced 2025-01-24 10:03:56 +00:00

htc: Implement RpcClient::ReceiveThread + SendThread

This commit is contained in:
Michael Scire 2021-02-10 02:05:45 -08:00 committed by SciresM
parent d60b1abed0
commit 3be005b638
6 changed files with 157 additions and 10 deletions

View file

@ -61,7 +61,7 @@ namespace ams::htc::server::rpc {
s64 body_size; s64 body_size;
u32 task_id; u32 task_id;
u64 params[5]; u64 params[5];
u8 data[]; char data[];
}; };
static_assert(sizeof(HtcmiscRpcPacket) == 0x40); static_assert(sizeof(HtcmiscRpcPacket) == 0x40);

View file

@ -154,12 +154,145 @@ namespace ams::htc::server::rpc {
} }
} }
void RpcClient::ReceiveThread() { Result RpcClient::ReceiveThread() {
AMS_ABORT("RpcClient::ReceiveThread"); /* Loop forever. */
auto *header = reinterpret_cast<RpcPacket *>(m_receive_buffer);
while (true) {
/* Try to receive a packet header. */
R_TRY(this->ReceiveHeader(header));
/* Track how much we've received. */
size_t received = sizeof(*header);
/* If the packet has one, receive its body. */
if (header->body_size > 0) {
/* Sanity check the task id. */
AMS_ABORT_UNLESS(header->task_id < static_cast<int>(MaxRpcCount));
/* Sanity check the body size. */
AMS_ABORT_UNLESS(util::IsIntValueRepresentable<size_t>(header->body_size));
AMS_ABORT_UNLESS(static_cast<size_t>(header->body_size) <= sizeof(m_receive_buffer) - received);
/* Receive the body. */
R_TRY(this->ReceiveBody(header->data, header->body_size));
/* Note that we received the body. */
received += header->body_size;
}
/* Acquire exclusive access to the task tables. */
std::scoped_lock lk(m_mutex);
/* Get the specified task. */
Task *task = m_task_table.Get<Task>(header->task_id);
R_UNLESS(task != nullptr, htc::ResultInvalidTaskId());
/* If the task is canceled, free it. */
if (task->GetTaskState() == RpcTaskState::Cancelled) {
m_task_active[header->task_id] = false;
m_task_table.Delete(header->task_id);
m_task_id_free_list.Free(header->task_id);
continue;
}
/* Handle the packet. */
switch (header->category) {
case PacketCategory::Response:
R_TRY(task->ProcessResponse(m_receive_buffer, received));
break;
case PacketCategory::Notification:
R_TRY(task->ProcessNotification(m_receive_buffer, received));
break;
default:
return htc::ResultInvalidCategory();
}
/* If we used the receive buffer, signal that we're done with it. */
if (task->IsReceiveBufferRequired()) {
os::SignalEvent(std::addressof(m_receive_buffer_available_events[header->task_id]));
}
}
} }
void RpcClient::SendThread() { Result RpcClient::ReceiveHeader(RpcPacket *header) {
AMS_ABORT("RpcClient::SendThread"); /* Receive. */
s64 received;
R_TRY(m_driver->Receive(std::addressof(received), reinterpret_cast<char *>(header), sizeof(*header), m_channel_id, htclow::ReceiveOption_ReceiveAllData));
/* Check size. */
R_UNLESS(static_cast<size_t>(received) == sizeof(*header), htc::ResultInvalidSize());
return ResultSuccess();
}
Result RpcClient::ReceiveBody(char *dst, size_t size) {
/* Receive. */
s64 received;
R_TRY(m_driver->Receive(std::addressof(received), dst, size, m_channel_id, htclow::ReceiveOption_ReceiveAllData));
/* Check size. */
R_UNLESS(static_cast<size_t>(received) == size, htc::ResultInvalidSize());
return ResultSuccess();
}
Result RpcClient::SendThread() {
while (true) {
/* Get a task. */
Task *task;
u32 task_id;
PacketCategory category;
do {
/* Dequeue a task. */
R_TRY(m_task_queue.Take(std::addressof(task_id), std::addressof(category)));
/* Get the task from the table. */
std::scoped_lock lk(m_mutex);
task = m_task_table.Get<Task>(task_id);
} while (task == nullptr);
/* If required, wait for the send buffer to become available. */
if (task->IsSendBufferRequired()) {
os::WaitEvent(std::addressof(m_send_buffer_available_events[task_id]));
/* Check if we've been cancelled. */
if (m_cancelled) {
break;
}
}
/* Handle the task. */
size_t packet_size;
switch (category) {
case PacketCategory::Request:
R_TRY(task->CreateRequest(std::addressof(packet_size), m_send_buffer, sizeof(m_send_buffer), task_id));
break;
case PacketCategory::Notification:
R_TRY(task->CreateNotification(std::addressof(packet_size), m_send_buffer, sizeof(m_send_buffer), task_id));
break;
AMS_UNREACHABLE_DEFAULT_CASE();
}
/* Send the request. */
R_TRY(this->SendRequest(m_send_buffer, packet_size));
}
return htc::ResultCancelled();
}
Result RpcClient::SendRequest(const char *src, size_t size) {
/* Sanity check our size. */
AMS_ASSERT(util::IsIntValueRepresentable<s64>(size));
/* Send the data. */
s64 sent;
R_TRY(m_driver->Send(std::addressof(sent), src, static_cast<s64>(size), m_channel_id));
/* Check that we sent the right amount. */
R_UNLESS(sent == static_cast<s64>(size), htc::ResultInvalidSize());
return ResultSuccess();
} }
} }

View file

@ -43,14 +43,14 @@ namespace ams::htc::server::rpc {
bool m_thread_running; bool m_thread_running;
os::EventType m_receive_buffer_available_events[MaxRpcCount]; os::EventType m_receive_buffer_available_events[MaxRpcCount];
os::EventType m_send_buffer_available_events[MaxRpcCount]; os::EventType m_send_buffer_available_events[MaxRpcCount];
u8 m_receive_buffer[BufferSize]; char m_receive_buffer[BufferSize];
u8 m_send_buffer[BufferSize]; char m_send_buffer[BufferSize];
private: private:
static void ReceiveThreadEntry(void *arg) { static_cast<RpcClient *>(arg)->ReceiveThread(); } static void ReceiveThreadEntry(void *arg) { static_cast<RpcClient *>(arg)->ReceiveThread(); }
static void SendThreadEntry(void *arg) { static_cast<RpcClient *>(arg)->SendThread(); } static void SendThreadEntry(void *arg) { static_cast<RpcClient *>(arg)->SendThread(); }
void ReceiveThread(); Result ReceiveThread();
void SendThread(); Result SendThread();
public: public:
RpcClient(driver::IDriver *driver, htclow::ChannelId channel); RpcClient(driver::IDriver *driver, htclow::ChannelId channel);
public: public:
@ -62,6 +62,10 @@ namespace ams::htc::server::rpc {
void Wait(); void Wait();
int WaitAny(htclow::ChannelState state, os::EventType *event); int WaitAny(htclow::ChannelState state, os::EventType *event);
private:
Result ReceiveHeader(RpcPacket *header);
Result ReceiveBody(char *dst, size_t size);
Result SendRequest(const char *src, size_t size);
}; };
} }

View file

@ -104,6 +104,12 @@ namespace ams::htc::server::rpc {
/* Mark the task as invalid. */ /* Mark the task as invalid. */
m_valid[index] = false; m_valid[index] = false;
} }
void Delete(u32 index) {
if (this->IsValid(index)) {
this->Delete<Task>(index);
}
}
}; };
} }

View file

@ -34,7 +34,7 @@ namespace ams::htc::server::rpc {
s64 body_size; s64 body_size;
u32 task_id; u32 task_id;
u64 params[5]; u64 params[5];
u8 data[]; char data[];
}; };
static_assert(sizeof(RpcPacket) == 0x40); static_assert(sizeof(RpcPacket) == 0x40);

View file

@ -28,6 +28,10 @@ namespace ams::htc {
R_DEFINE_ERROR_RESULT(Unknown, 1023); R_DEFINE_ERROR_RESULT(Unknown, 1023);
R_DEFINE_ERROR_RESULT(InvalidTaskId, 2003);
R_DEFINE_ERROR_RESULT(InvalidSize, 2011);
R_DEFINE_ERROR_RESULT(OutOfRpcTask, 2102); R_DEFINE_ERROR_RESULT(OutOfRpcTask, 2102);
R_DEFINE_ERROR_RESULT(InvalidCategory, 2123);
} }