posts - 56,  comments - 12,  trackbacks - 0

          習慣了TCP編程,認為UDP可以包辦這些問題是錯誤的。一個UDP應用程序要承擔可靠性方面的全部工作,包括報文的丟失、重復、時延、亂序以及連接失效等問題。

          通常我們在可靠性好,傳輸時延小的局域網上開發測試,一些問題不容易暴露,但在大型互聯網上卻會出現錯誤。

          UDP協議把遞送的可靠性責任推到了上層即應用層,下面簡單編寫了幾個類來專門處理兩個問題:亂序和丟包。

          四個類:DataPacket 類,PacketHeader類,PacketBody類 ,DataEntry類,位于同一個文件DataPacket .java中。

          DataPacket 類相當于一個門面模式,提供給外部使用,通信數據也在這個類中處理。

          package com.skysoft.pcks;

          import java.io.*;
          import java.net.*;
          import java.util.*;

          public class  DataPacket {
            InputStream is;
            OutputStream os;
            PacketHeader header;
            PacketBody body;
            ArrayList al;
            public static final int DataSwapSize = 64532;

            /**
             * 在接收數據報使用
             */
            public DataPacket() {
              header = new PacketHeader();
              body = new PacketBody();
              al = new ArrayList();
            }
            /**
             * 在發送數據報時使用,它調用報文分割操作.
             * @param file String  硬盤文件
             */
            public DataPacket(String file) {
              this();
              try {
                is = new FileInputStream(file);
                header.CalcHeaderInfo(is.available());
                this.madeBody();
                is.close();
                //this.Gereratedata();
              }
              catch (FileNotFoundException ex) {
                ex.printStackTrace();
              }
              catch (IOException ex1) {
                ex1.printStackTrace();
              }
            }
            /**
             * 在發送數據報時使用,它調用報文分割操作.
             * @param url URL url地址
             */
            public DataPacket(URL url) {
              this();
              try {
                //is = url.openStream();
                URLConnection conn=url.openConnection();
                is=conn.getInputStream();
                int total=conn.getContentLength();
                header.CalcHeaderInfo(total);
                this.madeBody();
                //System.out.println(total+":"+total);
                is.close();
              }
              catch (IOException ex) {
                ex.printStackTrace();
              }
            }
            /**
             * 為發送構造分組,使用PackageHeader處理了報頭格式,并為分組編序號.
             */
            private void madeBody() {
              al.clear();
              byte[] buffer;
              DataEntry de;
              for (int i = 0; i < header.fragmentcounter; i++) {
                try {
                  ByteArrayOutputStream bos = new ByteArrayOutputStream();
                  //is.skip(i * body.BODY_BUFFER_SIZE);
                  header.ArrageSort(i);
                  de = new DataEntry(PacketBody.BODY_BUFFER_SIZE);
                  de.setSn(i);
                  de.setStreamsize(header.getStreamsize());
                  de.setFragmentcounter(header.getFragmentcounter());
                  if (header.isWTailFragment(i)) {
                    buffer = new byte[header.getMinfragment()];
                    is.read(buffer, 0, buffer.length);
                    header.setActByteSize(header.getMinfragment());
                    de.setActByteSize(header.getMinfragment());
                  }
                  else {
                    buffer = new byte[body.BODY_BUFFER_SIZE];
                    is.read(buffer, 0, buffer.length);
                  }
                  //System.out.println("length-------"+i+" "+body.getBody().length+" "+header.getMinfragment());
                  body.setBody(buffer);
                  //System.out.println("length:" + i + " " + header.toString());
                  bos.write(header.getByte(), 0, header.HEADER_BUFFER_SIZE);
                  bos.write(body.getBody(), 0, body.getBody().length);
                  de.setBytes(bos.toByteArray());
                  al.add(de);
                }
                catch (IOException ex) {
                  ex.printStackTrace();
                }
              }
            }
            /**
             * 為發送構造分組,沒有考慮報頭格式,也沒有為分組編序號.
             */
            private void madeBody1() {
              al.clear();
              for (int i = 0; i < header.fragmentcounter; i++) {
                try {
                  if (header.isWTailFragment(i))
                    is.read(body.getBody(), i * body.BODY_BUFFER_SIZE,
                            header.getMinfragment());
                  else
                    is.read(body.getBody(), i * body.BODY_BUFFER_SIZE,
                            body.BODY_BUFFER_SIZE);
                  ByteArrayOutputStream bos = new ByteArrayOutputStream();
                  bos.write(header.getByte(), 0, header.HEADER_BUFFER_SIZE);
                  bos.write(body.getBody(), header.HEADER_BUFFER_SIZE,
                            body.getBody().length);
                  al.add(bos);
                }
                catch (IOException ex) {
                  ex.printStackTrace();
                }
              }
            }
            /**
             * 在接收到報文后,對此報文執行組裝,并處理報文丟失和亂序情況.
             * @param b1 byte[]
             */
            public void Add(byte[] b1) {
              byte[] buffer = (byte[]) b1.clone();
              handlerText(buffer);
              DataEntry de = new DataEntry(buffer, header.getActByteSize());
              de.setSn(header.getSn());
              de.setStreamsize(header.getStreamsize());
              de.setFragmentcounter(header.getFragmentcounter());
              al.add(de);
            }
            private void handlerText(byte[] buffer) {
              ByteArrayOutputStream baos = new ByteArrayOutputStream();
              baos.write(buffer, 0, header.HEADER_BUFFER_SIZE);
              byte[] b=new byte[header.HEADER_BUFFER_SIZE];
              System.arraycopy(buffer,0,b,0,b.length);
              ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
              InputStreamReader isr = new InputStreamReader(bais);
              BufferedReader br = new BufferedReader(isr);
              try {
                header = new PacketHeader(br.readLine());
              }
              catch (Exception ex) {
                ex.printStackTrace();
              }
            }
           
            private String calFileSize(int size) {
              return size / 1024 + "K";
            }

            public ArrayList getDataPackets() {
              return al;
            }
          /**
           * 是否接收完畢,通過序號是否等于最大段數來判斷,這也許有問題,比如,正好是最后一個段丟失了,這樣
           * 這個包整個就丟失了.
           * @return
           */
            public boolean isFull() {
              return this.header.getSn() == this.header.getFragmentcounter() - 1 ? true : false;
            }
          /**
           * 判斷是否只有一個段.
           * @return
           */
            public boolean isZero() {
              return this.header.getSn() == 0 ? true : false;
            }
          /**
           * 該函數執行報文組裝,不考慮丟失的報文.
           * @return
           */
            private ByteArrayOutputStream fetchDataPackets() {
              ByteArrayOutputStream bos = new ByteArrayOutputStream();
              byte[] buffer = null;
              DataEntry de;
              for (int i = 0; i < al.size(); i++) {
                try {
                  de = this.getSnData(i);
                  buffer = de.getByte();
                  if (header.getStreamsize() == de.getStreamsize()) {
                    bos.write(de.getByte(), header.HEADER_BUFFER_SIZE, de.getActByteSize());
                    System.out.println(de.toString() + " -- fetchDataPackets");
                  }
                }
                catch (Exception ex) {
                  ex.printStackTrace();
                }
              }
              return bos;
            }

            /**
             * 該函數執行報文組裝,對于丟失的報文,寫入空報文.
             * @return ByteArrayOutputStream
             */
            private ByteArrayOutputStream fetchDataPackets_sn() {
              ByteArrayOutputStream bos = new ByteArrayOutputStream();
              byte[] buffer;
              DataEntry de;
              for (int i = 0; i < header.getFragmentcounter(); i++) {
                try {
                  de = this.getSnData(i);
                  if (de == null) {
                    de = seachDeData(i);
                  }
                  buffer = de.getByte();
                  //System.out.println(de.getSn() + ":" + i);
                  //handlerText(buffer);
                  //bos.write(buffer, header.HEADER_BUFFER_SIZE,
                  //          buffer.length - header.HEADER_BUFFER_SIZE);
                  if (header.getStreamsize() == de.getStreamsize()) {
                    bos.write(de.getByte(), header.HEADER_BUFFER_SIZE,
                              de.getActByteSize());
                    //System.out.println(de.toString());
                  }
                }
                catch (Exception ex) {
                  ex.printStackTrace();
                }
              }
              return bos;
            }

            /**
             * 對緩沖的數據包進行排序處理,即按順序提取同一幀的數據,如果沒有找到該序號的幀,則返回空值.
             * @param sn int 要找的幀序號.
             * @return DataEntry
             */
            private DataEntry getSnData(int sn) {
              DataEntry de = null;
              for (int i = 0; i < al.size(); i++) {
                de = (DataEntry) al.get(i);
                if (header.getStreamsize() == de.getStreamsize()) {
                  if (sn == de.getSn())
                    break;
                  else
                    de = null;
                }
              }
              return de;
            }

            /**
             * 按序號開始向前或者是向后尋找最近的幀片段,日后可以增加請求重發功能,通過開一個通信連接.
             * @param sn int
             * @return DataEntry
             */
            private DataEntry seachDeData(int sn) {
              DataEntry de = null;
              int init, min = 10000;
              DataEntry back, fore = null;
              for (int i = 0; i < al.size(); i++) {
                de = (DataEntry) al.get(i);
                if (header.getStreamsize() == de.getStreamsize()) {
                  init = Math.abs(de.getSn() - sn);
                  if (de.getFragmentcounter() != de.getSn() && init < min) {
                    min = init;
                    fore = de;
                  }
                }
              }
              return fore;
            }

            /**
             * 除去最后一幀外,隨機抽取一幀.
             * @return DataEntry
             */
            private DataEntry seachDeData() {
              DataEntry de = null;
              for (int i = 0; i < al.size(); i++) {
                de = (DataEntry) al.get(i);
                System.out.println("sky ::::" + de.getFragmentcounter() + ":" + de.getSn() +
                                   ":" + i);
                if (header.getStreamsize() == de.getStreamsize()) {
                  if (de.getFragmentcounter() != de.getSn()) {
                    break;
                  }
                }
              }
              return de;
            }
            /**
             * 生成組裝完的結果數據.因為用圖像來做測試,所以令其返回圖像.
             * @return Image
             */
            public java.awt.Image Gereratedata() {
               ByteArrayInputStream bis;
               java.awt.image.BufferedImage bimage = null;
               try {
                 byte[] b = fetchDataPackets_sn().toByteArray();
                 //fetchDataPackets_old1()
                 bis = new ByteArrayInputStream(b);
                 bimage = javax.imageio.ImageIO.read(bis);

               }
               catch (Exception ex1) {
                 ex1.printStackTrace();
               }
               return bimage;
            }

            public static void main(String args[]) {
              DataPacket dp = new DataPacket("e:\\nature\\14.jpg");
            }
          }
          /**
           * 數據實體,充當臨時處理場所.
           * @author Administrator
           *
           */
          class DataEntry {
            byte[] bytes;
            int fragmentcounter, sn, actbytesize;
            long streamsize;
            int minfragment;

            public DataEntry() {

            }

            public DataEntry(int size) {
              this.actbytesize = size;
            }

            public DataEntry(byte[] b, int i) {
              this.bytes = b;
              this.actbytesize = i;
            }

            public byte[] getByte() {
              return this.bytes;
            }

            public void setBytes(byte[] b) {
              this.bytes = b;
            }

            public void setStreamsize(long size) {
              this.streamsize = size;
            }

            public long getStreamsize() {
              return this.streamsize;
            }

            public int getMinfragment() {
              return minfragment;
            }

            public synchronized void setSn(int i) {
              this.sn = i;
            }

            public synchronized int getSn() {
              return sn;
            }

            public synchronized int getFragmentcounter() {
              return fragmentcounter;
            }

            public synchronized void setFragmentcounter(int c) {
              this.fragmentcounter = c;
            }

            public void setActByteSize(int size) {
              actbytesize = size;
            }

            public int getActByteSize() {
              return actbytesize;
            }

            public String toString() {
              return this.streamsize + "::" + this.fragmentcounter + "::" + this.sn +
                  "::" + this.actbytesize + " recv DataEntry";
            }
          }
          /**
           * 報頭,處理報頭格式
           * @author Administrator
           *
           */
          class PacketHeader implements Serializable{
            public static final int HEADER_BUFFER_SIZE = 1024;
            int fragmentcounter, sn;
            int actbytesize = PacketBody.BODY_BUFFER_SIZE;
            byte[] header; //= new byte[HEADER_BUFFER_SIZE];
            long streamsize;
            int minfragment;

            public PacketHeader() {

            }

            public PacketHeader(long l) {
              this.setStreamsize(l);

            }

            public PacketHeader(String s) {
              String[] tm = s.split("::");
              this.setActByteSize(Integer.parseInt(tm[3]));
              this.setSn(Integer.parseInt(tm[2]));
              this.setFragmentcounter(Integer.parseInt(tm[1]));
              this.setStreamsize(Long.parseLong(tm[0]));
            }

            /**
             * 根據文件的段的順序生成數據頭.
             * @param sn 文件序列
             */
            public void ArrageSort(int sn) {
              this.setSn(sn);
              this.setByte();
            }

            public void CalcHeaderInfo(long l) {
              this.setStreamsize(l);
              CalcHeaderInfo();
            }
            /**
             * 計算流要被分成的片段數量,并得出最小片段余量.
             */
            public void CalcHeaderInfo() {
              fragmentcounter = Math.round( (float) streamsize /
                                           PacketBody.BODY_BUFFER_SIZE);
              float critical = (float) streamsize / PacketBody.BODY_BUFFER_SIZE;
              if (critical - fragmentcounter < 0.5 && critical - fragmentcounter > 0)
                fragmentcounter++;
              minfragment = (int) (streamsize % PacketBody.BODY_BUFFER_SIZE);
            }

            public byte[] getHeader() {
              Long it = new Long(this.streamsize);
              return new byte[] {it.byte()};
            }

            public byte[] getByte() {
              return header; //this.toString().getBytes();
            }
            /**
             * 生成報頭字節,首先取得數據包頭 流尺寸::段片數::段順序::段實際尺寸 的字節形式,
             * 然后加入回車換行符號,對于1024字節中剩余的部分一律寫入元素為0的字節數組.
             */
            public void setByte() {
              ByteArrayOutputStream bos = new ByteArrayOutputStream();
              byte[] buffer = this.toByte();
              try {
                bos.write(buffer);
                bos.write("\r\n".getBytes());
                bos.write(new byte[PacketHeader.HEADER_BUFFER_SIZE - buffer.length], 0,
                          PacketHeader.HEADER_BUFFER_SIZE - buffer.length);
                header = bos.toByteArray();
              }
              catch (IOException ex) {
                ex.printStackTrace();
              }
            }

            public void setStreamsize(long size) {
              this.streamsize = size;
            }

            public long getStreamsize() {
              return this.streamsize;
            }

            public int getMinfragment() {
              return minfragment;
            }

            public synchronized void setSn(int i) {
              this.sn = i;
            }

            public int getSn() {
              return sn;
            }

            public int getFragmentcounter() {
              return fragmentcounter;
            }

            public synchronized void setFragmentcounter(int c) {
              this.fragmentcounter = c;
            }

            public void setActByteSize(int size) {
              actbytesize = size;
              setByte();
            }

            public int getActByteSize() {
              return actbytesize;
            }
            /**
             * 數據包頭的格式為:流尺寸::段片數::段順序::段實際尺寸
             * 報頭字節長度是可變化的,比如,可以加入流的具體信息如:流所屬文件的名稱,文件類型以及一些其他信息.
             * @return String
             */
            public String toString() {
              return streamsize + "::" + this.fragmentcounter + "::" + this.getSn() +
                  "::" + this.getActByteSize();
            }

            public byte[] toByte() {
              return this.toString().getBytes();
            }
            /**
             * 是否為尾段
             * @param i int
             * @return boolean
             */
            public boolean isWTailFragment(int i) {
              return (i == fragmentcounter - 1) ? true : false;
            }

          }
          /**
           * 用戶數據區
           * @author Administrator
           *
           */
          class PacketBody implements Serializable{
            public static final int BODY_BUFFER_SIZE = 63508; //65508
            byte[] body;

            public PacketBody() {
            }

            public void setBody(byte[] b) {
              this.body = b;
            }

            public byte[] getBody() {
              return body;
            }
          }

          這個數據處理類,將在接下來使用。

          posted on 2007-01-19 00:08 苦笑枯 閱讀(272) 評論(0)  編輯  收藏 所屬分類: Java
          收藏來自互聯網,僅供學習。若有侵權,請與我聯系!

          <2007年1月>
          31123456
          78910111213
          14151617181920
          21222324252627
          28293031123
          45678910

          常用鏈接

          留言簿(2)

          隨筆分類(56)

          隨筆檔案(56)

          搜索

          •  

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 班戈县| 南乐县| 贵定县| 修水县| 五大连池市| 高邮市| 昌宁县| 页游| 开远市| 深水埗区| 芜湖县| 焦作市| 安宁市| 荔浦县| 若尔盖县| 仁怀市| 普格县| 凤山县| 西林县| 荣成市| 乳山市| 阳城县| 阿拉尔市| 安康市| 利川市| 红桥区| 桃源县| 宜兰县| 瑞金市| 同心县| 五莲县| 阿瓦提县| 厦门市| 弥渡县| 中阳县| 安义县| 长沙市| 咸宁市| 毕节市| 贡嘎县| 措勤县|