zmq源代码分析 - ypipe

    xiaoxiao2021-04-15  73

    ypipe_t源代码续 //回滚已经写入的元素(对于单一元素是不能回滚的,对于多片段元素,在最后一片写入之前前n-1片是可以回滚的)。 //参数: // value_,被回滚的元素。 //返回值: // true,回滚成功返回true;false,回滚失败返回false。 inline bool unwrite (T *value_) { //如果f == &queue.back(),那么说明没有未flush的元素,则回滚失败。 //所以说,如果只write一个元素,那么必然write之后必然f==&queue.back() //如果write n个元素,那么在写入第n个元素之前,前n-1个元素都是可以回滚的。 //第n个元素被写入的同时,所有n个元素被flush了,然后也就不能回滚了。 if (f == &queue.back ()) return false; queue.unpush (); *value_ = queue.back (); return true; } //flush之后c,w,f就指向同一个位置了。 //实际的刷新操作其实只做了两件事,c=f,w=f。其中C是读写线程共享的指针。 //如果只有写线程,那么c和w是永远相等的,永远等于f。 //只有被读线程改写的情况下c!=w才成立,而地线程只在一种情况下改写c,那就是队列中没有数据的时候,读线程把c设置为null, //然后睡眠。所以说如果c不为w,那么肯定为null,则此时读线程睡眠。 //那么flush()函数的返回值其实是表明: // 返回true,读线程活跃,不需要唤醒。 // 返回false,读线程睡眠,在flush之后需要唤醒读线程读取数据。 inline bool flush () { // If there are no un-flushed items, do nothing. //如果所有元素都已经flush了,那么直接返回,什么都不做。 //当写完数据,f指针被write()更新,此时 w!=f,就需要刷新, //所以w,f配合起来表示是否需要执行刷新操作。 if (w == f) return true; // Try to set 'c' to 'f'. //如果c与w相同,则c设置为f,并返回c的ptr。 //如果c与w不同,则c值不变,返回c的ptr。 if (c.cas (w, f) != w) { //c要么为空,要么为w。 // Compare-and-swap was unseccessful because 'c' is NULL. // This means that the reader is asleep. Therefore we don't // care about thread-safeness and update c in non-atomic // manner. We'll return false to let the caller know // that reader is sleeping. c.set (f); //设置c为f w = f; //设置w为f。 return false; } // Reader is alive. Nothing special to do now. Just move // the 'first un-flushed item' pointer to 'f'. w = f; return true; //检查pipe是否可读。 //返回值: // true,可读;false,不可读。 //check_read()是通过指针r的位置来判断是否有数据可读的; //如果指针指向的是队列头或者r没有指向任何元素,则说明队列中没有可读数据,这是check_read去尝试预取。 //所谓预取就是令r=c,而c在write中被指向f。这时从queue_front()到f这个位置的数据都被预读出来了,然后调用read() //一块一块的读取。当c再次等于queue_front()的时候说明数据读取完了,这时c指针指向null,然后读线程睡眠。 //c是否为null标志读线程是否睡眠。 inline bool check_read () { // Was the value prefetched already? If so, return. //值已经被预取(所谓取,就是r被更新到了第一个不可读元素的位置,在r //之前的元素都是可以被读取的),返回true。 if (&queue.front () != r && r) return true; // There's no prefetched value, so let us prefetch more values. // Prefetching is to simply retrieve the // pointer from c in atomic fashion. If there are no // items to prefetch, set c to NULL (using compare-and-swap). //在此函数(check_read)中实现了预读操作。 //最开始:在写入了n个数据并flush之后,c指向back_pos,与queue.front() //不相等,所有下面的一句话只返回c的当前值给r,c的值不变。 //由于写线程和读线程可能同时存在,因此在每次check_read()的时候 //c的位置可能又发生了向后移动,这样每次check_read()都会判断c的 //当前位置,每次都把r更新为c的最新位置。 //当所有元素都被读取完毕,此时begin_pos和back_pos位置相同, //这个时候c被设置为null,r的值不变。 r = c.cas (&queue.front (), NULL); // If there are no elements prefetched, exit. // During pipe's lifetime r should never be NULL, however, // it can happen during pipe shutdown when items // are being deallocated. //当所有元素都读完,返回false。 if (&queue.front () == r || !r) return false; // There was at least one value prefetched. return true; } // Reads an item from the pipe. Returns false if there is no value. // available. //从pipe中读取一个T元素。 //参数: // value_,出参。 //返回值: // true,读取成功;false,读取失败。 inline bool read (T *value_) { // Try to prefetch a value. if (!check_read ()) return false; // There was at least one value prefetched. // Return it to the caller. *value_ = queue.front (); queue.pop (); return true; }

      4. 过程分析      分析完了ypipe_t及其关联数据结构的源代码,下面通过图形展示一下各种操作的内存变动图。      4.1 yqueue_t初始化完成后的内存布局如下图所示。             4.2 ypipe_t初始化完成后内存布局如下图所示。             4.3 ypipe_t::write()写入一个元素,这个元素分为三个片段A,B, C写入,A,B的incomplete_为true,C的incomplete_为false导致整个元素被flush。             4.4 ypipe_t::flash()把元素刷新到队列中(也就是flash()之后此元素就被读线程可见了。)           4.5 ypipe_t::check_read()更新了r指针到新的第一个不可读位置。 4.6 调用三次ypipe_t::read()函数分三次读取了A,B,C。读取之后的内存布局如下图所示。

    转载请注明原文地址: https://ju.6miu.com/read-671147.html

    最新回复(0)