How to view pyspark temporary tables on Thrift server?

spark thrift server
spark thrift server performance
spark createorreplacetempview example
spark hive jdbc connection
spark create temp table
spark temp table
pyspark sql example

I'm trying to make a temporary table a create on pyspark available via Thrift. My final goal is to be able to access that from a database client like DBeaver using JDBC.

I'm testing first using beeline.

This is what i'm doing.

  1. Started a cluster with one worker in my own machine using docker and added spark.sql.hive.thriftServer.singleSession true on spark-defaults.conf
  2. Started Pyspark shell (for testing sake) and ran the following code:

    from pyspark.sql import Row l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)] rdd = sc.parallelize(l) people = rdd.map(lambda x: Row(name=x[0], age=int(x[1]))) people = people.toDF().cache() peebs = people.createOrReplaceTempView('peebs') result = sqlContext.sql('select * from peebs')

    So far so good, everything works fine.

  3. On a different terminal I initialize spark thrift server: ./sbin/start-thriftserver.sh --hiveconf hive.server2.thrift.port=10001 --conf spark.executor.cores=1 --master spark://172.18.0.2:7077

    The server appears to start normally and I'm able to see both pyspark and thrift server jobs running on my spark cluster master UI.

  4. I then connect to the cluster using beeline

    ./bin/beeline beeline> !connect jdbc:hive2://172.18.0.2:10001

    This is what I got

    Connecting to jdbc:hive2://172.18.0.2:10001 Enter username for jdbc:hive2://172.18.0.2:10001: Enter password for jdbc:hive2://172.18.0.2:10001: 2019-06-29 20:14:25 INFO Utils:310 - Supplied authorities: 172.18.0.2:10001 2019-06-29 20:14:25 INFO Utils:397 - Resolved authority: 172.18.0.2:10001 2019-06-29 20:14:25 INFO HiveConnection:203 - Will try to open client transport with JDBC Uri: jdbc:hive2://172.18.0.2:10001 Connected to: Spark SQL (version 2.3.3) Driver: Hive JDBC (version 1.2.1.spark2) Transaction isolation: TRANSACTION_REPEATABLE_READ

    Seems to be ok.

  5. When I list show tables; I can't see anything.

Two interesting things I'd like to highlight is:

  1. When I start pyspark I get these warnings

    WARN ObjectStore:6666 - Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0

    WARN ObjectStore:568 - Failed to get database default, returning NoSuchObjectException

    WARN ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException

  2. When I start the thrift server I get these:

    rsync from spark://172.18.0.2:7077 ssh: Could not resolve hostname spark: Name or service not known rsync: connection unexpectedly closed (0 bytes received so far) [Receiver] rsync error: unexplained error (code 255) at io.c(235) [Receiver=3.1.2] starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to ...

I've been through several posts and discussions. I see people saying we can't have temporary tables exposed via thrift unless you start the server from within the same code. If that's true how can I do that in python (pyspark)?

Thanks

In case someone needs to do this in Spark Streaming I got it to work like this.

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from py4j.java_gateway import java_import

spark = SparkSession \
    .builder \
    .appName('restlogs_qlik') \
    .enableHiveSupport()\
    .config('spark.sql.hive.thriftServer.singleSession', True)\
    .config('hive.server2.thrift.port', '10001') \
    .getOrCreate()

sc=spark.sparkContext
sc.setLogLevel('INFO')

#Order matters! 
java_import(sc._gateway.jvm, "")
sc._gateway.jvm.org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.startWithContext(spark._jwrapped)

#Define schema of json
schema = StructType().add("partnerid", "string").add("sessionid", "string").add("functionname", "string").add("functionreturnstatus", "string")

#load data into spark-structured streaming
df = spark \
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "localhost:9092") \
      .option("subscribe", "rest_logs") \
      .load() \
      .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

#Print output
query = df.writeStream \
            .outputMode("append") \
            .format("memory") \
            .queryName("view_figures") \
            .start()

query.awaitTermination();

After you start it up you can JDBC with beehive to test. What I can't comprehend is that I have to start the Thrift server in the same script. This is how to start the script.

    spark-submit --master local[2] \
--conf "spark.driver.extraClassPath=D:\\Libraries\\m2_repository\\org\\apache\\kafka\\kafka-clients\\2.0.0\\kafka-clients-2.0.0.jar" \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1 \
"C:\\Temp\\spark_kafka.py"

Hope this helps someone. By the way, I am in preliminary research stage so don't judge me.

Spark SQL and DataFrames, Running the Thrift JDBC/ODBC server; Running the Spark SQL CLI can create DataFrame s from an existing RDD , from a Hive table, or from data sources. operated on as normal RDDs and can also be used to create a temporary view. I'm learning pyspark and as an exercise I'm trying to do something that I assumed would be a simple thing but its giving me a very hard time. So let's start with what I'm doing. I'm working with windows and I'm trying to start a local thrift server and connect via beeline to a database I created within my app in a spark session.

createOrReplaceTempView creates an in-memory table. The Spark thrift server needs to be started on the same driver JVM where we created the in-memory table. In the above example, the driver on which the table is created and the driver running STS(Spark Thrift server) are different. Two options 1. Create the table using createOrReplaceTempView in the same JVM where the STS is started. 2. Use a backing metastore, and create tables using org.apache.spark.sql.DataFrameWriter#saveAsTable so that tables are accessible independent of the JVM(in fact without any Spark driver.

Regarding the errors: 1. Relates to client and server metastore version. 2. Seems like some rsync script trying to decode spark:\\ url Both doesnt seems to be related to the issue.

Publish Spark SQL DataFrame and RDD with Spark Thrift Server , With Spark Thrift Server, data is exposed to any JDBC client such as Hive's shell called Browsing the WebUI, you shall see the Spark Slave service being in memory and to expose it as a taxiridescache temporary table is:. Which means each JDBC/ODBC connection owns a copy of their own SQL configuration and temporary function registry. Cached tables are still shared though. If you prefer to run the Thrift server in the old single-session mode, please set option spark.sql.hive.thriftServer.singleSession to true.

After doing several tests I was able to come up with a simple (no authentication) code that's working for me.

It's important noticing that if you want to make temporary tables available via JDBC you need to start thrift server in the same JVM (same spark job) and ensure the code hangs so the application is kept running in the cluster.

Following a working sample code I created for reference:

import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from py4j.java_gateway import java_import

spark = SparkSession \
    .builder \
    .appName('the_test') \
    .enableHiveSupport()\
    .config('spark.sql.hive.thriftServer.singleSession', True)\
    .config('hive.server2.thrift.port', '10001') \
    .getOrCreate()

sc=spark.sparkContext
sc.setLogLevel('INFO')

java_import(sc._gateway.jvm, "")


from pyspark.sql import Row
l = [('John', 20), ('Heather', 34), ('Sam', 23), ('Danny', 36)]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
people = people.toDF().cache()
peebs = people.createOrReplaceTempView('peebs')

sc._gateway.jvm.org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.startWithContext(spark._jwrapped)

while True:
    time.sleep(10)

I simply used the .py above in my spark-submit and the I was able to connect via JDBC through beeline and using DBeaver using the Hive JDBC Driver.

How does createOrReplaceTempView work in Spark?, How do I create a temporary table from a DataFrame spark? If you want to have a temporary view that is shared among all sessions and keep alive until the Spark application terminates, you can create a global temporary view. The global table remains accessible as long as the application is alive. Opening a new shell and giving it the same application will just create a new application.

Thrift JDBC/ODBC Server, HiveContext import org.apache.spark.sql.hive.thriftserver. HiveThriftServer2 I can access the spark thrift but do not see the temporary table. from pyspark.sql import * from pyspark.sql.types import * When running an interactive query in Jupyter, the web browser window or tab caption shows a (Busy) status along with the notebook title. You also see a solid circle next to the PySpark text in the top-right corner. After the job is completed, it changes to a hollow circle.

Solved: Access spark temporary table via JDBC, Now If I execute show tables, I'm expecting to see paymentDataCache temporary table. Please find attached screen shot. I also tried to start the thrift server using. Running the Thrift JDBC/ODBC server. The Thrift JDBC/ODBC server implemented here corresponds to the HiveServer2 in built-in Hive. You can test the JDBC server with the beeline script that comes with either Spark or compatible Hive. To start the JDBC/ODBC server, run the following in the Spark directory:

Spark temporary table is not shown in beeline, I can't see any Temporary Table names in the dropdown menu plus Would that be a pyhive issue or a Spark Thrift server implementation  createOrReplaceTempView creates an in-memory table. The Spark thrift server needs to be started on the same driver JVM where we created the in-memory table. In the above example, the driver on which the table is created and the driver running STS(Spark Thrift server) are different. Two options 1.

Comments
  • Regarding #1, is there a way I could create the view in the same JVM without having to start STS at the end of my code? I mean, is there a way to create that and still start STS using the start-thriftserver.sh? Would you have some sample code? About #2 the problem is, I'd like to make some transformations on data before exposing via JDBC but I don't want to generate or handle one more table.
  • @BrunoFaria , u can use org.apache.spark.sql.hive.thriftserver.HiveThriftServer2#start . In the driver code, create the temp tables, and in the final line of the driver code start the STS. Also the thrift properties needs to be set rightly(check cwiki.apache.org/confluence/display/Hive/… for properties with prefix hive.server2)