typescript

Typescript 2.0之后,tsd和typings都可以去掉了。要获得lodash的类型定义文件只需要

npm install @types/lodash

这样一来,typescript的工作流就和普通的Node.js项目没什么区别了。
更重要的是Typescript 2.1之后,async/await可以直接编译到ES5。babel什么的,再见吧

https://github.com/typings/typings

你真的了解volatile关键字吗?

一、Java内存模型

想要理解volatile为什么能确保可见性,就要先理解Java中的内存模型是什么样的。

Java内存模型规定了所有的变量都存储在主内存中。每条线程中还有自己的工作内存,线程的工作内存中保存了被该线程所使用到的变量(这些变量是从主内存中拷贝而来)。线程对变量的所有操作(读取,赋值)都必须在工作内存中进行。不同线程之间也无法直接访问对方工作内存中的变量,线程间变量值的传递均需要通过主内存来完成。

基于此种内存模型,便产生了多线程编程中的数据“脏读”等问题。

举个简单的例子:在java中,执行下面这个语句:

1
i  = 10 ;

执行线程必须先在自己的工作线程中对变量i所在的缓存行进行赋值操作,然后再写入主存当中。而不是直接将数值10写入主存当中。

比如同时有2个线程执行这段代码,假如初始时i的值为10,那么我们希望两个线程执行完之后i的值变为12。但是事实会是这样吗?

可能存在下面一种情况:初始时,两个线程分别读取i的值存入各自所在的工作内存当中,然后线程1进行加1操作,然后把i的最新值11写入到内存。此时线程2的工作内存当中i的值还是10,进行加1操作之后,i的值为11,然后线程2把i的值写入内存。

最终结果i的值是11,而不是12。这就是著名的缓存一致性问题。通常称这种被多个线程访问的变量为共享变量。

那么如何确保共享变量在多线程访问时能够正确输出结果呢?

在解决这个问题之前,我们要先了解并发编程的三大概念:原子性,有序性,可见性。

二、原子性

1.定义

原子性:即一个操作或者多个操作 要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行。

2.实例

一个很经典的例子就是银行账户转账问题:

比如从账户A向账户B转1000元,那么必然包括2个操作:从账户A减去1000元,往账户B加上1000元。

试想一下,如果这2个操作不具备原子性,会造成什么样的后果。假如从账户A减去1000元之后,操作突然中止。这样就会导致账户A虽然减去了1000元,但是账户B没有收到这个转过来的1000元。

所以这2个操作必须要具备原子性才能保证不出现一些意外的问题。

同样地反映到并发编程中会出现什么结果呢?

举个最简单的例子,大家想一下假如为一个32位的变量赋值过程不具备原子性的话,会发生什么后果?

1
i = 9;

假若一个线程执行到这个语句时,我暂且假设为一个32位的变量赋值包括两个过程:为低16位赋值,为高16位赋值。

那么就可能发生一种情况:当将低16位数值写入之后,突然被中断,而此时又有一个线程去读取i的值,那么读取到的就是错误的数据。

3.Java中的原子性

在Java中,对基本数据类型的变量的读取和赋值操作是原子性操作,即这些操作是不可被中断的,要么执行,要么不执行。

上面一句话虽然看起来简单,但是理解起来并不是那么容易。看下面一个例子i:

请分析以下哪些操作是原子性操作:

1
2
3
4
x = 10;         //语句1
y = x;         //语句2
x ;           //语句3
x = x 1;     //语句4

咋一看,可能会说上面的4个语句中的操作都是原子性操作。其实只有语句1是原子性操作,其他三个语句都不是原子性操作。

语句1是直接将数值10赋值给x,也就是说线程执行这个语句的会直接将数值10写入到工作内存中。

语句2实际上包含2个操作,它先要去读取x的值,再将x的值写入工作内存,虽然读取x的值以及 将x的值写入工作内存 这2个操作都是原子性操作,但是合起来就不是原子性操作了。

同样的,x 和 x = x 1包括3个操作:读取x的值,进行加1操作,写入新的值。

所以上面4个语句只有语句1的操作具备原子性。

也就是说,只有简单的读取、赋值(而且必须是将数字赋值给某个变量,变量之间的相互赋值不是原子操作)才是原子操作。

从上面可以看出,Java内存模型只保证了基本读取和赋值是原子性操作,如果要实现更大范围操作的原子性,可以通过synchronized和Lock来实现。由于synchronized和Lock能够保证任一时刻只有一个线程执行该代码块,那么自然就不存在原子性问题了,从而保证了原子性。

关于synchronized和Lock的使用,参考:关于synchronized和ReentrantLock之多线程同步详解

三、可见性

1.定义

可见性是指当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值。

2.实例

举个简单的例子,看下面这段代码:

1
2
3
4
5
6
//线程1执行的代码
int i = 0;
i = 10;
//线程2执行的代码
j = i;

由上面的分析可知,当线程1执行 i =10这句时,会先把i的初始值加载到工作内存中,然后赋值为10,那么在线程1的工作内存当中i的值变为10了,却没有立即写入到主存当中。

此时线程2执行 j = i,它会先去主存读取i的值并加载到线程2的工作内存当中,注意此时内存当中i的值还是0,那么就会使得j的值为0,而不是10.

这就是可见性问题,线程1对变量i修改了之后,线程2没有立即看到线程1修改的值。

3.Java中的可见性

对于可见性,Java提供了volatile关键字来保证可见性。

当一个共享变量被volatile修饰时,它会保证修改的值会立即被更新到主存,当有其他线程需要读取时,它会去内存中读取新值。

而普通的共享变量不能保证可见性,因为普通共享变量被修改之后,什么时候被写入主存是不确定的,当其他线程去读取时,此时内存中可能还是原来的旧值,因此无法保证可见性。

另外,通过synchronized和Lock也能够保证可见性,synchronized和Lock能保证同一时刻只有一个线程获取锁然后执行同步代码,并且在释放锁之前会将对变量的修改刷新到主存当中。因此可以保证可见性。

四、有序性

1.定义

有序性:即程序执行的顺序按照代码的先后顺序执行。

2.实例

举个简单的例子,看下面这段代码:

1
2
3
4
5
6
int i = 0;             
boolean flag = false;
i = 1;                //语句1 
flag = true;          //语句2

上面代码定义了一个int型变量,定义了一个boolean类型变量,然后分别对两个变量进行赋值操作。从代码顺序上看,语句1是在语句2前面的,那么JVM在真正执行这段代码的时候会保证语句1一定会在语句2前面执行吗?不一定,为什么呢?这里可能会发生指令重排序(Instruction Reorder)。

下面解释一下什么是指令重排序,一般来说,处理器为了提高程序运行效率,可能会对输入代码进行优化,它不保证程序中各个语句的执行先后顺序同代码中的顺序一致,但是它会保证程序最终执行结果和代码顺序执行的结果是一致的。

比如上面的代码中,语句1和语句2谁先执行对最终的程序结果并没有影响,那么就有可能在执行过程中,语句2先执行而语句1后执行。

但是要注意,虽然处理器会对指令进行重排序,但是它会保证程序最终结果会和代码顺序执行结果相同,那么它靠什么保证的呢?再看下面一个例子:

1
2
3
4
int a = 10;    //语句1
int r = 2;    //语句2
a = a 3;    //语句3
r = a*a;     //语句4

这段代码有4个语句,那么可能的一个执行顺序是:

那么可不可能是这个执行顺序呢: 语句2 语句1 语句4 语句3

不可能,因为处理器在进行重排序时是会考虑指令之间的数据依赖性,如果一个指令Instruction 2必须用到Instruction 1的结果,那么处理器会保证Instruction 1会在Instruction 2之前执行。

虽然重排序不会影响单个线程内程序执行的结果,但是多线程呢?下面看一个例子:

1
2
3
4
5
6
7
8
9
10
//线程1:
context = loadContext();   //语句1
inited = true;             //语句2
 //线程2:
while(!inited ){
   sleep()
}
doSomethingwithconfig(context);

上面代码中,由于语句1和语句2没有数据依赖性,因此可能会被重排序。假如发生了重排序,在线程1执行过程中先执行语句2,而此是线程2会以为初始化工作已经完成,那么就会跳出while循环,去执行doSomethingwithconfig(context)方法,而此时context并没有被初始化,就会导致程序出错。

从上面可以看出,指令重排序不会影响单个线程的执行,但是会影响到线程并发执行的正确性。

也就是说,要想并发程序正确地执行,必须要保证原子性、可见性以及有序性。只要有一个没有被保证,就有可能会导致程序运行不正确。

3.Java中的有序性

在Java内存模型中,允许编译器和处理器对指令进行重排序,但是重排序过程不会影响到单线程程序的执行,却会影响到多线程并发执行的正确性。

在Java里面,可以通过volatile关键字来保证一定的“有序性”。另外可以通过synchronized和Lock来保证有序性,很显然,synchronized和Lock保证每个时刻是有一个线程执行同步代码,相当于是让线程顺序执行同步代码,自然就保证了有序性。

另外,Java内存模型具备一些先天的“有序性”,即不需要通过任何手段就能够得到保证的有序性,这个通常也称为 happens-before 原则。如果两个操作的执行次序无法从happens-before原则推导出来,那么它们就不能保证它们的有序性,虚拟机可以随意地对它们进行重排序。

下面就来具体介绍下happens-before原则(先行发生原则):

①程序次序规则:一个线程内,按照代码顺序,书写在前面的操作先行发生于书写在后面的操作

②锁定规则:一个unLock操作先行发生于后面对同一个锁的lock操作

③volatile变量规则:对一个变量的写操作先行发生于后面对这个变量的读操作

④传递规则:如果操作A先行发生于操作B,而操作B又先行发生于操作C,则可以得出操作A先行发生于操作C

⑤线程启动规则:Thread对象的start()方法先行发生于此线程的每个一个动作

⑥线程中断规则:对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生

⑦线程终结规则:线程中所有的操作都先行发生于线程的终止检测,我们可以通过Thread.join()方法结束、Thread.isAlive()的返回值手段检测到线程已经终止执行

⑧对象终结规则:一个对象的初始化完成先行发生于他的finalize()方法的开始

这8条规则中,前4条规则是比较重要的,后4条规则都是显而易见的。

下面我们来解释一下前4条规则:

对于程序次序规则来说,就是一段程序代码的执行在单个线程中看起来是有序的。注意,虽然这条规则中提到“书写在前面的操作先行发生于书写在后面的操作”,这个应该是程序看起来执行的顺序是按照代码顺序执行的,但是虚拟机可能会对程序代码进行指令重排序。虽然进行重排序,但是最终执行的结果是与程序顺序执行的结果一致的,它只会对不存在数据依赖性的指令进行重排序。因此,在单个线程中,程序执行看起来是有序执行的,这一点要注意理解。事实上,这个规则是用来保证程序在单线程中执行结果的正确性,但无法保证程序在多线程中执行的正确性。

第二条规则也比较容易理解,也就是说无论在单线程中还是多线程中,同一个锁如果处于被锁定的状态,那么必须先对锁进行了释放操作,后面才能继续进行lock操作。

第三条规则是一条比较重要的规则。直观地解释就是,如果一个线程先去写一个变量,然后一个线程去进行读取,那么写入操作肯定会先行发生于读操作。

第四条规则实际上就是体现happens-before原则具备传递性。

五、深入理解volatile关键字

1.volatile保证可见性

一旦一个共享变量(类的成员变量、类的静态成员变量)被volatile修饰之后,那么就具备了两层语义:

1)保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。

2)禁止进行指令重排序。

先看一段代码,假如线程1先执行,线程2后执行:

1
2
3
4
5
6
7
8
//线程1
boolean stop = false;
while(!stop){
    doSomething();
}
//线程2
stop = true;

这段代码是很典型的一段代码,很多人在中断线程时可能都会采用这种标记办法。但是事实上,这段代码会完全运行正确么?即一定会将线程中断么?不一定,也许在大多数时候,这个代码能够把线程中断,但是也有可能会导致无法中断线程(虽然这个可能性很小,但是只要一旦发生这种情况就会造成死循环了)。

下面解释一下这段代码为何有可能导致无法中断线程。在前面已经解释过,每个线程在运行过程中都有自己的工作内存,那么线程1在运行的时候,会将stop变量的值拷贝一份放在自己的工作内存当中。

那么当线程2更改了stop变量的值之后,但是还没来得及写入主存当中,线程2转去做其他事情了,那么线程1由于不知道线程2对stop变量的更改,因此还会一直循环下去。

但是用volatile修饰之后就变得不一样了:

第一:使用volatile关键字会强制将修改的值立即写入主存;

第二:使用volatile关键字的话,当线程2进行修改时,会导致线程1的工作内存中缓存变量stop的缓存行无效(反映到硬件层的话,就是CPU的L1或者L2缓存中对应的缓存行无效);

第三:由于线程1的工作内存中缓存变量stop的缓存行无效,所以线程1再次读取变量stop的值时会去主存读取。

那么在线程2修改stop值时(当然这里包括2个操作,修改线程2工作内存中的值,然后将修改后的值写入内存),会使得线程1的工作内存中缓存变量stop的缓存行无效,然后线程1读取时,发现自己的缓存行无效,它会等待缓存行对应的主存地址被更新之后,然后去对应的主存读取最新的值。

那么线程1读取到的就是最新的正确的值。

2.volatile不能确保原子性

下面看一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Test {
    public volatile int inc = 0;
    public void increase() {
        inc ;
    }
    public static void main(String[] args) {
        final Test test = new Test();
        for(int i=0;i<10;i ){
            new Thread(){
                public void run() {
                    for(int j=0;j<1000;j )
                        test.increase();
                };
            }.start();
        }
        while(Thread.activeCount()>1//保证前面的线程都执行完
            Thread.yield();
        System.out.println(test.inc);
    }
}

大家想一下这段程序的输出结果是多少?也许有些朋友认为是10000。但是事实上运行它会发现每次运行结果都不一致,都是一个小于10000的数字。

可能有的朋友就会有疑问,不对啊,上面是对变量inc进行自增操作,由于volatile保证了可见性,那么在每个线程中对inc自增完之后,在其他线程中都能看到修改后的值啊,所以有10个线程分别进行了1000次操作,那么最终inc的值应该是1000*10=10000。

这里面就有一个误区了,volatile关键字能保证可见性没有错,但是上面的程序错在没能保证原子性。可见性只能保证每次读取的是最新的值,但是volatile没办法保证对变量的操作的原子性。

在前面已经提到过,自增操作是不具备原子性的,它包括读取变量的原始值、进行加1操作、写入工作内存。那么就是说自增操作的三个子操作可能会分割开执行,就有可能导致下面这种情况出现:

假如某个时刻变量inc的值为10,

线程1对变量进行自增操作,线程1先读取了变量inc的原始值,然后线程1被阻塞了;

然后线程2对变量进行自增操作,线程2也去读取变量inc的原始值,由于线程1只是对变量inc进行读取操作,而没有对变量进行修改操作,所以不会导致线程2的工作内存中缓存变量inc的缓存行无效,也不会导致主存中的值刷新,所以线程2会直接去主存读取inc的值,发现inc的值时10,然后进行加1操作,并把11写入工作内存,最后写入主存。

然后线程1接着进行加1操作,由于已经读取了inc的值,注意此时在线程1的工作内存中inc的值仍然为10,所以线程1对inc进行加1操作后inc的值为11,然后将11写入工作内存,最后写入主存。

那么两个线程分别进行了一次自增操作后,inc只增加了1。

根源就在这里,自增操作不是原子性操作,而且volatile也无法保证对变量的任何操作都是原子性的。

解决方案:可以通过synchronized或lock,进行加锁,来保证操作的原子性。也可以通过AtomicInteger。

在java 1.5的java.util.concurrent.atomic包下提供了一些原子操作类,即对基本数据类型的 自增(加1操作),自减(减1操作)、以及加法操作(加一个数),减法操作(减一个数)进行了封装,保证这些操作是原子性操作。atomic是利用CAS来实现原子性操作的(Compare And Swap),CAS实际上是利用处理器提供的CMPXCHG指令实现的,而处理器执行CMPXCHG指令是一个原子性操作。

3.volatile保证有序性

在前面提到volatile关键字能禁止指令重排序,所以volatile能在一定程度上保证有序性。

volatile关键字禁止指令重排序有两层意思:

1)当程序执行到volatile变量的读操作或者写操作时,在其前面的操作的更改肯定全部已经进行,且结果已经对后面的操作可见;在其后面的操作肯定还没有进行;

2)在进行指令优化时,不能将在对volatile变量的读操作或者写操作的语句放在其后面执行,也不能把volatile变量后面的语句放到其前面执行。

可能上面说的比较绕,举个简单的例子:

1
2
3
4
5
6
7
8
//x、y为非volatile变量
//flag为volatile变量
x = 2;        //语句1
y = 0;        //语句2
flag = true//语句3
x = 4;         //语句4
y = -1;       //语句5

由于flag变量为volatile变量,那么在进行指令重排序的过程的时候,不会将语句3放到语句1、语句2前面,也不会讲语句3放到语句4、语句5后面。但是要注意语句1和语句2的顺序、语句4和语句5的顺序是不作任何保证的。

并且volatile关键字能保证,执行到语句3时,语句1和语句2必定是执行完毕了的,且语句1和语句2的执行结果对语句3、语句4、语句5是可见的。

那么我们回到前面举的一个例子:

1
2
3
4
5
6
7
8
9
//线程1:
context = loadContext();   //语句1
inited = true;             //语句2
//线程2:
while(!inited ){
  sleep()
}
doSomethingwithconfig(context);

前面举这个例子的时候,提到有可能语句2会在语句1之前执行,那么久可能导致context还没被初始化,而线程2中就使用未初始化的context去进行操作,导致程序出错。

这里如果用volatile关键字对inited变量进行修饰,就不会出现这种问题了,因为当执行到语句2时,必定能保证context已经初始化完毕。

六、volatile的实现原理

1.可见性

处理器为了提高处理速度,不直接和内存进行通讯,而是将系统内存的数据独到内部缓存后再进行操作,但操作完后不知什么时候会写到内存。

如果对声明了volatile变量进行写操作时,JVM会向处理器发送一条Lock前缀的指令,将这个变量所在缓存行的数据写会到系统内存。 这一步确保了如果有其他线程对声明了volatile变量进行修改,则立即更新主内存中数据。

但这时候其他处理器的缓存还是旧的,所以在多处理器环境下,为了保证各个处理器缓存一致,每个处理会通过嗅探在总线上传播的数据来检查 自己的缓存是否过期,当处理器发现自己缓存行对应的内存地址被修改了,就会将当前处理器的缓存行设置成无效状态,当处理器要对这个数据进行修改操作时,会强制重新从系统内存把数据读到处理器缓存里。 这一步确保了其他线程获得的声明了volatile变量都是从主内存中获取最新的。

2.有序性

Lock前缀指令实际上相当于一个内存屏障(也成内存栅栏),它确保指令重排序时不会把其后面的指令排到内存屏障之前的位置,也不会把前面的指令排到内存屏障的后面;即在执行到内存屏障这句指令时,在它前面的操作已经全部完成。

七、volatile的应用场景

synchronized关键字是防止多个线程同时执行一段代码,那么就会很影响程序执行效率,而volatile关键字在某些情况下性能要优于synchronized,但是要注意volatile关键字是无法替代synchronized关键字的,因为volatile关键字无法保证操作的原子性。通常来说,使用volatile必须具备以下2个条件:

1)对变量的写操作不依赖于当前值

2)该变量没有包含在具有其他变量的不变式中

下面列举几个Java中使用volatile的几个场景。

①.状态标记量

1
2
3
4
5
6
7
8
9
volatile boolean flag = false;
 //线程1
while(!flag){
    doSomething();
}
  //线程2
public void setFlag() {
    flag = true;
}

根据状态标记,终止线程。

②.单例模式中的double check

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Singleton{
    private volatile static Singleton instance = null;
    private Singleton() {
    }
    public static Singleton getInstance() {
        if(instance==null) {
            synchronized (Singleton.class) {
                if(instance==null)
                    instance = new Singleton();
            }
        }
        return instance;
    }
}
为什么要使用volatile 修饰instance?

主要在于instance = new Singleton()这句,这并非是一个原子操作,事实上在 JVM 中这句话大概做了下面 3 件事情:

1.给 instance 分配内存

2.调用 Singleton 的构造函数来初始化成员变量

3.将instance对象指向分配的内存空间(执行完这步 instance 就为非 null 了)。

但是在 JVM 的即时编译器中存在指令重排序的优化。也就是说上面的第二步和第三步的顺序是不能保证的,最终的执行顺序可能是 1-2-3 也可能是 1-3-2。如果是后者,则在 3 执行完毕、2 未执行之前,被线程二抢占了,这时 instance 已经是非 null 了(但却没有初始化),所以线程二会直接返回 instance,然后使用,然后顺理成章地报错。

参考文章

Java并发编程:volatile关键字解析
【死磕Java并发】—–深入分析volatile的实现原理
Java并发机制的底层实现原理
Volatile的实现原理

关于udp的一些问题总结

 

socket 的端口号

客户端的 socket 很少调用 bind() 来指明 socket 的端口号。 相反通常是让操作系统自动分配一个端口号。

TCP客户端 socket 的端口号是在调用了 connect() 之后,系统 会自动分配端口号。

UDP客户端 socket 的端口号是在第一次调用 sendto() 之后, 系统会自动分配端口号。

如果 UDP 的端口是自动分配的话,那么系统不会再改变这个端 口号。如果 UDP 的IP 地址也是自动分配的话,那么每一次调用 sendto() 系统都可能会根据目的 IP 地址而改变源IP地址。

weak end system VS strong end system

网络接口会接收所有和本地 IP 地址一致的数据包,叫做 weak end system .

网络接口只会接收所有和本接口的 IP 地址一致的数据包,叫做 strong end system 。

不同点, 在一个 multihome 的主机有多个网络接口,那么 strong end system 的过滤检查会更强大一些。

UDP 发送数据的地址和接收数据的地址不一致的问题。

UDP 的客户端发送给服务器,如果服务器的用于接收的网络接口 有多个 IP 地址,那么服务器送响应的时候就会自动选择一个 primary IP 地址回送给客户端。primary IP 地址有可能和客户 端发送的 IP 地址不一样。

如果客户端根据接收的响应的 IP 地址来判断是否是服务器发送 的响应,那么就有可能出错。

解决办法,一个是改造客户端,不再用 IP 地址判断是否是一个 服务器发送的数据包,而是根据 DNS 得到的域名来判断。缺点 是系统一定要有域名服务器,而且查询域名会影响效率。 一个办法是服务器不是用 wildcard 来绑定 socket ,而是为每 一个 IP 地址绑定一个 socket 。 缺点是系统如果动态改变了 IP 地址就需要重新起动服务器,而且增加服务器必须 select() 检查所有的 socket 。

UDP ICMP

sendto() 成功返回后,意思是说主机的网络接口有足够的缓冲 队列空间,用以容纳下要发送的数据。

如果接收端回送 ICMP 消息,那么是 socket 不能够知道的,除 非 UDP socket 也调用了 connect() 变成了 connected UDP socket.

linux 的实现中,则会 unconnected socket 也会返回 ICMP 错 误,只要 SO_BSDCOMPAT socket option 没有置位。

ICMP 消息会在下一个 read() 中返回, 错误值是 ECONNREFUSED。

有些 System V 系统有 Bug ,不会为 connected socket 返回 ICMP 错误。

connected UDP socket VS unconnected UDP socket

调用了 connect() 函数的 UDP socket 就从 unconnected UDP socket 变成了 connected UDP socket 。

connect() 并不是真正建立连接,只是把 socket 和一个 peer name 联系在一起。

函数 connect() 函数的操作完全是本地操作, 不涉及网络。

系统会根据 connect() 函数指定的目的地址寻找网络接口,选 择该网络接口的 primary IP 地址作为本地地址。

不同点:

  1. connected socket 调用 send 而不是 sendto 。
  2. connected socket 调用 recv 而不是 recvfrom 。 只有目的地址 和 connect() 函数指定的地址一致才会被接收到。
  • 问题:如果系统也是根据 IP 地址来判断一个连接的话,那么也有可能出现 上面说的发送命令的目的地址和接收响应的源地址是不一样的问题。
  • 答案:只有改造服务器了。
  1. ICMP 错误会收到。
  2. 发送数据的时候,由于不用指定目的 IP 地址,在用户程序 和内核之间传递的数据少,所以效率稍微高一些。

connected UDP socket 是否可以再调用 connect()

和 TCP 不一样,connected socket 可以多次调用 connect()。

connect() 的目的地址中的 address family 如果是 AF_UNSPEC 那么 connected socket 会变成 unconnected socket 。

可能会返回 EAFNOSUPPORT ,但是没有关系。

 

 

 

 

WireShark如何抓取本地localhost的包

今天将自己的电脑既作为客户端又作为服务端进行一个程序的测试,想着用WireShark来抓包分析一下问题,但由于WireShark只能抓取经过电脑网卡的包,由于我是使用localhost或者127.0.0.1进行测试的,流量是不经过电脑网卡的,所以WireShark无法抓包,一番查找之下找到了解决方法。

1 . 以管理员身份打开命令提示符

2 . 输入 route add 本机ip mask 255.255.255.255 网关ip
如果不知道本机ip和网关ip,可以在命令行输入ipconfig查看
例如我的 : route add 192.168.0.106 mask 255.255.255.255 192.168.0.1

这句话的作用是将发给电脑的包转发给路由器,路由器再发给自己的电脑。。避免本地回环

3 . 将我们程序里面的localhost或者127.0.0.1替换成本机ip

4 . 使用WireShark即可抓到本地包

注:在测试完之后,使用route delete 本机ip mask 255.255.255.255 网关ip来删除我们上面的更改,不然我们本机的所有报文都会先经过网卡再回到本机,会比较消耗性能。

 

三、使用 RawCap

需要管理员权限运行 RawCap 。

进入终端(cmd),然后运行:

RawCap.exe 127.0.0.1 dumpfile.pcapRawCap.exe 本地IP dumpfile.pcap

抓好包后,按 Ctrl C,停止抓包。此时会在 RawCap 的同级目录下生成一个dumpfile.pcap文件。用 Wireshark 打开,就可以看到本地环回的数据包了。

四、使用 Npcap

Npcap 是对当前最流行的 WinPcap 工具包进行改进的一个项目。

安装前请先卸载 WinPcap(可以在Wireshark 的Help一栏查看是否在使用 Npcap) 。

安装时要勾选

Use DLT_NULL protocol sa Loopback ...

install npcap in winpcap api-compat mode(选这个,是要兼容 WinPcap)

npcap-0.78-r2

npcap-0.78-r2

安装完成启动 Wireshark, 可以看到在网络接口列表中,多了一项 Npcap Loopback adapter,这个就是来抓本地环回包的网络接口。

wireshark-npcap 捕获界面

wireshark-npcap 捕获界面

异步编程中的最佳做法

近日来,涌现了许多关于 Microsoft .NET Framework 4.5 中新增了对 async 和 await 支持的信息。 本文旨在作为学习异步编程的“第二步”;我假设您已阅读过有关这一方面的至少一篇介绍性文章。 本文不提供任何新内容,Stack Overflow、MSDN 论坛和 async/await FAQ 这类在线资源提供了同样的建议。 本文只重点介绍一些淹没在文档海洋中的最佳做法。

本文中的最佳做法更大程度上是“指导原则”,而不是实际规则。 其中每个指导原则都有一些例外情况。 我将解释每个指导原则背后的原因,以便可以清楚地了解何时适用以及何时不适用。 图 1 中总结了这些指导原则;我将在以下各节中逐一讨论。

“名称”

说明

异常

避免 Async Void

最好使用 async Task 方法而不是 async void 方法

事件处理程序

始终使用 Async

不要混合阻塞式代码和异步代码

控制台 main 方法

配置上下文

尽可能使用 ConfigureAwait(false)

需要上下文的方法

Async 方法有三种可能的返回类型: Task、Task<T> 和 void,但是 async 方法的固有返回类型只有 Task 和 Task<T>。 当从同步转换为异步代码时,任何返回类型 T 的方法都会成为返回 Task<T> 的 async 方法,任何返回 void 的方法都会成为返回 Task 的 async 方法。 下面的代码段演示了一个返回 void 的同步方法及其等效的异步方法:

  1.  
  2.           void MyMethod()
  3. {
  4.   // Do synchronous work.
  5.           Thread.Sleep(1000);
  6. }
  7. async Task MyMethodAsync()
  8. {
  9.   // Do asynchronous work.
  10.           await Task.Delay(1000);
  11. }
  12.         

返回 void 的 async 方法具有特定用途: 用于支持异步事件处理程序。 事件处理程序可以返回某些实际类型,但无法以相关语言正常工作;调用返回类型的事件处理程序非常困难,事件处理程序实际返回某些内容这一概念也没有太大意义。 事件处理程序本质上返回 void,因此 async 方法返回 void,以便可以使用异步事件处理程序。 但是,async void 方法的一些语义与 async Task 或 async Task<T> 方法的语义略有不同。

Async void 方法具有不同的错误处理语义。 当 async Task 或 async Task<T> 方法引发异常时,会捕获该异常并将其置于 Task 对象上。 对于 async void 方法,没有 Task 对象,因此 async void 方法引发的任何异常都会直接在 SynchronizationContext(在 async void 方法启动时处于活动状态)上引发。 图 2 演示本质上无法捕获从 async void 方法引发的异常。

图 2 无法使用 Catch 捕获来自 Async Void 方法的异常

  1.  
  2.           private async void ThrowExceptionAsync()
  3. {
  4.   throw new InvalidOperationException();
  5. }
  6. public void AsyncVoidExceptions_CannotBeCaughtByCatch()
  7. {
  8.   try
  9.   {
  10.     ThrowExceptionAsync();
  11.   }
  12.   catch (Exception)
  13.   {
  14.     // The exception is never caught here!
  15.           throw;
  16.   }
  17. }
  18.         

可以通过对 GUI/ASP.NET 应用程序使用 AppDomain.UnhandledException 或类似的全部捕获事件观察到这些异常,但是使用这些事件进行常规异常处理会导致无法维护。

Async void 方法具有不同的组合语义。 返回 Task 或 Task<T> 的 async 方法可以使用 await、Task.WhenAny、Task.WhenAll 等方便地组合而成。 返回 void 的 async 方法未提供一种简单方式,用于向调用代码通知它们已完成。 启动几个 async void 方法不难,但是确定它们何时结束却不易。 Async void 方法会在启动和结束时通知 SynchronizationContext,但是对于常规应用程序代码而言,自定义 SynchronizationContext 是一种复杂的解决方案。

Async void 方法难以测试。 由于错误处理和组合方面的差异,因此调用 async void 方法的单元测试不易编写。 MSTest 异步测试支持仅适用于返回 Task 或 Task<T> 的 async 方法。 可以安装 SynchronizationContext 来检测所有 async void 方法都已完成的时间并收集所有异常,不过只需使 async void 方法改为返回 Task,这会简单得多。

显然,async void 方法与 async Task 方法相比具有几个缺点,但是这些方法在一种特定情况下十分有用: 异步事件处理程序。 语义方面的差异对于异步事件处理程序十分有意义。 它们会直接在 SynchronizationContext 上引发异常,这类似于同步事件处理程序的行为方式。 同步事件处理程序通常是私有的,因此无法组合或直接测试。 我喜欢采用的一个方法是尽量减少异步事件处理程序中的代码(例如,让它等待包含实际逻辑的 async Task 方法)。 下面的代码演示了这一方法,该方法通过将 async void 方法用于事件处理程序而不牺牲可测试性:

  1.  
  2.           private async void button1_Click(object sender, EventArgs e)
  3. {
  4.   await Button1ClickAsync();
  5. }
  6. public async Task Button1ClickAsync()
  7. {
  8.   // Do asynchronous work.
  9.           await Task.Delay(1000);
  10. }
  11.         

如果调用方不希望 async void 方法是异步的,则这些方法可能会造成严重影响。 当返回类型是 Task 时,调用方知道它在处理将来的操作;当返回类型是 void 时,调用方可能假设方法在返回时完成。 此问题可能会以许多意外方式出现。 在接口(或基类)上提供返回 void 的方法的 async 实现(或重写)通常是错误的。某些事件也假设其处理程序在返回时完成。 一个不易察觉的陷阱是将 async lambda 传递到采用 Action 参数的方法;在这种情况下,async lambda 返回 void 并继承 async void 方法的所有问题。 一般而言,仅当 async lambda 转换为返回 Task 的委托类型(例如,Func<Task>)时,才应使用 async lambda。

总结这第一个指导原则便是,应首选 async Task 而不是 async void。 Async Task 方法更便于实现错误处理、可组合性和可测试性。 此指导原则的例外情况是异步事件处理程序,这类处理程序必须返回 void。 此例外情况包括逻辑上是事件处理程序的方法,即使它们字面上不是事件处理程序(例如 ICommand.Execute implementations)。

异步代码让我想起了一个故事,有个人提出世界是悬浮在太空中的,但是一个老妇人立即提出质疑,她声称世界位于一个巨大乌龟的背上。 当这个人问乌龟站在哪里时,老夫人回答:“很聪明,年轻人,下面是一连串的乌龟!”在将同步代码转换为异步代码时,您会发现,如果异步代码调用其他异步代码并且被其他异步代码所调用,则效果最好 — 一路向下(或者也可以说“向上”)。 其他人已注意到异步编程的传播行为,并将其称为“传染”或将其与僵尸病毒进行比较。 无论是乌龟还是僵尸,无可置疑的是,异步代码趋向于推动周围的代码也成为异步代码。 此行为是所有类型的异步编程中所固有的,而不仅仅是新 async/await 关键字。

“始终异步”表示,在未慎重考虑后果的情况下,不应混合使用同步和异步代码。 具体而言,通过调用 Task.Wait 或 Task.Result 在异步代码上进行阻塞通常很糟糕。 对于在异步编程方面“浅尝辄止”的程序员,这是个特别常见的问题,他们仅仅转换一小部分应用程序,并采用同步 API 包装它,以便代码更改与应用程序的其余部分隔离。 不幸的是,他们会遇到与死锁有关的问题。 在 MSDN 论坛、Stack Overflow 和电子邮件中回答了许多与异步相关的问题之后,我可以说,迄今为止,这是异步初学者在了解基础知识之后最常提问的问题: “为何我的部分异步代码死锁?”

图 3 演示一个简单示例,其中一个方法发生阻塞,等待 async 方法的结果。 此代码仅在控制台应用程序中工作良好,但是在从 GUI 或 ASP.NET 上下文调用时会死锁。 此行为可能会令人困惑,尤其是通过调试程序单步执行时,这意味着没完没了的等待。 在调用 Task.Wait 时,导致死锁的实际原因在调用堆栈中上移。

图 3 在异步代码上阻塞时的常见死锁问题

  1.  
  2.           public static class DeadlockDemo
  3. {
  4.   private static async Task DelayAsync()
  5.   {
  6.     await Task.Delay(1000);
  7.   }
  8.   // This method causes a deadlock when called in a GUI or ASP.NET context.
  9.           public static void Test()
  10.   {
  11.     // Start the delay.
  12.           var delayTask = DelayAsync();
  13.     // Wait for the delay to complete.
  14.           delayTask.Wait();
  15.   }
  16. }
  17.         

这种死锁的根本原因是 await 处理上下文的方式。 默认情况下,当等待未完成的 Task 时,会捕获当前“上下文”,在 Task 完成时使用该上下文恢复方法的执行。 此“上下文”是当前 SynchronizationContext(除非它是 null,这种情况下则为当前 TaskScheduler)。 GUI 和 ASP.NET 应用程序具有 SynchronizationContext,它每次仅允许一个代码区块运行。 当 await 完成时,它会尝试在捕获的上下文中执行 async 方法的剩余部分。但是该上下文已含有一个线程,该线程在(同步)等待 async 方法完成。 它们相互等待对方,从而导致死锁。

请注意,控制台应用程序不会形成这种死锁。 它们具有线程池 SynchronizationContext 而不是每次执行一个区块的 SynchronizationContext,因此当 await 完成时,它会在线程池线程上安排 async 方法的剩余部分。该方法能够完成,并完成其返回任务,因此不存在死锁。 当程序员编写测试控制台程序,观察到部分异步代码按预期方式工作,然后将相同代码移动到 GUI 或 ASP.NET 应用程序中会发生死锁,此行为差异可能会令人困惑。

此问题的最佳解决方案是允许异步代码通过基本代码自然扩展。 如果采用此解决方案,则会看到异步代码扩展到其入口点(通常是事件处理程序或控制器操作)。 控制台应用程序不能完全采用此解决方案,因为 Main 方法不能是 async。 如果 Main 方法是 async,则可能会在完成之前返回,从而导致程序结束。 图 4演示了指导原则的这一例外情况: 控制台应用程序的 Main 方法是代码可以在异步方法上阻塞为数不多的几种情况之一。

图 4 Main 方法可以调用 Task.Wait 或 Task.Result

  1.  
  2.           class Program
  3. {
  4.   static void Main()
  5.   {
  6.     MainAsync().Wait();
  7.   }
  8.   static async Task MainAsync()
  9.   {
  10.     try
  11.     {
  12.       // Asynchronous implementation.
  13.           await Task.Delay(1000);
  14.     }
  15.     catch (Exception ex)
  16.     {
  17.       // Handle exceptions.
  18.           }
  19.   }
  20. }
  21.         

允许异步代码通过基本代码扩展是最佳解决方案,但是这意味着需进行许多初始工作,该应用程序才能体现出异步代码的实际好处。 可通过几种方法逐渐将大量基本代码转换为异步代码,但是这超出了本文的范围。在某些情况下,使用 Task.Wait 或 Task.Result 可能有助于进行部分转换,但是需要了解死锁问题以及错误处理问题。 我现在说明错误处理问题,并在本文后面演示如何避免死锁问题。

每个 Task 都会存储一个异常列表。 等待 Task 时,会重新引发第一个异常,因此可以捕获特定异常类型(如 InvalidOperationException)。 但是,在 Task 上使用 Task.Wait 或 Task.Result 同步阻塞时,所有异常都会用 AggregateException 包装后引发。 请再次参阅图 4。 MainAsync 中的 try/catch 会捕获特定异常类型,但是如果将 try/catch 置于 Main 中,则它会始终捕获 AggregateException。 当没有 AggregateException 时,错误处理要容易处理得多,因此我将“全局”try/catch 置于 MainAsync 中。

至此,我演示了两个与异步代码上阻塞有关的问题: 可能的死锁和更复杂的错误处理。 对于在 async 方法中使用阻塞代码,也有一个问题。 请考虑此简单示例:

  1.  
  2.           public static class NotFullyAsynchronousDemo
  3. {
  4.   // This method synchronously blocks a thread.
  5.           public static async Task TestNotFullyAsync()
  6.   {
  7.     await Task.Yield();
  8.     Thread.Sleep(5000);
  9.   }
  10. }
  11.         

此方法不是完全异步的。 它会立即放弃,返回未完成的任务,但是当它恢复执行时,会同步阻塞线程正在运行的任何内容。 如果此方法是从 GUI 上下文调用,则它会阻塞 GUI 线程;如果是从 ASP.NET 请求上下文调用,则会阻塞当前 ASP.NET 请求线程。 如果异步代码不同步阻塞,则其工作效果最佳。 图 5 是将同步操作替换为异步替换的速查表。

图 5 执行操作的“异步方式”

执行以下操作…

替换以下方式…

使用以下方式

检索后台任务的结果

Task.Wait 或 Task.Result

await

等待任何任务完成

Task.WaitAny

await Task.WhenAny

检索多个任务的结果

Task.WaitAll

await Task.WhenAll

等待一段时间

Thread.Sleep

await Task.Delay

总结这第二个指导原则便是,应避免混合使用异步代码和阻塞代码。 混合异步代码和阻塞代码可能会导致死锁、更复杂的错误处理及上下文线程的意外阻塞。 此指导原则的例外情况是控制台应用程序的 Main 方法,或是(如果是高级用户)管理部分异步的基本代码。

在本文前面,我简要说明了当等待未完成 Task 时默认情况下如何捕获“上下文”,以及此捕获的上下文用于恢复 async 方法的执行。 图 3 中的示例演示在上下文上的恢复执行如何与同步阻塞发生冲突从而导致死锁。此上下文行为还可能会导致另一个问题 — 性能问题。 随着异步 GUI 应用程序在不断增长,可能会发现 async 方法的许多小部件都在使用 GUI 线程作为其上下文。 这可能会形成迟滞,因为会由于“成千上万的剪纸”而降低响应性。

若要缓解此问题,请尽可能等待 ConfigureAwait 的结果。 下面的代码段说明了默认上下文行为和 ConfigureAwait 的用法:

  1.  
  2.           async Task MyMethodAsync()
  3. {
  4.   // Code here runs in the original context.
  5.           await Task.Delay(1000);
  6.   // Code here runs in the original context.
  7.           await Task.Delay(1000).ConfigureAwait(
  8.     continueOnCapturedContext: false);
  9.   // Code here runs without the original
  10.   // context (in this case, on the thread pool).
  11.           }
  12.         

通过使用 ConfigureAwait,可以实现少量并行性: 某些异步代码可以与 GUI 线程并行运行,而不是不断塞入零碎的工作。

除了性能之外,ConfigureAwait 还具有另一个重要方面: 它可以避免死锁。 再次考虑图 3;如果向 DelayAsync 中的代码行添加“ConfigureAwait(false)”,则可避免死锁。 此时,当等待完成时,它会尝试在线程池上下文中执行 async 方法的剩余部分。 该方法能够完成,并完成其返回任务,因此不存在死锁。 如果需要逐渐将应用程序从同步转换为异步,则此方法会特别有用。

如果可以在方法中的某处使用 ConfigureAwait,则建议对该方法中此后的每个 await 都使用它。 前面曾提到,如果等待未完成的 Task,则会捕获上下文;如果 Task 已完成,则不会捕获上下文。 在不同硬件和网络情况下,某些任务的完成速度可能比预期速度更快,需要谨慎处理在等待之前完成的返回任务。 图 6 显示了一个修改后的示例。

图 6 处理在等待之前完成的返回任务

  1.  
  2.           async Task MyMethodAsync()
  3. {
  4.   // Code here runs in the original context.
  5.           await Task.FromResult(1);
  6.   // Code here runs in the original context.
  7.           await Task.FromResult(1).ConfigureAwait(continueOnCapturedContext: false);
  8.   // Code here runs in the original context.
  9.           var random = new Random();
  10.   int delay = random.Next(2); // Delay is either 0 or 1
  11.   await Task.Delay(delay).ConfigureAwait(continueOnCapturedContext: false);
  12.   // Code here might or might not run in the original context.
  13.           // The same is true when you await any Task
  14.   // that might complete very quickly.
  15.           }
  16.         

如果方法中在 await 之后具有需要上下文的代码,则不应使用 ConfigureAwait。 对于 GUI 应用程序,包括任何操作 GUI 元素、编写数据绑定属性或取决于特定于 GUI 的类型(如 Dispatcher/CoreDispatcher)的代码。 对于 ASP.NET 应用程序,这包括任何使用 HttpContext.Current 或构建 ASP.NET 响应的代码(包括控制器操作中的返回语句)。 图 7 演示 GUI 应用程序中的一个常见模式:让 async 事件处理程序在方法开始时禁用其控制,执行某些 await,然后在处理程序结束时重新启用其控制;因为这一点,事件处理程序不能放弃其上下文。

图 7 让 async 事件处理程序禁用并重新启用其控制

  1.  
  2.           private async void button1_Click(object sender, EventArgs e)
  3. {
  4.   button1.Enabled = false;
  5.   try
  6.   {
  7.     // Can’t use ConfigureAwait here …
  8.           await Task.Delay(1000);
  9.   }
  10.   finally
  11.   {
  12.     // Because we need the context here.
  13.           button1.Enabled = true;
  14.   }
  15. }
  16.         

每个 async 方法都具有自己的上下文,因此如果一个 async 方法调用另一个 async 方法,则其上下文是独立的。 图 8 演示的代码对图 7 进行了少量改动。

图 8 每个 async 方法都具有自己的上下文

  1.  
  2.           private async Task HandleClickAsync()
  3. {
  4.   // Can use ConfigureAwait here.
  5.           await Task.Delay(1000).ConfigureAwait(continueOnCapturedContext: false);
  6. }
  7. private async void button1_Click(object sender, EventArgs e)
  8. {
  9.   button1.Enabled = false;
  10.   try
  11.   {
  12.     // Can’t use ConfigureAwait here.
  13.           await HandleClickAsync();
  14.   }
  15.   finally
  16.   {
  17.     // We are back on the original context for this method.
  18.           button1.Enabled = true;
  19.   }
  20. }
  21.         

无上下文的代码可重用性更高。 尝试在代码中隔离上下文相关代码与无上下文的代码,并尽可能减少上下文相关代码。 在图 8 中,建议将事件处理程序的所有核心逻辑都置于一个可测试且无上下文的 async Task 方法中,仅在上下文相关事件处理程序中保留最少量的代码。 即使是编写 ASP.NET 应用程序,如果存在一个可能与桌面应用程序共享的核心库,请考虑在库代码中使用 ConfigureAwait。

总结这第三个指导原则便是,应尽可能使用 Configure­Await。 无上下文的代码对于 GUI 应用程序具有最佳性能,是一种可在使用部分 async 基本代码时避免死锁的方法。 此指导原则的例外情况是需要上下文的方法。

关于 async 和 await 有许多需要了解的内容,这自然会有点迷失方向。 图 9 是常见问题的解决方案的快速参考。

图 9 常见异步问题的解决方案

问题

解决方案

创建任务以执行代码

Task.Run 或 TaskFactory.StartNew(不是 Task 构造函数或 Task.Start)

为操作或事件创建任务包装

TaskFactory.FromAsync 或 TaskCompletionSource<T>

支持取消

CancellationTokenSource 和 CancellationToken

报告进度

IProgress<T> 和 Progress<T>

处理数据流

TPL 数据流或被动扩展

同步对共享资源的访问

SemaphoreSlim

异步初始化资源

AsyncLazy<T>

异步就绪生产者/使用者结构

TPL 数据流或 AsyncCollection<T>

第一个问题是任务创建。 显然,async 方法可以创建任务,这是最简单的选项。 如果需要在线程池上运行代码,请使用 Task.Run 如果要为现有异步操作或事件创建任务包装,请使用 TaskCompletionSource<T>。下一个常见问题是如何处理取消和进度报告。 基类库 (BCL) 包括专门用于解决这些问题的类型: CancellationTokenSource/CancellationToken IProgress<T>/Progress<T> 异步代码应使用基于任务的异步模式(或称为 TAPmsdn.microsoft.com/library/hh873175),该模式详细说明了任务创建、取消和进度报告。

出现的另一个问题是如何处理异步数据流。 任务很棒,但是只能返回一个对象并且只能完成一次。 对于异步流,可以使用 TPL 数据流或被动扩展 (Rx)。 TPL 数据流会创建类似于主角的“网格”。 Rx 更加强大和高效,不过也更加难以学习。 TPL 数据流和 Rx 都具有异步就绪方法,十分适用于异步代码。

仅仅因为代码是异步的,并不意味着就安全。 共享资源仍需要受到保护,由于无法在锁中等待,因此这比较复杂。 下面是一个异步代码示例,该代码如果执行两次,则可能会破坏共享状态,即使始终在同一个线程上运行也是如此:

int value;

Task<int> GetNextValueAsync(int current);

async Task UpdateValueAsync()

{

  value = await GetNextValueAsync(value);

}

问题在于,方法读取值并在等待时挂起自己,当方法恢复执行时,它假设值未更改。 为了解决此问题,使用异步就绪 WaitAsync 重载扩展了 SemaphoreSlim 类。 图 10 演示 SemaphoreSlim.WaitAsync。

图 10 SemaphoreSlim 允许异步同步

SemaphoreSlim mutex = new SemaphoreSlim(1);

int value;

Task<int> GetNextValueAsync(int current);

async Task UpdateValueAsync()

{

  await mutex.WaitAsync().ConfigureAwait(false);

  try

  {

    value = await GetNextValueAsync(value);

  }

  finally

  {

    mutex.Release();

  }

}

异步代码通常用于初始化随后会缓存并共享的资源。 没有用于此用途的内置类型,但是 Stephen Toub 开发了 AsyncLazy<T>,其行为相当于 Task<T> Lazy<T> 合二为一。 该原始类型在其博客 (bit.ly/dEN178) 上进行了介绍,并且在我的 AsyncEx (nitoasyncex.codeplex.com) 中提供了更新版本。

最后,有时需要某些异步就绪数据结构。 TPL 数据流提供了 BufferBlock<T>,其行为如同异步就绪生产者/使用者队列。 而 AsyncEx 提供了 AsyncCollection<T>,这是异步版本的 BlockingCollection<T>。

我希望本文中的指导原则和指示能有所帮助。 异步真的是非常棒的语言功能,现在正是开始使用它的好时机!

TCP粘包/拆包问题的解决办法

一、什么是TCP粘包/拆包

如图所示,假如客户端分别发送两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4中情况:

  • 第一种情况:Server端分别读取到D1和D2,没有产生粘包和拆包的情况。
  • 第二种情况:Server端一次接收到两个数据包,D1和D2粘合在一起,被称为TCP粘包。
  • 第三种情况:Server端分2次读取到2个数据包,第一次读取到D1包和D2包的部分内容D2_1,第二次读取到D2包的剩余内容,被称为TCP拆包。
  • 第四中情况:Server端分2次读取到2个数据包,第一次读取到D1包的部分内容D1_1 ,第二次读取到D1包的剩余内容D1_2和D2包的整包。

如果此时服务端TCP接收滑动窗非常小,而数据包D1和D2都很大,很有可能发送第五种可能,即服务端多次才能把D1和D2接收完全,期间多次发生拆包情况。(TCP接收滑动窗:是接收端的大小,随着流量大小而变化,如果我的解释还不明确,请读者自行百度,或者查阅《计算机网络》、《TCP/IP》中TCP的内容)

粘包问题的解决策略

由于底层的TCP无法理解上层的业务逻辑,所以在底层是无法确保数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,归纳如下:

  • 1.消息定长,例如每个报文的大小为固定长度200字节,如果不够,空位补空格;
  • 2.在包尾增加回车换行符进行分割,例如FTP协议;
  • 3.将消息分为消息头和消息体,消息头中包含表示消息总长度(或者消息体长度)的字段,通常设计思路是消息头的第一个字段用int来表示消息的总长度;(我之前linux C开发,就用的这种)。
  • 4.更复杂的应用层协议;

 

 

C# 高性能 TCP 服务的多种实现方式

本篇文章的主旨是使用 .NET/C# 实现 TCP 高性能服务的不同方式,包括但不限于如下内容:

在 .NET/C# 中对于 Socket 的支持均是基于 Windows I/O Completion Ports 完成端口技术的封装,通过不同的 Non-Blocking 封装结构来满足不同的编程需求。以上方式均已在 Cowboy.Sockets 中有完整实现,并且 APM 和 TAP 方式已经在实际项目中应用。Cowboy.Sockets 还在不断的进化和完善中,如有任何问题请及时指正。

虽然有这么多种实现方式,但抽象的看,它们是一样一样的,用两个 Loop 即可描述:Accept Loop和 Read Loop,如下图所示。(这里提及的 “Loop” 指的是一种循环方式,而非特指 while/for 等关键字。)

  • 在任何 TCP Server 的实现中,一定存在一个 Accept Socket Loop,用于接收 Client 端的 Connect 请求以建立 TCP Connection。
  • 在任何 TCP Server 的实现中,一定存在一个 Read Socket Loop,用于接收 Client 端 Write 过来的数据。

如果 Accept 循环阻塞,则会导致无法快速的建立连接,服务端 Pending Backlog 满,进而导致 Client 端收到 Connect Timeout 的异常。如果 Read 循环阻塞,则显然会导致无法及时收到 Client 端发过来的数据,进而导致 Client 端 Send Buffer 满,无法再发送数据。

从实现细节的角度看,能够导致服务阻塞的位置可能在:

  1. Accept 到新的 Socket,构建新的 Connection 需要分配各种资源,分配资源慢;
  2. Accept 到新的 Socket,没有及时触发下一次 Accept;
  3. Read 到新的 Buffer,判定 Payload 消息长度,判定过程长;
  4. Read 到新的 Buffer,发现 Payload 还没有收全,继续 Read,则 “可能” 会导致一次 Buffer Copy;
  5. Payload 接收完毕,进行 De-Serialization 转成可识别的 Protocol Message,反序列化慢;
  6. 由 Business Module 来处理相应的 Protocol Message,处理过程慢;

1-2 涉及到 Accept 过程和 Connection 的建立过程,3-4 涉及到 ReceiveBuffer 的处理过程,5-6 涉及到应用逻辑侧的实现。

Java 中著名的 Netty 网络库从 4.0 版本开始对于 Buffer 部分做了全新的尝试,采用了名叫 ByteBuf的设计,实现 Buffer Zero Copy 以减少高并发条件下 Buffer 拷贝带来的性能损失和 GC 压力。DotNettyOrleans ,Helios 等项目正在尝试在 C# 中进行类似的 ByteBuf 的实现。

APM 方式:TcpSocketServer

TcpSocketServer 的实现是基于 .NET Framework 自带的 TcpListener 和 TcpClient 的更进一步的封装,采用基于 APM 的 BeginXXX 和 EndXXX 接口实现。

TcpSocketServer 中的 Accept Loop 指的就是,

  • BeginAccept -> EndAccept-> BeginAccept -> EndAccept -> BeginAccept -> …

每一个建立成功的 Connection 由 TcpSocketSession 来处理,所以 TcpSocketSession 中会包含 Read Loop,

  • BeginRead -> EndRead -> BeginRead -> EndRead -> BeginRead -> …

TcpSocketServer 通过暴露 Event 来实现 Connection 的建立与断开和数据接收的通知。

  event EventHandler ClientConnected;
  event EventHandler ClientDisconnected;
  event EventHandler ClientDataReceived;

使用也是简单直接,直接订阅事件通知。

复制代码
  private static void StartServer()
  {
      _server = new TcpSocketServer(22222);
      _server.ClientConnected  = server_ClientConnected;
      _server.ClientDisconnected  = server_ClientDisconnected;
      _server.ClientDataReceived  = server_ClientDataReceived;
      _server.Listen();
  }
  
  static void server_ClientConnected(object sender, TcpClientConnectedEventArgs e)
  {
      Console.WriteLine(string.Format("TCP client {0} has connected {1}.", e.Session.RemoteEndPoint, e.Session));
  }
  
  static void server_ClientDisconnected(object sender, TcpClientDisconnectedEventArgs e)
  {
      Console.WriteLine(string.Format("TCP client {0} has disconnected.", e.Session));
  }
  
  static void server_ClientDataReceived(object sender, TcpClientDataReceivedEventArgs e)
  {
      var text = Encoding.UTF8.GetString(e.Data, e.DataOffset, e.DataLength);
      Console.Write(string.Format("Client : {0} {1} --> ", e.Session.RemoteEndPoint, e.Session));
      Console.WriteLine(string.Format("{0}", text));
      _server.Broadcast(Encoding.UTF8.GetBytes(text));
  }
复制代码

TAP 方式:AsyncTcpSocketServer

AsyncTcpSocketServer 的实现是基于 .NET Framework 自带的 TcpListener 和 TcpClient 的更进一步的封装,采用基于 TAP 的 async/await 的 XXXAsync 接口实现。

然而,实际上 XXXAsync 并没有创建什么神奇的效果,其内部实现只是将 APM 的方法转换成了 TAP 的调用方式。

复制代码
  //************* Task-based async public methods *************************
  [HostProtection(ExternalThreading = true)]
  public Task AcceptSocketAsync()
  {
      return Task.Factory.FromAsync(BeginAcceptSocket, EndAcceptSocket, null);
  }
  
  [HostProtection(ExternalThreading = true)]
  public Task AcceptTcpClientAsync()
  {
      return Task.Factory.FromAsync(BeginAcceptTcpClient, EndAcceptTcpClient, null);
  }
复制代码

AsyncTcpSocketServer 中的 Accept Loop 指的就是,

  while (IsListening)
  {
      var tcpClient = await _listener.AcceptTcpClientAsync();
  }

每一个建立成功的 Connection 由 AsyncTcpSocketSession 来处理,所以 AsyncTcpSocketSession 中会包含 Read Loop,

  while (State == TcpSocketConnectionState.Connected)
  {
      int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length);
  }

为了将 async/await 异步到底,AsyncTcpSocketServer 所暴露的接口也同样是 Awaitable 的。

复制代码
  public interface IAsyncTcpSocketServerMessageDispatcher
  {
      Task OnSessionStarted(AsyncTcpSocketSession session);
      Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count);
      Task OnSessionClosed(AsyncTcpSocketSession session);
  }
复制代码

使用时仅需将一个实现了该接口的对象注入到 AsyncTcpSocketServer 的构造函数中即可。

复制代码
  public class SimpleMessageDispatcher : IAsyncTcpSocketServerMessageDispatcher
  {
      public async Task OnSessionStarted(AsyncTcpSocketSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));
          await Task.CompletedTask;
      }
  
      public async Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count)
      {
          var text = Encoding.UTF8.GetString(data, offset, count);
          Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
          Console.WriteLine(string.Format("{0}", text));
  
          await session.SendAsync(Encoding.UTF8.GetBytes(text));
      }
  
      public async Task OnSessionClosed(AsyncTcpSocketSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));
          await Task.CompletedTask;
      }
  }
复制代码

当然,对于接口的实现也不是强制了,也可以在构造函数中直接注入方法的实现。

复制代码
  public AsyncTcpSocketServer(
      IPEndPoint listenedEndPoint,
      Func<AsyncTcpSocketSession, byte[], int, int, Task> onSessionDataReceived = null,
      Func<AsyncTcpSocketSession, Task> onSessionStarted = null,
      Func<AsyncTcpSocketSession, Task> onSessionClosed = null,
      AsyncTcpSocketServerConfiguration configuration = null)
  {}
复制代码

SAEA 方式:TcpSocketSaeaServer

SAEA 是 SocketAsyncEventArgs 的简写。SocketAsyncEventArgs 是 .NET Framework 3.5 开始支持的一种支持高性能 Socket 通信的实现。SocketAsyncEventArgs 相比于 APM 方式的主要优点可以描述如下:

The main feature of these enhancements is the avoidance of the repeated allocation and synchronization of objects during high-volume asynchronous socket I/O. The Begin/End design pattern currently implemented by the Socket class for asynchronous socket I/O requires a System.IAsyncResult object be allocated for each asynchronous socket operation.

也就是说,优点就是无需为每次调用都生成 IAsyncResult 等对象,向原生 Socket 更靠近一些。

使用 SocketAsyncEventArgs 的推荐步骤如下:

  1. Allocate a new SocketAsyncEventArgs context object, or get a free one from an application pool.
  2. Set properties on the context object to the operation about to be performed (the callback delegate method and data buffer, for example).
  3. Call the appropriate socket method (xxxAsync) to initiate the asynchronous operation.
  4. If the asynchronous socket method (xxxAsync) returns true in the callback, query the context properties for completion status.
  5. If the asynchronous socket method (xxxAsync) returns false in the callback, the operation completed synchronously. The context properties may be queried for the operation result.
  6. Reuse the context for another operation, put it back in the pool, or discard it.

重点在于池化(Pooling),池化的目的就是为了重用和减少运行时分配和垃圾回收的压力。

TcpSocketSaeaServer 即是对 SocketAsyncEventArgs 的应用和封装,并实现了 Pooling 技术。TcpSocketSaeaServer 中的重点是 SaeaAwaitable 类,SaeaAwaitable 中内置了一个 SocketAsyncEventArgs,并通过 GetAwaiter 返回 SaeaAwaiter 来支持 async/await 操作。同时,通过 SaeaExtensions 扩展方法对来扩展 SocketAsyncEventArgs 的 Awaitable 实现。

  public static SaeaAwaitable AcceptAsync(this Socket socket, SaeaAwaitable awaitable)
  public static SaeaAwaitable ConnectAsync(this Socket socket, SaeaAwaitable awaitable)
  public static SaeaAwaitable DisonnectAsync(this Socket socket, SaeaAwaitable awaitable)
  public static SaeaAwaitable ReceiveAsync(this Socket socket, SaeaAwaitable awaitable)
  public static SaeaAwaitable SendAsync(this Socket socket, SaeaAwaitable awaitable)

SaeaPool 则是一个 QueuedObjectPool 的衍生实现,用于池化 SaeaAwaitable 实例。同时,为了减少 TcpSocketSaeaSession 的构建过程,也实现了 SessionPool 即 QueuedObjectPool。

TcpSocketSaeaServer 中的 Accept Loop 指的就是,

复制代码
  while (IsListening)
  {
      var saea = _acceptSaeaPool.Take();
  
      var socketError = await _listener.AcceptAsync(saea);
      if (socketError == SocketError.Success)
      {
          var acceptedSocket = saea.Saea.AcceptSocket;
      }
  
      _acceptSaeaPool.Return(saea);
  }
复制代码

每一个建立成功的 Connection 由 TcpSocketSaeaSession 来处理,所以 TcpSocketSaeaSession 中会包含 Read Loop,

复制代码
  var saea = _saeaPool.Take();
  saea.Saea.SetBuffer(_receiveBuffer, 0, _receiveBuffer.Length);
  
  while (State == TcpSocketConnectionState.Connected)
  {
      saea.Saea.SetBuffer(0, _receiveBuffer.Length);
  
      var socketError = await _socket.ReceiveAsync(saea);
      if (socketError != SocketError.Success)
          break;
  
      var receiveCount = saea.Saea.BytesTransferred;
      if (receiveCount == 0)
          break;
  }
复制代码

同样,TcpSocketSaeaServer 对外所暴露的接口也同样是 Awaitable 的。

复制代码
  public interface ITcpSocketSaeaServerMessageDispatcher
  {
      Task OnSessionStarted(TcpSocketSaeaSession session);
      Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count);
      Task OnSessionClosed(TcpSocketSaeaSession session);
  }
复制代码

使用起来也是简单直接:

复制代码
  public class SimpleMessageDispatcher : ITcpSocketSaeaServerMessageDispatcher
  {
      public async Task OnSessionStarted(TcpSocketSaeaSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));
          await Task.CompletedTask;
      }
  
      public async Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count)
      {
          var text = Encoding.UTF8.GetString(data, offset, count);
          Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
          Console.WriteLine(string.Format("{0}", text));
  
          await session.SendAsync(Encoding.UTF8.GetBytes(text));
      }
  
      public async Task OnSessionClosed(TcpSocketSaeaSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));
          await Task.CompletedTask;
      }
  }
复制代码

RIO 方式:TcpSocketRioServer

从 Windows 8.1 / Windows Server 2012 R2 开始,微软推出了 Registered I/O Networking Extensions 来支持高性能 Socket 服务的实现,简称 RIO。

The following functions are supported for Windows Store apps on Windows 8.1, Windows Server 2012 R2, and later. Microsoft Visual Studio 2013 Update 3 or later is required for Windows Store apps.

  • RIOCloseCompletionQueue
  • RIOCreateCompletionQueue
  • RIOCreateRequestQueue
  • RIODequeueCompletion
  • RIODeregisterBuffer
  • RIONotify
  • RIOReceive
  • RIOReceiveEx
  • RIORegisterBuffer
  • RIOResizeCompletionQueue
  • RIOResizeRequestQueue
  • RIOSend
  • RIOSendEx

到目前为止,.NET Framework 还没有推出对 RIO 的支持,所以若想在 C# 中实现 RIO 则只能通过 P/Invoke 方式,RioSharp 是开源项目中的一个比较完整的实现。

Cowboy.Sockets 直接引用了 RioSharp 的源代码,放置在 Cowboy.Sockets.Experimental 名空间下,以供实验和测试使用。

同样,通过 TcpSocketRioServer 来实现 Accept Loop,

复制代码
_listener.OnAccepted = (acceptedSocket) =>
{
    Task.Run(async () =>
    {
        await Process(acceptedSocket);
    })
    .Forget();
};
复制代码

通过 TcpSocketRioSession 来处理 Read Loop,

  while (State == TcpSocketConnectionState.Connected)
  {
      int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length);
      if (receiveCount == 0)
          break;
  }

测试代码一如既往的类似:

复制代码
  public class SimpleMessageDispatcher : ITcpSocketRioServerMessageDispatcher
  {
      public async Task OnSessionStarted(TcpSocketRioSession session)
      {
          //Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));
          Console.WriteLine(string.Format("TCP session has connected {0}.", session));
          await Task.CompletedTask;
      }
  
      public async Task OnSessionDataReceived(TcpSocketRioSession session, byte[] data, int offset, int count)
      {
          var text = Encoding.UTF8.GetString(data, offset, count);
          //Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
          Console.Write(string.Format("Client : --> "));
          Console.WriteLine(string.Format("{0}", text));
  
          await session.SendAsync(Encoding.UTF8.GetBytes(text));
      }
  
      public async Task OnSessionClosed(TcpSocketRioSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));
          await Task.CompletedTask;
      }
  }
复制代码

参考资料

本篇文章《C#高性能TCP服务的多种实现方式》由 Dennis Gao 发表自博客园个人博客,未经作者本人同意禁止以任何的形式转载,任何自动的或人为的爬虫转载行为均为耍流氓。

基于C#的通信协议封包

接上一篇《基于.NET技术的监控系统应用分析》中所描述的数据通信协议设计,我们来看一下在C#中是怎么对自定义协议进行封包的?我们知道基于流的数据协议的特点:发送和接收到的数据都是连续的流。每次网络I/O操作的流长度不确定,也就是无法知道每次接收到的数据是一个完整的数据包。同样,主机发送一个数据包也会根据网络的实际情况执行若干次。所以我们对这类消息的编解码过程需要进行一个统一的封装。

重新回顾一下每个消息的结构:消息头 + 消息体。每次先发送出去的是消息头,然后是消息体。消息头里描述了这个数据包的类型,长度,序列号等信息。消息头的长度是固定的,消息体的长度是根据每个消息类型会有所的区别。

消息头的定义:

字段 长度(字节) 类型 说明
Length 4 Int 消息的总长度(字节)
Command ID 4 Int 命令ID
NodeID 4 Int 结点ID
TimeID 4 Int 时间戳
SequenceID 4 Int 递增序列号

对应的封装代码:

1using System;
2using MonitorLib.Utility;
3
4namespace MonitorLib.Protocol
5{
6    /// <summary>
7    /// 消息头
8    /// </summary>
9    [Serializable]
10    public class Head
11    {
12        private byte[] initValue = new byte[Head.HeaderLength];
13
14        public Head(Command CommandID)
15        {
16            Converter.IntToBytes((uint)CommandID).CopyTo(initValue, 4);
17        }
18
19        public Head(byte[] bs)
20        {
21            uint length = Head.HeaderLength ;
22
23            for (int i = 0;i < length;i++)
24            {
25                initValue[i]=bs[i];
26            }
27        }
28
29
30        public Head(byte[] bs,int baseIndex)
31        {
32            uint length = Head.HeaderLength ;
33
34            for (int i = 0; i < length; i++)
35            {
36                initValue[i]=bs[baseIndex+i];
37            }
38        }
39
40        /// <summary>
41        /// 消息的整个长度
42        /// </summary>
43        public uint Length
44        {
45            get
46            {
47                return (Converter.BytesToUInt(initValue,0));
48            }
49            set
50            {
51                byte[] byt = Converter.IntToBytes(value);
52                for (int i = 0;i < 4;i++)
53                {
54                    initValue[i]= byt[i];
55                }
56            }
57        }
58
59        /// <summary>
60        /// 命令类型
61        /// </summary>
62        public uint CommandID
63        {
64            get
65            {
66                return (Converter.BytesToUInt(initValue, 4));
67            }
68            set
69            {
70                byte[] t=Converter.IntToBytes(value);
71                for (int i = 0; i < 4; i++)
72                {
73                    initValue[i + 4] = t[i];
74                }
75            }
76        }
77
78        /// <summary>
79        /// 源结点号
80        /// </summary>
81        public uint NodeID
82        {
83            get
84            {
85                return (Converter.BytesToUInt(initValue, 8));
86            }
87            set
88            {
89                byte[] t = Converter.IntToBytes(value);
90                for (int i = 0; i < 4; i++)
91                {
92                    initValue[i + 8] = t[i];
93                }
94            }
95        }
96
97        /// <summary>
98        /// 时间戳
99        /// </summary>
100        public uint TimeID
101        {
102            get
103            {
104                return (Converter.BytesToUInt(initValue,12));
105            }
106            set
107            {
108                byte[] t = Converter.IntToBytes(value);
109                for (int i = 0; i < 4; i++)
110                {
111                    initValue[i + 12] = t[i];
112                }
113            }
114        }
115
116        /// <summary>
117        /// 序列号
118        /// </summary>
119        public uint SequenceID
120        {
121            get
122            {
123                return (Converter.BytesToUInt(initValue,16));
124            }
125            set
126            {
127                byte[] t = Converter.IntToBytes(value);
128                for (int i = 0;i < 4;i++)
129                {
130                    initValue[i + 16] = t[i];
131                }
132            }
133        }
134
135
136        /// <summary>
137        /// 输出字节流
138        /// </summary>
139        /// <returns></returns>
140        public byte[] ToBytes()
141        {
142            return initValue;
143        }
144
145        /// <summary>
146        /// 从字节流中转换
147        /// </summary>
148        /// <param name=”bs”></param>
149        public void FromBytes(byte[] bs)
150        {
151            for (int i = 0; i < Head.HeaderLength; i++)
152            {
153                initValue[i] = bs[i];
154            }
155        }
156
157        /// <summary>
158        /// 消息头的长度
159        /// </summary>
160        public static uint HeaderLength
161        {
162            get
163            {
164                return (4 + 4 + 12);
165            }
166        }
167    }
168}
169

using System;
using MonitorLib.Utility;

namespace MonitorLib.Protocol
{
/// <summary>
/// Sequence 的摘要说明。
/// </summary>
[Serializable]
public class Sequence
{
private uint node;
private uint time;
private uint sequence ;
public Sequence()
{

}

public uint Node
{
get{ return this.node; }
set{ this.node = value ;}
}

public uint Time
{
get{ return this.time ; }
set{ this.time = value; }

}

public uint Value
{
get{ return sequence;}
set{this.sequence = value;}
}

public ulong ToUInt64()
{
string temp = String.Format(“{0}{1}{2}”,Node, Time, Value);
return Convert.ToUInt64(temp);
}
}

public class Seed
{
private uint sequence = uint.MinValue;

public uint GetSequence()
{
lock (this)
{
return this.sequence >= 90000 ? uint.MinValue : this.sequence++;
}
}

public uint GetTimeStamp()
{
lock (this)
{
return Convert.ToUInt32( DateTime.Now.ToString(“MMddHHmmss”) );
}
}

}
}

上面只是一个消息头,要成为一个完整的消息,一般还必须包含消息体(当然你也可以根据需要仅发送一个消息头的数据,作为特殊用途,例如自定义的心跳包)。举个例子:客户机与服务器连接上后,它通常会发送一个绑定(Bind) 消息给服务器端。例如:验证确认客户端的合法性。那么此时的Bind消息的格式是:

字段 长度(字节) 类型 说明
HEAD     上面的消息头部
loginName 16 string 用户名(固定16位,不足用空格填充)
LoginPassword 16 string 密码(固定16位,不足用空格填充)

对应的封装代码:

using System;
using MonitorLib.Utility;

namespace MonitorLib.Protocol
{
/// <summary>
/// AbstractBase 的摘要说明。
/// </summary>

[Serializable]
public abstract class AbstractBase
{
protected byte[] initValue;
public Head header;

public AbstractBase()
{

}

public virtual byte[] ToBytes()
{
return null;
}
}
}

using System;
using System.IO;

namespace MonitorLib.Utility
{
/// <summary>
///消息命令常量
/// </summary>
public enum Command : uint
{
/// <summary>
/// 对客户端验证
/// </summary>
MOT_BIND = 0x1,

/// <summary>
/// 服务端返回验证请求
/// </summary>
MOT_BIND_RESP = 0x80000001,

/// <summary>
/// 断开连接
/// </summary>
MOT_UNBIND  =0x2,

/// <summary>
/// 返回断开连接状态
/// </summary>
MOT_UNBIND_RESP=0x80000002,

/// <summary>
/// 上行提交内容
/// </summary>
MOT_SUBMIT = 0x3,

/// <summary>
/// submit 应答
/// </summary>
MOT_SUBMIT_RESP = 0x80000003,

/// <summary>
/// 设置命令
/// </summary>
MOT_REQUEST = 0x4,

MOT_REQUEST_RESP = 0x80000004,

/// <summary>
/// 连接命令
/// </summary>
MOT_CONNECT = 0x5,

MOT_CONNECT_RESP = 0x80000005,

/// <summary>
/// 更新程序命令
/// </summary>
MOT_UPDATE = 0x6,

MOT_UPDATE_RESP = 0x80000006,

/// <summary>
/// 返回结点的数据参数
/// </summary>
MOT_RESPONSE_PARAM = 0x7,

MOT_CLIENTINFO = 0x8,

MOT_CLIENTINFO_RESP = 0x80000008

}

/// <summary>
/// 错误定义
/// </summary>
public enum ErrorDefine : int
{
/// <summary>
///无错误,命令正确接收
/// </summary>
NO_ERROR = 0,

/// <summary>
/// 非法登录,如登录名、口令出错、登录名与口令不符等
/// </summary>
ILLEAGE_LOGIN = 1,

/// <summary>
/// 重复登录,如在同一TCP/IP连接中连续两次以上请求登录。
/// </summary>
REPEAT_LOGIN =2,

/// <summary>
/// 连接过多,指单个节点要求同时建立的连接数过多。
/// </summary>
MORE_CONNECT = 3,

/// <summary>
/// 不知道的用户
/// </summary>
UNKNOW_USER = 29,

/// <summary>
/// 不提供此功能
/// </summary>
UNSUPPORT_FUNCTION = 30,

/// <summary>
/// 系统失败
/// </summary>
SYSTEM_FAIL = 32,
}

/// <summary>
/// 字节 整形 转换类 网络格式转换为内存格式
/// </summary>
public class Converter
{
/// <summary>
/// 转换整形数据网络次序的字节数组
/// </summary>
/// <param name=”i”></param>
/// <returns></returns>
public static byte[] IntToBytes(uint i)
{
byte[] t = BitConverter.GetBytes(i) ;
byte b = t[0];
t[0] = t[3];
t[3] = b;
b = t[1];
t[1] = t[2];
t[2] = b;
return (t);
}

public static byte[] IntToBytes(uint source,int number)
{
byte[] t = new byte[number];
t = BitConverter.GetBytes(source);
byte temp;
for (int i = t.Length-1; i > t.Length/2; i–)
{
temp = t[i];
t[i] = t[t.Length-1-i];
t[t.Length-1-i] = temp;
}
return (t);
}

/// <summary>
/// 返回字节数组代表的整数数字,4个数组
/// </summary>
/// <param name=”bs”></param>
/// <param name=”startIndex”></param>
/// <returns></returns>
public static uint BytesToUInt(byte[] bs,int startIndex)
{
byte[] t=new byte[4];
for (int i = 0; i < 4 && i < bs.Length-startIndex; i++)
{
t[i]=bs[startIndex+i];
}

byte b=t[0];
t[0]=t[3];
t[3]=b;
b=t[1];
t[1]=t[2];
t[2]=b;

return BitConverter.ToUInt32(t,0);
}

public static uint BytesToUInt(byte[] b,int startIndex,int number)
{
byte[] t = new Byte[number];
for (int i = 0; i < number && i < b.Length-startIndex; i++)
{
t[i] = b[startIndex+i];
}

byte temp;
for (int i = t.Length-1; i > t.Length/2; i–)
{
temp = t[i];
t[i]=t[t.Length-1-i];
t[i] = temp;
}
return (BitConverter.ToUInt32(t,0));
}

/// <summary>
/// 没有指定起始索引
/// </summary>
/// <param name=”bs”></param>
/// <returns></returns>
public static uint BytesToUInt(byte[] bs)
{
return (BytesToUInt(bs,0));
}
}

/// <summary>
/// 缓冲区对象
/// </summary>
public class BufferObject
{
private byte[] buffer = null;
private int length = 0;

public BufferObject(byte[] bytes, int len)
{
if (buffer != null)
{
buffer = null;
GC.Collect();
}

length = len;
buffer = new byte[len];
for (int i = 0; i < len; i++)
{
buffer[i] = bytes[i];
}
}

public byte[] Buffer
{
get { return buffer;}
}

public int Length
{
get { return length; }
}
}
}

}

using System;
using System.Text;
using MonitorLib.Utility;

namespace MonitorLib.Protocol
{
/// <summary>
/// Bind消息
/// </summary>
[Serializable]
public class Bind : AbstractBase
{
private string loginName;
private string loginPassword;

/// <summary>
/// 初始Bind命令的消息头
/// </summary>
/// <param name=”Sequence”>序列号</param>
public Bind(Sequence seq)
{
header = new Head(Command.MOT_BIND);
header.NodeID = seq.Node;
header.TimeID = seq.Time ;
header.SequenceID = seq.Value ;
header.Length = Head.HeaderLength + 16 + 16;
}

public Bind(byte[] receive)
{
initValue = new byte[receive.Length];
receive.CopyTo(initValue,0);
}

/// <summary>
/// 登录名
/// </summary>
public string LoginName
{
get
{
return Encoding.ASCII.GetString(initValue,20,16);
}
set
{
loginName = value;
}
}

/// <summary>
/// 密码
/// </summary>
public string LoginPassword
{
get
{
return Encoding.ASCII.GetString(initValue,36,16);
}
set
{
loginPassword = value;
}
}

/// <summary>
/// 把消息结构转换成字节数组
/// </summary>
/// <returns>结果字节数组</returns>
public override byte[] ToBytes()
{
byte[] retValue = new byte[this.header.Length];
uint index = 0;

//填充消息头
header.ToBytes().CopyTo(retValue,index);

index += Head.HeaderLength;
Encoding.ASCII.GetBytes(loginName).CopyTo(retValue,index);

//移位16位, 填充密码
index += 16;
Encoding.ASCII.GetBytes(loginPassword).CopyTo(retValue,index);
return retValue;
}
}

/// <summary>
/// Bind应答结构
/// </summary>
[Serializable]
public class Bind_Resp : AbstractBase
{
private uint result;

/// <summary>
/// 构造函数,把接收的字节数组复制到initValue
/// </summary>
/// <param name=”recBytes”>从网络上接收到的字节数组</param>
public Bind_Resp(byte[] receive)
{
initValue = new byte[receive.Length];
receive.CopyTo(initValue,0);
}

public Bind_Resp(Sequence seq)
{
header = new Head(Command.MOT_BIND_RESP);
header.NodeID = seq.Node;
header.TimeID = seq.Time ;
header.SequenceID = seq.Value ;
header.Length = Head.HeaderLength + 4;
}

/// <summary>
/// bind 执行命令是否成功,0-成功。其它:错误码。
/// </summary>
public uint Result
{
get
{
return Convert.ToUInt32(initValue[20].ToString());
}
set
{
result = value;
}
}

public override byte[] ToBytes()
{
byte[] retValue =  new byte[header.Length];
header.ToBytes().CopyTo(retValue,0);
BitConverter.GetBytes(result).CopyTo(retValue,20);
return retValue;
}
}

}

除了这种协议封装方法外,还有一种直接利用 .NET 的字节流操作类来编解码,例如 ICMP 协议的封包代码:

1    public class ICMPHDR
2    {
3        private byte mType;
4        public byte Type
5        {
6            get{ return mType; }
7            set{ mType = value; }
8        }
9
10        private byte mCode = 0;
11        public byte Code
12        {
13            get{ return mCode; }
14            set{ mCode = value; }
15        }
16
17        private ushort mChecksum = 0;
18        public ushort Checksum
19        {
20            get{ return mChecksum; }
21            set{ mChecksum = value; }
22        }
23
24        private ushort mID;
25        public ushort ID
26        {
27            get{ return mID; }
28            set{ mID = value; }
29        }
30
31        private ushort mSeq;
32        public ushort Seq
33        {
34            get{ return mSeq; }
35            set{ mSeq = value; }
36        }
37
38        private ulong mtmSend;
39        public ulong tmSend
40        {
41            get{ return mtmSend; }
42            set{ mtmSend = value; }
43        }
44
45        private int mnTaskId;
46        public int nTaskId
47        {
48            get{ return mnTaskId; }
49            set{ mnTaskId = value; }
50        }
51
52        public void Encode(BinaryWriter writer)
53        {
54            writer.Write(Type);
55            writer.Write(Code);
56            writer.Write((UInt16)Checksum);
57            writer.Write((UInt16)ID);
58            writer.Write((UInt16)Seq);
59            writer.Write((UInt32)tmSend);
60            writer.Write(nTaskId);
61         }
62
63        public void Decode(BinaryReader reader)
64        {
65            Type = reader.ReadByte();
66            Code = reader.ReadByte();
67            Checksum = reader.ReadUInt16();
68            ID = reader.ReadUInt16();
69            Seq = reader.ReadUInt16();
70            tmSend = reader.ReadUInt32();
71            nTaskId = reader.ReadInt32();
72        }
73
74        public uint Sum()
75        {
76            uint sum = 0;
77            sum += (ushort)(Type + (Code << 8));
78            sum += (ushort)ID;
79            sum += (ushort)Seq;
80            sum += (ushort)tmSend;
81            sum += (ushort)(tmSend >> 16);
82            sum += (ushort)nTaskId;
83            sum += (ushort)(nTaskId >> 16);
84            return sum;
85        }
86    }

 以上介绍了用C#是如何对自定义的通信协议封装的过程。 如有不同的处理方法的朋友,欢迎评论,一起探讨一下。

关于tcp和udp的一些要点

关于TCP输出:
每个TCP套接口有一个发送缓冲区,当应用程序调用write时,内核将应用程序的缓冲区的数据拷贝到TCP的发送缓冲区,(如果write成功返回,仅仅表示应用程序的缓冲区已经全部拷贝到TCP的发送缓冲区,说明此时应用程序的缓冲区中的数据可以丢弃,并不表示TCP的数据已经成功发送)TCP以对方MSS(maximum segment size)大小或者更小发送数据块给IP,并给每个数据块加上TCP头部形成分节,IP给每个分节安上IP头部形成数据包(分组)(一般都会小于MTU,所以不用再分片),并寻找目标IP地址,及路由表项以确定外出接口,然后把数据包传给数据链路,加入链路输出队列,如果队列满,分组丢弃,并通过协议栈向上返回错误:链路层到IP层,IP层到TCP层,TCP记住这个错误并在某个时候重传这个分节。当对方(设为服务器)接受到数据时,会发送ACK,本机(客户端)接受ACK,并应答ACK,此时,发送缓冲区的数据废弃,此缓冲区可以再次写入其他数据。
关于UDP输出:
UDP因为是不可靠的,它不必保存应用进程的数据拷贝,所以没有发送缓冲区,UDP给每个数据块安上8个字节的UDP头部,形成数据报并传给IP,IP给数据报安上IP头部形成数据包(分组),如果数据包不合适MTU,则执行分片,然后把每个分片加入到数据链路层的输出队列。应用程序的write返回表示数据已经加入到输出队列。同时注意UDP没有MSS,所以关于数据包的分片就更多的发生在UDP而不是TCP。

基本介绍

在OSI七层模型中,上层的数据包都会作为下层数据包的数据部分(payload)。

也就是说,当构造TCP数据包的时候,会把应用层的数据包作为TCP包的数据部分,然后加上TCP头构成TCP数据包;同样,当构造IP数据包的时候,整个TCP包就会被当作数据部分,然后加上IP头构成IP数据包。

TCP头的数据格式如下,在不包括可选字段的情况下,一般TCP头会占用20个字节。

在TCP首部中,有几个字段是需要关注一下:

  • 在TCP首部中没有源和目标的IP、MAC地址(IP和MAC地址分别是网络层和链路层首部的信息),只有源和目标的端口
  • Sequence Number是包的序号,网络层(IP层)的传输是不可靠的,可能产生包乱序,所以这个需要可以解决网络包乱序的问题
  • Acknowledgement Number用来确认收到数据包的确认序号,为TCP的传输提供了可靠性保证
  • TCP Flags包括了8个bit,通过对这些bit的设置,可以代表不同类型的TCP数据包

 

 

 

TCP层的分片

在IP层往链路层传输数据的时候,往往会做一个分片的操作,对于大多数链路层来讲,它都有一个最大传输单元(MTU),表示能够发送数据量的大 小,它是由硬件决定的。比如以太网的MTU为1500字节。当IP层传输给链路层的数据量大于其MTU时,那么IP层就会将数据拆分为小于其链路层MTU 的数据片,再传输给链路层进行发送。

但是对于不同的传输层协议(TCP/UDP)来说,在IP层上,需不需要进行分片是不同的。

对于TCP来说,它是尽量避免分片的,为什么?因为如果在IP层进行分片了话,如果其中的某片的数据丢失了,对于保证可靠性的TCP协议来说,会增 大重传数据包的机率,而且只能重传整个TCP分组(进行IP分片前的数据包),因为TCP层是不知道IP层进行分片的细节的,也不关心

当TCP层进行TCP分组的重传后,还会直接影响到应用层程序的性能,特别是在应用程序使用阻塞IO进行读写的时候。要理解这点,首先我们要知道当应用层程序往TCPIP协议栈写数据的时候都做了些什么事。

在应用层程序中,我们可以有自己的发送缓冲区,而TCP层本身也有自己的一个发送缓冲区,在JAVA中,可以通过设置Socket的 SO_SNDBUF选项来设置,默认情况下一般是8k大小。 当我们在应用层往TCP层写数据(比如outputstream.write())的时候,实际上是将应用层发送缓冲区的数据拷贝到TCP层的发送缓冲区 中。当TCP层的发送缓冲区满或者网络空闲时,TCP层就会将其缓冲区中的数据通过IP层传到链路层的发送队列中。如果TCP层的发送缓冲区满而且应用层 的数据没有写完时,内核会将write系统调用挂起,并不返回给应用层程序,直到应用层的数据全部拷贝到TCP层的缓冲区中。而由于TCP层要保证数据包 的可靠性,即数据包丢失时要进行重传,那么TCP层在往网络发送TCP分组后,需要在其发送缓冲区中暂时保存发出的TCP分组数据用于后续可能的重传。

在这样的前提下,如果IP对来自TCP层的数据进行了分片, 那么就有可能使得应用层程序一直在write系统调用处挂起等待,引起性能的下降。

TCP层如何避免IP层的分片

首先,我们先回顾下TCP建立连接的3次握手:

 

在这3次握手中,除了确认SYN分节外,通信的两端还进行协商了一个值,MSS,这个值用来告诉对方,能够发送的TCP分节的大小。这个值一般是取其链路层的MTU大小减去TCP头部大小和IP头部的大小。MSS=MTU-TCP头部大小-IP头部大小. MTU的值可以通过询问链路层得知。

当两端确认好MSS后进行通信,当TCP层往IP层传输数据时,如果TCP层缓冲区的大小大于MSS,那么TCP层都会将其发送缓冲区中的数据切分 成MSS大小的分组进行传输,由于MSS是通过MTU减去TCP头部大小和IP头部的大小计算得出的,MSS肯定比MTU小,那么到IP层的时候就可以避 免IP层的分片。

UDP层的分片

如果我们采用的是UDP协议而不是TCP协议呢?在IP层会不会进行分片?由于UDP是不需要保证可靠性的,那么它就不会保存发送的数据包,TCP 之所以保存发送的数据包是因为要进行重传。所以UDP本身是没有像TCP一样的发送缓冲区的。这就导致了对UDP进行write系统调用的时候,实际上应 用层的数据是直接传输到IP层,由于IP层本身也不会有缓冲区,数据就会直接写到链路层的输出队列中。

在这种情况下,IP层会不会对来自UDP的数据进行分片呢?这个取决于UDP数据报的大小。如果UDP数据报的大小大于链路层的MTU,那么IP层就会直接进行分片,然后在发送到链路层的输出队列中,反之,则不会进行分片,直接加上IP头部发送到链路层的输出队列中。

 

附加图片,直观具体,帮助理解

数据的封装过程:
1.data
2.segment
3.packet
4.frame
5.bit

对应的协议:
1.application-session (Represen layer is in the middle) layer
2.transprot layer
3.network layer
4.data link layer
5.physical layer
——————————

二层的PDU叫做Frame;
IP的叫做Packet;
TCP的叫做Segment;
UDP的叫做Datagram。

 

 

 

 

 

 

 

http://www.cnblogs.com/wilber2013/p/4853674.html