SparkSQL_"spark.sql(\"update usr_info set 'age_range'=-1 wh-程序员宅基地

技术标签: PySpark  Spark  Sql  

1. 预览

  • Spark SQL是Spark处理结构化数据的模块
  • Spark SQL的功能之一是执行SQL查询, 也支持Hive, 查询结果以Dataset/DataFrame的形式返回
  • 一个DataFrame是一个Dataset组成的指定列

2. 开始入门

2.1 起始点: SparkSession

Spark SQL中所有功能的入口是SparkSession类.

使用SparkSession.builder创建一个SparkSession

>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName('appName').config('spark.some.config.option', 'some-value').getOrCreate()
>>> spark
SparkSession - in-memory

SparkContext

Spark UI

Version
v2.2.1
Master
local[*]
AppName
appName

2.2 创建数据框

可以从以下对象中创建DataFrame:
- 从一个已经存在的RDD
- 从Spark数据源
- 从Hive表

示例: 基于一个JSON文件创建一个DataFrame

>>> df = spark.read.json(r"C:\Install\spark-2.2.1-bin-hadoop2.7\examples\src\main\resources\people.json")
>>> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

2.3 DataFrame相关操作

举例简单介绍一下DataFrame的操作, 完整列表请参考DataFrame函数指南

>>> df.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

>>> df.select('name').show()
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

>>> df.select(df['name'], df['age']+1).show()
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+

>>> df.filter(df['age']>12).show()
+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+

>>> df.groupBy('age').count().show()
+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+

2.4 运行SQL查询

SparkSession的sql函数可以运行SQL查询, 返回DataFrame

>>> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
>>> df.createOrReplaceTempView('people')
>>> sqlDF = spark.sql('select * from people')
>>> sqlDF.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

2.5 全局临时视图

  • Spark SQL中的临时视图是session级别的, 会随着session的消失而消失.
  • 如果想让一个临时视图在所有session中相互传递并且可用, 直到Spark应用退出, 可以创建一个全局的临时视图.
  • 全局的临时视图存在于系统数据库global_temp中, 我们必须加上库名去引用
    select * from global_temp.table1
>>> df.createGlobalTempView('people')
>>> spark.sql('select * from global_temp.people').show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
>>> spark.newSession().sql('select * from global_temp.people').show()  # 新建一个session结果还是一样, 说明是全局的
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

2.6 创建Datasets(Java and Scala) pass

2.7 将RDD转化为DataFrame的两种方式

  • textfile => [Row1, Row2, …] => DataFrame
>>> from pyspark.sql import Row
>>> sc = spark.sparkContext

>>> lines = sc.textFile(r'C:\Install\spark-2.2.1-bin-hadoop2.7\examples\src\main\resources\people.txt')
>>> parts = lines.map(lambda x: x.split(","))
>>> people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
>>> people.collect()
[Row(age=29, name='Michael'),
 Row(age=30, name='Andy'),
 Row(age=19, name='Justin')]
>>> schemaPeople = spark.createDataFrame(people)
>>> schemaPeople.show()
+---+-------+
|age|   name|
+---+-------+
| 29|Michael|
| 30|   Andy|
| 19| Justin|
+---+-------+
>>> schemaPeople.createOrReplaceTempView('people')
>>> teenagers = spark.sql('select * from people where age between 13 and 19')
>>> teenagers.show()
+---+------+
|age|  name|
+---+------+
| 19|Justin|
+---+------+
  • textfile => [data, schema] => DataFrame
    • RDD从原始的RDD穿件一个RDD的toples或者一个列表;
    • Step 1 被创建后, 创建 Schema 表示一个 StructType 匹配 RDD 中的结构.
    • 通过 SparkSession 提供的 createDataFrame 方法应用 Schema 到 RDD .
>>> from pyspark.sql.types import StringType, IntegerType, StructType, StructField
>>> sc = spark.sparkContext
>>> lines = sc.textFile(r'C:\Install\spark-2.2.1-bin-hadoop2.7\examples\src\main\resources\people.txt')
>>> parts = lines.map(lambda x: x.split(','))
>>> people = parts.map(lambda p: (p[0], p[1].strip()))
>>> fields = [StructField(name, StringType(), True) for name in ['name', 'age']]
>>> schema = StructType(fields)
>>> schemaPeople = spark.createDataFrame(people, schema=schema)
>>> result = spark.sql('select name from people')
>>> result.show()
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

2.8 Aggregations(Java and Scala) pass

  • Untyped User-Defined Aggregate Functions
  • Type-Safe User-Defined Aggregate Functions

3. 数据源

  • spark.read.load(path): 读取parquet文件为DataFrame
  • spark.write.save(path): 保存DataFrame为parquet文件
  • spark.read.load(path, format="json"): 指定源数据类型为给定格式
    • json
    • parquet
    • jdbc
    • orc
    • libsvm
    • csv
    • text
  • spark.write.save(path, format="json")
  • spark.sql(“select * from parquet.examples/src/main/resources/users.parquet“): 直接在文件上运行SQL
>>> df = spark.read.load(r'C:\Install\spark-2.2.1-bin-hadoop2.7\examples\src\main\resources\users.parquet')
>>> df.select('name', 'favorite_color').write.save('nameAndFavColors.parquet')
  • 保存模式(Scala and Java) pass

3.1.4 保存到持久表

>>> df = spark.read.load(r'C:\Install\spark-2.2.1-bin-hadoop2.7\examples\src\main\resources\users.parquet')
>>> df.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+
>>> # df.write.option('path1', 'path2').saveAsTable()

DataFrames 也可以使用 saveAsTable 命令作为 persistent tables (持久表)保存到 Hive metastore 中. 请注意, existing Hive deployment (现有的 Hive 部署)不需要使用此功能. Spark 将为您创建默认的 local Hive metastore (本地 Hive metastore)(使用 Derby ). 与 createOrReplaceTempView 命令不同, saveAsTable 将 materialize (实现) DataFrame 的内容, 并创建一个指向 Hive metastore 中数据的指针. 即使您的 Spark 程序重新启动, Persistent tables (持久性表)仍然存在, 因为您保持与同一个 metastore 的连接. 可以通过使用表的名称在 SparkSession 上调用 table 方法来创建 persistent tabl (持久表)的 DataFrame .

对于 file-based (基于文件)的 data source (数据源), 例如 text, parquet, json等, 您可以通过 path 选项指定 custom table path (自定义表路径), 例如 df.write.option(“path”, “/some/path”).saveAsTable(“t”) . 当表被 dropped (删除)时, custom table path (自定义表路径)将不会被删除, 并且表数据仍然存在. 如果未指定自定义表路径, Spark 将把数据写入 warehouse directory (仓库目录)下的默认表路径. 当表被删除时, 默认的表路径也将被删除.

从 Spark 2.1 开始, persistent datasource tables (持久性数据源表)将 per-partition metadata (每个分区元数据)存储在 Hive metastore 中. 这带来了几个好处:

由于 metastore 只能返回查询的必要 partitions (分区), 因此不再需要将第一个查询上的所有 partitions discovering 到表中.
Hive DDLs 如 ALTER TABLE PARTITION … SET LOCATION 现在可用于使用 Datasource API 创建的表.
请注意, 创建 external datasource tables (外部数据源表)(带有 path 选项)的表时, 默认情况下不会收集 partition information (分区信息). 要 sync (同步) metastore 中的分区信息, 可以调用 MSCK REPAIR TABLE .

3.1.5 分桶, 排序和分区

对于基于文件的数据源, 也可以对输出进行bucket, sort和partition操作, 前二者仅适用于持久表

>>> df.write.bucketBy(42, 'name').sortBy('age').saveAsTable('pepple_buckted')

在使用Dataset API时, partitioning可以同时和save和saveAsTable一起使用

df.write.partitionBy('favorite_color').format('parquet').save('namesPartyByColor.parquet')

可以为单个表使用partitioning和bucketing

df = spark.read.parquet(r'C:\Install\spark-2.2.1-bin-hadoop2.7\examples\src\main\resources\users.parquet')
df.write.partitionBy('favorite_color').bucketBy(42, 'name').saveAsTable('people_partitioned_bucketed')

partitionBy 创建一个 directory structure (目录结构), 如 Partition Discovery 部分所述. 因此, 对 cardinality (基数)较高的 columns 的适用性有限. 相反, bucketBy 可以在固定数量的 buckets 中分配数据, 并且可以在 a number of unique values is unbounded (多个唯一值无界时)使用数据.

3.2 Parquet Files

3.2.1 以编程的方式加载数据

>>> peopleDF = spark.read.json(r'C:\Install\spark-2.2.1-bin-hadoop2.7\examples\src\main\resources\people.json')
>>> peopleDF.write.parquet('people.parquet')
>>> parquetFile = spark.read.parquet('people.parquet')
>>> parquetFile.createOrReplaceTempView('parquetFile')
>>> spark.sql('select name from parquetFile where age between 13 and 19').show()
+------+
|  name|
+------+
|Justin|
+------+

3.2.2 分区发现

3.2.3 模式合并

3.2.4 Hive metastore Oarquet table conversion

3.2.5 配置

可以使用SparkSession上的setConf方法或使用SQL运行set key=value命令来完成parquet的设置

参数名称 默认值 含义
spark.sql.parquet.binaryAsString false 一些其他 Parquet-producing systems (Parquet 生产系统), 特别是 Impala, Hive 和旧版本的 Spark SQL , 在 writing out (写出) Parquet schema 时, 不区分 binary data (二进制数据)和 strings (字符串). 该 flag 告诉 Spark SQL 将 binary data (二进制数据)解释为 string (字符串)以提供与这些系统的兼容性.
spark.sql.parquet.int96AsTimestamp true 一些 Parquet-producing systems , 特别是 Impala 和 Hive , 将 Timestamp 存入INT96 . 该 flag 告诉 Spark SQL 将 INT96 数据解析为 timestamp 以提供与这些系统的兼容性.
spark.sql.parquet.cacheMetadata true 打开 Parquet schema metadata 的缓存. 可以加快查询静态数据.
spark.sql.parquet.compression.codec snappy 在编写 Parquet 文件时设置 compression codec (压缩编解码器)的使用. 可接受的值包括: uncompressed, snappy, gzip, lzo .
spark.sql.parquet.filterPushdown true 设置为 true 时启用 Parquet filter push-down optimization .
spark.sql.hive.convertMetastoreParquet true 当设置为 false 时, Spark SQL 将使用 Hive SerDe 作为 parquet tables , 而不是内置的支持.
spark.sql.parquet.mergeSchema false 当为 true 时, Parquet data source (Parquet 数据源) merges (合并)从所有 data files (数据文件)收集的 schemas , 否则如果没有可用的 summary file , 则从 summary file 或 random data file 中挑选 schema .
spark.sql.optimizer.metadataOnly true 如果为 true , 则启用使用表的 metadata 的 metadata-only query optimization 来生成 partition columns (分区列)而不是 table scans (表扫描). 当 scanned (扫描)的所有 columns (列)都是 partition columns (分区列)并且 query (查询)具有满足 distinct semantics (不同语义)的 aggregate operator (聚合运算符)时, 它将适用.

3.3 Json数据集

通过spark.read.json方法将数据集加载为DataFrame, 支持两种格式:
- 常规的多行json文件, 如\examples\src\main\resources\people.json
- 独立有效的json对象rdd, 如['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']

>>> sc = spark.sparkContext
>>> path = r'C:\Install\spark-2.2.1-bin-hadoop2.7\examples\src\main\resources\people.json'
>>> peopleDF = spark.read.json(path)
>>> peopleDF.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
>>> peopleDF.createOrReplaceTempView('people')
>>> spark.sql('select name from people where age between 13 and 19').show()
+------+
|  name|
+------+
|Justin|
+------+

>>> jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']
>>> rdd = sc.parallelize(jsonStrings)
>>> people2 = spark.read.json(rdd)
>>> people2.show()
+---------------+----+
|        address|name|
+---------------+----+
|[Columbus,Ohio]| Yin|
+---------------+----+

3.4 Hive表

  • Spark SQL还支持读取和写入Hive中的数据.
  • 但是, Hive中有大量的依赖关系, 因此这些依赖关系不包含在默认Spark分发中.
  • 如果在类路径中找到Hive依赖项, Spark将自动加载它们.
  • 请注意, 这些Hive依赖关系也必须存在于所有工作节点上, 因为它们将需要访问Hive序列化和反序列化库SerDes, 以访问存储在Hive中的数据
  • 通过修改hive-site.xml, core-site.xml, hdfs-site.xml文件来完成配置
  • 当使用Hive时, 必须用Hive支持实例化SparkSession, 包括:
    • 连接到持续的Hive转移
    • 支持Hive serdes
    • 支持Hive用户自定义功能
  • 没有现有Hive部署的用户仍可以启用Hive支持
  • 当hive-site.xml未配置时, 上下文会自动在当前目录创建metastore_db, 并创建有spark.sql.warehouse.dir配置的目录, 该目录默认为Sprk应用程序当前目录中的spark-warehouse目录
  • 请注意,自从2.0.0以来,hive-site.xml 中的 hive.metastore.warehouse.dir 属性已被弃用。 而是使用 spark.sql.warehouse.dir 来指定仓库中数据库的默认位置
  • 您可能需要向启动 Spark 应用程序的用户授予写权限
from os.path import expanduser, join, abspath
from pyspark.sql import SparkSession, Row
warehouse_location=abspath('spark-warehouse')
spark = SparkSession.builder.config('spark.sql.warehouse.dir', warehouse_location).enableHiveSupport().getOrCreate()
spark.sql('CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive')
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

3.4.1 指定Hive表的存储格式

创建Hive表时
- 需要定义输入格式和输出格式
- 还需要定义该表如何将数据反序列化为行, 或将行序列化为数据, 即serde
- 以下选项可用于指定存储格式 (“serde”, “input format”, “output format”),例如,CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')
- 默认情况下,我们将以纯文本形式读取表格文件
- 请注意,Hive 存储处理程序在创建表时不受支持,您可以使用 Hive 端的存储处理程序创建一个表,并使用 Spark SQL 来读取它

Property Name Meaning
fileFormat fileFormat是一种存储格式规范的包,包括 “serde”,”input format” 和 “output format”。 目前我们支持6个文件格式:’sequencefile’,’rcfile’,’orc’,’parquet’,’textfile’和’avro’。
inputFormat, outputFormat 这两个选项将相应的 “InputFormat” 和 “OutputFormat” 类的名称指定为字符串文字,例如: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat。 这两个选项必须成对出现,如果您已经指定了 “fileFormat” 选项,则无法指定它们。
serde 此选项指定 serde 类的名称。 当指定 fileFormat 选项时,如果给定的 fileFormat 已经包含 serde 的信息,那么不要指定这个选项。 目前的 “sequencefile”, “textfile” 和 “rcfile” 不包含 serde 信息,你可以使用这3个文件格式的这个选项。
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim 这些选项只能与 “textfile” 文件格式一起使用。它们定义如何将分隔的文件读入行。

3.4.2 与不同版本的Hive Metastore进行交互

3.5 JDBC连接其他数据库

  • Spark SQL可以使用JDBC读取其他数据库的数据, 这个功能优于使用JdbcRDD(因为直接返回DataFrame结果)
  • 要开始使用, 你需要在Spark类路径中包含特定数据库的JDBC driver程序. 例如, 要从Spark Shell连接到postgres, 运行命令:spark-shell –driver-class-path postgresql-9.4.1207.jar –jars postgresql-9.4.1207.jar
  • Spark支持以下option
    • url: 要连接的JDBC URL, 如jdbc:postgresql://localhost/test?user=fred&password=secret
    • dbtable: 需读取的JDBC表
    • driver: 用于连接到此URL的JDBC driver程序的类名
    • partitionColumn, lowerBound, upperBound:如果指定了这些选项,则必须指定这些选项。 另外,必须指定 numPartitions. 他们描述如何从多个 worker 并行读取数据时将表给分区。partitionColumn 必须是有问题的表中的数字列。 请注意,lowerBound 和 upperBound 仅用于决定分区的大小,而不是用于过滤表中的行。 因此,表中的所有行将被分区并返回。此选项仅适用于读操作。
    • numPartitions: 在表读写中可以用于并行度的最大分区数。这也确定并发JDBC连接的最大数量。 如果要写入的分区数超过此限制,则在写入之前通过调用 coalesce(numPartitions) 将其减少到此限制。
    • fetchsize:JDBC 抓取的大小,用于确定每次数据往返传递的行数。 这有利于提升 JDBC driver 的性能,它们的默认值较小(例如: Oracle 是 10 行)。 该选项仅适用于读取操作。
    • batchsize: JDBC 批处理的大小,用于确定每次数据往返传递的行数。 这有利于提升 JDBC driver 的性能。 该选项仅适用于写操作。默认值为 1000.
    • isolationLevel: 事务隔离级别,适用于当前连接。 它可以是 NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, 或 SERIALIZABLE 之一,对应于 JDBC 连接对象定义的标准事务隔离级别,默认为 READ_UNCOMMITTED。 此选项仅适用于写操作。请参考 java.sql.Connection 中的文档。
    • truncate: 这是一个与 JDBC 相关的选项。 启用 SaveMode.Overwrite 时,此选项会导致 Spark 截断现有表,而不是删除并重新创建。 这可以更有效,并且防止表元数据(例如,索引)被移除。 但是,在某些情况下,例如当新数据具有不同的模式时,它将无法工作。 它默认为 false。 此选项仅适用于写操作。
    • createTableOptions: 这是一个与JDBC相关的选项。 如果指定,此选项允许在创建表时设置特定于数据库的表和分区选项(例如:CREATE TABLE t (name string) ENGINE=InnoDB. )。此选项仅适用于写操作。
    • createTableColumnTypes: 使用数据库列数据类型而不是默认值,创建表时。 数据类型信息应以与 CREATE TABLE 列语法相同的格式指定(例如:”name CHAR(64), comments VARCHAR(1024)”)。 指定的类型应该是有效的 spark sql 数据类型。此选项仅适用于写操作。
jdbcDF = spark.read.format('jdbc').option('url', 'jdbc:postgresql:dbserver').option('dbtable', 'schema.tablename').option('user', 'username').optioni('password', 'password').load()

jdbcDF2 = spark.read.jdbc('jdbc:postgresql:dbserver', 'schema.tablename', properise={
   'user': 'username', 'password': 'password'})

jdbcDF.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .save()

jdbcDF2.write \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={
   "user": "username", "password": "password"})

# Specifying create table column data types on write
jdbcDF.write \
    .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={
   "user": "username", "password": "password"})

3.6 故障排除

JDBC driver 程序类必须对客户端会话和所有执行程序上的原始类加载器可见。 这是因为 Java 的 DriverManager 类执行安全检查,导致它忽略原始类加载器不可见的所有 driver 程序,当打开连接时。一个方便的方法是修改所有工作节点上的compute_classpath.sh 以包含您的 driver 程序 JAR。
一些数据库,例如 H2,将所有名称转换为大写。 您需要使用大写字母来引用 Spark SQL 中的这些名称。

4. 性能调优

对于某些工作负载,可以通过缓存内存中的数据或打开一些实验选项来提高性能。

4.1 在内存中缓存数据

Spark SQL 可以通过调用 spark.catalog.cacheTable(“tableName”) 或 dataFrame.cache() 来使用内存中的列格式来缓存表。 然后,Spark SQL 将只扫描所需的列,并将自动调整压缩以最小化内存使用量和 GC 压力。 您可以调用 spark.catalog.uncacheTable(“tableName”) 从内存中删除该表

内存缓存的配置可以使用 SparkSession 上的 setConf 方法或使用 SQL 运行 SET key=value 命令来完成。

属性名称 默认 含义
spark.sql.inMemoryColumnarStorage.compressed true 当设置为 true 时,Spark SQL 将根据数据的统计信息为每个列自动选择一个压缩编解码器。
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制批量的柱状缓存的大小。更大的批量大小可以提高内存利用率和压缩率,但是在缓存数据时会冒出 OOM 风险。

4.2 其他配置选项

以下选项也可用于调整查询执行的性能。这些选项可能会在将来的版本中被废弃,因为更多的优化是自动执行的

属性名称 默认值 含义
spark.sql.files.maxPartitionBytes 134217728 (128 MB) 在读取文件时,将单个分区打包的最大字节数。
spark.sql.files.openCostInBytes 4194304 (4 MB) 按照字节数来衡量的打开文件的估计费用可以在同一时间进行扫描。 将多个文件放入分区时使用。最好过度估计,那么具有小文件的分区将比具有较大文件的分区(首先计划的)更快。
spark.sql.broadcastTimeout 300 广播连接中的广播等待时间超时(秒)
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置执行连接时将广播给所有工作节点的表的最大大小(以字节为单位)。 通过将此值设置为-1可以禁用广播。 请注意,目前的统计信息仅支持 Hive Metastore 表,其中已运行命令 ANALYZE TABLE COMPUTE STATISTICS noscan。
spark.sql.shuffle.partitions 200 Configures the number of partitions to use when shuffling data for joins or aggregations.

5. 分布式SQL引擎

Spark SQL 也可以充当使用其 JDBC/ODBC 或命令行界面的分布式查询引擎。 在这种模式下,最终用户或应用程序可以直接与 Spark SQL 交互运行 SQL 查询,而不需要编写任何代码。

5.1 运行Thrift JDBC/ODBC

这里实现的 Thrift JDBC/ODBC 服务器对应于 Hive 1.2 中的 HiveServer2。 您可以使用 Spark 或 Hive 1.2.1 附带的直线脚本测试 JDBC 服务器。

要启动 JDBC/ODBC 服务器,请在 Spark 目录中运行以下命令:

./sbin/start-thriftserver.sh

此脚本接受所有 bin/spark-submit 命令行选项,以及 –hiveconf 选项来指定 Hive 属性。 您可以运行 ./sbin/start-thriftserver.sh –help 查看所有可用选项的完整列表。 默认情况下,服务器监听 localhost:10000. 您可以通过环境变量覆盖此行为,即:

export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
  --master <master-uri> \
  ...

or system properties:

./sbin/start-thriftserver.sh \
  --hiveconf hive.server2.thrift.port=<listening-port> \
  --hiveconf hive.server2.thrift.bind.host=<listening-host> \
  --master <master-uri>
  ...

现在,您可以使用 beeline 来测试 Thrift JDBC/ODBC 服务器:./bin/beeline

使用 beeline 方式连接到 JDBC/ODBC 服务器:
beeline> !connect jdbc:hive2://localhost:10000

Beeline 将要求您输入用户名和密码。 在非安全模式下,只需输入机器上的用户名和空白密码即可。 对于安全模式,请按照 beeline 文档 中的说明进行操作。

配置Hive是通过将 hive-site.xml, core-site.xml 和 hdfs-site.xml 文件放在 conf/ 中完成的。

您也可以使用 Hive 附带的 beeline 脚本。

Thrift JDBC 服务器还支持通过 HTTP 传输发送 thrift RPC 消息。 使用以下设置启用 HTTP 模式作为系统属性或在 conf/ 中的 hive-site.xml 文件中启用:

hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number to listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice

要测试,请使用 beeline 以 http 模式连接到 JDBC/ODBC 服务器:

beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

5.2 运行Spark SQL CLI

Spark SQL CLI 是在本地模式下运行 Hive 转移服务并执行从命令行输入的查询的方便工具。 请注意,Spark SQL CLI 不能与 Thrift JDBC 服务器通信。

要启动 Spark SQL CLI,请在 Spark 目录中运行以下命令:

./bin/spark-sql
配置 Hive 是通过将 hive-site.xml, core-site.xml 和 hdfs-site.xml 文件放在 conf/ 中完成的。 您可以运行 ./bin/spark-sql –help 获取所有可用选项的完整列表。

6. 支持的Hive特性

Spark SQL 支持绝大部分的 Hive 功能,如:

  • Hive query(查询)语句, 包括:
    • SELECT
    • GROUP BY
    • ORDER BY
    • CLUSTER BY
    • SORT BY
  • 所有 Hive 操作, 包括:
    • 关系运算符 (=, ⇔, ==, <>, <, >, >=, <=, 等等)
    • 算术运算符 (+, -, *, /, %, 等等)
    • 逻辑运算符 (AND, &&, OR, ||, 等等)
    • 复杂类型的构造
    • 数学函数 (sign, ln, cos, 等等)
    • String 函数 (instr, length, printf, 等等)
  • 用户定义函数 (UDF)
  • 用户定义聚合函数 (UDAF)
  • 用户定义 serialization formats (SerDes)
  • 窗口函数
  • Joins
    • JOIN
    • {LEFT|RIGHT|FULL} OUTER JOIN
    • LEFT SEMI JOIN
    • CROSS JOIN
  • Unions
  • Sub-queries(子查询)
    • SELECT col FROM ( SELECT a + b AS col from t1) t2
  • Sampling
  • Explain
  • Partitioned tables including dynamic partition insertion
  • View
  • 所有的 Hive DDL 函数, 包括:
    • CREATE TABLE
    • CREATE TABLE AS SELECT
    • ALTER TABLE
      大部分的 Hive Data types(数据类型), 包括:
    • TINYINT
    • SMALLINT
    • INT
    • BIGINT
    • BOOLEAN
    • FLOAT
    • DOUBLE
    • STRING
    • BINARY
    • TIMESTAMP
    • DATE
    • ARRAY<>
    • MAP<>
    • STRUCT<>

6. 参考

6.1 数据类型

Spark SQL 和 DataFrames 支持下面的数据类型:

  • Numeric types
    • ByteType: Represents 1-byte signed integer numbers. The range of numbers is from -128 to 127.
    • ShortType: Represents 2-byte signed integer numbers. The range of numbers is from -32768 to 32767.
    • IntegerType: Represents 4-byte signed integer numbers. The range of numbers is from -2147483648 to 2147483647.
    • LongType: Represents 8-byte signed integer numbers. The range of numbers is from -9223372036854775808 to 9223372036854775807.
    • FloatType: Represents 4-byte single-precision floating point numbers.
    • DoubleType: Represents 8-byte double-precision floating point numbers.
    • DecimalType: Represents arbitrary-precision signed decimal numbers. Backed internally by java.math.BigDecimal. A BigDecimal consists of an arbitrary precision integer unscaled value and a 32-bit integer scale.
  • String type
    • StringType: Represents character string values.
  • Binary type
    • BinaryType: Represents byte sequence values.
  • Boolean type
    • BooleanType: Represents boolean values.
  • Datetime type
    • TimestampType: Represents values comprising values of fields year, month, day, hour, minute, and second.
    • DateType: Represents values comprising values of fields year, month, day.
  • Complex types
    • ArrayType(elementType, containsNull): Represents values comprising a sequence of elements with the type of elementType. containsNull is used to indicate if elements in a ArrayType value can have null values.
    • MapType(keyType, valueType, valueContainsNull): Represents values comprising a set of key-value pairs. The data type of keys are described by keyType and the data type of values are described by valueType. For a MapType value, keys are not allowed to have null values. valueContainsNull is used to indicate if values of a MapType value can have null values.
    • StructType(fields): Represents values with the structure described by a sequence of StructFields (fields).
      • StructField(name, dataType, nullable): Represents a field in a StructType. The name of a field is indicated by name. The data type of a field is indicated by dataType. nullable is used to indicate if values of this fields can have null values.

Spark SQL中的数据类型都在pyspark.sql.types的包中, 通过以下方式访问:

from pysparl.sql.types import *
Data type Value type in Python API to access or create a data type
ByteType int or long, Note: Numbers will be converted to 1-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -128 to 127. ByteType()
ShortType int or long, Note: Numbers will be converted to 2-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -32768 to 32767. ShortType()
IntegerType int or long IntegerType()
LongType long, Note: Numbers will be converted to 8-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -9223372036854775808 to 9223372036854775807. Otherwise, please convert data to decimal.Decimal and use DecimalType. LongType()
FloatType float, Note: Numbers will be converted to 4-byte single-precision floating point numbers at runtime. FloatType()
DoubleType float DoubleType()
DecimalType decimal.Decimal DecimalType()
StringType string StringType()
BinaryType bytearray BinaryType()
BooleanType bool BooleanType()
TimestampType datetime.datetime TimestampType()
DateType datetime.date DateType()
ArrayType list, tuple, or array ArrayType(elementType, [containsNull]), Note: The default value of containsNull is True.
MapType dict MapType(keyType, valueType, [valueContainsNull]), Note: The default value of valueContainsNull is True.
StructType list or tuple StructType(fields), Note: fields is a Seq of StructFields. Also, two fields with the same name are not allowed.
StructField The value type in Python of the data type of this field (For example, Int for a StructField with the data type IntegerType) StructField(name, dataType, [nullable]), Note: The default value of nullable is True.

6.2 NaN Semantics

当处理一些不符合标准浮点数语义的 float 或 double 类型时,对于 Not-a-Number(NaN) 需要做一些特殊处理. 具体如下:

  • NaN = NaN 返回 true.
  • 在 aggregations(聚合)操作中,所有的 NaN values 将被分到同一个组中.
  • 在 join key 中 NaN 可以当做一个普通的值.
  • NaN 值在升序排序中排到最后,比任何其他数值都大.
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/xiligey1/article/details/80267873

智能推荐

oracle 12c 集群安装后的检查_12c查看crs状态-程序员宅基地

文章浏览阅读1.6k次。安装配置gi、安装数据库软件、dbca建库见下:http://blog.csdn.net/kadwf123/article/details/784299611、检查集群节点及状态:[root@rac2 ~]# olsnodes -srac1 Activerac2 Activerac3 Activerac4 Active[root@rac2 ~]_12c查看crs状态

解决jupyter notebook无法找到虚拟环境的问题_jupyter没有pytorch环境-程序员宅基地

文章浏览阅读1.3w次,点赞45次,收藏99次。我个人用的是anaconda3的一个python集成环境,自带jupyter notebook,但在我打开jupyter notebook界面后,却找不到对应的虚拟环境,原来是jupyter notebook只是通用于下载anaconda时自带的环境,其他环境要想使用必须手动下载一些库:1.首先进入到自己创建的虚拟环境(pytorch是虚拟环境的名字)activate pytorch2.在该环境下下载这个库conda install ipykernelconda install nb__jupyter没有pytorch环境

国内安装scoop的保姆教程_scoop-cn-程序员宅基地

文章浏览阅读5.2k次,点赞19次,收藏28次。选择scoop纯属意外,也是无奈,因为电脑用户被锁了管理员权限,所有exe安装程序都无法安装,只可以用绿色软件,最后被我发现scoop,省去了到处下载XXX绿色版的烦恼,当然scoop里需要管理员权限的软件也跟我无缘了(譬如everything)。推荐添加dorado这个bucket镜像,里面很多中文软件,但是部分国外的软件下载地址在github,可能无法下载。以上两个是官方bucket的国内镜像,所有软件建议优先从这里下载。上面可以看到很多bucket以及软件数。如果官网登陆不了可以试一下以下方式。_scoop-cn

Element ui colorpicker在Vue中的使用_vue el-color-picker-程序员宅基地

文章浏览阅读4.5k次,点赞2次,收藏3次。首先要有一个color-picker组件 <el-color-picker v-model="headcolor"></el-color-picker>在data里面data() { return {headcolor: ’ #278add ’ //这里可以选择一个默认的颜色} }然后在你想要改变颜色的地方用v-bind绑定就好了,例如:这里的:sty..._vue el-color-picker

迅为iTOP-4412精英版之烧写内核移植后的镜像_exynos 4412 刷机-程序员宅基地

文章浏览阅读640次。基于芯片日益增长的问题,所以内核开发者们引入了新的方法,就是在内核中只保留函数,而数据则不包含,由用户(应用程序员)自己把数据按照规定的格式编写,并放在约定的地方,为了不占用过多的内存,还要求数据以根精简的方式编写。boot启动时,传参给内核,告诉内核设备树文件和kernel的位置,内核启动时根据地址去找到设备树文件,再利用专用的编译器去反编译dtb文件,将dtb还原成数据结构,以供驱动的函数去调用。firmware是三星的一个固件的设备信息,因为找不到固件,所以内核启动不成功。_exynos 4412 刷机

Linux系统配置jdk_linux配置jdk-程序员宅基地

文章浏览阅读2w次,点赞24次,收藏42次。Linux系统配置jdkLinux学习教程,Linux入门教程(超详细)_linux配置jdk

随便推点

matlab(4):特殊符号的输入_matlab微米怎么输入-程序员宅基地

文章浏览阅读3.3k次,点赞5次,收藏19次。xlabel('\delta');ylabel('AUC');具体符号的对照表参照下图:_matlab微米怎么输入

C语言程序设计-文件(打开与关闭、顺序、二进制读写)-程序员宅基地

文章浏览阅读119次。顺序读写指的是按照文件中数据的顺序进行读取或写入。对于文本文件,可以使用fgets、fputs、fscanf、fprintf等函数进行顺序读写。在C语言中,对文件的操作通常涉及文件的打开、读写以及关闭。文件的打开使用fopen函数,而关闭则使用fclose函数。在C语言中,可以使用fread和fwrite函数进行二进制读写。‍ Biaoge 于2024-03-09 23:51发布 阅读量:7 ️文章类型:【 C语言程序设计 】在C语言中,用于打开文件的函数是____,用于关闭文件的函数是____。

Touchdesigner自学笔记之三_touchdesigner怎么让一个模型跟着鼠标移动-程序员宅基地

文章浏览阅读3.4k次,点赞2次,收藏13次。跟随鼠标移动的粒子以grid(SOP)为partical(SOP)的资源模板,调整后连接【Geo组合+point spirit(MAT)】,在连接【feedback组合】适当调整。影响粒子动态的节点【metaball(SOP)+force(SOP)】添加mouse in(CHOP)鼠标位置到metaball的坐标,实现鼠标影响。..._touchdesigner怎么让一个模型跟着鼠标移动

【附源码】基于java的校园停车场管理系统的设计与实现61m0e9计算机毕设SSM_基于java技术的停车场管理系统实现与设计-程序员宅基地

文章浏览阅读178次。项目运行环境配置:Jdk1.8 + Tomcat7.0 + Mysql + HBuilderX(Webstorm也行)+ Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。项目技术:Springboot + mybatis + Maven +mysql5.7或8.0+html+css+js等等组成,B/S模式 + Maven管理等等。环境需要1.运行环境:最好是java jdk 1.8,我们在这个平台上运行的。其他版本理论上也可以。_基于java技术的停车场管理系统实现与设计

Android系统播放器MediaPlayer源码分析_android多媒体播放源码分析 时序图-程序员宅基地

文章浏览阅读3.5k次。前言对于MediaPlayer播放器的源码分析内容相对来说比较多,会从Java-&amp;amp;gt;Jni-&amp;amp;gt;C/C++慢慢分析,后面会慢慢更新。另外,博客只作为自己学习记录的一种方式,对于其他的不过多的评论。MediaPlayerDemopublic class MainActivity extends AppCompatActivity implements SurfaceHolder.Cal..._android多媒体播放源码分析 时序图

java 数据结构与算法 ——快速排序法-程序员宅基地

文章浏览阅读2.4k次,点赞41次,收藏13次。java 数据结构与算法 ——快速排序法_快速排序法