wordcount
sh
sc.textFile("../data/data.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
spark-sql
java
/*
读取json 文件 并通过sql 执行 join 查询
*/
public static void main(String[] args) {
SparkSession session = SparkSession.builder().master("local")
.appName("master").getOrCreate();
Dataset<Row> df = session.read().json("./data/user2.json");
// json.show();
df.createOrReplaceTempView("user");
Dataset<Row> df2 = session.read().json("./data/dept.json");
df2.createOrReplaceTempView("dept");
Dataset<Row> result = session.sql("select u.id,u.name,d.name dname from user u left join dept d" +
" on u.deptno= d.id" +
"");
// result.show();
// result.collectAsList().forEach(System.out::println);
result.write().csv("data/result2");
session.close();
}
java
/*
jdbc 导出csv
*/
public static void main(String[] args) {
SparkSession session = SparkSession.builder().master("local")
.appName("master").getOrCreate();
Properties connectionProperties = new Properties();
connectionProperties.put("user", "root");
connectionProperties.put("password", "root");
Long start = System.currentTimeMillis();
Dataset<Row> report = session.read().jdbc("jdbc:mysql://localhost:3306/test",
"idms_user", connectionProperties).filter("createDate is not null");
report.write().option("header", true).mode("overwrite").csv("data/idms_user");
long end = System.currentTimeMillis();
System.out.println("cost:" + (end - start)+"ms");
session.close();
}
java
/*
jdbc 通过sql 查询 函数处理
*/
public static void main(String[] args) {
SparkSession session = SparkSession.builder().master("local")
.appName("master").getOrCreate();
Properties connectionProperties = new Properties();
connectionProperties.put("user", "root");
connectionProperties.put("password", "root");
Long start = System.currentTimeMillis();
Dataset<Row> report = session.read().jdbc("jdbc:mysql://localhost:3306/test",
"report", connectionProperties);
report.createOrReplaceTempView("report");
Dataset<Row> result = session.
sql("select id,date_format(created_at,'yyyy-MM-dd') createDate ,title " +
" , date_format(now(),'yyyy-MM-dd') nowDate" +
" ,datediff(now(),created_at) days"+
" from report where age>12");
result.show();
session.close();
}