網絡空間安全:行業資訊、技術分享、法規研討、趨勢分析……

“游俠安全網”創建了網絡安全從業者QQ大群(群號:1255197) ,歡迎各位同仁加入!有其它問題如合作等,請聯系站長“網路游俠”,QQ:55984512
?

:【SPARK】使用Spark(jdbc)從MySQL讀取和保存數據

2016-01-07 21:32 推薦: 瀏覽: 4,854 views 字號:

博彩彩票计划 www.nandk.icu 摘要: 本文最初思路來自 //www.sparkexpert.com ,但是,發現 https://github.com/sujee81/SparkApps 提供的源碼太老了,Spark官方從1.4.0已經放棄原來的方法(包含:createJDBCTable...

本文最初思路來自 //www.sparkexpert.com ,但是,發現 https://github.com/sujee81/SparkApps 提供的源碼太老了,Spark官方從1.4.0已經放棄原來的方法(包含:createJDBCTable,insertIntoJDBC等 ),取而代之的是 sqlContext.read().jdbc()和sqlContext.write().jdbc()方法。

一、源碼下載

git clone https://github.com/jiekechoo/spark-jdbc-apps.git

源代碼目錄如下,今天主要說明前面兩個:

spark-load-from-db:從數據庫讀取
spark-save-to-db:保存到數據庫
spark-stats:下一篇文章介紹
spark-jdbcrdd:下一篇文章介紹

二、源碼分析

依賴包分析

父項目pom,定義了共用組件slf4j,spark版本1.5.1,mysql5.1.32等

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="//maven.apache.org/POM/4.0.0" xmlns:xsi="//www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="//maven.apache.org/POM/4.0.0 //maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.sectong</groupId>
    <artifactId>spark-apps-parent</artifactId>
    <version>1.0-SNAPSHOT</version>

    <name>spark-apps-parent</name>
    <packaging>pom</packaging>

    <modules>
        <module>spark-jdbcrdd</module>
        <module>spark-load-from-db</module>
        <module>spark-save-to-db</module>
        <module>spark-stats</module>
    </modules>
    <dependencies>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.13</version>
        </dependency>
    </dependencies>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spark.version>1.5.1</spark.version>
        <mysql.version>5.1.32</mysql.version>
    </properties>

</project>

保存到數據庫spark-save-to-db

依賴包,主要是spark-core和spark-sql,還有mysql驅動:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="//maven.apache.org/POM/4.0.0" xmlns:xsi="//www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="//maven.apache.org/POM/4.0.0 //maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.sectong</groupId>
        <artifactId>spark-apps-parent</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <artifactId>spark-save-to-db</artifactId>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <compilerArgument>-Xlint:all</compilerArgument>
                    <showWarnings>true</showWarnings>
                    <showDeprecation>true</showDeprecation>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

看源碼:

package com.sectong;

import java.io.Serializable;
import java.util.Properties;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;

public class Main implements Serializable {

    /**
     * 
     */
    private static final long serialVersionUID = -8513279306224995844L;
    private static final String MYSQL_USERNAME = "demo";
    private static final String MYSQL_PWD = "demo";
    private static final String MYSQL_CONNECTION_URL = "jdbc:mysql://192.168.1.91:3306/demo";

    private static final JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkSaveToDb").setMaster("local[*]"));

    private static final SQLContext sqlContext = new SQLContext(sc);

    public static void main(String[] args) {
        // Sample data-frame loaded from a JSON file
        DataFrame usersDf = sqlContext.read().json("users.json");

        // Save data-frame to MySQL (or any other JDBC supported databases)
        Properties connectionProperties = new Properties();
        connectionProperties.put("user", MYSQL_USERNAME);
        connectionProperties.put("password", MYSQL_PWD);

        // write dataframe to jdbc mysql
        usersDf.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "users", connectionProperties);
    }
}

我們為了寫入數據方便測試,需要一個json文件,類似下方:

{"id":994,"name":"Betty","email":"bsmithrl@simplemachines.org","city":"Eláteia","country":"Greece","ip":"9.19.204.44"},
{"id":995,"name":"Anna","email":"alewisrm@canalblog.com","city":"Shangjing","country":"China","ip":"14.207.119.126"},
{"id":996,"name":"David","email":"dgarrettrn@japanpost.jp","city":"Tsarychanka","country":"Ukraine","ip":"111.252.63.159"},
{"id":997,"name":"Heather","email":"hgilbertro@skype.com","city":"Koilás","country":"Greece","ip":"29.57.181.250"},
{"id":998,"name":"Diane","email":"ddanielsrp@statcounter.com","city":"Mapiripán","country":"Colombia","ip":"19.205.181.99"},
{"id":999,"name":"Philip","email":"pfullerrq@reuters.com","city":"El Cairo","country":"Colombia","ip":"210.248.121.194"},
{"id":1000,"name":"Maria","email":"mfordrr@shop-pro.jp","city":"Karabash","country":"Russia","ip":"224.21.41.52"}

讀取文件時,users.json需要與jar包在同一目錄下,測試采用本地運行方式:

DataFrame usersDf = sqlContext.read().json("users.json");

其中,代碼中的這行mode(SaveMode.Append)要特別注意,這個使得每次寫入的數據是增加到數據表中。否則會一直提 示:Exception in thread “main” java.lang.RuntimeException: Table users already exists.

usersDf.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "users", connectionProperties);

打包,上傳spark運行:

/opt/spark/bin/spark-submit --class com.sectong.Main --driver-class-path mysql-connector-java-5.1.32.jar spark-save-to-db-1.0-SNAPSHOT.jar 

結果就是下面這樣了: 保存數據后結果

從數據庫讀取spark-load-from-db

依賴包與保存數據基本一致,不再說明。

看源碼:

package com.sectong;

import java.io.Serializable;
import java.util.List;
import java.util.Properties;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Main implements Serializable {

    /**
     * 
     */
    private static final long serialVersionUID = -8513279306224995844L;

    private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);

    private static final String MYSQL_USERNAME = "demo";
    private static final String MYSQL_PWD = "demo";
    private static final String MYSQL_CONNECTION_URL = "jdbc:mysql://192.168.1.91:3306/demo";

    private static final JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcFromDb").setMaster("local[*]"));

    private static final SQLContext sqlContext = new SQLContext(sc);

    public static void main(String[] args) {

        Properties properties = new Properties();
        properties.put("user", MYSQL_USERNAME);
        properties.put("password", MYSQL_PWD);
        // Load MySQL query result as DataFrame
        DataFrame jdbcDF = sqlContext.read().jdbc(MYSQL_CONNECTION_URL, "users", properties);

        List<Row> employeeFullNameRows = jdbcDF.collectAsList();

        for (Row employeeFullNameRow : employeeFullNameRows) {
            LOGGER.info(employeeFullNameRow.toString());
        }
    }
}

讀取MySQL數據,這行最關鍵:

DataFrame jdbcDF = sqlContext.read().jdbc(MYSQL_CONNECTION_URL, "users", properties);

再打印出來:

List<Row> employeeFullNameRows = jdbcDF.collectAsList();
for (Row employeeFullNameRow : employeeFullNameRows) {
            LOGGER.info(employeeFullNameRow.toString());
        }

Spark運行程序,注意–driver-class-path mysql-connector-java-5.1.32.jar參數,需要把mysql-connector掛載上:

/opt/spark/bin/spark-submit --class com.sectong.Main --driver-class-path mysql-connector-java-5.1.32.jar spark-load-from-db-1.0-SNAPSHOT.jar 

中間運行省略了,把結果輸出:

2016-01-06 08:14:01[main] INFO  Main:43 - [Matriz de Camaragibe,Brazil,sgarciadp@nifty.com,494,39.244.171.48,Steven]
2016-01-06 08:14:01[main] INFO  Main:43 - [Huarancante,Peru,njacksondq@si.edu,495,67.123.78.80,Nicholas]
2016-01-06 08:14:01[main] INFO  Main:43 - [Zandak,Russia,sjonesdr@nbcnews.com,496,167.69.237.11,Sarah]
2016-01-06 08:14:01[main] INFO  Main:43 - [Somovo,Russia,jgardnerds@nsw.gov.au,497,112.190.104.80,Judy]
2016-01-06 08:14:01[main] INFO  Main:43 - [Huaping,China,calexanderdt@blinklist.com,498,79.242.142.206,Christine]
2016-01-06 08:14:01[main] INFO  Main:43 - [Isulan,Philippines,wgomezdu@imdb.com,499,26.220.121.74,Wanda]
2016-01-06 08:14:01[main] INFO  Main:43 - [Wujiayao,China,wleedv@latimes.com,500,26.104.219.178,Walter]
2016-01-06 08:14:01[main] INFO  Main:43 - [Dongtou,China,hriveradw@skype.com,501,82.13.121.35,Henry]
2016-01-06 08:14:01[Thread-3] INFO  SparkContext:59 - Invoking stop() from shutdown hook
2016-01-06 08:14:01[Thread-3] INFO  ContextHandler:843 - stopped o.s.j.s.ServletContextHandler{/static/sql,null}
2016-01-06 08:14:01[Thread-3] INFO  ContextHandler:843 - stopped o.s.j.s.ServletContextHandler{/SQL/execution/json,null}

微信公眾服務號:sectong

原文地址: //blog.sectong.com/blog/spark_jdbc_load_save.html

聯系站長租廣告位!
?
中國首席信息安全官


關閉


博彩彩票计划
關閉
斗地主赢钱 一千本金八期倍投方案 三公怎么玩才能赢钱 时时彩计划群稳赚qq群 台球比分直播网 双面盘1.999入款1% 重庆时时彩龙虎口诀 pk10免费计划app 广东时时平台租用 北京pk拾云端计划软件 北京pk10预测号码软件 凤凰彩票下载地址 重庆时彩时彩结果 钱生钱最高境界方法 pk10走势图要怎么分析 mg游戏中心