[zion] Move IPC objects to share code

This commit is contained in:
Drew Galbraith 2023-06-21 23:42:21 -07:00
parent 58df2c0ed2
commit 9dd457391c
14 changed files with 125 additions and 196 deletions

View file

@ -5,44 +5,9 @@
glcr::Pair<glcr::RefPtr<Channel>, glcr::RefPtr<Channel>>
Channel::CreateChannelPair() {
auto c1 = glcr::MakeRefCounted<Channel>();
auto c2 = glcr::MakeRefCounted<Channel>();
auto c1 = glcr::AdoptPtr(new Channel);
auto c2 = glcr::AdoptPtr(new Channel);
c1->SetPeer(c2);
c2->SetPeer(c1);
return {c1, c2};
}
z_err_t Channel::Write(uint64_t num_bytes, const void* bytes, uint64_t num_caps,
const z_cap_t* caps) {
return peer_->WriteInternal(num_bytes, bytes, num_caps, caps);
}
z_err_t Channel::Read(uint64_t* num_bytes, void* bytes, uint64_t* num_caps,
z_cap_t* caps) {
mutex_.Lock();
while (message_queue_.empty()) {
auto thread = gScheduler->CurrentThread();
thread->SetState(Thread::BLOCKED);
blocked_threads_.PushBack(thread);
mutex_.Unlock();
gScheduler->Yield();
mutex_.Lock();
}
mutex_.Unlock();
MutexHolder lock(mutex_);
return message_queue_.PopFront(num_bytes, bytes, num_caps, caps);
}
z_err_t Channel::WriteInternal(uint64_t num_bytes, const void* bytes,
uint64_t num_caps, const z_cap_t* caps) {
MutexHolder lock(mutex_);
RET_ERR(message_queue_.PushBack(num_bytes, bytes, num_caps, caps));
if (blocked_threads_.size() > 0) {
auto thread = blocked_threads_.PopFront();
thread->SetState(Thread::RUNNABLE);
gScheduler->Enqueue(thread);
}
return glcr::OK;
}

View file

@ -1,6 +1,5 @@
#pragma once
#include <glacier/container/intrusive_list.h>
#include <glacier/container/pair.h>
#include <glacier/memory/ref_ptr.h>
@ -8,6 +7,7 @@
#include "include/ztypes.h"
#include "lib/message_queue.h"
#include "lib/mutex.h"
#include "object/ipc_object.h"
#include "object/kernel_object.h"
#include "usr/zcall_internal.h"
@ -18,7 +18,7 @@ struct KernelObjectTag<Channel> {
static const uint64_t type = KernelObject::CHANNEL;
};
class Channel : public KernelObject {
class Channel : public IpcObject {
public:
uint64_t TypeTag() override { return KernelObject::CHANNEL; }
static glcr::Pair<glcr::RefPtr<Channel>, glcr::RefPtr<Channel>>
@ -26,25 +26,20 @@ class Channel : public KernelObject {
glcr::RefPtr<Channel> peer() { return peer_; }
z_err_t Write(uint64_t num_bytes, const void* bytes, uint64_t num_caps,
const z_cap_t* caps);
z_err_t Read(uint64_t* num_bytes, void* bytes, uint64_t* num_caps,
z_cap_t* caps);
virtual MessageQueue& GetSendMessageQueue() override {
return peer_->message_queue_;
}
virtual MessageQueue& GetRecvMessageQueue() override {
return message_queue_;
}
private:
// FIXME: We will likely never close the channel based on this
// circular dependency.
glcr::RefPtr<Channel> peer_{nullptr};
Mutex mutex_{"channel"};
UnboundedMessageQueue message_queue_;
glcr::IntrusiveList<Thread> blocked_threads_;
friend class glcr::MakeRefCountedFriend<Channel>;
Channel() {}
void SetPeer(const glcr::RefPtr<Channel>& peer) { peer_ = peer; }
z_err_t WriteInternal(uint64_t num_bytes, const void* bytes,
uint64_t num_caps, const z_cap_t* caps);
};

View file

@ -5,39 +5,3 @@
glcr::RefPtr<Endpoint> Endpoint::Create() {
return glcr::AdoptPtr(new Endpoint);
}
glcr::ErrorCode Endpoint::Write(uint64_t num_bytes, const void* data,
z_cap_t reply_port_cap) {
MutexHolder h(mutex_);
RET_ERR(message_queue_.PushBack(num_bytes, data, 1, &reply_port_cap));
if (blocked_threads_.size() > 0) {
auto thread = blocked_threads_.PopFront();
thread->SetState(Thread::RUNNABLE);
gScheduler->Enqueue(thread);
}
return glcr::OK;
}
glcr::ErrorCode Endpoint::Read(uint64_t* num_bytes, void* data,
z_cap_t* reply_port_cap) {
mutex_.Lock();
while (message_queue_.empty()) {
auto thread = gScheduler->CurrentThread();
thread->SetState(Thread::BLOCKED);
mutex_.Unlock();
gScheduler->Yield();
mutex_.Lock();
}
mutex_.Unlock();
MutexHolder h(mutex_);
uint64_t num_caps = 1;
RET_ERR(message_queue_.PopFront(num_bytes, data, &num_caps, reply_port_cap));
if (num_caps != 1) {
return glcr::INTERNAL;
}
return glcr::OK;
}

View file

@ -6,6 +6,7 @@
#include "lib/message_queue.h"
#include "lib/mutex.h"
#include "object/ipc_object.h"
#include "object/kernel_object.h"
class Endpoint;
@ -16,22 +17,23 @@ struct KernelObjectTag<Endpoint> {
static const uint64_t type = KernelObject::ENDPOINT;
};
class Endpoint : public KernelObject {
class Endpoint : public IpcObject {
public:
uint64_t TypeTag() override { return KernelObject::ENDPOINT; }
static glcr::RefPtr<Endpoint> Create();
glcr::ErrorCode Write(uint64_t num_bytes, const void* data,
z_cap_t reply_port_cap);
glcr::ErrorCode Read(uint64_t* num_bytes, void* data,
z_cap_t* reply_port_cap);
private:
Mutex mutex_{"endpoint"};
UnboundedMessageQueue message_queue_;
virtual MessageQueue& GetSendMessageQueue() override {
return message_queue_;
}
virtual MessageQueue& GetRecvMessageQueue() override {
return message_queue_;
}
glcr::IntrusiveList<Thread> blocked_threads_;
private:
UnboundedMessageQueue message_queue_;
Endpoint() {}
};

View file

@ -0,0 +1,35 @@
#include "object/ipc_object.h"
#include "scheduler/scheduler.h"
glcr::ErrorCode IpcObject::Send(uint64_t num_bytes, const void* bytes,
uint64_t num_caps, const z_cap_t* caps) {
auto& message_queue = GetSendMessageQueue();
MutexHolder lock(mutex_);
RET_ERR(message_queue.PushBack(num_bytes, bytes, num_caps, caps));
if (blocked_threads_.size() > 0) {
auto thread = blocked_threads_.PopFront();
thread->SetState(Thread::RUNNABLE);
gScheduler->Enqueue(thread);
}
return glcr::OK;
}
glcr::ErrorCode IpcObject::Recv(uint64_t* num_bytes, void* bytes,
uint64_t* num_caps, z_cap_t* caps) {
auto& message_queue = GetRecvMessageQueue();
mutex_.Lock();
while (message_queue.empty()) {
auto thread = gScheduler->CurrentThread();
thread->SetState(Thread::BLOCKED);
blocked_threads_.PushBack(thread);
mutex_.Unlock();
gScheduler->Yield();
mutex_.Lock();
}
mutex_.Unlock();
MutexHolder lock(mutex_);
return message_queue.PopFront(num_bytes, bytes, num_caps, caps);
}

33
zion/object/ipc_object.h Normal file
View file

@ -0,0 +1,33 @@
#pragma once
#include <glacier/container/intrusive_list.h>
#include <glacier/status/error.h>
#include "include/ztypes.h"
#include "lib/message_queue.h"
#include "lib/mutex.h"
#include "object/kernel_object.h"
class IpcObject : public KernelObject {
public:
IpcObject(){};
virtual ~IpcObject() {}
virtual glcr::ErrorCode Send(uint64_t num_bytes, const void* bytes,
uint64_t num_caps, const z_cap_t* caps) final;
virtual glcr::ErrorCode Recv(uint64_t* num_bytes, void* bytes,
uint64_t* num_caps, z_cap_t* caps) final;
bool HasMessages() {
MutexHolder h(mutex_);
return !GetRecvMessageQueue().empty();
}
virtual MessageQueue& GetSendMessageQueue() = 0;
virtual MessageQueue& GetRecvMessageQueue() = 0;
protected:
// FIXME: move locking and blocked threads to the message queue itself.
Mutex mutex_{"ipc"};
glcr::IntrusiveList<Thread> blocked_threads_;
};

View file

@ -2,43 +2,7 @@
#include "scheduler/scheduler.h"
Port::Port() {}
z_err_t Port::Write(uint64_t num_bytes, const void* bytes, uint64_t num_caps,
const z_cap_t* caps) {
MutexHolder h(mutex_);
RET_ERR(message_queue_.PushBack(num_bytes, bytes, num_caps, caps));
if (blocked_threads_.size() > 0) {
auto thread = blocked_threads_.PopFront();
thread->SetState(Thread::RUNNABLE);
gScheduler->Enqueue(thread);
}
return glcr::OK;
}
z_err_t Port::Read(uint64_t* num_bytes, void* bytes, uint64_t* num_caps,
z_cap_t* caps) {
mutex_.Lock();
while (message_queue_.empty()) {
auto thread = gScheduler->CurrentThread();
thread->SetState(Thread::BLOCKED);
blocked_threads_.PushBack(thread);
mutex_.Unlock();
gScheduler->Yield();
mutex_.Lock();
}
mutex_.Unlock();
MutexHolder lock(mutex_);
return message_queue_.PopFront(num_bytes, bytes, num_caps, caps);
}
void Port::WriteKernel(uint64_t init, glcr::RefPtr<Capability> cap) {
MutexHolder h(mutex_);
message_queue_.WriteKernel(init, cap);
}
bool Port::HasMessages() {
MutexHolder h(mutex_);
return !message_queue_.empty();
}

View file

@ -6,6 +6,7 @@
#include "capability/capability.h"
#include "lib/message_queue.h"
#include "lib/mutex.h"
#include "object/ipc_object.h"
#include "object/kernel_object.h"
#include "object/thread.h"
#include "usr/zcall_internal.h"
@ -17,24 +18,21 @@ struct KernelObjectTag<Port> {
static const uint64_t type = KernelObject::PORT;
};
class Port : public KernelObject {
class Port : public IpcObject {
public:
uint64_t TypeTag() override { return KernelObject::PORT; }
Port();
z_err_t Write(uint64_t num_bytes, const void* bytes, uint64_t num_caps,
const z_cap_t* caps);
z_err_t Read(uint64_t* num_bytes, void* bytes, uint64_t* num_caps,
z_cap_t* caps);
Port() = default;
void WriteKernel(uint64_t init, glcr::RefPtr<Capability> cap);
bool HasMessages();
virtual MessageQueue& GetSendMessageQueue() override {
return message_queue_;
}
virtual MessageQueue& GetRecvMessageQueue() override {
return message_queue_;
}
private:
UnboundedMessageQueue message_queue_;
glcr::IntrusiveList<Thread> blocked_threads_;
Mutex mutex_{"Port"};
};

View file

@ -5,37 +5,3 @@
glcr::RefPtr<ReplyPort> ReplyPort::Create() {
return glcr::AdoptPtr(new ReplyPort);
}
uint64_t ReplyPort::Write(uint64_t num_bytes, const void* data,
uint64_t num_caps, uint64_t* caps) {
MutexHolder h(mutex_);
RET_ERR(message_holder_.PushBack(num_bytes, data, num_caps, caps));
if (blocked_thread_) {
// FIXME: We need to handle the case where the blocked thread has died I
// think.
blocked_thread_->SetState(Thread::RUNNABLE);
gScheduler->Enqueue(blocked_thread_);
blocked_thread_ = nullptr;
}
return glcr::OK;
}
uint64_t ReplyPort::Read(uint64_t* num_bytes, void* data, uint64_t* num_caps,
uint64_t* caps) {
mutex_.Lock();
if (message_holder_.empty()) {
// Multiple threads can't block on a reply port.
if (blocked_thread_) {
mutex_.Unlock();
return glcr::FAILED_PRECONDITION;
}
blocked_thread_ = gScheduler->CurrentThread();
blocked_thread_->SetState(Thread::BLOCKED);
mutex_.Unlock();
gScheduler->Yield();
mutex_.Lock();
}
return message_holder_.PopFront(num_bytes, data, num_caps, caps);
}

View file

@ -4,6 +4,7 @@
#include "lib/message_queue.h"
#include "lib/mutex.h"
#include "object/ipc_object.h"
#include "object/kernel_object.h"
class ReplyPort;
@ -13,21 +14,20 @@ struct KernelObjectTag<ReplyPort> {
static const uint64_t type = KernelObject::REPLY_PORT;
};
class ReplyPort : public KernelObject {
class ReplyPort : public IpcObject {
public:
uint64_t TypeTag() override { return KernelObject::REPLY_PORT; }
static glcr::RefPtr<ReplyPort> Create();
uint64_t Write(uint64_t num_bytes, const void* data, uint64_t num_caps,
uint64_t* caps);
uint64_t Read(uint64_t* num_bytes, void* data, uint64_t* num_caps,
uint64_t* caps);
virtual MessageQueue& GetSendMessageQueue() override {
return message_holder_;
}
virtual MessageQueue& GetRecvMessageQueue() override {
return message_holder_;
}
private:
Mutex mutex_{"reply_port"};
SingleMessageQueue message_holder_;
glcr::RefPtr<Thread> blocked_thread_;
ReplyPort() {}
};