使用 AWS Lambda 持续将 Amazon DynamoDB 更改复制到 Amazon Aur
实时将 Amazon DynamoDB 更改同步到 Amazon Aurora PostgreSQL
使用 AWS Lambda
关键要点
在本篇文章中,我们介绍了一种解决方案,通过使用 Amazon DynamoDB Streams 和 AWS Lambda,将数据从 DynamoDB 实时复制到 Amazon Aurora PostgreSQL 兼容版。这可以帮助客户在进行数据仓库现代化的过程中,支持其传统的报告应用和业务智能工具。
为什么选择这种解决方案? 通过 DynamoDB Streams 来捕获数据更改。 利用 Lambda 函数实现自动化数据同步。 针对企业级应用提供了快速的、可靠的解决方案。
背景介绍
Amazon DynamoDB 是一种完全托管的无服务器键值 NoSQL 数据库,旨在以任何规模运行高性能应用。 Amazon Aurora 是一个与 MySQL 和 PostgreSQL 兼容的云原生关系数据库。Aurora 结合了传统企业数据库的性能和可用性,并且具有开源数据库的简单性和成本效益。使用无服务器技术可以消除诸如容量规划和补丁管理的基础设施管理任务,并在整个应用堆栈中增强敏捷性。
在这篇文章中,我们提出了一种解决方案,通过从 DynamoDB 复制数据到 Amazon Aurora PostgreSQL 兼容版,处理规模的实时数据更改。
用例场景
客户拥有遗留的报告应用程序、业务智能BI工具,以及运行在其本地环境中的数据仓库。作为长期计划,客户希望将数据仓库现代化到 Amazon Redshift。在此过程中,为了支持下游的遗留报告环境,他们将数据从 DynamoDB 复制到 Amazon Aurora PostgreSQL 兼容版,以便用户能够运行一次性和聚合查询。尽管将数据从 DynamoDB 复制到 Amazon Aurora PostgreSQL 并不常见,但他们更倾向于将数据导入到 Amazon Aurora PostgreSQL,以便在实现 Amazon Redshift 迁移前能够持续运行现有的遗留应用程序。
解决方案概述
在 DynamoDB 中,您只需定义一个分区键Partition Key和可选的排序键Sort Key来处理数据。相比之下,对于像 Amazon Aurora 这样的关系数据库,则需要为每个属性定义表模式。为了从 DynamoDB 表复制更改,我们使用 DynamoDB Streams 捕获按时间顺序排列的项级修改序列,并将这些信息存储在日志中,最长可达 24 小时。启用 DynamoDB Streams 后,复制才会开始。这意味着,如果您的 DynamoDB 表中已有数据需要复制到 Aurora,则必须通过一次性加载方式来处理,例如 DynamoDB 数据导出到 Amazon S3 或者通过 AWS 数据管道导入导出 DynamoDB 数据。
DynamoDB 是无模式的,您需要负责确保关系数据库结构的更新,以避免因添加新的属性到 DynamoDB 而导致复制中断。尽管 Aurora 可以处理每秒数十万次的事务TPS,但如果 DynamoDB 中的传入 TPS 超过这一限度,Aurora 可能会出现延迟。在实施解决方案之前,了解 TPS 要求并与延迟 SLA 对齐非常重要。
在与客户的讨论中,我们还考虑了通过 Amazon Data Firehose 将数据从 DynamoDB 流式传输到 Aurora。然而,客户更倾向于使用 DynamoDB Streams,因为这是一个开箱即用的解决方案,没有额外成本,并且符合客户对数据保留的服务水平协议SLA。

以下图示展示了该解决方案的架构与工作流程:
要启用两个数据库之间的数据复制,可以完成以下高层步骤:
创建一个 DynamoDB 表。配置 DynamoDB 到 SQL 的表映射。在 DynamoDB 表中启用 DynamoDB Streams。创建一个 Lambda 函数,并使用 Powertools for AWS Lambda 进行 Amazon CloudWatch Logs 和 AWS Secrets Manager 参数设置。为 DynamoDB 流配置 Lambda 触发器。验证在 Amazon RDS 中的 DynamoDB 更改。前提条件
要按照本篇文章进行操作,您需要满足以下前提条件:
安装并配置 AWS 命令行接口AWS CLI拥有适当权限的 AWS 账号在 AWS 上创建一个虚拟专用云VPC一台 Aurora Serverless v2 实例,数据将在其中复制配置 DynamoDB 到 SQL 的表映射
以下表格展示了 DynamoDB 表与 SQL 数据库之间的映射。在我们的用例中,我们将一个 DynamoDB 表映射到一个 Aurora PostgreSQL 表。
DynamoDB 表员工SQL 表员工Id主键,类型:NumberId主键,类型:NumberempName,类型:StringName,类型:Varchar(20)empDepartment,类型:StringDepartment,类型:Varchar(10)我们在两个表中使用 Id 作为主键。
DynamoDBSQLINSERTINSERTMODIFYUPDATEREMOVEDELETE创建一个 DynamoDB 表
以下 AWS CLI 命令创建了一个名为 Employees 的 DynamoDB 表,主键为 Id,类型为数字:
bashaws dynamodb createtable tablename Employees attributedefinitions AttributeName=IdAttributeType=N keyschema AttributeName=IdKeyType=HASH provisionedthroughput ReadCapacityUnits=5WriteCapacityUnits=5 tableclass STANDARD
该表配置了 5 个读取容量单位RCU和 5 个写入容量单位WCU的预配置吞吐量。有关预配置容量定价的更多信息,请参考 预配置容量的定价
在 DynamoDB 表上启用 DynamoDB Streams
要 启用 DynamoDB Streams,请完成以下步骤:
在 DynamoDB 控制台中,导航至您创建的表并选择 导出和流 选项卡。在 DynamoDB 流详细信息 中,选择 开启。当 DynamoDB Streams 被启用时,所有在 DynamoDB 表中的数据操作语言DML操作将被捕获为流中的一项。
在 查看类型 中选择 新图像 以捕获更新后的新值,我们使用新值替代目标中的数据。选择 打开流。同样也可以通过 CLI 完成,以下命令在员工表上启用新图像视图类型。
bashaws dynamodb updatetable tablename Employees streamspecification StreamEnabled=trueStreamViewType=NEWIMAGE
创建用于 CloudWatch Logs 和 Secrets Manager 参数的 Lambda 函数
在本用例中,我们使用名为 SQL Replicator 的 Lambda 函数。当 DynamoDB 表中发生数据修改时,该函数会被 DynamoDB Streams 触发。此函数将更改复制到 Aurora PostgreSQL Serverless,其日志使用 Powertools for Lambda 捕获到 CloudWatch Logs。Lambda 代码使用 Python 编写,使用 Psycopg 数据库适配器连接 PostgreSQL,并使用 Powertools for LambdaPython进行日志记录和 secrets 存储。
Lambda 权限策略
Lambda 角色包含以下 AWS 托管策略:
AWSLambdaInvocationDynamoDB 提供对 DynamoDB Streams 的读取权限AWSLambdaBasicExecutionRole 提供对 CloudWatch Logs 的写入权限AWSLambdaVPCAccessExecutionRole 为 Lambda 函数提供在访问 VPC 中资源时所需的最小权限,包括创建、描述、删除网络接口以及对 CloudWatch Logs 的写入权限SecretsManagerReadWrite 提供通过 AWS 管理控制台 的 Secrets Manager 的读/写访问权限您可以在 Secrets Manager 中 创建目标 RDS 数据库的 secrets 以供 Lambda 函数使用。有关直接集成的信息,请参阅 通过 AWS Secrets Manager 改善 Amazon RDS 主数据库凭据的安全性。
银河加速器下载苹果ios以下是同步从 DynamoDB 到 PostgreSQL 数据的 Lambda 函数的 Python 代码,包含以下操作:
导入必要的库,如 json、psycopg2 和 awslambdapowertools初始化来自 awslambdapowertools 的日志记录器从 Secrets Manager 检索 RDS 数据库凭据使用 psycopg2 连接 PostgreSQL 数据库对于 DynamoDB 事件中的每条记录,根据事件类型 INSERT、MODIFY、REMOVE在 PostgreSQL 中执行 CRUD 操作。使用 psycopg2 游标执行 SQL 查询,插入、更新和删除 PostgreSQL 数据库中的记录在每个步骤中使用记录器记录相关信息通过从 PostgreSQL 中选择记录来验证数据同步在最后提交事务python
导入库
import jsonimport psycopg2from awslambdapowertools import Loggerfrom awslambdapowertoolsutilities import parameters
初始化 Powertools Logger
logger = Logger()
指示 Logger 记录传入事件
@loggerinjectlambdacontext(logevent=True)
Lambda 处理函数
def lambdahandler(event context) try # 从 AWS Secrets Manager 获取秘密值 secretvalue = jsonloads(parametersgetsecret(name=dev/rds))
# 数据库连接 mydb = psycopg2connect( user=secretvalue[username] password=secretvalue[password] host=secretvalue[host] dbname=secretvalue[engine] port= secretvalue[port] ) mycursor = mydbcursor() # 对于事件中的每条记录 for record in event[Records] eventname = record[eventName] # 根据记录事件进行处理 if eventname == INSERT loggerinfo(插入记录 extra={record record}) sqlscript = INSERT INTO publicEmployees VALUES (sss) sqlvalue = ( int(recordget(dynamodb {})get(Keys {})get(Id {})get(N)) recordget(dynamodb {})get(NewImage {})get(empName {})get(S NA) recordget(dynamodb {})get(NewImage {})get(empDepartment {})get(S NA) ) mycursorexecute(sqlscript sqlvalue) loggerinfo(成功插入记录到 Employees 表) elif eventname == MODIFY loggerinfo(修改记录 extra={record record}) sqlscript = UPDATE publicEmployees SET Name = s Department = s WHERE Id = s sqlvalue = ( recordget(dynamodb {})get(NewImage {})get(empName {})get(S recordget(dynamodb {})get(NewImage {})get(empName {})get(S NA)) recordget(dynamodb {})get(NewImage {})get(empDepartment {})get(S recordget(dynamodb {})get(NewImage {})get(empDepartment {})get(S NA)) int(recordget(dynamodb {})get(Keys {})get(Id {})get(N)) ) mycursorexecute(sqlscript sqlvalue) loggerinfo(成功修改 Employees 表中的记录) elif eventname == REMOVE loggerinfo(删除记录 extra={record record}) sqlscript = DELETE FROM publicEmployees WHERE Id = s sqlvalue = ( int(recordget(dynamodb {})get(Keys {})get(Id {})get(N)) ) mycursorexecute(sqlscript sqlvalue) loggerinfo(成功从 Employees 表中删除记录) # 验证选择语句 mycursorexecute(SELECT FROM publicEmployees) records = mycursorfetchall() mycursorclose() mydbcommit() loggerinfo(接收到的事件 extra={records records})except (Exception psycopg2Error) as error loggerexception(从 PostgreSQL 获取数据时发生错误 extra={error error})在函数代码中,请根据您的需要将 “dev/rds” 替换为您要用于数据库身份验证的 secret 的名称。有关创建 RDS 数据库 secret 的更多信息,请参阅 在 AWS Secrets Manager 中创建数据库 secret。
Secrets Manager secrets 值
以下是供您参考的 secret 值:
json{usernameadminpasswordxxxxxxenginepostgreshostdatabaseclusterabcdefghjklmnuseast1rdsamazonawscomport5432dbClusterIdentifierdatabase3}
由于 Amazon Aurora 部署在 VPC 中,因此我们将 Lambda 附加到同一 VPC。我们在 Lambda 和 Amazon Aurora 上配置了安全组规则,以允许它们之间的连接。有关更多信息,请参考 使用 Lambda 函数访问 Amazon RDS 数据库。您还可以参考 Amazon RDS 故障排除手册 以获取其他帮助。
配置 DynamoDB 流的 Lambda 触发器
通过触发器,您可以构建响应于 DynamoDB 表数据修改的应用程序。有关 DynamoDB 与 Lambda 集成的更多信息,请参阅 DynamoDB Streams 和 AWS Lambda 触发器。
触发器会运行 Lambda 函数,如果返回错误,它将重试批处理,直到处理成功或数据过期。您还可以配置 Lambda 函数以使用更小的批次重试,限制重试次数及其他选项。关于批处理的更多信息,请参见 轮询和批处理流。
函数触发器的批大小配置为 100,基于 DynamoDB DML 的数据量。
验证 DynamoDB 在 Amazon RDS 中的更改
通过设置 DynamoDB 流和 Lambda 函数,您现在可以验证数据的交付。您可以使用 AWS CLI 或控制台执行 DynamoDB 的插入、更新和删除操作。以下为 AWS CLI 示例代码。您可以使用 PostgreSQL 客户端在此示例中使用 pgAdmin来连接并验证数据。
插入
使用以下代码在 AWS CLI 中执行插入操作:
bashaws dynamodb putitem tablename Employees item {Id {N
使用 Amazon EMR 针对 Apache Spark 的运行时将 Apache Spark 3
用 Amazon EMR 运行 Apache Spark 351 工作负载的性能提升关键要点Amazon EMR 提供了针对 Apache Spark 的优化运行时,性能比 Apache Spark 351 快 45 倍,并在成本效益方面提高了 28 倍。自 2022 年底以来新增了 35 项优化,...
使用语言模型的文档自动摘要技术 机器学习博客
使用语言模型进行文档自动摘要的技术关键要点本文探讨了使用自然语言处理NLP和生成式人工智能AI进行文档摘要的各种技术。自动摘要在信息丰富的时代至关重要,可以帮助压缩冗长的文本,提高交流效率。文章涵盖了提取式和生成式摘要技术以及多层次摘要方法,包括具体的技术步骤和应用场景,提供实际的编码示例和实施建议...