Results 1 to 4 of 4

Thread: Dynamic pipe

  1. #1
    deschutron is offline Member
    Join Date
    Sep 2009
    Posts
    9
    Rep Power
    0

    Default Dynamic pipe

    Hello

    I'm making a program where I want to have "node" objects that communicate with each other in a pipe.

    stdin -> node1 -> node2 -> node3 -> stdout

    When node1 tries to read, it reads from System.in
    When node1 tries to write, it writes to node2.
    When node2 tries to read, it reads from node1.

    The system should work like a pipe from Unix.

    But that's not all.

    I want to be able to create new nodes and insert them into the pipe, while the pipe is running.
    e.g.
    stdin -> node1 -> newNode -> node2 -> node3 -> stdout

    And I want to be able to remove nodes from the pipe while it is running.
    e.g.
    stdin -> newNode -> node2 -> node3 -> stdout

    But I have some head-ache making constraints:

    1. Each node must run in its own thread.
    2. If I use PipedInputStream and PipedOutputStream,
    2.1. once a PipedInputStream and PipedOutputStream are connected, they cannot be separated;
    2.2. every PipedInputStream or PipedOutputStream must only be used by the thread that created it. (this is a recommendation from the Java API).

    What approach can I take in implementing this communication system?

    By the way, I understand the concepts of blocking and nonblocking calls, the events implementation in Java, polling and race conditions.

    Deschutron

  2. #2
    travishein's Avatar
    travishein is offline Senior Member
    Join Date
    Sep 2009
    Location
    Canada
    Posts
    684
    Rep Power
    6

    Default

    hmm, how about something that internally (between each nodes) works with a kind of events system. where the first input stream is read( bytes, buffers of byte[] ?) and stuffed into an event packet. each node has a list (or just one here) listeners, that will be the receiver of a notification when the event happens. each node also has its own thread.

    so a special first node

    inputstream > listeners
    onRead() // invoke an event to all (usually 1 here) subscribed listeners
    addListener(EventListener)


    inside nodes:

    Node implements EventListener
    handleEvent() // other node's thread will invoke this, just store things into a vector (queue) in this object
    Vector<Events>
    run() // this node's thread reads event objects from its queue, does stuff, maybe a local event to a piped out/inputstream pair
    addListener(EventListener) // downstream nodes attach to this node's outputs


    and the last node receives events and does stuff to standard out

    Maybe the nodes can be built into a single generic node type object, which can optionally be initialized to work in the different modes.

    but I think this kind of abstractraction of IO stream things to an internal token and bucket kind of thing like we do with the event and event listener , with the queue on each node, is the only way to have the generic change the order of nodes interactively during a running system, because those stream classes are pretty rigid on the wanting to work with the instance they were set up with.

  3. #3
    deschutron is offline Member
    Join Date
    Sep 2009
    Posts
    9
    Rep Power
    0

    Default

    Hi traveishein,

    Thankyou for thinking about my problem.

    I went and implemented a class called PipeNode that reads and writes, and can join with other PipeNodes into a linked list.

    Each PipeNode has a queue.
    When a non-head PipeNode is asked to write data, it calls an enqueue() method from the PipeNode before it. The enqueue() method puts the data into the prior PipeNode's queue.

    When a PipeNode is asked to read data, it returns the first byte in its queue. If the queue is empty, the node Thread.sleeps until it is interrupted and then it checks again.
    The enqueue() method wakes its node up with Thread.interrupt().

    Nearly everything is working well. The nodes form a successful pipe of arbitrary length when no insertions or removals occur between read or write operations.

    But there is a problem with removing nodes. When a PipeNode is removed with data in its queue, the data should be passed onto the next PipeNode in the list. (If this doesn't happen, any data that the PipeNode receives between its final read and its removal will be lost.) But the approach I have used has not worked properly.

    I decided to make each node write() all the bytes in its queue just before it is removed from the pipe. (PipeNodes remove themselves from the pipe.) That way, all the queued input should be read by the next node, just as if the input was never intercepted by the node that is being removed.

    But the output of my test programs for the situation is wrong, and gives different results for the same input.

    I'm not sure where the error is.

    I tested it with a program that creates three threads, each with a PipeNode, and then passes a fixed number of bytes through them like this:

    (stdin)
    1. C===R=+1=W==R=+1=W==R=W==R=W=exit
    2. ==C=R=+1=W==R=+2=W=exit
    3. =C==R=+1=W==R=+3=W=exit
    (stdout)

    C: create
    R: read one byte into memory
    W: write the byte in memory
    +n: add n to the byte in memory

    With input "0000", the output should be "3700". The first two bytes should be processed by all three nodes. The final one should be only processed by the first node.
    The result is the threading fairy's choice of the following three outputs:
    1. "37"
    2. "37"
    3. "3700"
    '' is printed when a node writes -1 to stdout.

    Does anyone know what is causing this error?

    I can provide more information (eg source code) if it helps.

    Deschutron

  4. #4
    deschutron is offline Member
    Join Date
    Sep 2009
    Posts
    9
    Rep Power
    0

    Default [SOLVED] Dynamic Pipe

    My only complete test program was in fact a Forking SNUSP interpreter. And it had too many moving parts for me to find my error then or even to state how I solved it afterwards.

    But I have solved my problem. And it still uses the architecture I stated in my previous post.

    Deschutron

Similar Threads

  1. Replies: 1
    Last Post: 09-10-2009, 01:58 PM
  2. Dynamic GUI
    By ike2u in forum New To Java
    Replies: 4
    Last Post: 08-08-2009, 02:50 AM
  3. Dynamic JTable
    By eftenpuften in forum AWT / Swing
    Replies: 0
    Last Post: 07-24-2008, 03:41 PM
  4. dynamic update in swt
    By sandor in forum SWT / JFace
    Replies: 0
    Last Post: 05-14-2007, 08:32 PM

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •