diff --git a/sortix/include/sortix/kernel/pipe.h b/sortix/include/sortix/kernel/pipe.h
new file mode 100644
index 00000000..f7b0e387
--- /dev/null
+++ b/sortix/include/sortix/kernel/pipe.h
@@ -0,0 +1,51 @@
+/*******************************************************************************
+
+ Copyright(C) Jonas 'Sortie' Termansen 2011, 2012, 2013.
+
+ This file is part of Sortix.
+
+ Sortix is free software: you can redistribute it and/or modify it under the
+ terms of the GNU General Public License as published by the Free Software
+ Foundation, either version 3 of the License, or (at your option) any later
+ version.
+
+ Sortix is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
+ details.
+
+ You should have received a copy of the GNU General Public License along with
+ Sortix. If not, see .
+
+ sortix/kernel/pipe.h
+ Embeddedable one-way data stream.
+
+*******************************************************************************/
+
+#ifndef INCLUDE_SORTIX_KERNEL_PIPE_H
+#define INCLUDE_SORTIX_KERNEL_PIPE_H
+
+namespace Sortix {
+
+class PipeChannel;
+
+class PipeEndpoint
+{
+public:
+ PipeEndpoint();
+ ~PipeEndpoint();
+ bool Connect(PipeEndpoint* destination);
+ void Disconnect();
+ ssize_t read(ioctx_t* ctx, uint8_t* buf, size_t count);
+ ssize_t write(ioctx_t* ctx, const uint8_t* buf, size_t count);
+ int poll(ioctx_t* ctx, PollNode* node);
+
+private:
+ PipeChannel* channel;
+ bool reading;
+
+};
+
+} // namespace Sortix
+
+#endif
diff --git a/sortix/pipe.cpp b/sortix/pipe.cpp
index 715d79e4..f075e5bd 100644
--- a/sortix/pipe.cpp
+++ b/sortix/pipe.cpp
@@ -41,8 +41,9 @@
#include
#include
#include
-#include
#include
+#include
+#include
#include
#include
@@ -63,8 +64,6 @@ class PipeChannel
public:
PipeChannel(uint8_t* buffer, size_t buffersize);
~PipeChannel();
- void StartReading();
- void StartWriting();
void CloseReading();
void CloseWriting();
void PerhapsShutdown();
@@ -97,7 +96,7 @@ PipeChannel::PipeChannel(uint8_t* buffer, size_t buffersize)
this->buffer = buffer;
this->buffersize = buffersize;
bufferoffset = bufferused = 0;
- anyreading = anywriting = false;
+ anyreading = anywriting = true;
}
PipeChannel::~PipeChannel()
@@ -105,20 +104,6 @@ PipeChannel::~PipeChannel()
delete[] buffer;
}
-void PipeChannel::StartReading()
-{
- ScopedLock lock(&pipelock);
- assert(!anyreading);
- anyreading = true;
-}
-
-void PipeChannel::StartWriting()
-{
- ScopedLock lock(&pipelock);
- assert(!anywriting);
- anywriting = true;
-}
-
void PipeChannel::CloseReading()
{
anyreading = false;
@@ -231,48 +216,44 @@ int PipeChannel::poll(ioctx_t* /*ctx*/, PollNode* node)
return errno = EAGAIN, -1;
}
-class PipeEndpoint : public AbstractInode
+PipeEndpoint::PipeEndpoint()
{
-public:
- PipeEndpoint(dev_t dev, uid_t owner, gid_t group, mode_t mode,
- PipeChannel* channel, bool reading);
- ~PipeEndpoint();
- virtual ssize_t read(ioctx_t* ctx, uint8_t* buf, size_t count);
- virtual ssize_t write(ioctx_t* ctx, const uint8_t* buf, size_t count);
- virtual int poll(ioctx_t* ctx, PollNode* node);
-
-private:
- kthread_mutex_t pipelock;
- PipeChannel* channel;
- bool reading;
-
-};
-
-PipeEndpoint::PipeEndpoint(dev_t dev, uid_t owner, gid_t group, mode_t mode,
- PipeChannel* channel, bool reading)
-{
- inode_type = INODE_TYPE_STREAM;
- this->dev = dev;
- this->ino = (ino_t) this;
- this->channel = channel;
- this->reading = reading;
- if ( reading )
- channel->StartReading();
- else
- channel->StartWriting();
- pipelock = KTHREAD_MUTEX_INITIALIZER;
- this->stat_uid = owner;
- this->stat_gid = group;
- this->type = S_IFCHR;
- this->stat_mode = (mode & S_SETABLE) | this->type;
+ channel = NULL;
+ reading = false;
}
PipeEndpoint::~PipeEndpoint()
{
+ if ( channel )
+ Disconnect();
+}
+
+bool PipeEndpoint::Connect(PipeEndpoint* destination)
+{
+ assert(!channel);
+ assert(!destination->channel);
+ const size_t BUFFER_SIZE = 4096;
+ size_t size = BUFFER_SIZE;
+ uint8_t* buffer = new uint8_t[size];
+ if ( !buffer )
+ return false;
+ destination->reading = !(reading = false);
+ if ( !(destination->channel = channel = new PipeChannel(buffer, size)) )
+ {
+ delete[] buffer;
+ return false;
+ }
+ return true;
+}
+
+void PipeEndpoint::Disconnect()
+{
+ assert(channel);
if ( reading )
channel->CloseReading();
else
channel->CloseWriting();
+ reading = false;
}
ssize_t PipeEndpoint::read(ioctx_t* ctx, uint8_t* buf, size_t count)
@@ -292,9 +273,57 @@ int PipeEndpoint::poll(ioctx_t* ctx, PollNode* node)
return channel->poll(ctx, node);
}
-namespace Pipe {
+class PipeNode : public AbstractInode
+{
+public:
+ PipeNode(dev_t dev, uid_t owner, gid_t group, mode_t mode);
+ ~PipeNode();
+ bool Connect(PipeNode* destination);
+ virtual ssize_t read(ioctx_t* ctx, uint8_t* buf, size_t count);
+ virtual ssize_t write(ioctx_t* ctx, const uint8_t* buf, size_t count);
+ virtual int poll(ioctx_t* ctx, PollNode* node);
-const size_t BUFFER_SIZE = 4096UL;
+private:
+ PipeEndpoint endpoint;
+
+};
+
+bool PipeNode::Connect(PipeNode* destination)
+{
+ return endpoint.Connect(&destination->endpoint);
+}
+
+PipeNode::PipeNode(dev_t dev, uid_t owner, gid_t group, mode_t mode)
+{
+ inode_type = INODE_TYPE_STREAM;
+ this->dev = dev;
+ this->ino = (ino_t) this;
+ this->stat_uid = owner;
+ this->stat_gid = group;
+ this->type = S_IFCHR;
+ this->stat_mode = (mode & S_SETABLE) | this->type;
+}
+
+PipeNode::~PipeNode()
+{
+}
+
+ssize_t PipeNode::read(ioctx_t* ctx, uint8_t* buf, size_t count)
+{
+ return endpoint.read(ctx, buf, count);
+}
+
+ssize_t PipeNode::write(ioctx_t* ctx, const uint8_t* buf, size_t count)
+{
+ return endpoint.write(ctx, buf, count);
+}
+
+int PipeNode::poll(ioctx_t* ctx, PollNode* node)
+{
+ return endpoint.poll(ctx, node);
+}
+
+namespace Pipe {
static int sys_pipe(int pipefd[2])
{
@@ -303,18 +332,14 @@ static int sys_pipe(int pipefd[2])
uid_t gid = process->gid;
mode_t mode = 0600;
- size_t buffersize = BUFFER_SIZE;
- uint8_t* buffer = new uint8_t[buffersize];
- if ( !buffer ) return -1;
-
- PipeChannel* channel = new PipeChannel(buffer, buffersize);
- if ( !channel ) { delete[] buffer; return -1; }
-
- Ref recv_inode(new PipeEndpoint(0, uid, gid, mode, channel, true));
- if ( !recv_inode ) { delete channel; return -1; }
- Ref send_inode(new PipeEndpoint(0, uid, gid, mode, channel, false));
+ Ref recv_inode(new PipeNode(0, uid, gid, mode));
+ if ( !recv_inode ) return -1;
+ Ref send_inode(new PipeNode(0, uid, gid, mode));
if ( !send_inode ) return -1;
+ if ( !send_inode->Connect(recv_inode.Get()) )
+ return -1;
+
Ref recv_vnode(new Vnode(recv_inode, Ref(NULL), 0, 0));
Ref send_vnode(new Vnode(send_inode, Ref(NULL), 0, 0));
if ( !recv_vnode || !send_vnode ) return -1;