ASP源码.NET源码PHP源码JSP源码JAVA源码DELPHI源码PB源码VC源码VB源码Android源码
当前位置:首页 >> 数据库 >> MySql >> Two ways to load mysql tables into hdfs via spark

Two ways to load mysql tables into hdfs via spark

来源:网络整理     时间:2015-08-25     关键词:

本篇文章主要介绍了"Two ways to load mysql tables into hdfs via spark",主要涉及到方面的内容,对于MySql感兴趣的同学可以参考一下: There are two ways to load mysql tables into hdfs via spark, then process these ...

There are two ways to load mysql tables into hdfs via spark, then process these datas.

  1. Load mysql tables: use JDBCRDD directely
    package org.apache.spark.examples.sql
    import org.apache.spark.sql.SQLContext
    import java.sql.{ Connection, DriverManager, ResultSet }
    import org.apache.spark.rdd.JdbcRDD
    import org.apache.spark.{ SparkConf, SparkContext }
    import java.util.HashMap
    import org.apache.spark.api.java.JavaSparkContext
    import org.apache.spark.sql.hive.HiveContext
    object LoadFromMysql {
      def escape(ori: String) = {
        if(ori!=null){
         ori.replace("&", "&").replace("\t", " ").replace("\n", " ")
        }else{
         ori
        }  
      }
     
      def main(args: Array[String]) {
        if (args.length != 6) {
          System.err.println("Usage LoadFromMysql ")
          System.exit(1)
        }
        val Array(url, username, password, table, id, output) = args
        val sparkConf = new SparkConf().setAppName("LoadFromMysql")
        val sc = new SparkContext(sparkConf)
        val lines_each_part = 2000000 //row lines each part file include
        Class.forName("com.mysql.jdbc.Driver").newInstance
        val connection = DriverManager.getConnection(url, username, password)
        // for partitions, get lower_bound and upper_bound
    ......
        val myRDD = new JdbcRDD(sc, () => DriverManager.getConnection(url,username, password),
          "select * from " + table + " where "+id+" >= ? and "+id+" <= ?", lower_bound, upper_bound, partitions, r => {
            var result = escape(r.getString(1))
            var i = 2
            while (i <= r.getMetaData.getColumnCount) {
              result = result + "\t" + escape(r.getString(i))
              i += 1
            }
            result
          })
        myRDD.saveAsTextFile(output)
      }
    }
    Process data:  create hive external table and process with hive(hive-shell or spark-sql) command. 
  2. Load mysql tables: SQLContext.load and save table with parquet format
    SQLContext way is also based on JDBCRDD, just spark provide more parquet support in SqlContext. 
    package org.apache.spark.examples.sql
    import org.apache.spark.sql.SQLContext
    import java.sql.{ Connection, DriverManager, ResultSet }
    import org.apache.spark.rdd.JdbcRDD
    import org.apache.spark.{ SparkConf, SparkContext }
    import java.util.HashMap
    import org.apache.spark.api.java.JavaSparkContext
    /**
    * @author ChenFangFang
    */
    object LoadFromMysql_SqlContext {
      def main(args: Array[String]) {
        if (args.length != 6) {
          System.err.println("Usage LoadFromMysql_SqlContext ")
          System.exit(1)
        }
        val Array(url, username, password, table, id, output) = args
        val sparkConf = new SparkConf().setAppName("SqlKeywordCount")
        val lines_each_part = 2000000 //row lines each part file include
        Class.forName("com.mysql.jdbc.Driver").newInstance
        val connection = DriverManager.getConnection(url, username, password)
        // for partitions, get lower_bound and upper_bound
    ...... 
        val sc = new JavaSparkContext(new SparkConf().setAppName("LoadFromMysql"));
        val sqlContext = new SQLContext(sc)
        val url_total = url + "?user=" + username + "&password=" + password;
        var options: HashMap[String, String] = new HashMap
        options.put("driver", "com.mysql.jdbc.Driver")
        options.put("url", url_total)
        options.put("dbtable", table) 
        options.put("lowerBound", lower_bound.toString())
        options.put("upperBound", upper_bound.toString())
        options.put("numPartitions", partitions.toString());
        options.put("partitionColumn", id);
        val jdbcDF = sqlContext.load("jdbc", options)
        jdbcDF.save(output)
      }
    }
    Process data:  use spark-shell directely in parquet way
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._
    val df = sqlContext.parquetFile(...).toDF()
    
    df.registerTempTable("parquetTable")
    
    sqlContext.sql("SELECT * FROM parquetTable where id=1").collect().foreach(println)

版权声明:本文为博主原创文章,未经博主允许不得转载。

以上就介绍了Two ways to load mysql tables into hdfs via spark,包括了方面的内容,希望对MySql有兴趣的朋友有所帮助。

本文网址链接:http://www.codes51.com/article/detail_165371.html

相关图片

相关文章