中英文双语网站 滑动切换,k5wordpress主题,定制头像软件,西安学网站开发哪边好设计一个基于多个带标签SparkSQL模板作为配置文件和多组参数的PySPARK代码程序#xff0c;实现根据不同的输入参数自动批量地将数据导出为Parquet、CSV和Excel文件到S3上#xff0c;标签和多个参数#xff08;以“_”分割#xff09;为组成导出数据文件名#xff0c;文件已…设计一个基于多个带标签SparkSQL模板作为配置文件和多组参数的PySPARK代码程序实现根据不同的输入参数自动批量地将数据导出为Parquet、CSV和Excel文件到S3上标签和多个参数以“_”分割为组成导出数据文件名文件已经存在则覆盖原始文件。 代码如下
import json
from pyspark.sql import SparkSessiondef load_config(config_path):with open(config_path, r) as f:return json.load(f)def main(config_path, base_s3_path):# 初始化SparkSession配置S3和Excel支持spark SparkSession.builder \.appName(DataExportJob) \.config(spark.jars.packages, com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1) \.getOrCreate()# 配置S3访问根据实际环境配置spark.sparkContext._jsc.hadoopConfiguration().set(fs.s3a.access.key, YOUR_ACCESS_KEY)spark.sparkContext._jsc.hadoopConfiguration().set(fs.s3a.secret.key, YOUR_SECRET_KEY)spark.sparkContext._jsc.hadoopConfiguration().set(fs.s3a.endpoint, s3.amazonaws.com)config load_config(config_path)for template in config[templates]:label template[label]sql_template template[sql_template]parameters_list template[parameters]for params in parameters_list:# 验证参数数量是否匹配placeholders sql_template.count({)if len(params) ! placeholders:raise ValueError(f参数数量不匹配模板需要{placeholders}个参数但当前参数为{len(params)}个)# 替换SQL中的占位符formatted_sql sql_template.format(*params)df spark.sql(formatted_sql)# 生成文件名参数部分param_str _.join(params)base_filename f{label}_{param_str}# 定义输出路径output_paths {parquet: f{base_s3_path}/parquet/{base_filename},csv: f{base_s3_path}/csv/{base_filename},excel: f{base_s3_path}/excel/{base_filename}.xlsx}# 写入Parquetdf.write.mode(overwrite).parquet(output_paths[parquet])# 写入CSV自动生成headerdf.write.mode(overwrite) \.option(header, true) \.csv(output_paths[csv])# 写入Excel使用spark-excel包df.write.format(com.crealytics.spark.excel) \.option(header, true) \.option(inferSchema, true) \.mode(overwrite) \.save(output_paths[excel])spark.stop()if __name__ __main__:import argparseparser argparse.ArgumentParser()parser.add_argument(--config, typestr, requiredTrue, helpPath to config JSON file)parser.add_argument(--s3-path, typestr, requiredTrue, helpBase S3 path (e.g., s3a://your-bucket/data))args parser.parse_args()main(args.config, args.s3_path)配置文件示例config.json
{templates: [{label: sales_report,sql_template: SELECT * FROM sales WHERE date {0} AND region {1},parameters: [[202301, north],[202302, south]]},{label: user_activity,sql_template: SELECT user_id, COUNT(*) AS cnt FROM activity WHERE day {0} GROUP BY user_id,parameters: [[2023-01-01],[2023-01-02]]}]
}使用说明 依赖管理 确保Spark集群已安装Hadoop AWS和Spark Excel依赖spark-submit --packages com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1 your_script.pyS3配置 替换代码中的YOUR_ACCESS_KEY和YOUR_SECRET_KEY为实际AWS凭证根据S3兼容存储调整endpoint如使用MinIO需特殊配置 执行命令 spark-submit --packages com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1 \
data_export.py --config config.json --s3-path s3a://your-bucket/exports输出结构
s3a://your-bucket/exports
├── parquet
│ ├── sales_report_202301_north
│ ├── sales_report_202302_south
│ └── user_activity_2023-01-01
├── csv
│ ├── sales_report_202301_north
│ ├── sales_report_202302_south
│ └── user_activity_2023-01-01
└── excel├── sales_report_202301_north.xlsx├── sales_report_202302_south.xlsx└── user_activity_2023-01-01.xlsx