I'm making a program where I want to have "node" objects that communicate with each other in a pipe.
stdin -> node1 -> node2 -> node3 -> stdout
When node1 tries to read, it reads from System.in
When node1 tries to write, it writes to node2.
When node2 tries to read, it reads from node1.
The system should work like a pipe from Unix.
But that's not all.
I want to be able to create new nodes and insert them into the pipe, while the pipe is running.
stdin -> node1 -> newNode -> node2 -> node3 -> stdout
And I want to be able to remove nodes from the pipe while it is running.
stdin -> newNode -> node2 -> node3 -> stdout
But I have some head-ache making constraints:
1. Each node must run in its own thread.
2. If I use PipedInputStream and PipedOutputStream,
2.1. once a PipedInputStream and PipedOutputStream are connected, they cannot be separated;
2.2. every PipedInputStream or PipedOutputStream must only be used by the thread that created it. (this is a recommendation from the Java API).
What approach can I take in implementing this communication system?
By the way, I understand the concepts of blocking and nonblocking calls, the events implementation in Java, polling and race conditions.
hmm, how about something that internally (between each nodes) works with a kind of events system. where the first input stream is read( bytes, buffers of byte ?) and stuffed into an event packet. each node has a list (or just one here) listeners, that will be the receiver of a notification when the event happens. each node also has its own thread.
so a special first node
inputstream > listeners
onRead() // invoke an event to all (usually 1 here) subscribed listeners
Node implements EventListener
handleEvent() // other node's thread will invoke this, just store things into a vector (queue) in this object
run() // this node's thread reads event objects from its queue, does stuff, maybe a local event to a piped out/inputstream pair
addListener(EventListener) // downstream nodes attach to this node's outputs
and the last node receives events and does stuff to standard out
Maybe the nodes can be built into a single generic node type object, which can optionally be initialized to work in the different modes.
but I think this kind of abstractraction of IO stream things to an internal token and bucket kind of thing like we do with the event and event listener , with the queue on each node, is the only way to have the generic change the order of nodes interactively during a running system, because those stream classes are pretty rigid on the wanting to work with the instance they were set up with.
Thankyou for thinking about my problem.
I went and implemented a class called PipeNode that reads and writes, and can join with other PipeNodes into a linked list.
Each PipeNode has a queue.
When a non-head PipeNode is asked to write data, it calls an enqueue() method from the PipeNode before it. The enqueue() method puts the data into the prior PipeNode's queue.
When a PipeNode is asked to read data, it returns the first byte in its queue. If the queue is empty, the node Thread.sleeps until it is interrupted and then it checks again.
The enqueue() method wakes its node up with Thread.interrupt().
Nearly everything is working well. The nodes form a successful pipe of arbitrary length when no insertions or removals occur between read or write operations.
But there is a problem with removing nodes. When a PipeNode is removed with data in its queue, the data should be passed onto the next PipeNode in the list. (If this doesn't happen, any data that the PipeNode receives between its final read and its removal will be lost.) But the approach I have used has not worked properly.
I decided to make each node write() all the bytes in its queue just before it is removed from the pipe. (PipeNodes remove themselves from the pipe.) That way, all the queued input should be read by the next node, just as if the input was never intercepted by the node that is being removed.
But the output of my test programs for the situation is wrong, and gives different results for the same input.
I'm not sure where the error is.
I tested it with a program that creates three threads, each with a PipeNode, and then passes a fixed number of bytes through them like this:
R: read one byte into memory
W: write the byte in memory
+n: add n to the byte in memory
With input "0000", the output should be "3700". The first two bytes should be processed by all three nodes. The final one should be only processed by the first node.
The result is the threading fairy's choice of the following three outputs:
'ÿ' is printed when a node writes -1 to stdout.
Does anyone know what is causing this error?
I can provide more information (eg source code) if it helps.
[SOLVED] Dynamic Pipe
My only complete test program was in fact a Forking SNUSP interpreter. And it had too many moving parts for me to find my error then or even to state how I solved it afterwards.
But I have solved my problem. And it still uses the architecture I stated in my previous post.