博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SparkSQLJDBC数据源实例
阅读量:5998 次
发布时间:2019-06-20

本文共 4150 字,大约阅读时间需要 13 分钟。

hot3.png

package cn.hhb.spark.sql;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;import org.apache.spark.sql.DataFrame;import org.apache.spark.sql.Row;import org.apache.spark.sql.RowFactory;import org.apache.spark.sql.SQLContext;import org.apache.spark.sql.hive.HiveContext;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;import scala.Tuple2;import java.sql.Connection;import java.sql.DriverManager;import java.sql.Statement;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;/** * Created by dell on 2017/7/27. */public class JDBCDataSource {    public static void main(String[] args) {        // 创建SparkConf        SparkConf conf = new SparkConf()                .setAppName("HiveDataSource").setMaster("local")                .set("spark.testing.memory", "2147480000");        // 创建javasparkContext        JavaSparkContext sc = new JavaSparkContext(conf);        SQLContext sqlContext = new SQLContext(sc);        // 分别将mysql中两张表的数据加载为dataframe        Map
options = new HashMap
(); options.put("url","jdbc:mysql://spark1:3306/testdb"); options.put("dbtable","student_infos"); DataFrame studentInfosDF = sqlContext.read().format("jdbc").options(options).load(); options.put("dbtable","student_scores"); DataFrame studentScoresDF = sqlContext.read().format("jdbc").options(options).load(); // 将两个dataframe转换为javapairRDD,执行join操作 JavaPairRDD
> studentsRDD = studentInfosDF.javaRDD().mapToPair(new PairFunction
() { @Override public Tuple2
call(Row row) throws Exception { return new Tuple2
( row.getString(0), Integer.valueOf(String.valueOf(row.getLong(1))) ); } }).join(studentScoresDF.javaRDD().mapToPair(new PairFunction
() { @Override public Tuple2
call(Row row) throws Exception { return new Tuple2
( String.valueOf(row.get(0)), Integer.valueOf(String.valueOf(row.get(1))) ); } })); // 将javapairRDD转换为javaRDD
JavaRDD
studentRowsRDD = studentsRDD.map(new Function
>, Row>() { @Override public Row call(Tuple2
> tuple) throws Exception { return RowFactory.create(tuple._1, tuple._2._1, tuple._2._2); } }); // 过滤出分数大于80分的数据 JavaRDD
filteredStudentRowsRDD = studentRowsRDD.filter(new Function
() { @Override public Boolean call(Row row) throws Exception { if (row.getInt(2) > 80){ return null; } return false; } }); // 转换为dataframe List
structFields = new ArrayList
(); structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true)); structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true)); StructType structType = DataTypes.createStructType(structFields); // 使用动态构造的元数据,将rdd转换为dataframe DataFrame studentsDF = sqlContext.createDataFrame(filteredStudentRowsRDD, structType); Row[] rows = studentsDF.collect(); for (Row row : rows){ System.out.println(row); } // 将dataFrame中的数据保存到mysql表中 studentsDF.javaRDD().foreach(new VoidFunction
() { @Override public void call(Row row) throws Exception { String sql = "insert into good_student_infos values('"+row.getString(0)+"','"+Integer.valueOf(String.valueOf(row.getLong(1)))+"','"+Integer.valueOf(String.valueOf(row.getLong(1)))+"')"; Class.forName("com.mysql.jdbc.Driver"); Connection conn = null; Statement stmt = null; try { conn = DriverManager.getConnection( "jdbc:mysql://spark1:3306/testdb", "", "" ); stmt = conn.createStatement(); stmt.executeUpdate(sql); } catch (Exception e){ e.printStackTrace(); } finally { if (stmt != null){ stmt.close(); } if (conn != null){ conn.close(); } } } }); sc.close(); }}

转载于:https://my.oschina.net/hehongbo/blog/1490549

你可能感兴趣的文章
网站故障排查常用命令
查看>>
Python setdaemon守护进程
查看>>
ubuntu10.04下安装LAMP
查看>>
sendmail+tls+java
查看>>
wget 用法
查看>>
改善 ASP.NET MVC 代码库的 5 点建议
查看>>
Git配置以及命令总结
查看>>
cacti基础配置,附带软件包
查看>>
Centos 7 Saltstack自动化部署weblogic 12c
查看>>
ORACLE学习笔记--SQL查询语句
查看>>
自学sql之路,SQL 是用于访问和处理数据库的标准的计算机语言!
查看>>
Nginx基本配置
查看>>
[Windows Azure] How to use the Windows Azure Blob Storage Service in .NET
查看>>
LNAMP第二版(nginx 1.2.0+apache 2.4.2+php 5.4)
查看>>
MongoDB repl set权限认证配置步骤
查看>>
java学习笔记(1)
查看>>
jQuery 如何获取何设置 redio标签的值
查看>>
禁止Mysql默认端口访问Internet - MySQL - IT技术网
查看>>
java高并发设计(十五)-- netty通信之全部
查看>>
zend studio 9 字体,颜色,快捷键等相关设置
查看>>