BTS 的 DPoS 共识之代码实现概述(上)

本文基于的BitShares源代码GitHub地址如下:https://github.com/bitshares/bitshares-core

目录

1. DPoS共识算法概述

任何共识机制都必须解决包括但不限于以下5个问题:

  1. 下一个添加到数据库的新区块应该由谁来生成?
  2. 下一个块应该何时产生?
  3. 该区块应包含哪些交易?
  4. 怎样对本协议进行修改?
  5. 该如何解决交易历史的竞争问题?

本文主要讨论前3个问题,第4个问题本文不做讨论;关于第5个问题,即交易数据分叉和双花等问题的研究分析可参考:DPOS共识算法 - 缺失的白皮书

总体来说,为了解决PoW和PoS共识其交易性能低下的问题,DPoS使用见证人机制(witness)解决该问题。通过去中心化的投票机制选出前N个证人来保证信任问题;通过轮流安排证人打包生产区块来提高出块速度以及减少能耗问题;通过证人对区块进行验证从而减少确认的次数,从而加快共识的过程和提高交易结算的速度。总之,DPoS共识通过间接民主的方式既保证了最大限度的去中心化又极大的提高了性能同时降低了电力等成本。

DPoS的伪代码实现如下(伪代码参考了用伪代码理解DPoS):

for round i //分成很多个round,round无限持续
    dlist_i = get N witnesses sort by votes //根据投票结果选出得票率最高的N个证人
    dlist_i = shuffle(dlist_i) //随机改变顺序
    loop //round完了,退出循环
        slot = global_time_offset / block_interval
        pos = slot % N
        if dlist_i[pos] exists in this node //witness在这个节点
            generateBlock(keypair of dlist_i[pos]) //生产block
        else
            skip

可以看到,在每一轮(round)循环里,系统会重新统计得票排名。在选出最高的N个证人里,系统采用先打乱顺序,然后证人轮流生产区块。一轮区块生产完毕后进入下一个周期。

在BitShares实际的代码中,过程比上述伪代码复杂得多,下面本文从3个方向讲解BTS中DPoS共识的实现过程:

  1. 打乱证人调度顺序的过程,即洗牌算法;
  2. 统计投票和选出证人集合的过程;
  3. 调度证人和生产区块的过程;

2. 洗牌算法

洗牌算法的代码流程图:

首先,为了存储证人的调度顺序,BTS定义了一个名叫witness_schedule_object的类,在该类中用了一个名为current_shuffled_witnessesvector集合来存储:

class witness_schedule_object : 
    public graphene::db::abstract_object<witness_schedule_object>
{
   public:
      static const uint8_t space_id = implementation_ids;
      static const uint8_t type_id = impl_witness_schedule_object_type;

      vector< witness_id_type > current_shuffled_witnesses;    //用于存储证人的调度顺序
};

有了上面的存储结构,下面来再看洗牌算法update_witness_schedule()

void database::update_witness_schedule()
{
   const witness_schedule_object& wso = witness_schedule_id_type()(*this);
   const global_property_object& gpo = get_global_properties();

   // 该条件控制了洗牌算法被调用的时机,必须是证人完整的轮转完一圈后
   if( head_block_num() % gpo.active_witnesses.size() == 0 )
   {
      modify( wso, [&]( witness_schedule_object& _wso )
      {
         // 清空证人集合
         _wso.current_shuffled_witnesses.clear();
         _wso.current_shuffled_witnesses.reserve( gpo.active_witnesses.size() );

         // 初始化证人集合数据
         for( const witness_id_type& w : gpo.active_witnesses )
            _wso.current_shuffled_witnesses.push_back( w );

         // 打乱证人的调度顺序
         auto now_hi = uint64_t(head_block_time().sec_since_epoch()) << 32;
         for( uint32_t i = 0; i < _wso.current_shuffled_witnesses.size(); ++i )
         {
            /// 此处使用了随机函数,具体原理请参看:http://xorshift.di.unimi.it/
            uint64_t k = now_hi + uint64_t(i)*2685821657736338717ULL;
            k ^= (k >> 12);
            k ^= (k << 25);
            k ^= (k >> 27);
            k *= 2685821657736338717ULL;

            uint32_t jmax = _wso.current_shuffled_witnesses.size() - i;
            uint32_t j = i + k%jmax;
            // 进行N次随机交换
            std::swap( _wso.current_shuffled_witnesses[i],
                       _wso.current_shuffled_witnesses[j] );
         }
      });
   }
}

那么洗牌算法是在什么时候由哪个节点调用呢?
实际上update_witness_schedule()方法只在_apply_block()中调用:

// 将一个签名后的区块添加到链上
void database::_apply_block( const signed_block& next_block )
{ 
    //其他处理
    ......

    update_witness_schedule();

    //其他处理
    ......
}

关于该方法的又是什么时候调用会在后文介绍,总之,证人节点每次将一个签名区块添加到链上之后调用洗牌算法,而在洗牌算法内部会在证人轮转完一圈时才真正洗牌。

这里要强调一下,洗牌算法只是打乱了证人的调度顺序,但可能还是原来那些证人。那么证人成员什么时候更新呢?现在回到洗牌算法update_witness_schedule()中,current_shuffled_witnesses是从gpo.active_witnesses中获得活跃证人的初始化数据的,只有当gpo.active_witnesses中的的成员发生变化时,current_shuffled_witnesses中的成员才发生变化,那么gpo.active_witnesses又是怎么更新的呢?

3. 统计投票和选出证人集合

统计投票和选出证人集合代码流程图:

首先介绍和活跃证人集合相关的类,全局属性类global_property_object,该类中使用了名为active_witnessesflat_set保存活跃证人:

// 全局属性类
class global_property_object : public graphene::db::abstract_object<global_property_object>
   {
      public:
         static const uint8_t space_id = implementation_ids;
         static const uint8_t type_id  = impl_global_property_object_type;

         chain_parameters           parameters;
         optional<chain_parameters> pending_parameters;

         uint32_t                           next_available_vote_id = 0;
         vector<committee_member_id_type>   active_committee_members;
         flat_set<witness_id_type>          active_witnesses; // 活跃证人集合,每个维护间隔更新一次
   };

有了上面类的铺垫,下来再来讲活跃证人集合更新的事情。在BitShares的源代码中,gpo.active_witnesses的更新是通过update_active_witnesses()方法实现的:

// 更新活跃证人
void database::update_active_witnesses()
{ 
    // 统计投票的证人个数
    assert( _witness_count_histogram_buffer.size() > 0 );
    share_type stake_target = (_total_voting_stake-_witness_count_histogram_buffer[0]) / 2;

    /// 为0或1个证人投票的账户不会就证人数量发表意见(他们弃权并且是无投票权的账户)

    share_type stake_tally = 0; 

    size_t witness_count = 0;
    if( stake_target > 0 )
    {
        while( (witness_count < _witness_count_histogram_buffer.size() - 1)
             && (stake_tally <= stake_target) )
        {
            stake_tally += _witness_count_histogram_buffer[++witness_count];
        }
    }

    const chain_property_object& cpo = get_chain_properties();
    // 排序选出前排名前N的证人
    auto wits = sort_votable_objects<witness_index>(std::max(witness_count*2+1, (size_t)cpo.immutable_parameters.min_witness_count));

    //其他处理
    ......

    // 将证人存入active_witnesses中
    modify(gpo, [&]( global_property_object& gp ){
        gp.active_witnesses.clear();
        gp.active_witnesses.reserve(wits.size());
        std::transform(wits.begin(), 
                       wits.end(),
                       std::inserter(gp.active_witnesses, gp.active_witnesses.end()),
                       [](const witness_object& w) {
                           return w.id;
                       });
    });

    //其他处理
    ...... 
}

update_active_witnesses()方法中首先统计出投票的证人个数N,然后排序选出得票率最高的N个证人,再更新到active_witnesses中去。下面是排序算法sort_votable_objects的介绍:

// 从高到低排列出前N个指定类型的可投票对象,并以vector的形式返回
template<class Index>
vector<std::reference_wrapper<const typename Index::object_type>> database::sort_votable_objects(size_t count) const
{
   using ObjectType = typename Index::object_type;
   // 获取所有对象
   const auto& all_objects = get_index_type<Index>().indices();
   // 计算实际个数
   count = std::min(count, all_objects.size());
   vector<std::reference_wrapper<const ObjectType>> refs;
   refs.reserve(all_objects.size());
   // 初始化refs
   std::transform(all_objects.begin(), all_objects.end(),
                  std::back_inserter(refs),
                  [](const ObjectType& o) { return std::cref(o); });
   // 排序(从高到低)
   std::partial_sort(refs.begin(), refs.begin() + count, refs.end(),
                   [this](const ObjectType& a, const ObjectType& b)->bool {
      share_type oa_vote = _vote_tally_buffer[a.vote_id];
      share_type ob_vote = _vote_tally_buffer[b.vote_id];
      if( oa_vote != ob_vote )
         return oa_vote > ob_vote;
      return a.vote_id < b.vote_id;
   });

   // 截取,返回
   refs.resize(count, refs.front());
   return refs;
}

再来看在update_active_witnesses()方法,可以看到证人的投票数据是从_witness_count_histogram_buffer中获取的,而该集合又是在什么时候更新呢?还有update_active_witnesses()方法又是什么时候调用呢?这些问题的答案都在perform_chain_maintenance()方法中:

// 区块链维护处理
void database::perform_chain_maintenance(const signed_block& next_block, const global_property_object& global_props)
{
   const auto& gpo = get_global_properties();

   //其他处理
   ......

   // 统计所有账户的投票,注意:此处是笔者为了便于理解,讲原代码中的泛型编程改写而成的
   const auto& idx = get_index_type<account_index>().indices().get<by_name>();
   for( const account_object& a : idx )
   {      
       database& d = *this;
       const global_property_object& props = gpo;

       //其他处理
       ......

       d._witness_count_histogram_buffer.resize(props.parameters.maximum_witness_count / 2 + 1);

       const account_object& stake_account = a;

       // 获取实际投票账户
       const account_object& opinion_account =
          (stake_account.options.voting_account ==
           GRAPHENE_PROXY_TO_SELF_ACCOUNT)? stake_account
             : d.get(stake_account.options.voting_account);

       // 获取实际投票数量
       const auto& stats = stake_account.statistics(d);
       uint64_t voting_stake = stats.total_core_in_orders.value
             + (stake_account.cashback_vb.valid() ? (*stake_account.cashback_vb)(d).balance.amount.value: 0)
             + d.get_balance(stake_account.get_id(), asset_id_type()).amount.value;

       //其他处理
       ......

       // 统计投票
       if( opinion_account.options.num_witness <= props.parameters.maximum_witness_count )
       {
          uint16_t offset = std::min(size_t(opinion_account.options.num_witness/2), d._witness_count_histogram_buffer.size() - 1);
          d._witness_count_histogram_buffer[offset] += voting_stake;
       }
   }

   //其他处理
   ......

   // 注意:在这里更新活跃证人
   update_active_witnesses();

   //其他处理
   ......

   // 以下计算出下一次区块链维护时间,并更新到全局数据库

   auto next_maintenance_time = get<dynamic_global_property_object>(dynamic_global_property_id_type()).next_maintenance_time;
   auto maintenance_interval = gpo.parameters.maintenance_interval;

   if( next_maintenance_time <= next_block.timestamp )
   {
      if( next_block.block_num() == 1 )
         next_maintenance_time = time_point_sec() +
               (((next_block.timestamp.sec_since_epoch() / maintenance_interval) + 1) * maintenance_interval);
      else
      {
         auto y = (head_block_time() - next_maintenance_time).to_seconds() / maintenance_interval;
         next_maintenance_time += (y+1) * maintenance_interval;
      }
   }

   //其他处理
   ......

   d.next_maintenance_time = next_maintenance_time;

   //其他处理
   ......

}

总结一下,在perform_chain_maintenance()方法中,首先统计出每个账户实际的投票数据,接着计算出投出的证人数量N和得票率前N的证人,再调用update_active_witnesses()更新活跃证人,最后计算出下一次的区块链维护时间。那么perform_chain_maintenance()方法又是什么时候调用呢?且看_apply_block()方法:

// 将一个签名后的区块添加到链上
void database::_apply_block( const signed_block& next_block )
{ 
    //其他处理
    ......

    bool maint_needed = (dynamic_global_props.next_maintenance_time <= next_block.timestamp);

    //其他处理
    ......

   // 当到了区块链维护时间时
   if( maint_needed )
      perform_chain_maintenance(next_block, global_props);

    //其他处理
    ......

    // 注意了,这里调用洗牌算法(前面讲过)
    update_witness_schedule();

    //其他处理
    ......
}

可以看到在_apply_block()方法中首先判断是否到了维护时间,如果到了维护时间就重新统计投票,得到新的活跃证人。这里就需要强调的是,在上一节讲的洗牌算法也是在该方法中调用,也就是说每次将一个区块添加的链上后,首先判断是否到了区块链维护时间,如果是就统计投票选出证人并更新,然后会调用洗牌算法,判断是否到了洗牌的时刻,再洗牌。

那么_apply_block()方法又是什么调用呢?或者说什么时候调度证人和生产区块呢?

4. 调度证人和生产区块

调度证人和生产区块的代码流程图:

这一次,我们从main()方法开始讲解:

// 应用程序入口
int main(int argc, char** argv) {
   app::application* node = new app::application();

    //其他处理
    ......

    // 启动所有插件
      node->startup_plugins();

    //其他处理
    ......
}

在main方法中,调用启动插件接口startup_plugins(),启动所有插件:

void application::startup_plugins()
{
    // 遍历map,启动所有有效的插件
   for( auto& entry : my->_active_plugins )
      entry.second->plugin_startup();
   return;
}

在该方法中,会启动所有有效的插件,如果节点要运行证人节点的话需要特殊配置,请参考BitShares官网说明: Howto Run a Block-producing Witness,本文不做介绍。在所有插件中,就包括witness_plugin插件,其启动实现如下:

// 证人插件启动
void witness_plugin::plugin_startup()
{ try {
   ilog("witness plugin:  plugin_startup() begin");
   chain::database& d = database();

   if( !_witnesses.empty() )
   {
      ilog("Launching block production for ${n} witnesses.", ("n", _witnesses.size()));
      app().set_block_production(true);
      if( _production_enabled )
      {
         if( d.head_block_num() == 0 )
            new_chain_banner(d);
         _production_skip_flags |= graphene::chain::database::skip_undo_history_check;
      }
      // 开启调度生产循环
      schedule_production_loop();
   } else
      elog("No witnesses configured! Please add witness IDs and private keys to configuration.");
   ilog("witness plugin:  plugin_startup() end");
} FC_CAPTURE_AND_RETHROW() }

在该方法中,进行若干判断和初始化,然后调用schedule_production_loop()方法,开启调度生产循环,其实现如下:

// 调度生产循环
void witness_plugin::schedule_production_loop()
{
   // 计算下一秒的滴答时刻,不少于50毫秒
   fc::time_point now = fc::time_point::now();
   int64_t time_to_next_second = 1000000 - (now.time_since_epoch().count() % 1000000);
   if( time_to_next_second < 50000 )
       time_to_next_second += 1000000;

   fc::time_point next_wakeup( now + fc::microseconds( time_to_next_second ) );

   // 调度区块生产循环
   _block_production_task = fc::schedule([this]{block_production_loop();},
                                         next_wakeup, "Witness Block Production");
}

schedule_production_loop()方法里,计算时钟,调度区块生产循环方法block_production_loop()

// 区块生产循环
block_production_condition::block_production_condition_enum witness_plugin::block_production_loop()
{
    //其他处理
    ......

    // 判断区块生产的条件是否满足,并生产区块
    result = maybe_produce_block(capture);

    //其他处理
    ......

    // 循环调度生产
    schedule_production_loop();

    //其他处理
    ......
}

在该方法中,调用生产区块的入口方法,然后继续执行循环,其maybe_produce_block()方法实现如下:

// 判断区块生产的条件是否满足,并生产区块
block_production_condition::block_production_condition_enum witness_plugin::maybe_produce_block( fc::limited_mutable_variant_object& capture )
{
   chain::database& db = database();
   fc::time_point now_fine = fc::time_point::now();
   fc::time_point_sec now = now_fine + fc::microseconds( 500000 );

   // 判断生产时钟是否同步
   if( !_production_enabled )
   {
      if( db.get_slot_time(1) >= now )
         _production_enabled = true;
      else
         return block_production_condition::not_synced;
   }

   // 判断生产时间槽已经开始
   uint32_t slot = db.get_slot_at_time( now );
   if( slot == 0 )
   {
      capture("next_time", db.get_slot_time(1));
      return block_production_condition::not_time_yet;
   }

   assert( now > db.head_block_time() );

   // 判断是否轮转到当前证人生产区块
   graphene::chain::witness_id_type scheduled_witness = db.get_scheduled_witness( slot );
   if( _witnesses.find( scheduled_witness ) == _witnesses.end() )
   {
      capture("scheduled_witness", scheduled_witness);
      return block_production_condition::not_my_turn;
   }

   fc::time_point_sec scheduled_time = db.get_slot_time( slot );
   graphene::chain::public_key_type scheduled_key = scheduled_witness( db ).signing_key;
   auto private_key_itr = _private_keys.find( scheduled_key );

    // 判断私钥是否有效
   if( private_key_itr == _private_keys.end() )
   {
      capture("scheduled_key", scheduled_key);
      return block_production_condition::no_private_key;
   }

   // 判断是否达到最低参与率
   uint32_t prate = db.witness_participation_rate();
   if( prate < _required_witness_participation )
   {
      capture("pct", uint32_t(100*uint64_t(prate) / GRAPHENE_1_PERCENT));
      return block_production_condition::low_participation;
   }

    // 判断时间间隔是否低于50毫秒
   if( llabs((scheduled_time - now).count()) > fc::milliseconds( 500 ).count() )
   {
      capture("scheduled_time", scheduled_time)("now", now);
      return block_production_condition::lag;
   }

    // 生产区块
   auto block = db.generate_block(
      scheduled_time,
      scheduled_witness,
      private_key_itr->second,
      _production_skip_flags
      );
   capture("n", block.block_num())("t", block.timestamp)("c", now);
   // 将生产的区块同步广播到网络中
   fc::async( [this,block](){ p2p_node().broadcast(net::block_message(block)); } );

   return block_production_condition::produced;
}

可以看到在maybe_produce_block()方法中,进行了各种条件判断,只有满足了所有条件才开始生产区块,并将生产后区块广播同步到网络中的其他节点。这里先介绍3个方法get_slot_time()get_slot_at_time()get_scheduled_witness()

get_slot_time()方法,获得未来的第n个区块生产时间槽的时刻:

fc::time_point_sec database::get_slot_time(uint32_t slot_num)const
{
   if( slot_num == 0 )
      return fc::time_point_sec();

    // 区块生产时间间隔
   auto interval = block_interval();
   const dynamic_global_property_object& dpo = get_dynamic_global_properties();

   if( head_block_num() == 0 )
   {
      // 特殊情况,生产创世区块的时刻
      fc::time_point_sec genesis_time = dpo.time;
      return genesis_time + slot_num * interval;
   }

    // 获取前一个区块生产时间槽时刻
   int64_t head_block_abs_slot = head_block_time().sec_since_epoch() / interval;
   fc::time_point_sec head_slot_time(head_block_abs_slot * interval);

   const global_property_object& gpo = get_global_properties();

    // 如果当前为区块链维护时刻,则跳过指定个时间槽
   if( dpo.dynamic_flags & dynamic_global_property_object::maintenance_flag )
      slot_num += gpo.parameters.maintenance_skip_slots;

  // 前一个生产时间+若干生产间隔
   return head_slot_time + (slot_num * interval);
}

get_slot_at_time()方法主要是获取指定时间属于未来哪个时间槽:

uint32_t database::get_slot_at_time(fc::time_point_sec when)const
{
   fc::time_point_sec first_slot_time = get_slot_time( 1 );
    // 判断指定的时间是否已经过时
   if( when < first_slot_time )
      return 0;
   return (when - first_slot_time).to_seconds() / block_interval() + 1;
}

get_scheduled_witness()方法实现了获取指定时间槽的区块生产调度证人:

witness_id_type database::get_scheduled_witness( uint32_t slot_num )const
{
   const dynamic_global_property_object& dpo = get_dynamic_global_properties();
   const witness_schedule_object& wso = witness_schedule_id_type()(*this);
   // 计算调度偏移量
   uint64_t current_aslot = dpo.current_aslot + slot_num;
   return wso.current_shuffled_witnesses[ current_aslot % wso.current_shuffled_witnesses.size() ];
}

这里就和前面讲的洗牌算法衔接上了,get_scheduled_witness()方法是从洗牌后的集合current_shuffled_witnesses中获取证人。

下面再回到maybe_produce_block()方法中来,在该方法中调用了generate_block()方法生产区块,其实现如下:

signed_block database::generate_block(
   fc::time_point_sec when,
   witness_id_type witness_id,
   const fc::ecc::private_key& block_signing_private_key,
   uint32_t skip /* = 0 */
   )
{ try {
   signed_block result;
   detail::with_skip_flags( *this, skip, [&]()
   {
       // 调用实际生产区块方法
      result = _generate_block( when, witness_id, block_signing_private_key );
   } );
   return result;
} FC_CAPTURE_AND_RETHROW() }

在该方法中才调用用实际生产区块的方法_generate_block()

signed_block database::_generate_block(
   fc::time_point_sec when,
   witness_id_type witness_id,
   const fc::ecc::private_key& block_signing_private_key
   )
{
   try {

    //其他处理
    ......

   static const size_t max_block_header_size = fc::raw::pack_size( signed_block_header() ) + 4;
   auto maximum_block_size = get_global_properties().parameters.maximum_block_size;
   size_t total_block_size = max_block_header_size;

   signed_block pending_block;

    //其他处理
    ......

   for( const processed_transaction& tx : _pending_tx )
   {
      size_t new_total_size = total_block_size + fc::raw::pack_size( tx );

      // 计算交易大小,防止超过最大限度
      if( new_total_size >= maximum_block_size )
      {
         postponed_tx_count++;
         continue;
      }

      try
      {
         auto temp_session = _undo_db.start_undo_session();
         processed_transaction ptx = _apply_transaction( tx );
         temp_session.merge();

         // 将交易添加到区块中
         total_block_size += fc::raw::pack_size( ptx );
         pending_block.transactions.push_back( ptx );
      }

    //其他处理
    ......
   }

    //其他处理
    ......

   pending_block.previous = head_block_id(); // 添加上一个区块id
   pending_block.timestamp = when; //添加时间戳
   pending_block.transaction_merkle_root = pending_block.calculate_merkle_root(); //添加交易树根
   pending_block.witness = witness_id; //更新证人id

    // 签名
   if( !(skip & skip_witness_signature) )
      pending_block.sign( block_signing_private_key );

   // 检查区块大小
   if( !(skip & skip_block_size_check) )
   {
      FC_ASSERT( fc::raw::pack_size(pending_block) <= get_global_properties().parameters.maximum_block_size );
   }

    // 调用推块方法
   push_block( pending_block, skip );

   return pending_block;
} FC_CAPTURE_AND_RETHROW( (witness_id) ) }

再来看将待定区块添加到链的实现过程push_block()

bool database::push_block(const signed_block& new_block, uint32_t skip)
{
   bool result;
   detail::with_skip_flags( *this, skip, [&]()
   {
      detail::without_pending_transactions( *this, std::move(_pending_tx),
      [&]()
      {
          // 调用实际的推块方法
         result = _push_block(new_block);
      });
   });
   return result;
}

在该方法中调用实际的推块的方法_push_block(),其实现如下:

bool database::_push_block(const signed_block& new_block)
{ try {
   uint32_t skip = get_node_properties().skip_flags;

   // 在此处理区块链竞争分叉问题,理论讲解请参看本人翻译的另一篇文章:
   // https://blog.csdn.net/ggq89/article/details/80072876

   try {
      auto session = _undo_db.start_undo_session();
      // 将新区块添加到链上
      apply_block(new_block, skip);
      // 存入数据库
      _block_id_to_block.store(new_block.id(), new_block);
      session.commit();
   } catch ( const fc::exception& e ) {
      elog("Failed to push new block:\n${e}", ("e", e.to_detail_string()));
      _fork_db.remove(new_block.id());
      throw;
   }

   return false;
} FC_CAPTURE_AND_RETHROW( (new_block) ) }

下面再看apply_block()的实现过程:

void database::apply_block( const signed_block& next_block, uint32_t skip )
{
    //其他处理
    ......

   detail::with_skip_flags( *this, skip, [&]()
   {
       // 在此调用实际的添加区块方法
      _apply_block( next_block );
   } );
   return;
}

到这里就和前面所讲的全部内容衔接在一起了,这个第三次代码跟踪到_apply_block()方法,至此整个dpos共识过程基本讲解完毕。最后只剩下整个应用程序的初始化过程了

未完待续…

原文链接:https://blog.csdn.net/ggq89/article/details/80068306