1
votes

I am getting live audio streaming over the network in the form of RTP packets and I have to write a code to Capture, Buffer and play the audio stream.

Problem

Now to solve this problem I have written two threads one for capture the audio and another for playing it. Now when I start both the threads my capture threads running slower than playing thread :(

Buffer Requirement

  • RTP Audio Packets.
  • 8kHz, 16-bit Linear Samples (Linear PCM).
  • 4 frames of 20ms audio will be sent in each RTP Packet.
  • Do not play until AudioStart=24 (# of 20ms frames) have arrived.
  • While playing ... if the # of 20ms frames in buffer reaches 0 ... stop playing until AudioStart frames are buffered then restart.
  • While playing ... if the # of 20ms frames in buffer exceeds AudioBufferHigh=50 then delete 24 frames (in easiest manner -- delete from buffer or just drop next 6 RTP messages).

    What I have done so far..

Code

BufferManager.java

public abstract class BufferManager {
    protected static final Integer ONE = new Integer(1);
    protected static final Integer TWO = new Integer(2);
    protected static final Integer THREE = new Integer(3);
    protected static final Integer BUFFER_SIZE = 5334;//5.334KB
    protected static volatile Map<Integer, ByteArrayOutputStream> bufferPool = new ConcurrentHashMap<>(3, 0.9f, 2);
    protected static volatile Integer captureBufferKey = ONE;
    protected static volatile Integer playingBufferKey = ONE;
    protected static Boolean running; 
    protected static volatile Integer noOfFrames = 0;

    public BufferManager() {
        //captureBufferKey = ONE;
        //playingBufferKey = ONE;
        //noOfFrames = new Integer(0);
    }

    protected void switchCaptureBufferKey() {
        if(ONE.intValue() == captureBufferKey.intValue()) 
            captureBufferKey = TWO;
        else if(TWO.intValue() == captureBufferKey.intValue())
            captureBufferKey = THREE;
        else 
            captureBufferKey = ONE;
        //printBufferState("SWITCHCAPTURE");
    }//End of switchWritingBufferKey() Method.

    protected void switchPlayingBufferKey() {
        if(ONE.intValue() == playingBufferKey.intValue()) 
            playingBufferKey = TWO;
        else if(TWO.intValue() == playingBufferKey.intValue())
            playingBufferKey = THREE;
        else 
            playingBufferKey = ONE;
    }//End of switchWritingBufferKey() Method.

    protected static AudioFormat getFormat() {
        float sampleRate = 8000;
        int sampleSizeInBits = 16;
        int channels = 1;
        boolean signed = true;
        boolean bigEndian = true;
        return new AudioFormat(sampleRate, sampleSizeInBits, channels, signed, bigEndian);
    }

    protected int getByfferSize() {
        return bufferPool.get(ONE).size() 
                + bufferPool.get(TWO).size() 
                + bufferPool.get(THREE).size();
    }

    protected static void printBufferState(String flag) {
        int a = bufferPool.get(ONE).size();
        int b = bufferPool.get(TWO).size();
        int c = bufferPool.get(THREE).size();
        System.out.println(flag + " == TOTAL : [" + (a + b +c) + "bytes] ");
//      int a,b,c;
//      System.out.println(flag + "1 : [" + (a = bufferPool.get(ONE).size()) + "bytes], 2 : [" + (b = bufferPool.get(TWO).size())
//              + "bytes] 3 : [" + (c = bufferPool.get(THREE).size()) + "bytes], TOTAL : [" + (a + b +c) + "bytes] ");
    }
}//End of BufferManager Class.

AudioCapture.java

public class AudioCapture extends BufferManager implements Runnable {
    private static final Integer RTP_HEADER_SIZE = 12;
    private InetAddress ipAddress; 
    private DatagramSocket serverSocket;
    long lStartTime = 0;

    public AudioCapture(Integer port) throws UnknownHostException, SocketException {
        super();
        running = Boolean.TRUE;
        bufferPool.put(ONE, new ByteArrayOutputStream(BUFFER_SIZE));
        bufferPool.put(TWO, new ByteArrayOutputStream(BUFFER_SIZE));
        bufferPool.put(THREE, new ByteArrayOutputStream(BUFFER_SIZE));
        this.ipAddress = InetAddress.getByName("0.0.0.0");
        serverSocket = new DatagramSocket(port, ipAddress);
    }

    @Override
    public void run() {
        System.out.println();
        byte[] receiveData = new byte[1300];
        DatagramPacket receivePacket = null;
        lStartTime = System.currentTimeMillis();
        receivePacket = new DatagramPacket(receiveData, receiveData.length);
        byte[] packet = new byte[receivePacket.getLength() - RTP_HEADER_SIZE];
        ByteArrayOutputStream buff = bufferPool.get(captureBufferKey);
        while (running) {
            if(noOfFrames <= 50) {
                try {
                    serverSocket.receive(receivePacket);
                    packet = Arrays.copyOfRange(receivePacket.getData(), RTP_HEADER_SIZE, receivePacket.getLength());
                    if((buff.size() + packet.length) > BUFFER_SIZE) {
                        switchCaptureBufferKey();
                        buff = bufferPool.get(captureBufferKey);
                    }
                    buff.write(packet);
                    noOfFrames += 4;
                } catch (SocketException e) {
                    e.printStackTrace();
                } catch (IOException e) {
                    e.printStackTrace();
                } // End of try-catch block.
            } else {
                //System.out.println("Packet Ignored, Buffer reached to its maximum limit ");
            }//End of if-else block.
        } // End of while loop. 
    }//End of run() Method.
}

AudioPlayer.java

public class AudioPlayer extends BufferManager implements Runnable {
    long lStartTime = 0;

    public AudioPlayer() {
        super();
    }

    @Override
    public void run() {
        AudioFormat format = getFormat();
        DataLine.Info info = new DataLine.Info(SourceDataLine.class, format);
        SourceDataLine line = null;
        try {
            line = (SourceDataLine) AudioSystem.getLine(info);
            line.open(format);
            line.start();
        } catch (LineUnavailableException e1) {
            e1.printStackTrace();
        }

        while (running) {
            if (noOfFrames >= 24) {
                ByteArrayOutputStream out = null;
                try {
                    out = bufferPool.get(playingBufferKey);
                    InputStream input = new ByteArrayInputStream(out.toByteArray());
                    byte buffer[] = new byte[640];
                    int count;
                    while ((count = input.read(buffer, 0, buffer.length)) != -1) {
                        if (count > 0) {
                            InputStream in = new ByteArrayInputStream(buffer);
                            AudioInputStream ais = new AudioInputStream(in, format, buffer.length / format.getFrameSize());

                            byte buff[] = new byte[640];
                            int c = 0;
                            if((c = ais.read(buff)) != -1)
                                line.write(buff, 0, buff.length);
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
                /*byte buffer[] = new byte[1280];
                try {
                    int count;
                    while ((count = ais.read(buffer, 0, buffer.length)) != -1) {
                        if (count > 0) {
                            line.write(buffer, 0, count);
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }*/
                out.reset();
                noOfFrames -= 4;
                try {
                    if (getByfferSize() >= 10240) {
                        Thread.sleep(15);
                    } else if (getByfferSize() >= 5120) {
                        Thread.sleep(25);
                    } else if (getByfferSize() >= 0) {
                        Thread.sleep(30);
                    } 
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                // System.out.println("Number of frames :- " + noOfFrames);
            }
        }
    }// End of run() method.
}// End of AudioPlayer Class class.

any help or pointer to the helpful link will be appreciable Thanks...

1

1 Answers

0
votes

This answer explains a few challenges with streaming.

In a nutshell, your client needs to deal with two issues:

1) The clock (crystals) on the client and server are not perfectly in sync. The server may be a fraction of a Hz faster/slower than the client. The client continuously match the infer the clock rate of the server by examining the rate that rtp packets are delivered. The client then adjusts the playback rate via sample rate conversion. So instead of playing back at 48k, it may play back at 48000.0001 Hz.

2) Packets loss, out of order arrivals, etc. must be dealt with. If you lose packets, you need to still keep a place holder for those packets in your buffer stream otherwise your audio will skip and sound crackly and become unaligned. The simplest method would be to replace those missing packets with silence but the volume of adjacent packets should be adjusted to avoid sharp envelope changes snapping to 0.

Your design seems a bit unorthodox. I have had success using a ring buffer instead. You will have to deal with edge cases as well.

I always state that streaming media is not a trivial task.