defmain(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("spark- demo") .config("spark.master", "local") .getOrCreate()
// For implicit conversions like converting RDDs to DataFrames import spark.implicits._ val df = spark.read.json("src/main/resources/people.json") df.show() } }
报错
1
Exception in thread "main" java.lang.IllegalArgumentException: Illegal pattern component: XXX
// This import is needed to use the $-notation import spark.implicits._ // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true)
// Encoders for most common types are automatically provided by importing spark.implicits._ val primitiveDS = Seq(1, 2, 3).toDS() primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name val path = "examples/src/main/resources/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
caseclassPerson(name: String, age: Long) objectSparkSqlTest{ val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.master", "local") .getOrCreate() val sc = spark.sparkContext defmain(args: Array[String]): Unit = {
// For implicit conversions like converting RDDs to DataFrames import spark.implicits._ // Create an RDD of Person objects from a text file, convert it to a Dataframe val peopleDF = sc.textFile("src/main/resources/people.txt") .map(_.split(",")) .map(attributes=>Person(attributes(0), attributes(1).trim.toInt)) .toDF() //注册成临时视图 peopleDF.createOrReplaceTempView("people")
// SQL statements can be run by using the sql methods provided by Spark val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") // 结果中的行的列可以通过字段索引访问 teenagersDF.map(teenager => "Name: " + teenager(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // 或者通过字段名 teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+
// 对 Dataset[Map[K,V]],无预定义的编码器, 需要显式声明 implicitval mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]] // Primitive types and case classes can be also defined as // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()
// SQL can be run over a temporary view created using DataFrames val results = spark.sql("SELECT name FROM people")
// The results of SQL queries are DataFrames and support all the normal RDD operations // The columns of a row in the result can be accessed by field index or by field name results.map(attributes => "Name: " + attributes(0)).show() // +-------------+ // | value| // +-------------+ // |Name: Michael| // | Name: Andy| // | Name: Justin| // +-------------+
} }
聚合
内置的DataFrames函数提供常见的聚合,例如count(), countDistinct(), avg(), max(), min(), etc. 虽然这些函数是为DataFrames设计的,但Spark SQL还为Scala和Java中的某些函数提供了类型安全版本,以便使用强类型数据集。此外,用户不限于预定义的聚合函数,并且可以创建自己的聚合函数。