0
votes

I have a Multiple Writer Threads with Single Reader Thread model. The ThreadMultipleDateReceiver class is designed to read from multiple Threads.

public class ThreadMultipleDateReceiver extends Thread {

  private static final int MAX_CLIENT_THREADS = 4;
  private byte[] incomingBytes;
  private volatile boolean isRunning;
  private volatile List<ThreadStreamDateWriter> lThrdDate;

  private static PipedInputStream pipedInputStream;

  public ThreadMultipleDateReceiver() {
    lThrdDate = Collections.synchronizedList(new ArrayList<>(MAX_CLIENT_THREADS));
    pipedInputStream = new PipedInputStream();
    System.out.println("ThreadMultipleDateReceiver Created");
  }

  @Override public void run() {
    isRunning = true;
    while (isRunning) {
      if (!lThrdDate.isEmpty()) {
        System.out.println("ThreadMultipleDateReceiver has:" + lThrdDate.size());
        for (int i = lThrdDate.size(); i > 0; i--) {
          if (lThrdDate.get(i - 1).getState() == Thread.State.TERMINATED) {
            lThrdDate.remove(i - 1);
          } else {
            System.out.println("I ThreadMultipleDateReceiver have:" + lThrdDate.get(i - 1).getNameDateWriter());
          }
        }
        incomingBytes = new byte[1024];
        try {
          String str = "";
          int iRd;
          System.out.println("ThreadMultipleDateReceiver waiting:" + str);
          while ((iRd = pipedInputStream.read(incomingBytes)) != -1) {
            if (iRd > 0) {
              str += new String(incomingBytes);
            }
          }
          System.out.println("ThreadMultipleDateReceiver Received:\n\t:" + str);
        } catch (IOException e) { }
      } else {
        System.out.println("ThreadMultipleDateReceiver Empty");
      }
    }
    emptyDateWriters();
  }

  public void addDateWriter(ThreadStreamDateWriter threadDateWriter) {
    if (lThrdDate.size() < MAX_CLIENT_THREADS) {
      lThrdDate.add(threadDateWriter);
    }
  }

  private void emptyDateWriters() {
    if (!lThrdDate.isEmpty()) {
      for (int i = lThrdDate.size(); i > 0; i--) {
        ThreadStreamDateWriter threadDateWriter = lThrdDate.get(i - 1);
        threadDateWriter.stopThread();
        lThrdDate.remove(i - 1);
      }
    }
  }

  public PipedInputStream getPipedInputStream() {
    return pipedInputStream;
  }

  public void stopThread() {
    isRunning = false;
  }

}

And the single Writer Thread

public class ThreadStreamDateWriter extends Thread {
  String Self;
  private byte[] outgoingBytes;
  private volatile boolean isRunning;
  private static PipedOutputStream pipedOutputStream;

  ThreadStreamDateWriter(String name, PipedInputStream snk) {
    Self = name;
    pipedOutputStream = new PipedOutputStream();
    try {
      pipedOutputStream.connect(snk);
    } catch (IOException e) { }
  }

  @Override public void run() {
    isRunning = true;
    while (isRunning) {
      try {
        outgoingBytes = getInfo().getBytes();
        System.out.println("ThreadStreamDateWriter -> write to pipedOutputStream:" + new String(outgoingBytes));
        pipedOutputStream.write(outgoingBytes);
        System.out.println("ThreadStreamDateWriter -> wrote:" + new String(outgoingBytes));
        try { Thread.sleep(4000); } catch (InterruptedException ex) { }
      } catch (IOException | NegativeArraySizeException | IndexOutOfBoundsException e) {
        isRunning = false;
      }
    }
  }

  String getInfo() {
      String sDtTm = new SimpleDateFormat("yyyyMMdd-hhmmss").format(Calendar.getInstance().getTime());
      return Self + " -> " + sDtTm;
  }

  public void stopThread() {
    isRunning = false;
  }

  public String getNameDateWriter() {
    return Self;
  }
}

How launch (I'm using Netbeans)?

ThreadMultipleDateReceiver thrdMDateReceiver = null;
ThreadStreamDateWriter thrdSDateWriter0 = null;
ThreadStreamDateWriter thrdSDateWriter1 = null;
  private void jtbDateExchangerActionPerformed(java.awt.event.ActionEvent evt) {  
    if (jtbDateExchanger.isSelected()) {
      if (thrdMDateReceiver == null) {
        thrdMDateReceiver = new ThreadMultipleDateReceiver();
        thrdMDateReceiver.start();
      }
      if (thrdSDateWriter0 == null) {
        thrdSDateWriter0 = new ThreadStreamDateWriter("-0-", thrdMDateReceiver.getPipedInputStream());
        thrdSDateWriter0.start();
        thrdMDateReceiver.addDateWriter(thrdSDateWriter0);
      }
      if (thrdSDateWriter1 == null) {
        thrdSDateWriter1 = new ThreadStreamDateWriter("-1-", thrdMDateReceiver.getPipedInputStream());
        thrdSDateWriter1.start();
        thrdMDateReceiver.addDateWriter(thrdSDateWriter1);
      }
    } else {
      if (thrdMDateReceiver != null) {
        thrdMDateReceiver.stopThread();
      }
    }
  }                                                

The OUTPUT

    run:
ThreadMultipleDateReceiver Created
ThreadMultipleDateReceiver Empty
ThreadMultipleDateReceiver Empty
ThreadMultipleDateReceiver Empty
.....
ThreadMultipleDateReceiver Empty
ThreadMultipleDateReceiver Empty
ThreadMultipleDateReceiver Empty
ThreadMultipleDateReceiver has:1
I ThreadMultipleDateReceiver have:-0-
ThreadMultipleDateReceiver waiting:
ThreadStreamDateWriter -> write to pipedOutputStream:-0- -> 20170608-090003
ThreadStreamDateWriter -> write to pipedOutputStream:-1- -> 20170608-090003
BUILD SUCCESSFUL (total time: 1 minute 3 seconds)

The ThreadMultipleDateReceiver is blocked, and is not printing:

ThreadMultipleDateReceiver Received:
    -1- -> 20170608-090003

or

ThreadMultipleDateReceiver Received:
    -0- -> 20170608-090003

How solve it?

2

2 Answers

1
votes

looks like your piped output stream is static, so every time you construct a ThreadStreamDateWriter, you are stepping on the old value of piped output stream.

try making this an instance variable and pass it into the constructor. so you only have one of them.

edit 1: i made the pipes instance variables and added some printouts. seems to be running longer now (see below):

edit 2: you second pipedOutputStream.connect(snk); is throwing. you can only connect one thing at a time.

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.List;
public class So44438086 {
    public static class ThreadMultipleDateReceiver extends Thread {
        private static final int MAX_CLIENT_THREADS=4;
        private byte[] incomingBytes;
        private volatile boolean isRunning;
        private volatile List<ThreadStreamDateWriter> lThrdDate;
        private /*static*/ PipedInputStream pipedInputStream;
        public ThreadMultipleDateReceiver() {
            lThrdDate=Collections.synchronizedList(new ArrayList<>(MAX_CLIENT_THREADS));
            pipedInputStream=new PipedInputStream();
            System.out.println("ctor setting pipedInputStream to: "+pipedInputStream);
            System.out.println("ThreadMultipleDateReceiver Created");
        }
        @Override public void run() {
            isRunning=true;
            while(isRunning) {
                if(!lThrdDate.isEmpty()) {
                    System.out.println("ThreadMultipleDateReceiver has:"+lThrdDate.size());
                    for(int i=lThrdDate.size();i>0;i--) {
                        if(lThrdDate.get(i-1).getState()==Thread.State.TERMINATED) {
                            lThrdDate.remove(i-1);
                        } else {
                            System.out.println("I ThreadMultipleDateReceiver have:"+lThrdDate.get(i-1).getNameDateWriter());
                        }
                    }
                    incomingBytes=new byte[1024];
                    try {
                        String str="";
                        int iRd;
                        System.out.println("ThreadMultipleDateReceiver waiting:"+str);
                        System.out.println("reading: "+pipedInputStream);
                        while((iRd=pipedInputStream.read(incomingBytes))!=-1) {
                            if(iRd>0) {
                                str+=new String(incomingBytes);
                            }
                        }
                        System.out.println("ThreadMultipleDateReceiver Received:\n\t:"+str);
                    } catch(IOException e) {}
                } else {
                    System.out.println("ThreadMultipleDateReceiver Empty");
                }
            }
            emptyDateWriters();
        }
        public void addDateWriter(ThreadStreamDateWriter threadDateWriter) {
            if(lThrdDate.size()<MAX_CLIENT_THREADS) {
                lThrdDate.add(threadDateWriter);
            }
        }
        private void emptyDateWriters() {
            if(!lThrdDate.isEmpty()) {
                for(int i=lThrdDate.size();i>0;i--) {
                    ThreadStreamDateWriter threadDateWriter=lThrdDate.get(i-1);
                    threadDateWriter.stopThread();
                    lThrdDate.remove(i-1);
                }
            }
        }
        public PipedInputStream getPipedInputStream() {
            return pipedInputStream;
        }
        public void stopThread() {
            isRunning=false;
        }
    }
    public static class ThreadStreamDateWriter extends Thread {
        String Self;
        private byte[] outgoingBytes;
        private volatile boolean isRunning;
        private /*static*/ PipedOutputStream pipedOutputStream;
        ThreadStreamDateWriter(String name,PipedInputStream snk) {
            Self=name;
            pipedOutputStream=new PipedOutputStream();
            System.out.println("ctor setting pipedOutputStream to: "+pipedOutputStream);
            try {
                pipedOutputStream.connect(snk);
                System.out.println(pipedOutputStream+" connectd to: "+snk);
            } catch(IOException e) {}
        }
        @Override public void run() {
            isRunning=true;
            while(isRunning) {
                try {
                    outgoingBytes=getInfo().getBytes();
                    System.out.println("ThreadStreamDateWriter -> write to pipedOutputStream:"+new String(outgoingBytes));
                    System.out.println("writing to: "+pipedOutputStream);
                    pipedOutputStream.write(outgoingBytes);
                    System.out.println("ThreadStreamDateWriter -> wrote:"+new String(outgoingBytes));
                    try {
                        Thread.sleep(4000);
                    } catch(InterruptedException ex) {}
                } catch(IOException|NegativeArraySizeException|IndexOutOfBoundsException e) {
                    isRunning=false;
                }
            }
        }
        String getInfo() {
            String sDtTm=new SimpleDateFormat("yyyyMMdd-hhmmss").format(Calendar.getInstance().getTime());
            return Self+" -> "+sDtTm;
        }
        public void stopThread() {
            isRunning=false;
        }
        public String getNameDateWriter() {
            return Self;
        }
    }
    private void foo() {
        if(thrdMDateReceiver==null) {
            thrdMDateReceiver=new ThreadMultipleDateReceiver();
            thrdMDateReceiver.start();
        }
        if(thrdSDateWriter0==null) {
            thrdSDateWriter0=new ThreadStreamDateWriter("-0-",thrdMDateReceiver.getPipedInputStream());
            thrdSDateWriter0.start();
            thrdMDateReceiver.addDateWriter(thrdSDateWriter0);
        }
        if(thrdSDateWriter1==null) {
            thrdSDateWriter1=new ThreadStreamDateWriter("-1-",thrdMDateReceiver.getPipedInputStream());
            thrdSDateWriter1.start();
            thrdMDateReceiver.addDateWriter(thrdSDateWriter1);
        }
    }
    void run() throws InterruptedException {
        System.out.println(("running"));
        foo();
        System.out.println(("sleeping"));
        Thread.sleep(10000);
        System.out.println(("stopping"));
        if(thrdMDateReceiver!=null) {
            thrdMDateReceiver.stopThread();
        }
    }
    public static void main(String[] args) throws InterruptedException {
        new So44438086().run();
    }
    ThreadMultipleDateReceiver thrdMDateReceiver=null;
    ThreadStreamDateWriter thrdSDateWriter0=null;
    ThreadStreamDateWriter thrdSDateWriter1=null;
}
1
votes

Test this code...

public class ThreadMultipleDateReceiver extends Thread {

    private static final int MAX_CLIENT_THREADS = 4;
    private byte[] incomingBytes;
    private volatile boolean isRunning;
    private volatile List<ThreadStreamDateWriter> lThrdDate;

    private PipedInputStream pipedInputStream;
    private PipedOutputStream pipedOutputStream;

    public ThreadMultipleDateReceiver() {
        lThrdDate = Collections.synchronizedList(new ArrayList<>(MAX_CLIENT_THREADS));
        pipedInputStream = new PipedInputStream();
        pipedOutputStream = new PipedOutputStream();
        pipedInputStream.connect(pipedOutputStream);
        System.out.println("ThreadMultipleDateReceiver Created");
    }

    @Override public void run() {
        isRunning = true;
        while (isRunning) {
            if (!lThrdDate.isEmpty()) {
                System.out.println("ThreadMultipleDateReceiver has:" + lThrdDate.size());
                for (int i = lThrdDate.size(); i > 0; i--) {
                    if (lThrdDate.get(i - 1).getState() == Thread.State.TERMINATED) {
                        lThrdDate.remove(i - 1);
                    } else {
                        System.out.println("ThreadMultipleDateReceiver have:" + lThrdDate.get(i - 1).getNameDateWriter());
                    }
                }
                incomingBytes = new byte[1024];
                try {
                    String str = "";
                    int iRd;
                    System.out.println("ThreadMultipleDateReceiver waiting:" + str);
                    while ((iRd = pipedInputStream.read(incomingBytes)) != -1) {
                        String r = new String(Arrays.copyOf(incomingBytes, iRd));
//                        if (iRd > 0) {
//                            str += r;
//                        }
                        System.out.println("ThreadMultipleDateReceiver Received:\t" + r);
                    }
//                    System.out.println("ThreadMultipleDateReceiver Received:\n\t:" + str);
                } catch (IOException e) { }
            } else {
                System.out.println("ThreadMultipleDateReceiver Empty");
            }
        }
        emptyDateWriters();
    }

public void addDateWriter(ThreadStreamDateWriter threadDateWriter) {
    if (lThrdDate.size() < MAX_CLIENT_THREADS) {
        lThrdDate.add(threadDateWriter);
    }
}

    private void emptyDateWriters() {
        if (!lThrdDate.isEmpty()) {
            for (int i = lThrdDate.size(); i > 0; i--) {
                ThreadStreamDateWriter threadDateWriter = lThrdDate.get(i - 1);
                threadDateWriter.stopThread();
                lThrdDate.remove(i - 1);
            }
        }
    }

    public PipedOutputStream getPipedOutputStream() {
        return pipedOutputStream;
    }

    public void stopThread() {
        isRunning = false;
    }

}

the ThreadStreamDateWriter class

public class ThreadStreamDateWriter extends Thread {
    String Self;
    private byte[] outgoingBytes;
    private volatile boolean isRunning;
    private final PipedOutputStream pipedOutputStream;


    ThreadStreamDateWriter(String name, PipedOutputStream src) {
        Self = name;
        pipedOutputStream = src;
    }

    @Override public void run() {
        isRunning = true;
        while (isRunning) {
            try {
                outgoingBytes = getInfo().getBytes();
                System.out.println("ThreadStreamDateWriter -> write to pipedOutputStream:" + new String(outgoingBytes));
                pipedOutputStream.write(outgoingBytes);
                System.out.println("ThreadStreamDateWriter -> wrote:" + new String(outgoingBytes));
                try { Thread.sleep(4000); } catch (InterruptedException ex) { }
            } catch (IOException | NegativeArraySizeException | IndexOutOfBoundsException e) {
                isRunning = false;
            }
        }
    }

    String getInfo() {
      String sDtTm = new SimpleDateFormat("yyyyMMdd-hhmmss").format(Calendar.getInstance().getTime());
      return Self + " -> " + sDtTm;
    }

    public void stopThread() {
        isRunning = false;
    }

    public String getNameDateWriter() {
        return Self;
    }
}

using...

  ThreadMultipleDateReceiver thrdMDateReceiver = null;
  ThreadStreamDateWriter thrdSDateWriter0 = null;
  ThreadStreamDateWriter thrdSDateWriter1 = null;
  private void jtbDateExchangerActionPerformed(java.awt.event.ActionEvent evt) {  
    if (jtbDateExchanger.isSelected()) {
      if (thrdMDateReceiver == null) {
        thrdMDateReceiver = new ThreadMultipleDateReceiver();
        thrdMDateReceiver.start();
      }
      if (thrdSDateWriter0 == null) {
        thrdSDateWriter0 = new ThreadStreamDateWriter("-0-", thrdMDateReceiver.getPipedOutputStream());
        thrdSDateWriter0.start();
        thrdMDateReceiver.addDateWriter(thrdSDateWriter0);
      }
      if (thrdSDateWriter1 == null) {
        thrdSDateWriter1 = new ThreadStreamDateWriter("-1-", thrdMDateReceiver.getPipedOutputStream());
        thrdSDateWriter1.start();
        thrdMDateReceiver.addDateWriter(thrdSDateWriter1);
      }
    } else {
      if (thrdMDateReceiver != null) {
        thrdMDateReceiver.stopThread();
      }
    }
  }