bash – AWS Data Pipeline – 如何从ShellCommandActivity设置全局管道变量

我正在尝试扩充我的管道(将数据从RDS迁移到RedShift),以便它选择id大于RedShift中存在的最大ID的所有行.我在
Python中有一个脚本来计算这个值并将其返回到输出.我想获取此输出并将其保存到变量max_id,我稍后可以在我的RDS选择查询中引用该变量.例如,我的RDS选择部分目前看起来像这样:

{
  "database": {
    "ref": "rds_mysql"
  },
  "scheduleType": "TIMESERIES",
  "name": "SrcRDSTable",
  "id": "SrcRDSTable",
  "type": "SqlDataNode",
  "table": "#{myRDSTableName}",
  "selectQuery": "select * from #{table} where #{myRDSTableLastModifiedCol} > '#{max_id}'"
},

然后我想在此之前添加一个执行bash脚本的部分,检索id字段并将其保存到变量max_id中,以便可以在上面的代码中引用它.到目前为止,我有:

{
 "myComment": "Retrieves the maximum ID for a given table in RedShift",
  "id": "ShellCommandActivity_Max_ID",
  "workerGroup": "wg-12345",
  "type": "ShellCommandActivity",
  "command": "starting_point=$(/usr/bin/python /home/user/aws-taskrunner-docker/get_id.py --schema=schema_name --table=users --database=master)"
},

如何调整以上将max_id设置为starting_point的值?谢谢.

最佳答案 不幸的是,我认为在管道执行期间没有办法设置管道参数.以下是一些可能对您有所帮助的选项.

首先,如果您的数据表有一个修改日期列,那么您可以使用管道模板Incremental Copy of RDS MySQL to Redshift.如果您不使用MySQL,您仍然可以根据需要修改该模板.

或者,您可以创建一个ShellCommandActivity,使用python连接到您的RDS数据库并将相关记录集导出到S3,而不是使用SqlDataNode.然后,您可以使用RedshiftCopyActivity从S3导入记录.

点赞