我正在尝试扩充我的管道(将数据从RDS迁移到RedShift),以便它选择id大于RedShift中存在的最大ID的所有行.我在
Python中有一个脚本来计算这个值并将其返回到输出.我想获取此输出并将其保存到变量max_id,我稍后可以在我的RDS选择查询中引用该变量.例如,我的RDS选择部分目前看起来像这样:
{
"database": {
"ref": "rds_mysql"
},
"scheduleType": "TIMESERIES",
"name": "SrcRDSTable",
"id": "SrcRDSTable",
"type": "SqlDataNode",
"table": "#{myRDSTableName}",
"selectQuery": "select * from #{table} where #{myRDSTableLastModifiedCol} > '#{max_id}'"
},
然后我想在此之前添加一个执行bash脚本的部分,检索id字段并将其保存到变量max_id中,以便可以在上面的代码中引用它.到目前为止,我有:
{
"myComment": "Retrieves the maximum ID for a given table in RedShift",
"id": "ShellCommandActivity_Max_ID",
"workerGroup": "wg-12345",
"type": "ShellCommandActivity",
"command": "starting_point=$(/usr/bin/python /home/user/aws-taskrunner-docker/get_id.py --schema=schema_name --table=users --database=master)"
},
如何调整以上将max_id设置为starting_point的值?谢谢.
最佳答案 不幸的是,我认为在管道执行期间没有办法设置管道参数.以下是一些可能对您有所帮助的选项.
首先,如果您的数据表有一个修改日期列,那么您可以使用管道模板Incremental Copy of RDS MySQL to Redshift.如果您不使用MySQL,您仍然可以根据需要修改该模板.
或者,您可以创建一个ShellCommandActivity,使用python连接到您的RDS数据库并将相关记录集导出到S3,而不是使用SqlDataNode.然后,您可以使用RedshiftCopyActivity从S3导入记录.