指南:如何保存和获取状态

使用键值对来持久化状态

介绍

状态管理是任何应用程序最常见的需求之一:无论是新是旧,是单体还是微服务。 与不同的数据库库打交道,进行测试,处理重试和故障是很费时费力的。

Dapr提供的状态管理功能包括一致性和并发选项。 在本指南中,我们将从基础知识开始。使用键/值状态API来允许应用程序保存,获取和删除状态。

前提

第一步:设置状态存储

状态存储组件代表Dapr用来与数据库进行通信的资源。

在本指南中,我们将使用 Redis 作为状态存储引擎,但在 支持列表中的任何状态存储引擎都是可以使用的。


当在单机模式下使用dapr init时,Dapr CLI会自动提供一个状态存储(Redis),并在components目录中创建相关的YAML,在Linux/MacOS上位于$HOME/.dapr/components,在Windows上位于%USERPROFILE%/.dapr/components

如果需要切换使用的状态存储引擎,用你选择的文件替换/components下的YAML文件statestore.yaml


要将其部署到 Kubernetes 集群中,请在下面的 yaml 中填写你的所需statestore 组件metadata 连接详情,保存为 statestore.yaml,并执行命令 kubectl apply -f statestore.yaml

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: statestore
  namespace: default
spec:
  type: state.redis
  version: v1
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: ""

请参阅 这里的说明,了解如何在 Kubernetes 上设置不同的状态存储引擎。

第二步:保存和检索单个状态

下面的例子显示了如何使用Dapr状态构件的单个键/值对。


首先启动一个Dapr sidecar:

dapr run --app-id myapp --dapr-http-port 3500

然后在一个单独的终端中保存一个键/值对到你的statestore中:

curl -X POST -H "Content-Type: application/json" -d '[{ "key": "key1", "value": "value1"}]' http://localhost:3500/v1.0/state/statestore

现在获取你刚才保存的状态:

curl http://localhost:3500/v1.0/state/statestore/key1

你也可以重启你的sidecar,然后再次尝试检索状态,看看存储的状态是否与应用状态保持一致。


首先启动一个Dapr sidecar:

dapr --app-id myapp --port 3500 run

然后在一个单独的终端中保存一个键/值对到你的statestore中:

Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '[{"key": "key1", "value": "value1"}]' -Uri 'http://localhost:3500/v1.0/state/statestore'

现在获取你刚才保存的状态:

Invoke-RestMethod -Uri 'http://localhost:3500/v1.0/state/statestore/key1'

你也可以重启你的sidecar,然后再次尝试检索状态,看看存储的状态是否与应用状态保持一致。


将以下内容保存到名为pythonState.py的文件中:

from dapr.clients import DaprClient

with DaprClient() as d:
    d.save_state(store_name="statestore", key="myFirstKey", value="myFirstValue" )
    print("State has been stored")

    data = d.get_state(store_name="statestore", key="myFirstKey").data
    print(f"Got value: {data}")

保存后执行以下命令启动Dapr sidecar并运行Python应用程序:

dapr --app-id myapp run python pythonState.py

你应该会得到一个类似于下面的输出,它将同时显示Dapr和应用程序的日志:

== DAPR == time="2021-01-06T21:34:33.7970377-08:00" level=info msg="starting Dapr Runtime -- version 0.11.3 -- commit a1a8e11" app_id=Braidbald-Boot scope=dapr.runtime type=log ver=0.11.3
== DAPR == time="2021-01-06T21:34:33.8040378-08:00" level=info msg="standalone mode configured" app_id=Braidbald-Boot scope=dapr.runtime type=log ver=0.11.3
== DAPR == time="2021-01-06T21:34:33.8040378-08:00" level=info msg="app id: Braidbald-Boot" app_id=Braidbald-Boot scope=dapr.runtime type=log ver=0.11.3
== DAPR == time="2021-01-06T21:34:33.9750400-08:00" level=info msg="component loaded. name: statestore, type: state.redis" app_id=Braidbald-Boot scope=dapr.runtime type=log ver=0.11.3
== DAPR == time="2021-01-06T21:34:33.9760387-08:00" level=info msg="API gRPC server is running on port 51656" app_id=Braidbald-Boot scope=dapr.runtime type=log ver=0.11.3
== DAPR == time="2021-01-06T21:34:33.9770372-08:00" level=info msg="dapr initialized. Status: Running. Init Elapsed 172.9994ms" app_id=Braidbald-Boot scope=dapr.

Checking if Dapr sidecar is listening on GRPC port 51656
Dapr sidecar is up and running.
Updating metadata for app command: python pythonState.py
You are up and running! Both Dapr and your app logs will appear here.

== APP == State has been stored
== APP == Got value: b'myFirstValue'

state-example.php中保存以下内容:

<?php
require_once __DIR__.'/vendor/autoload.php';

$app = \Dapr\App::create();
$app->run(function(\Dapr\State\StateManager $stateManager, \Psr\Log\LoggerInterface $logger) {
    $stateManager->save_state(store_name: 'statestore', item: new \Dapr\State\StateItem(
        key: 'myFirstKey',
        value: 'myFirstValue' 
    ));
    $logger->alert('State has been stored');

    $data = $stateManager->load_state(store_name: 'statestore', key: 'myFirstKey')->value;
    $logger->alert("Got value: {data}", ['data' => $data]);
});

保存后,执行以下命令启动Dapr sidecar并运行PHP应用程序:

dapr --app-id myapp run -- php state-example.php

你应该会得到一个类似于下面的输出,它将同时显示Dapr和应用程序的日志:

✅  You're up and running! Both Dapr and your app logs will appear here.

== APP == [2021-02-12T16:30:11.078777+01:00] APP.ALERT: State has been stored [] []

== APP == [2021-02-12T16:30:11.082620+01:00] APP.ALERT: Got value: myFirstValue {"data":"myFirstValue"} []

第三步:删除状态

下面的例子显示了如何通过给状态管理API传递一个键来删除一个对象:


用上面运行的同一个dapr实例执行:

curl -X DELETE 'http://localhost:3500/v1.0/state/statestore/key1'

再尝试获取状态,注意没有返回任何值。


用上面运行的同一个dapr实例执行:

Invoke-RestMethod -Method Delete -Uri 'http://localhost:3500/v1.0/state/statestore/key1'

再尝试获取状态,注意没有返回任何值。


修改pythonState.py如下:

from dapr.clients import DaprClient

with DaprClient() as d:
    d.save_state(store_name="statestore", key="key1", value="value1" )
    print("State has been stored")

    data = d.get_state(store_name="statestore", key="key1").data
    print(f"Got value: {data}")

    d.delete_state(store_name="statestore", key="key1")

    data = d.get_state(store_name="statestore", key="key1").data
    print(f"Got value after delete: {data}")

现在通过以下命令运行你的程序:

dapr --app-id myapp run python pythonState.py

你应该会看到一个类似于下面的输出:

Starting Dapr with id Yakchocolate-Lord. HTTP Port: 59457. gRPC Port: 59458

== DAPR == time="2021-01-06T22:55:36.5570696-08:00" level=info msg="starting Dapr Runtime -- version 0.11.3 -- commit a1a8e11" app_id=Yakchocolate-Lord scope=dapr.runtime type=log ver=0.11.3
== DAPR == time="2021-01-06T22:55:36.5690367-08:00" level=info msg="standalone mode configured" app_id=Yakchocolate-Lord scope=dapr.runtime type=log ver=0.11.3
== DAPR == time="2021-01-06T22:55:36.7220140-08:00" level=info msg="component loaded. name: statestore, type: state.redis" app_id=Yakchocolate-Lord scope=dapr.runtime type=log ver=0.11.3
== DAPR == time="2021-01-06T22:55:36.7230148-08:00" level=info msg="API gRPC server is running on port 59458" app_id=Yakchocolate-Lord scope=dapr.runtime type=log ver=0.11.3
== DAPR == time="2021-01-06T22:55:36.7240207-08:00" level=info msg="dapr initialized. Status: Running. Init Elapsed 154.984ms" app_id=Yakchocolate-Lord scope=dapr.runtime type=log ver=0.11.3

Checking if Dapr sidecar is listening on GRPC port 59458
Dapr sidecar is up and running.
Updating metadata for app command: python pythonState.py
You're up and running! Both Dapr and your app logs will appear here.

== APP == State has been stored
== APP == Got value: b'value1'
== APP == Got value after delete: b''

修改state-example.php,内容如下:

<?php
require_once __DIR__.'/vendor/autoload.php';

$app = \Dapr\App::create();
$app->run(function(\Dapr\State\StateManager $stateManager, \Psr\Log\LoggerInterface $logger) {
    $stateManager->save_state(store_name: 'statestore', item: new \Dapr\State\StateItem(
        key: 'myFirstKey',
        value: 'myFirstValue' 
    ));
    $logger->alert('State has been stored');

    $data = $stateManager->load_state(store_name: 'statestore', key: 'myFirstKey')->value;
    $logger->alert("Got value: {data}", ['data' => $data]);

    $stateManager->delete_keys(store_name: 'statestore', keys: ['myFirstKey']);
    $data = $stateManager->load_state(store_name: 'statestore', key: 'myFirstKey')->value;
    $logger->alert("Got value after delete: {data}", ['data' => $data]);
});

现在使用以下命令运行它:

dapr --app-id myapp run -- php state-example.php

你应该会看到类似下面的输出:

✅  You're up and running! Both Dapr and your app logs will appear here.

== APP == [2021-02-12T16:38:00.839201+01:00] APP.ALERT: State has been stored [] []

== APP == [2021-02-12T16:38:00.841997+01:00] APP.ALERT: Got value: myFirstValue {"data":"myFirstValue"} []

== APP == [2021-02-12T16:38:00.845721+01:00] APP.ALERT: Got value after delete:  {"data":null} []

第四步:保存和检索多个状态

Dapr还允许你在同一个调用中保存和检索多个状态:


在上面运行的同一个dapr实例中,将两个键/值对保存到你的statetore中:

curl -X POST -H "Content-Type: application/json" -d '[{ "key": "key1", "value": "value1"}, { "key": "key2", "value": "value2"}]' http://localhost:3500/v1.0/state/statestore

现在获取你刚才保存的状态:

curl -X POST -H "Content-Type: application/json" -d '{"keys":["key1", "key2"]}' http://localhost:3500/v1.0/state/statestore/bulk

在上面运行的同一个dapr实例中,将两个键/值对保存到你的statetore中:

Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '[{ "key": "key1", "value": "value1"}, { "key": "key2", "value": "value2"}]' -Uri 'http://localhost:3500/v1.0/state/statestore'

现在获取你刚才保存的状态:

Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"keys":["key1", "key2"]}' -Uri 'http://localhost:3500/v1.0/state/statestore/bulk'

StateItem对象可以使用save_statesget_states方法来存储多个Dapr状态。

用以下代码更新你的pythonState.py文件:

from dapr.clients import DaprClient
from dapr.clients.grpc._state import StateItem

with DaprClient() as d:
    s1 = StateItem(key="key1", value="value1")
    s2 = StateItem(key="key2", value="value2")

    d.save_bulk_state(store_name="statestore", states=[s1,s2])
    print("States have been stored")

    items = d.get_bulk_state(store_name="statestore", keys=["key1", "key2"]).items
    print(f"Got items: {[i.data for i in items]}")

现在通过以下命令运行你的程序:

dapr --app-id myapp run python pythonState.py

你应该会看到一个类似于下面的输出:

== DAPR == time="2021-01-06T21:54:56.7262358-08:00" level=info msg="starting Dapr Runtime -- version 0.11.3 -- commit a1a8e11" app_id=Musesequoia-Sprite scope=dapr.runtime type=log ver=0.11.3
== DAPR == time="2021-01-06T21:54:56.7401933-08:00" level=info msg="standalone mode configured" app_id=Musesequoia-Sprite scope=dapr.runtime type=log ver=0.11.3
== DAPR == time="2021-01-06T21:54:56.8754240-08:00" level=info msg="Initialized name resolution to standalone" app_id=Musesequoia-Sprite scope=dapr.runtime type=log ver=0.11.3
== DAPR == time="2021-01-06T21:54:56.8844248-08:00" level=info msg="component loaded. name: statestore, type: state.redis" app_id=Musesequoia-Sprite scope=dapr.runtime type=log ver=0.11.3
== DAPR == time="2021-01-06T21:54:56.8854273-08:00" level=info msg="API gRPC server is running on port 60614" app_id=Musesequoia-Sprite scope=dapr.runtime type=log ver=0.11.3
== DAPR == time="2021-01-06T21:54:56.8854273-08:00" level=info msg="dapr initialized. Status: Running. Init Elapsed 145.234ms" app_id=Musesequoia-Sprite scope=dapr.runtime type=log ver=0.11.3

Checking if Dapr sidecar is listening on GRPC port 60614
Dapr sidecar is up and running.
Updating metadata for app command: python pythonState.py
You're up and running! Both Dapr and your app logs will appear here.

== APP == States have been stored
== APP == Got items: [b'value1', b'value2']

要用PHP批量加载和保存状态,只需创建一个 “Plain Ole’ PHP对象”(POPO),并用 StateStore注解进行声明。

更新state-example.php文件:

<?php

require_once __DIR__.'/vendor/autoload.php';

#[\Dapr\State\Attributes\StateStore('statestore', \Dapr\consistency\EventualLastWrite::class)]
class MyState {
    public string $key1 = 'value1';
    public string $key2 = 'value2';
}

$app = \Dapr\App::create();
$app->run(function(\Dapr\State\StateManager $stateManager, \Psr\Log\LoggerInterface $logger) {
    $obj = new MyState();
    $stateManager->save_object(item: $obj);
    $logger->alert('States have been stored');

    $stateManager->load_object(into: $obj);
    $logger->alert("Got value: {data}", ['data' => $obj]);
});

运行该应用:

dapr --app-id myapp run -- php state-example.php

并看到以下输出:

✅  You're up and running! Both Dapr and your app logs will appear here.

== APP == [2021-02-12T16:55:02.913801+01:00] APP.ALERT: States have been stored [] []

== APP == [2021-02-12T16:55:02.917850+01:00] APP.ALERT: Got value: [object MyState] {"data":{"MyState":{"key1":"value1","key2":"value2"}}} []

第五步:执行状态事务性操作


用上面运行的同一个dapr实例执行两个状态事务操作:

curl -X POST -H "Content-Type: application/json" -d '{"operations": [{"operation":"upsert", "request": {"key": "key1", "value": "newValue1"}}, {"operation":"delete", "request": {"key": "key2"}}]}' http://localhost:3500/v1.0/state/statestore/transaction

现在可以看到你的状态事务操作的结果:

curl -X POST -H "Content-Type: application/json" -d '{"keys":["key1", "key2"]}' http://localhost:3500/v1.0/state/statestore/bulk

在上面运行的同一个dapr实例中,将两个键/值对保存到你的statetore中:

Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"operations": [{"operation":"upsert", "request": {"key": "key1", "value": "newValue1"}}, {"operation":"delete", "request": {"key": "key2"}}]}' -Uri 'http://localhost:3500/v1.0/state/statestore'

现在可以看到你的状态事务操作的结果:

Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"keys":["key1", "key2"]}' -Uri 'http://localhost:3500/v1.0/state/statestore/bulk'

如果你的状态存储需要事务支持,可以考虑使用TransactionalStateOperation

用以下代码更新你的pythonState.py文件:

from dapr.clients import DaprClient
from dapr.clients.grpc._state import StateItem
from dapr.clients.grpc._request import TransactionalStateOperation, TransactionOperationType

with DaprClient() as d:
    s1 = StateItem(key="key1", value="value1")
    s2 = StateItem(key="key2", value="value2")

    d.save_bulk_state(store_name="statestore", states=[s1,s2])
    print("States have been stored")

    d.execute_state_transaction(
        store_name="statestore",
        operations=[
            TransactionalStateOperation(key="key1", data="newValue1", operation_type=TransactionOperationType.upsert),
            TransactionalStateOperation(key="key2", data="value2", operation_type=TransactionOperationType.delete)
        ]
    )
    print("State transactions have been completed")

    items = d.get_bulk_state(store_name="statestore", keys=["key1", "key2"]).items
    print(f"Got items: {[i.data for i in items]}")

现在通过以下命令运行你的程序:

dapr run python pythonState.py

你应该会看到一个类似于下面的输出:

Starting Dapr with id Singerchecker-Player. HTTP Port: 59533. gRPC Port: 59534
== DAPR == time="2021-01-06T22:18:14.1246721-08:00" level=info msg="starting Dapr Runtime -- version 0.11.3 -- commit a1a8e11" app_id=Singerchecker-Player scope=dapr.runtime type=log ver=0.11.3
== DAPR == time="2021-01-06T22:18:14.1346254-08:00" level=info msg="standalone mode configured" app_id=Singerchecker-Player scope=dapr.runtime type=log ver=0.11.3
== DAPR == time="2021-01-06T22:18:14.2747063-08:00" level=info msg="component loaded. name: statestore, type: state.redis" app_id=Singerchecker-Player scope=dapr.runtime type=log ver=0.11.3
== DAPR == time="2021-01-06T22:18:14.2757062-08:00" level=info msg="API gRPC server is running on port 59534" app_id=Singerchecker-Player scope=dapr.runtime type=log ver=0.11.3
== DAPR == time="2021-01-06T22:18:14.2767059-08:00" level=info msg="dapr initialized. Status: Running. Init Elapsed 142.0805ms" app_id=Singerchecker-Player scope=dapr.runtime type=log ver=0.11.3

Checking if Dapr sidecar is listening on GRPC port 59534
Dapr sidecar is up and running.
Updating metadata for app command: python pythonState.py
You're up and running! Both Dapr and your app logs will appear here.

== APP == State transactions have been completed
== APP == Got items: [b'value1', b'']

事务性状态通过扩展TransactionalState基础对象来支持,它挂接到你的 对象,然后通过setters和getters来提供事务。 而你可能会希望依赖注入框架来替你创建一个事务对象:

再次修改state-example.php文件:

<?php

require_once __DIR__.'/vendor/autoload.php';

#[\Dapr\State\Attributes\StateStore('statestore', \Dapr\consistency\EventualLastWrite::class)]
class MyState extends \Dapr\State\TransactionalState {
    public string $key1 = 'value1';
    public string $key2 = 'value2';
}

$app = \Dapr\App::create();
$app->run(function(MyState $obj, \Psr\Log\LoggerInterface $logger, \Dapr\State\StateManager $stateManager) {
    $obj->begin();
    $obj->key1 = 'hello world';
    $obj->key2 = 'value3';
    $obj->commit();
    $logger->alert('Transaction committed!');

    // begin a new transaction which reloads from the store
    $obj->begin();
    $logger->alert("Got value: {key1}, {key2}", ['key1' => $obj->key1, 'key2' => $obj->key2]);
});

运行程序:

dapr --app-id myapp run -- php state-example.php

观察到以下输出:

✅  You're up and running! Both Dapr and your app logs will appear here.

== APP == [2021-02-12T17:10:06.837110+01:00] APP.ALERT: Transaction committed! [] []

== APP == [2021-02-12T17:10:06.840857+01:00] APP.ALERT: Got value: hello world, value3 {"key1":"hello world","key2":"value3"} []

下一步

Last modified January 1, 0001