如何在AWS Glue创建作业并调用python脚本
关于AWS Glue
AWS Glue是一个完全托管的ETL(Extract, Transform, Load)服务,可用于准备和加载数据以进行分析、机器学习和应用程序开发。它可以自动发现和分类数据,为你提供ETL作业的自动编排和扩展能力,并在AWS各种服务之间提供无缝连接,例如Amazon S3,Amazon RDS和Amazon Redshift。
AWS Glue提供了一种可扩展的服务来编写和执行ETL作业,可以使用Python或Scala编写ETL脚本,也可以使用可视化工具来创建和管理作业。AWS Glue还提供了一个数据目录,可用于发现、搜索和查询各种数据资源和元数据,包括数据库表、S3对象和Kafka主题等。
AWS Glue还提供了许多与安全性、可扩展性、可靠性和成本效益有关的特性,例如自动扩展、负载均衡、安全数据访问、数据加密、监控和日志记录等。
如何在aws glue中创建一个job,并在其中调用python脚本去读取S3里的csv文件?
-
登录 AWS Glue 控制台:https://console.aws.amazon.com/glue/
-
在导航菜单中,选择“作业”,然后选择“添加作业”。
-
在“添加作业”页面,输入作业名称,并选择要使用的 IAM 角色。此角色应具有访问 S3 和 Glue 的权限。
-
在“数据源”选项卡下,选择“S3”作为数据源类型,然后输入 S3 桶和前缀的路径。
-
在“转换”选项卡下,选择“Python Shell”作为作业类型,并输入要执行的 Python 脚本的路径。例如:s3://your-bucket-name/path-to-your-script.py
-
在“目标”选项卡下,选择“创建表”。
-
点击“提交作业”按钮。
AWS Glue 会自动创建一个 Spark 环境,并在其中执行你的 Python 脚本。你的脚本可以使用 AWS Glue 的 PySpark API 读取和处理数据。
在aws gule中的job中如何向被调用的python脚本中传入参数,又如何获取python脚本的返回值呢?
- 向 Python 脚本传递参数
在 AWS Glue 的作业中向 Python 脚本传递参数,可以使用脚本参数功能。具体步骤如下:
-
在作业编辑器中,选择“转换”选项卡,在“脚本参数”部分中添加参数,例如:
--input_path
和--output_path
。 -
在 Python 脚本中,可以使用 argparse 模块解析这些参数。例如:
import argparse parser = argparse.ArgumentParser() parser.add_argument("--input_path", help="input S3 path") parser.add_argument("--output_path", help="output S3 path") args = parser.parse_args()
- 创建工作流
在 AWS Glue 中获取 Python 脚本的返回值,可以使用 JobRun
对象。具体步骤如下:
- 在 Python 脚本中,使用
print()
函数输出需要返回的结果。例如:
print("result: " + str(result))
- 在作业编辑器中,选择“目标”选项卡,勾选“收集作业输出”,并设置 S3 输出路径。
- 在 Python 脚本执行完成后,可以使用以下代码获取输出结果:
pythonCopy code
import sys
import json
for line in sys.stdin:
result = json.loads(line)
print("result: " + str(result))
其中,sys.stdin
表示从 S3 中读取输出结果。输出结果是一个 JSON 字符串,需要使用 json.loads()
函数解析成 Python 对象。
AWS Glue的一个job中,具体如何调用多个python shell?
- 在 AWS Glue 控制台上,创建一个新的 Job,或编辑一个已有的 Job。
- 在 Job 参数中,指定要执行的 Python Shell Job 脚本。你可以在“脚本路径”参数中指定一个或多个 Python 脚本的路径,以逗号分隔,如下所示:
s3://your-bucket/path-to-script1.py, s3://your-bucket/path-to-script2.py
- 你还可以指定其他 Job 参数,例如输入输出路径、连接、安全配置等。在 Python Shell Job 脚本中,可以使用这些参数来访问和处理数据。
- 运行 Job。AWS Glue 将按顺序执行指定的 Python Shell Job 脚本,并将输出传递给下一个脚本。如果任何一个脚本失败,整个 Job 将被中止。
需要注意的是,AWS Glue Job 中每个 Python Shell Job 脚本都是独立运行的,它们之间不共享变量和状态。如果需要在不同的 Python 脚本之间传递数据和状态,你可以使用 S3、DynamoDB、SQS 等 AWS 服务来实现。
举例说明同一个job中如何调用多个python shell:
假设我的S3里有4个python shell,分别执行简单的加减乘除4种运算。我要在同一个job中调用这4个python shell,它们按顺序分别被调用,它们输出的结果要储存到S3中。从第2个shell开始,每个shell都会接受上一个shell的运算结果作为参数,去进行它的运算。如何创建job,编写及调用python shell呢?
方法一:
-
在 S3 中创建 4 个 Python 脚本,分别实现加、减、乘、除运算,并将它们上传到 S3 桶中。
-
在 AWS Glue 控制台上创建一个新的 Job。在 Job 参数中指定以下参数:
a.
--input_path
:输入路径,指向包含输入数据的 S3 文件路径。b.
--output_path
:输出路径,指向存储输出数据的 S3 文件路径。c.
--operator
:运算符,指定要执行的运算类型。在第一个 Python 脚本中,将该参数设置为默认值add
。d.
--result
:结果,指定上一个 Python 脚本的输出结果。在第一个 Python 脚本中,将该参数设置为空字符串。 -
在 Job 定义中指定 4 个 Python 脚本的路径和名称,以按顺序执行它们。
-
在每个 Python 脚本中,读取输入数据,并根据指定的运算符和上一个 Python 脚本的输出结果,计算出当前 Python 脚本的输出结果,并将其保存到 S3 中指定的输出路径中。
示例代码 :
add.py :
import sys import boto3 import json input_path = sys.argv[1] output_path = sys.argv[2] operator = sys.argv[3] result = sys.argv[4] s3 = boto3.client('s3') if result: result = int(result) else: result = 0 if operator == "add": data = s3.get_object(Bucket='my-bucket', Key=input_path)['Body'].read().decode('utf-8') data = json.loads(data) result += sum(data) s3.put_object(Bucket='my-bucket', Key=output_path, Body=str(result).encode('utf-8'))
sub.py :
import sys import boto3 import json input_path = sys.argv[1] output_path = sys.argv[2] operator = sys.argv[3] result = sys.argv[4] s3 = boto3.client('s3') if result: result = int(result) else: result = 0 if operator == "sub": data = s3.get_object(Bucket='my-bucket', Key=input_path)['Body'].read().decode('utf-8') data = json.loads(data) result -= sum(data) s3.put_object(Bucket='my-bucket', Key=output_path, Body=str(result).encode('utf-8'))
mul.py :
import sys import boto3 import json input_path = sys.argv[1] output_path = sys.argv[2] operator = sys.argv[3] result = sys.argv[4] s3 = boto3.client('s3') if result: result = int(result) else: result = 1 if operator == "mul": data = s3.get_object(Bucket='my-bucket', Key=input_path)['Body'].read().decode('utf-8') data = json.loads(data) for d in data: result *= d s3.put_object(Bucket='my-bucket', Key=output_path, Body=str(result).encode('utf-8'))
del.py :
### 根据以上类推
在AWS Glue的job中,我们需要按照以下步骤调用这四个Python脚本:
- 在job中添加四个Python Shell作业节点。
- 将第一个Python脚本设置为作业节点的入口点,并将S3输入文件路径作为其输入参数。同时将S3输出文件路径传递给第二个Python脚本。
- 将第二个Python脚本设置为第一个Python脚本的后继节点,并将第一个Python脚本的输出文件路径作为其输入参数。同时将S3输出文件路径传递给第三个Python脚本。
- 将第三个Python脚本设置为第二个Python脚本的后继节点,并将第二个Python脚本的输出文件路径作为其输入参数。同时将S3输出文件路径传递给第四个Python脚本。
- 将第四个Python脚本设置为第三个Python脚本的后继节点,并将第三个Python脚本的输出文件路径作为其输入参数。
以上步骤可以在AWS Glue Console中完成。具体而言,需要在job编辑器中按照上述步骤添加作业节点,设置输入输出路径和参数,以及将作业节点连接起来。最后保存并运行job即可。
方法二:
可以使用AWS Glue的Python Shell作业类型。下面是详细步骤:
-
将4个python shell脚本上传到S3中,并记住它们的S3路径。
-
创建一个Python Shell作业,选择要使用的Python版本,并将作业类型设置为“Spark”。
-
在作业参数中添加一个名为“s3_output_path”的参数,用于指定输出结果要存储到的S3路径。
-
在“脚本路径”下添加以下代码,以调用四个Python脚本:
import subprocess # 调用第一个python脚本,不传递参数 subprocess.call(['python', '/path/to/first/script.py']) # 调用第二个python脚本,传递第一个脚本的输出结果作为参数 output_of_script1 = # 从S3获取第一个脚本的输出结果 subprocess.call(['python', '/path/to/second/script.py', str(output_of_script1)]) # 调用第三个python脚本,传递第二个脚本的输出结果作为参数 output_of_script2 = # 从S3获取第二个脚本的输出结果 subprocess.call(['python', '/path/to/third/script.py', str(output_of_script2)]) # 调用第四个python脚本,传递第三个脚本的输出结果作为参数 output_of_script3 = # 从S3获取第三个脚本的输出结果 subprocess.call(['python', '/path/to/fourth/script.py', str(output_of_script3)])
-
在“Python依赖项”下添加任何需要的依赖项。
-
单击“完成”以保存作业。
-
运行作业并等待作业完成。在作业完成后,可以在S3中找到输出结果。
- 注意:在第2个Python脚本及其后续脚本中,需要编写代码来接收前一个脚本的输出结果,并在进行下一步运算时将其用作参数。
以上第4步中具体如何获第一个脚本的输出结果并将其存储到变量output_of_script1
中,示例:
import boto3
import os
# 创建S3客户端
s3 = boto3.client('s3')
# 指定存储第一个脚本输出结果的S3对象的路径
s3_output_path = 's3://my-bucket/path/to/output_of_script1'
# 在本地创建一个文件,用于存储从S3下载的输出结果
local_output_path = os.path.join(os.environ['GLUE_TMP_DIR'], 'output_of_script1')
s3.download_file(Bucket='my-bucket', Key=s3_output_path, Filename=local_output_path)
# 读取文件内容,并存储到变量output_of_script1中
with open(local_output_path, 'r') as f:
output_of_script1 = f.read().strip()
在上面的代码中,假设输出文件已经存储在名为my-bucket
的S3存储桶中,并在路径/path/to/output_of_script1
下。os
模块用于操作本地文件路径。GLUE_TMP_DIR
是AWS Glue在执行过程中提供的环境变量,指向本地临时文件夹的路径。strip()
函数用于删除字符串开头和结尾的空白字符。