操作方法:使用输入绑定来触发应用程序
通过输入绑定,您可以在外部资源发生事件时触发应用程序。 一个外部资源可以是队列、消息管道、云服务、文件系统等。 可选择随请求发送有效载荷和元数据。
输入绑定对于事件驱动的处理,数据管道或通常对事件作出反应并执行进一步处理非常理想。 Dapr 输入绑定允许您:
- 接收不包含特定 SDK 或库的事件
- 在不更改代码的情况下替换绑定
- 关注业务逻辑而不是事件资源实现
本指南以 Kafka 绑定为例。 您可以从绑定组件列表} 中找到自己喜欢的绑定规范。 在本指南中
- 该示例调用了
/binding
端点,其中checkout
,即要调用的绑定名称。 - 有效载荷位于必需的
data
字段中,并且可以是任何 JSON 可序列化的值。 operation
字段告诉绑定需要采取什么操作。 例如,Kafka绑定支持create
操作。- 您可以查看每个输出绑定支持的操作(针对每个组件)。
注意
如果你还没有,请尝试使用绑定快速入门快速了解如何使用绑定 API。创建绑定
创建一个 binding.yaml
文件,并将其保存到应用程序目录中的 components
子文件夹中。
创建一个名称为 checkout
的新绑定组件。 在metadata
部分,配置以下与Kafka相关的属性:
- 您要发布信息的主题
- Broker
在创建绑定组件时,请指定支持的绑定direction
(方向)。
使用 --resources-path
标志与 dapr run
命令一起使用,指向您的自定义资源目录。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: checkout
spec:
type: bindings.kafka
version: v1
metadata:
# Kafka broker connection setting
- name: brokers
value: localhost:9092
# consumer configuration: topic and consumer group
- name: topics
value: sample
- name: consumerGroup
value: group1
# publisher configuration: topic
- name: publishTopic
value: sample
- name: authRequired
value: false
- name: direction
value: input
要将部署到Kubernetes集群中,请运行 kubectl apply -f binding.yaml
。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: checkout
spec:
type: bindings.kafka
version: v1
metadata:
# Kafka broker connection setting
- name: brokers
value: localhost:9092
# consumer configuration: topic and consumer group
- name: topics
value: sample
- name: consumerGroup
value: group1
# publisher configuration: topic
- name: publishTopic
value: sample
- name: authRequired
value: false
- name: direction
value: input
监听传入事件 (输入绑定)
现在配置您的应用程序来接收传入事件。 如果您正在使用HTTP,您需要:
- 在
binding.yaml
文件中,按照metadata.name
指定的绑定名称,监听一个POST
终结点。 - 验证您的应用程序允许 Dapr 为此端点进行
OPTIONS
请求。
下面是利用 Dapr SDK 展示输出绑定的代码示例。
//dependencies
using System.Collections.Generic;
using System.Threading.Tasks;
using System;
using Microsoft.AspNetCore.Mvc;
//code
namespace CheckoutService.controller
{
[ApiController]
public class CheckoutServiceController : Controller
{
[HttpPost("/checkout")]
public ActionResult<string> getCheckout([FromBody] int orderId)
{
Console.WriteLine("Received Message: " + orderId);
return "CID" + orderId;
}
}
}
//dependencies
import org.springframework.web.bind.annotation.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
//code
@RestController
@RequestMapping("/")
public class CheckoutServiceController {
private static final Logger log = LoggerFactory.getLogger(CheckoutServiceController.class);
@PostMapping(path = "/checkout")
public Mono<String> getCheckout(@RequestBody(required = false) byte[] body) {
return Mono.fromRunnable(() ->
log.info("Received Message: " + new String(body)));
}
}
#dependencies
import logging
from dapr.ext.grpc import App, BindingRequest
#code
app = App()
@app.binding('checkout')
def getCheckout(request: BindingRequest):
logging.basicConfig(level = logging.INFO)
logging.info('Received Message : ' + request.text())
app.run(6002)
//dependencies
import (
"encoding/json"
"log"
"net/http"
"github.com/gorilla/mux"
)
//code
func getCheckout(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
var orderId int
err := json.NewDecoder(r.Body).Decode(&orderId)
log.Println("Received Message: ", orderId)
if err != nil {
log.Printf("error parsing checkout input binding payload: %s", err)
w.WriteHeader(http.StatusOK)
return
}
}
func main() {
r := mux.NewRouter()
r.HandleFunc("/checkout", getCheckout).Methods("POST", "OPTIONS")
http.ListenAndServe(":6002", r)
}
//dependencies
import { DaprServer, CommunicationProtocolEnum } from '@dapr/dapr';
//code
const daprHost = "127.0.0.1";
const serverHost = "127.0.0.1";
const serverPort = "6002";
const daprPort = "3602";
start().catch((e) => {
console.error(e);
process.exit(1);
});
async function start() {
const server = new DaprServer({
serverHost,
serverPort,
communicationProtocol: CommunicationProtocolEnum.HTTP,
clientOptions: {
daprHost,
daprPort,
}
});
await server.binding.receive('checkout', async (orderId) => console.log(`Received Message: ${JSON.stringify(orderId)}`));
await server.start();
}
ACK一个事件
通过从您的HTTP处理程序返回一个200 OK
响应,告诉Dapr您已成功处理了应用程序中的事件。
拒绝事件
告诉 Dapr 在您的应用程序中事件未被正确处理,并通过返回除 200 OK
以外的任何响应来安排重新传递。 例如,一个 500 Error
。
指定自定义路由
默认情况下,传入事件将发送到与输入绑定的名称对应的 HTTP 端点。 您可以通过在 binding.yaml
中设置以下元数据属性来覆盖此属性:
name: mybinding
spec:
type: binding.rabbitmq
metadata:
- name: route
value: /onevent
事件传递保证
事件传递保证由绑定实现控制。 根据绑定实现,事件传递可以正好一次或至少一次。
参考资料
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.