我有一个Dataflow作业要写入BigQuery.它适用于非嵌套模式,但嵌套模式失败.
这是我的Dataflow管道:
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
wordcount_options = pipeline_options.view_as(WordcountTemplatedOptions)
schema = 'url: STRING,' \
'ua: STRING,' \
'method: STRING,' \
'man: RECORD,' \
'man.ip: RECORD,' \
'man.ip.cc: STRING,' \
'man.ip.city: STRING,' \
'man.ip.as: INTEGER,' \
'man.ip.country: STRING,' \
'man.res: RECORD,' \
'man.res.ip_dom: STRING'
first = p | 'read' >> ReadFromText(wordcount_options.input)
second = (first
| 'process' >> (beam.ParDo(processFunction()))
| 'write' >> beam.io.WriteToBigQuery(
'myBucket:tableFolder.test_table',
schema=schema)
)
我使用以下Schema创建了BigQuery Table:
[
{
"mode": "NULLABLE",
"name": "url",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "ua",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "method",
"type": "STRING"
},
{
"mode": "REPEATED",
"name": "man",
"type": "RECORD",
"fields":
[
{
"mode": "REPEATED",
"name": "ip",
"type": "RECORD",
"fields":
[
{
"mode": "NULLABLE",
"name": "cc",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "city",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "as",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "country",
"type": "STRING"
}
]
},
{
"mode": "REPEATED",
"name": "res",
"type": "RECORD",
"fields":
[
{
"mode": "NULLABLE",
"name": "ip_dom",
"type": "STRING"
}
]
}
]
}
]
我收到以下错误:
BigQuery creation of import job for table "test_table" in dataset "tableFolder" in project "myBucket" failed., BigQuery execution failed., HTTP transport error:
Message: Invalid value for: url is not a valid value
HTTP Code: 400
题
有人可以指导我吗?我究竟做错了什么?
另外,如果有更好的方法来遍历所有嵌套模式并写入BigQuery,请建议?
附加信息
我的数据文件:
{"url":"xyz.com","ua":"Mozilla/5.0 Chrome/63","method":"PUT","man":{"ip":{"cc":"IN","city":"delhi","as":274,"country":"States"},"res":{"ip_dom":"v1"}}}
{"url":"xyz.com","ua":"Mozilla/5.0 Chrome/63","method":"PUT","man":{"ip":{"cc":"DK","city":"munlan","as":4865,"country":"United"},"res":{"ip_dom":"v1"}}}
{"url":"xyz.com","ua":"Mozilla/5.0 Chrome/63","method":"GET","man":{"ip":{"cc":"BS","city":"sind","as":7655,"country":"India"},"res":{"ip_dom":"v1"}}}
最佳答案 您的代码的问题是您尝试使用嵌套字段,同时将BigQuery表架构指定为字符串,这是不受支持的.
为了将嵌套记录从Apache Beam推送到BigQuery,您需要创建TableSchema对象,即使用内置解析器:
from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
table_schema = parse_table_schema_from_json(your_bigquery_json_schema)
您需要在那里将模式作为JSON字符串传递,您可以在终端中使用以下命令获取它(我假设您已安装了gcloud工具):
bq --project=your-gcp-project-name --format=json show your.table.name > schema.json
并在Python中使用它如下:
table_schema = parse_table_schema_from_json(json.dumps(json.load(open("schema.json"))["schema"]))
然后在你的管道中:
beam.io.WriteToBigQuery(
'myBucket:tableFolder.test_table',
schema=table_schema)
您还可以查看显示手动创建TableSchema对象的示例:
https://github.com/apache/beam/blob/474345f5987e47a22d063c7bfcb3638c85a57e64/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
这是(来自链接的例子):
from apache_beam.io.gcp.internal.clients import bigquery
table_schema = bigquery.TableSchema()
full_name_schema = bigquery.TableFieldSchema()
full_name_schema.name = 'fullName'
full_name_schema.type = 'string'
full_name_schema.mode = 'required'
table_schema.fields.append(full_name_schema)
# A nested field
phone_number_schema = bigquery.TableFieldSchema()
phone_number_schema.name = 'phoneNumber'
phone_number_schema.type = 'record'
phone_number_schema.mode = 'nullable'
number = bigquery.TableFieldSchema()
number.name = 'number'
number.type = 'integer'
number.mode = 'nullable'
phone_number_schema.fields.append(number)
table_schema.fields.append(phone_number_schema)
area_code = bigquery.TableFieldSchema()
area_code.name = 'areaCode'
area_code.type = 'integer'
area_code.mode = 'nullable'
phone_number_schema.fields.append(area_code)
table_schema.fields.append(phone_number_schema)
然后在beam.io.WriteToBigQuery中使用table_schema变量.