This is documentation on a preview feature.

状态存储

创建一个状态存储组件只需要几个基本步骤。

添加状态存储命名空间

添加 using 语句来引用与状态存储相关的命名空间。

using Dapr.PluggableComponents.Components;
using Dapr.PluggableComponents.Components.StateStore;

实现 IStateStore

创建一个实现IStateStore接口的类。

internal sealed class MyStateStore : IStateStore
{
    public Task DeleteAsync(StateStoreDeleteRequest request, CancellationToken cancellationToken = default)
    {
        // Delete the requested key from the state store...
    }

    public Task<StateStoreGetResponse?> GetAsync(StateStoreGetRequest request, CancellationToken cancellationToken = default)
    {
        // Get the requested key value from from the state store, else return null...
    }

    public Task InitAsync(MetadataRequest request, CancellationToken cancellationToken = default)
    {
        // Called to initialize the component with its configured metadata...
    }

    public Task SetAsync(StateStoreSetRequest request, CancellationToken cancellationToken = default)
    {
        // Set the requested key to the specified value in the state store...
    }
}

注册状态存储组件

在主程序文件中(例如,Program.cs),将状态存储注册到应用程序服务中。

using Dapr.PluggableComponents;

var app = DaprPluggableComponentsApplication.Create();

app.RegisterService(
    "<socket name>",
    serviceBuilder =>
    {
        serviceBuilder.RegisterStateStore<MyStateStore>();
    });

app.Run();

批量状态存储

打算支持批量操作的状态存储应该实现可选的 IBulkStateStore 接口。 它的方法与基础IStateStore接口的方法相同,但包括多个请求的值。

internal sealed class MyStateStore : IStateStore, IBulkStateStore
{
    // ...

    public Task BulkDeleteAsync(StateStoreDeleteRequest[] requests, CancellationToken cancellationToken = default)
    {
        // Delete all of the requested values from the state store...
    }

    public Task<StateStoreBulkStateItem[]> BulkGetAsync(StateStoreGetRequest[] requests, CancellationToken cancellationToken = default)
    {
        // Return the values of all of the requested values from the state store...
    }

    public Task BulkSetAsync(StateStoreSetRequest[] requests, CancellationToken cancellationToken = default)
    {
        // Set all of the values of the requested keys in the state store...
    }
}

事务性状态存储

打算支持事务的状态存储应该实现可选的ITransactionalStateStore接口。 它的TransactAsync()方法接收一个请求,其中包含要在事务中执行的一系列delete和/或set操作。 状态存储应该遍历序列并调用每个操作的Visit()方法,传递表示每种操作类型所采取的动作的回调函数。

internal sealed class MyStateStore : IStateStore, ITransactionalStateStore
{
    // ...

    public async Task TransactAsync(StateStoreTransactRequest request, CancellationToken cancellationToken = default)
    {
        // Start transaction...

        try
        {
            foreach (var operation in request.Operations)
            {
                await operation.Visit(
                    async deleteRequest =>
                    {
                        // Process delete request...

                    },
                    async setRequest =>
                    {
                        // Process set request...
                    });
            }
        }
        catch
        {
            // Rollback transaction...

            throw;
        }

        // Commit transaction...
    }
}

可查询的状态存储

打算支持查询的状态存储应该实现可选的IQueryableStateStore接口。 它的QueryAsync()方法会传递有关查询的详细信息,例如过滤器、结果限制和分页,以及结果的排序顺序。 状态存储应该使用这些详细信息来生成一组值,作为其响应的一部分返回。

internal sealed class MyStateStore : IStateStore, IQueryableStateStore
{
    // ...

    public Task<StateStoreQueryResponse> QueryAsync(StateStoreQueryRequest request, CancellationToken cancellationToken = default)
    {
        // Generate and return results...
    }
}

ETag 和其他语义错误处理

Dapr 运行时对某些状态存储操作的特定错误条件有额外处理。 状态存储可以通过从其操作逻辑中抛出特定的异常来指示这些条件:

异常 适用操作 说明
ETagInvalidException Delete, Set, Bulk Delete, Bulk Set 当 ETag 无效时
ETagMismatchException Delete, Set, Bulk Delete, Bulk Set 当 ETag 与预期值不匹配时
BulkDeleteRowMismatchException Bulk Delete 当受影响的行数与预期的行数不匹配时