如何利用Pravega的状态同步器解决分布式一致性问题(33)

发布于2019-04-21 20:15:35

Pravega是一个开源的分布式流存储平台。其中,StateSynchronizer组件以stream为基础,对外提供一致性状态共享服务。StateSynchronizer允许一组进程同时读写同一共享状态而不必担心一致性问题。本文以实现一个简单的共享字典应用为示例,演示StateSynchronizer相关API的使用。

API示例

示例实现1:SharedConfig(共享配置)

在深入StateSynchronizer的使用细节之前,先让我们看一个使用StateSynchronizer的示例。本章节示例全部来自Pravega官方文档 [1]。该示例的所有源代码都可以从Pravega-Samples的GitHub仓库 [2]下载获取。这个示例程序使用StateSynchronizer实现了Java的Map(字典)数据结构,我们不妨将其称作“SharedMap(共享字典)”。我们以这个SharedMap为基础,实现了SharedConfig。SharedConfig允许一组进程一致性地读写一个共享的,由一组键/值对属性所构成的配置对象。除了示例代码之外,我们还提供了一个命令行小程序SharedConfigCLI,让读者可以方便地体验一下这个SharedConfig应用。该示例的整体架构如图 1所示。

image

命令行小程序SharedConfigCLI所支持的所有命令如下:

在安装好上述名为“Pravega-Samples”的项目之后,请使用相同的scope名字和stream名字启动两个SharedConfigCLI的实例,这让我们可以模拟两个不同的进程通过协调它们本地的SharedConfig副本来操作同一共享状态。读者可以按如下步骤进行操作,以便初步体会一下这个SharedConfig是如何被不同进程协调的。

步骤 进程1 进程2 讨论
1 GET_ALL GET_ALL 两个进程都看见一个空的SharedConfig。
2 PUT p1, v1 进程1添加名为p1的属性
3 GET p1 GET p1 进程1看见属性p1的值为v1
进程2此时并不能看见属性p1。为什么?因为进程2还没有从共享状态刷新它的本地副本。
4 REFRESH 将进程2的本地副本与共享状态同步。
5 GET p1 进程2现在可以看见进程1在步骤2中所做的改动了。
6 REPLACE p1, newVal, v1 进程2尝试更改p1的值,但使用的是一个条件更改,这意味着只有当p1的旧值为v1时该更改才能生效(此时确实如此)。
7 GET p1 p1的值现在变成newVal了。
8 REPLACE p1, anotherVal, v1 进程1尝试使用和进程2在步骤6中所用相同的条件更改。这次的更改会失败,因为p1此时的值已不再是v1了。
9 GET p1 步骤8中的失败更改会导致进程1的共享状态本地副本被更新。p1此时的值已是newVal了。

表 1 两个进程同时操作同一个SharedConfig [1]

读者可以重复类似的操作序列来探索PUT_IF_ABCENT和其它修改共享状态操作的语义。该示例背后蕴含的基本思想是:所有对SharedConfig的更改操作只有当它们作用于最新的状态值时才能成功。此处,我们使用乐观的并发控制在SharedConfig对象的不同消费者之间达到了高效的一致性。读者还可以创建多个SharedConfig状态对象同时运行,每个SharedConfig对象个体都使用基于不同stream的独立的StateSynchronizer。

示例实现2:SharedMap(共享字典)的基本结构

在上一个示例中,我们其实已经使用到了SharedMap。StateSynchronizer可以用于实现几乎所有数据结构的共享版本。或许你的应用只需要共享一个简单的整数计数器,那么我们可以用StateSynchronizer实现一个共享计数器。又或许你需要共享的数据是一个包含当前集群里所有服务器的一个集合,那么我们可以用StateSynchronizer实现一个共享集合。还有许许多多类似这样的可能性。现在,让我们详细讨论一下如何基于StateSynchronizer实现SharedMap以便进行对象共享。

创建StateSynchronizer

StateSynchronizer也是Pravega客户端的类型之一,类似EventStreamReader或者EventStreamWriterStateSynchronizer通过ClientFactory接口创建。每个StateSynchronizer在其scope内必须具有唯一的名字。SynchronizerConfig可用于定制StateSynchronizer的行为(尽管在当前版本中,StateSynchronizer尚不支持自定义行为)。StateSynchronizer利用Java的泛型机制,允许开发者指定基于特定类型的StateSynchronizer

共享/同步状态StateT

在设计一个使用StateSynchronizer的应用时,开发者必须确定需要被同步(共享)的状态类型。我们需要共享一个Map?一个Set?还是一个普通的POJO类?也就是说,我们需要共享什么样的数据结构。这将定义出StateSynchronizer的核心类型,即StateSynchronizer接口上的泛型类型StateTStateT对象可以是任何Java对象,只要它实现了Pravega的Revisioned接口。Revisioned是很简单的一个接口,它允许Pravega能够比较任意两个不同的StateT对象。

在我们的例子中,SharedMap才是StateSynchronizer的真正使用者。它定义了一个简单的Map接口,提供get(key)set(key, value)等操作,就像一个常见的Map对象那样。它同时也按StateSynchronizer的要求实现了Revisioned接口,并且使用一个简单的ConcurrentHashMap作为内部Map的实现。所以,在我们的这个例子中,StateT就是SharedStateMap<K, V>

状态变更操作Update与InitialUpdate

在一个StateSynchronizer应用中,除了类型StateT之外,还需要定义另外两个非常重要的类型:Update类型和InitialUpdate类型。Update代表了持久化在stream中的“增量”或者说是更新对象。InitialUpdate则是一个特殊的起始更新对象,用于启动StateSynchronizer。UpdateInitialUpdate都是基于泛型类型StateT定义的。

StateSynchronizer使用单segment的stream来存储对共享对象的更新操作。以UpdateInitialUpdate形式存在的更新对象,根据当前本地的共享状态副本是否相对于stream处于最新状态,被写入stream之中。如果检测到该更新操作将会作用于在共享状态的一个早期版本,则更新操作不会被应用。StateSynchronizer自身在本地内存中维护了一份共享状态的副本,并且保存了该副本相关的版本元信息。通过getState()方法可以获取该本地状态副本。本地的状态副本有可能已经过期,应用程序可以通过调用fetchUpdates()方法来刷新状态副本,该操作会从stream获取所有在该版本之后产生的更新操作。绝大多数来自应用程序的更新操作都通过updateState()方法进行。updateState()方法接受一个函数对象作为参数。该函数对象会被调用并传入最新的共享状态,而函数本身则负责确定有哪些更新将作用于该状态。

在我们的例子中,InitialUpdate实现如下:

/**
 * Create a Map. This is used by StateSynchronizer to initialize shared state.
 */
private static class CreateState<K, V> implements InitialUpdate<SharedStateMap<K,V>>, Serializable {
    private static final long serialVersionUID = 1L;
    private final ConcurrentHashMap<K, V> impl;

    public CreateState(ConcurrentHashMap<K, V> impl) {
        this.impl = impl;
    }

    @Override
    public SharedStateMap<K, V> create(String scopedStreamName, Revision revision) {
        return new SharedStateMap<K, V>(scopedStreamName, impl, revision);
    }
}

源代码 1 用于产生起始状态的特殊更新操作InitialUpdate [1]

如源码所示,CreateState对象用于初始化stream中的共享对象,它将会创建一个新的,空SharedStateMap对象。读者可以把其他例子中的InitialUpdate想象成把计数器置1,或者把一个集合初始化成仅包含数个固定元素。将诸如“initialize”和“update”的函数方法用类的形式来实现或许看起来有一些奇怪,但当你仔细思考其中的缘由时,你就会发现这其实非常合理。所有这些更新操作,例如initialize和update,都需要被保存到stream中,因此它们必须被实现为可序列化的对象。我们必须允许一个客户端在任何时间点启动,计算出当前共享状态,并且随着更新操作(可能来自其它客户端)不断被写入到stream中,还能够维持当前最新状态。如果我们仅仅在stream中保存“最新的状态值”,那么就无法使用并发控制来提供并发读写的一致性了。

StateUpdate抽象

Update类型看起来就更加怪异了。对于Map数据结构来说,并非只有一种更新操作,而是有各种各样的更新操作:更新一个键/值对,更新一组键/值对,删除一个键/值对,删除所有键/值对等等。每一类这样的更新操作都必须由一个对应的类实现。为此,我们定义了一个名为StateUpdate的抽象类,而所有其它更新操作的实现类都将继承自这个抽象类:

/**
 * A base class for all updates to the shared state. This allows for several different types of updates.
 */
private static abstract class StateUpdate<K,V> implements Update<SharedStateMap<K,V>>, Serializable {
    private static final long serialVersionUID = 1L;

    @Override
    public SharedStateMap<K,V> applyTo(SharedStateMap<K,V> oldState, Revision newRevision) {
        ConcurrentHashMap<K, V> newState = new ConcurrentHashMap<K, V>(oldState.impl);
        process(newState);
        return new SharedStateMap<K,V>(oldState.getScopedStreamName(), newState, newRevision);
    }

    public abstract void process(ConcurrentHashMap<K, V> updatableList);
}

源代码 2 所有更新操作的抽象基类 [1]

通过定义一个抽象类StateUpdate,我们可以基于该抽象类来定义其它的Update实现。抽象类实现了“applyTo”方法。StateSynchronizer调用该方法将更新应用于当前的状态对象,并返回对应的更新后的状态对象。实际的更新工作是在旧状态的底层Map实现对象的一个拷贝上完成的:process()方法作用于实现对象上,并返回SharedStateMap的一个新版本,而这个新版本则使用process()方法处理后的实现对象作为内部状态。抽象类所定义的process()方法正是更新操作真正进行的位置。该方法由具体的更新操作实现类负责实现,例如SharedMap上的Put更新操作和PutAll更新操作。

基于StateUpdate抽象的Put操作

以下是SharedMap上的Put(key, value)操作的实现:

/**
 * Add a key/value pair to the State.
 */
private static class Put<K,V> extends StateUpdate<K,V> {
    private static final long serialVersionUID = 1L;
    private final K key;
    private final V value;

    public Put(K key, V value) {
        this.key = key;
        this.value = value;
    }

    @Override
    public void process(ConcurrentHashMap<K, V> impl) {
        impl.put(key, value);
    }
}

源代码 3 Put更新操作的实现 [1]

此处,process()方法用于将一个键/值对加入字典数据结构,如果对应的键已经存在,则更改对应的值。SharedMap上的所有操作都是通过创建StateUpdate子类实例的方式进行的。

示例实现3:在SharedMap上执行操作

SharedMap的例子展示了StateSynchronizer上的典型操作。SharedMap对外提供的API非常类似Java的Map<K, V>接口。通过使用StateUpdate的各个子类进行状态变更操作,SharedMap以操作StateSynchronizer的形式实现了Map上的各种操作。

创建与初始化

以下源码将展示如何创建SharedMap:

/**
  * Creates the shared state using a synchronizer based on the given stream name.
  *
  * @param clientFactory - the Pravega ClientFactory to use to create the StateSynchronizer.
  * @param streamManager - the Pravega StreamManager to use to create the Scope and the Stream used by the StateSynchronizer
  * @param scope - the Scope to use to create the Stream used by the StateSynchronizer.
  * @param name - the name of the Stream to be used by the StateSynchronizer.
  */
 public SharedMap(ClientFactory clientFactory, StreamManager streamManager, String scope, String name){
     streamManager.createScope(scope);

     StreamConfiguration streamConfig = StreamConfiguration.builder().scope(scope).streamName(name)
             .scalingPolicy(ScalingPolicy.fixed(1))
             .build();

     streamManager.createStream(scope, name, streamConfig);

     this.stateSynchronizer = clientFactory.createStateSynchronizer(name,
                                             new JavaSerializer<StateUpdate<K,V>>(),
                                             new JavaSerializer<CreateState<K,V>>(),
                                             SynchronizerConfig.builder().build());

     stateSynchronizer.initialize(new CreateState<K,V>(new ConcurrentHashMap<K,V>()));
 }

源代码 4 SharedMap的创建 [1]

一个SharedMap实例是以定义scope和stream的形式创建出来的(在大多数情况下,对应的scope和stream可能已经存在,那么第10-16行通常为一个空操作)。StateSynchronizer对象本身是在第18-21行中由ClientFactory创建的,非常类似reader和writer的创建。注意,可以为Update对象和InitialUpdate对象分别制定各自的序列化器。目前,SynchronizerConfig还仅仅只是一个空实现,因为StateSynchronizer在当前版本中尚无可用的配置项。

StateSynchronizer提供一个以InitialUpdate对象作为参数的initialize()方法。该方法在SharedMap的构造函数中被调用,以保证共享状态被正确地初始化。注意,在大多数情况下,SharedMap很可能是基于一个已经包含SharedMap共享状态的stream被创建。在这种情况下,调用initialize()方法也没有任何问题,因为initialize()方法不会修改stream中的共享状态。

读操作

所有的读操作,即不会改变共享状态的操作,例如:get(key)containsValue(value)等,都是基于StateSynchronizer的本地状态副本的。所有的这些操作都使用getState()方法获取本地共享状态副本,然后在该副本上的进行读取操作。StateSynchronizer的本地状态副本可能过期。在这种情况下,SharedMap的客户端可能需要使用refresh()方法来强制StateSynchronizer从stream上的最新共享状态刷新本地状态,而该刷新操作是通过StateSynchronizer对象的fetchUpdates()方法完成的。

注意,用存在过期状态的可能性来换取更快的响应,这是设计决策层面的一种权衡。当然,我们也可以很容易地将读操作实现为在读取本地共享状态副本之前总是先做一次刷新操作。如果开发者可以预见到共享状态上会存在频繁更新,那么这也不失为一种有效的策略。在我们的这个示例中,我们假设SharedMap对象会被频繁读取,但并不会被频繁更新,因此选择了直接读取本地状态副本。

写(更新)操作

正如我们先前讨论的那样,所有的写操作都是由StateUpdate类的各个具体子类实现的:clear()操作使用StateUpdate的子类Clear来进行键/值对的删除,而put()操作则使用Put子类,如此种种。现在,让我们深入put()操作的实现来更加详细地讨论StateSynchronizer的编程。以下是put()操作的源码:

/**
 * Associates the specified value with the specified key in this map.
 *
 * @param key - the key at which the value should be found.
 * @param value - the value to be entered into the map.
 * @return - the previous value (if it existed) for the given key or null if the key did not exist before this operation.
 */
public V put(K key, V value){
    final AtomicReference<V> oldValue = new AtomicReference<V>(null);
     stateSynchronizer.updateState((state, updates) -> {
        oldValue.set(state.get(key));
        updates.add(new Put<K,V>(key,value));
    });
    return oldValue.get();
}

源代码 5 SharedMap上put接口方法的实现 [1]

注意,传入给StateSynchronizerupdateState()方法的函数对象很可能被调用多次。将该函数作用于旧状态所得的结果只有当该状态是stream上的最新状态时才会被写出。如果由于竞争而导致乐观并发控制检查失败,那么操作会反复重试(从而导致传入的函数对象被调用多次)。在大多数情况下,只需要很少的几次重试就可以完成对应操作了。在某些情况下,开发者也可以选择在调用updateState()方法之前先手动调用一次fetchUpdates(),从stream上同步最新的共享状态到StateSynchronizer。这其实是一种优化手段,在更新操作的预期频繁程度和更新操作的高效性之间做出权衡。如果你能预见到会有大量更新,那么就在调用updateState()方法之前先调用一次fetchUpdates()。在我们的示例中,我们假设不会有太多更新,因此选择让StateSynchronizer自己处理状态刷新。

删除操作

我们选择在实现删除操作的同时,利用StateSynchronizer的compact特性。我们的策略是,每执行5次remove()操作后,以及每次clear()操作后,都自动进行一次compact()调用。我们甚至还可以选择在每执行5次update()操作后也进行一次compct()调用,但目前我们想仅把compact()操作运用在删除操作上。

读者可以把compact()操作想象成StateSynchronizer的某种形式的“垃圾清除”机制。在一定数量的更新操作被应用到共享状态之后,一种更有效的形式是再写入一个新的起始状态。这个新的起始状态可以看作是所有写入stream的更新操作的累积表示。通过这种方法,所有早于该compact()操作的数据现在都可以被忽略,并最终从stream中清除。

image

图 2 每个compact()操作都会创建一个新的起始状态 图片来自[1]

如图 2所示,compact()操作的结果就是将一个新的起始状态Initial2写入stream。现在,所有比Change3更早的数据,包括Change3自身,都已经无用了,可以从stream中被作为垃圾清除掉。

Pravega系列文章计划

Pravega根据Apache 2.0许可证开源,0.4版本已于近日发布。我们欢迎对流式存储感兴趣的大咖们加入Pravega社区,与Pravega共同成长。本篇文章为Pravega系列第六篇,系列文章标题如下(标题根据需求可能会有更新):

  1. 实时流处理(Streaming)统一批处理(Batch)的最后一块拼图:Pravega

  2. 开源Pravega架构解析:如何通过分层解决流存储的三大挑战?

  3. Pravega应用实战:为什么云原生特性对流存储至关重要

  4. “ToB” 产品必备特性: Pravega的动态弹性伸缩

  5. 取代ZooKeeper!高并发下的分布式一致性开源组件StateSynchronizer

  6. 分布式一致性解决方案-状态同步器(StateSynchronizer) API示例

  7. Pravega的仅一次语义及事务支持

  8. 与Apache Flink集成使用

作者简介

参考文献

[1] “Working with Pravega: State Synchronizer,” Dell EMC, [Online]. Available: https://github.com/pravega/pravega/blob/master/documentation/src/docs/state-synchronizer.md.

[2] “StateSynchronizer Related Samples in Pravega-Samples GitHub Repository,” [Online]. Available: https://github.com/pravega/pravega-samples/tree/master/pravega-client-examples/src/main/java/io/pravega/example/statesynchronizer.

更多内容,请关注AI前线

image