From 658defdbc304ad096ab7ebcc6d762ef55b93ed9e Mon Sep 17 00:00:00 2001 From: Jonas 'Sortie' Termansen Date: Wed, 24 Apr 2013 17:32:36 +0200 Subject: [PATCH] Refactor pipe implementation. --- sortix/include/sortix/kernel/pipe.h | 51 ++++++++++ sortix/pipe.cpp | 151 ++++++++++++++++------------ 2 files changed, 139 insertions(+), 63 deletions(-) create mode 100644 sortix/include/sortix/kernel/pipe.h diff --git a/sortix/include/sortix/kernel/pipe.h b/sortix/include/sortix/kernel/pipe.h new file mode 100644 index 00000000..f7b0e387 --- /dev/null +++ b/sortix/include/sortix/kernel/pipe.h @@ -0,0 +1,51 @@ +/******************************************************************************* + + Copyright(C) Jonas 'Sortie' Termansen 2011, 2012, 2013. + + This file is part of Sortix. + + Sortix is free software: you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation, either version 3 of the License, or (at your option) any later + version. + + Sortix is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + details. + + You should have received a copy of the GNU General Public License along with + Sortix. If not, see . + + sortix/kernel/pipe.h + Embeddedable one-way data stream. + +*******************************************************************************/ + +#ifndef INCLUDE_SORTIX_KERNEL_PIPE_H +#define INCLUDE_SORTIX_KERNEL_PIPE_H + +namespace Sortix { + +class PipeChannel; + +class PipeEndpoint +{ +public: + PipeEndpoint(); + ~PipeEndpoint(); + bool Connect(PipeEndpoint* destination); + void Disconnect(); + ssize_t read(ioctx_t* ctx, uint8_t* buf, size_t count); + ssize_t write(ioctx_t* ctx, const uint8_t* buf, size_t count); + int poll(ioctx_t* ctx, PollNode* node); + +private: + PipeChannel* channel; + bool reading; + +}; + +} // namespace Sortix + +#endif diff --git a/sortix/pipe.cpp b/sortix/pipe.cpp index 715d79e4..f075e5bd 100644 --- a/sortix/pipe.cpp +++ b/sortix/pipe.cpp @@ -41,8 +41,9 @@ #include #include #include -#include #include +#include +#include #include #include @@ -63,8 +64,6 @@ class PipeChannel public: PipeChannel(uint8_t* buffer, size_t buffersize); ~PipeChannel(); - void StartReading(); - void StartWriting(); void CloseReading(); void CloseWriting(); void PerhapsShutdown(); @@ -97,7 +96,7 @@ PipeChannel::PipeChannel(uint8_t* buffer, size_t buffersize) this->buffer = buffer; this->buffersize = buffersize; bufferoffset = bufferused = 0; - anyreading = anywriting = false; + anyreading = anywriting = true; } PipeChannel::~PipeChannel() @@ -105,20 +104,6 @@ PipeChannel::~PipeChannel() delete[] buffer; } -void PipeChannel::StartReading() -{ - ScopedLock lock(&pipelock); - assert(!anyreading); - anyreading = true; -} - -void PipeChannel::StartWriting() -{ - ScopedLock lock(&pipelock); - assert(!anywriting); - anywriting = true; -} - void PipeChannel::CloseReading() { anyreading = false; @@ -231,48 +216,44 @@ int PipeChannel::poll(ioctx_t* /*ctx*/, PollNode* node) return errno = EAGAIN, -1; } -class PipeEndpoint : public AbstractInode +PipeEndpoint::PipeEndpoint() { -public: - PipeEndpoint(dev_t dev, uid_t owner, gid_t group, mode_t mode, - PipeChannel* channel, bool reading); - ~PipeEndpoint(); - virtual ssize_t read(ioctx_t* ctx, uint8_t* buf, size_t count); - virtual ssize_t write(ioctx_t* ctx, const uint8_t* buf, size_t count); - virtual int poll(ioctx_t* ctx, PollNode* node); - -private: - kthread_mutex_t pipelock; - PipeChannel* channel; - bool reading; - -}; - -PipeEndpoint::PipeEndpoint(dev_t dev, uid_t owner, gid_t group, mode_t mode, - PipeChannel* channel, bool reading) -{ - inode_type = INODE_TYPE_STREAM; - this->dev = dev; - this->ino = (ino_t) this; - this->channel = channel; - this->reading = reading; - if ( reading ) - channel->StartReading(); - else - channel->StartWriting(); - pipelock = KTHREAD_MUTEX_INITIALIZER; - this->stat_uid = owner; - this->stat_gid = group; - this->type = S_IFCHR; - this->stat_mode = (mode & S_SETABLE) | this->type; + channel = NULL; + reading = false; } PipeEndpoint::~PipeEndpoint() { + if ( channel ) + Disconnect(); +} + +bool PipeEndpoint::Connect(PipeEndpoint* destination) +{ + assert(!channel); + assert(!destination->channel); + const size_t BUFFER_SIZE = 4096; + size_t size = BUFFER_SIZE; + uint8_t* buffer = new uint8_t[size]; + if ( !buffer ) + return false; + destination->reading = !(reading = false); + if ( !(destination->channel = channel = new PipeChannel(buffer, size)) ) + { + delete[] buffer; + return false; + } + return true; +} + +void PipeEndpoint::Disconnect() +{ + assert(channel); if ( reading ) channel->CloseReading(); else channel->CloseWriting(); + reading = false; } ssize_t PipeEndpoint::read(ioctx_t* ctx, uint8_t* buf, size_t count) @@ -292,9 +273,57 @@ int PipeEndpoint::poll(ioctx_t* ctx, PollNode* node) return channel->poll(ctx, node); } -namespace Pipe { +class PipeNode : public AbstractInode +{ +public: + PipeNode(dev_t dev, uid_t owner, gid_t group, mode_t mode); + ~PipeNode(); + bool Connect(PipeNode* destination); + virtual ssize_t read(ioctx_t* ctx, uint8_t* buf, size_t count); + virtual ssize_t write(ioctx_t* ctx, const uint8_t* buf, size_t count); + virtual int poll(ioctx_t* ctx, PollNode* node); -const size_t BUFFER_SIZE = 4096UL; +private: + PipeEndpoint endpoint; + +}; + +bool PipeNode::Connect(PipeNode* destination) +{ + return endpoint.Connect(&destination->endpoint); +} + +PipeNode::PipeNode(dev_t dev, uid_t owner, gid_t group, mode_t mode) +{ + inode_type = INODE_TYPE_STREAM; + this->dev = dev; + this->ino = (ino_t) this; + this->stat_uid = owner; + this->stat_gid = group; + this->type = S_IFCHR; + this->stat_mode = (mode & S_SETABLE) | this->type; +} + +PipeNode::~PipeNode() +{ +} + +ssize_t PipeNode::read(ioctx_t* ctx, uint8_t* buf, size_t count) +{ + return endpoint.read(ctx, buf, count); +} + +ssize_t PipeNode::write(ioctx_t* ctx, const uint8_t* buf, size_t count) +{ + return endpoint.write(ctx, buf, count); +} + +int PipeNode::poll(ioctx_t* ctx, PollNode* node) +{ + return endpoint.poll(ctx, node); +} + +namespace Pipe { static int sys_pipe(int pipefd[2]) { @@ -303,18 +332,14 @@ static int sys_pipe(int pipefd[2]) uid_t gid = process->gid; mode_t mode = 0600; - size_t buffersize = BUFFER_SIZE; - uint8_t* buffer = new uint8_t[buffersize]; - if ( !buffer ) return -1; - - PipeChannel* channel = new PipeChannel(buffer, buffersize); - if ( !channel ) { delete[] buffer; return -1; } - - Ref recv_inode(new PipeEndpoint(0, uid, gid, mode, channel, true)); - if ( !recv_inode ) { delete channel; return -1; } - Ref send_inode(new PipeEndpoint(0, uid, gid, mode, channel, false)); + Ref recv_inode(new PipeNode(0, uid, gid, mode)); + if ( !recv_inode ) return -1; + Ref send_inode(new PipeNode(0, uid, gid, mode)); if ( !send_inode ) return -1; + if ( !send_inode->Connect(recv_inode.Get()) ) + return -1; + Ref recv_vnode(new Vnode(recv_inode, Ref(NULL), 0, 0)); Ref send_vnode(new Vnode(send_inode, Ref(NULL), 0, 0)); if ( !recv_vnode || !send_vnode ) return -1;