How to view pyspark temporary tables on Thrift server?
spark thrift server performance
spark createorreplacetempview example
spark hive jdbc connection
spark create temp table
spark temp table
pyspark sql example
I'm trying to make a temporary table a create on pyspark available via Thrift. My final goal is to be able to access that from a database client like DBeaver using JDBC.
I'm testing first using beeline.
This is what i'm doing.
- Started a cluster with one worker in my own machine using docker and added
Started Pyspark shell (for testing sake) and ran the following code:
from pyspark.sql import Row l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)] rdd = sc.parallelize(l) people = rdd.map(lambda x: Row(name=x, age=int(x))) people = people.toDF().cache() peebs = people.createOrReplaceTempView('peebs') result = sqlContext.sql('select * from peebs')
So far so good, everything works fine.
On a different terminal I initialize spark thrift server:
./sbin/start-thriftserver.sh --hiveconf hive.server2.thrift.port=10001 --conf spark.executor.cores=1 --master spark://172.18.0.2:7077
The server appears to start normally and I'm able to see both pyspark and thrift server jobs running on my spark cluster master UI.
I then connect to the cluster using beeline
./bin/beeline beeline> !connect jdbc:hive2://172.18.0.2:10001
This is what I got
Connecting to jdbc:hive2://172.18.0.2:10001 Enter username for jdbc:hive2://172.18.0.2:10001: Enter password for jdbc:hive2://172.18.0.2:10001: 2019-06-29 20:14:25 INFO Utils:310 - Supplied authorities: 172.18.0.2:10001 2019-06-29 20:14:25 INFO Utils:397 - Resolved authority: 172.18.0.2:10001 2019-06-29 20:14:25 INFO HiveConnection:203 - Will try to open client transport with JDBC Uri: jdbc:hive2://172.18.0.2:10001 Connected to: Spark SQL (version 2.3.3) Driver: Hive JDBC (version 1.2.1.spark2) Transaction isolation: TRANSACTION_REPEATABLE_READ
Seems to be ok.
When I list
show tables;I can't see anything.
Two interesting things I'd like to highlight is:
When I start pyspark I get these warnings
WARN ObjectStore:6666 - Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
WARN ObjectStore:568 - Failed to get database default, returning NoSuchObjectException
WARN ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
When I start the thrift server I get these:
rsync from spark://172.18.0.2:7077 ssh: Could not resolve hostname spark: Name or service not known rsync: connection unexpectedly closed (0 bytes received so far) [Receiver] rsync error: unexplained error (code 255) at io.c(235) [Receiver=3.1.2] starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to ...
I've been through several posts and discussions. I see people saying we can't have temporary tables exposed via thrift unless you start the server from within the same code. If that's true how can I do that in python (pyspark)?
In case someone needs to do this in Spark Streaming I got it to work like this.
from pyspark.sql.functions import * from pyspark.sql.types import * from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession from py4j.java_gateway import java_import spark = SparkSession \ .builder \ .appName('restlogs_qlik') \ .enableHiveSupport()\ .config('spark.sql.hive.thriftServer.singleSession', True)\ .config('hive.server2.thrift.port', '10001') \ .getOrCreate() sc=spark.sparkContext sc.setLogLevel('INFO') #Order matters! java_import(sc._gateway.jvm, "") sc._gateway.jvm.org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.startWithContext(spark._jwrapped) #Define schema of json schema = StructType().add("partnerid", "string").add("sessionid", "string").add("functionname", "string").add("functionreturnstatus", "string") #load data into spark-structured streaming df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "rest_logs") \ .load() \ .select(from_json(col("value").cast("string"), schema).alias("parsed_value")) #Print output query = df.writeStream \ .outputMode("append") \ .format("memory") \ .queryName("view_figures") \ .start() query.awaitTermination();
After you start it up you can JDBC with beehive to test. What I can't comprehend is that I have to start the Thrift server in the same script. This is how to start the script.
spark-submit --master local \ --conf "spark.driver.extraClassPath=D:\\Libraries\\m2_repository\\org\\apache\\kafka\\kafka-clients\\2.0.0\\kafka-clients-2.0.0.jar" \ --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1 \ "C:\\Temp\\spark_kafka.py"
Hope this helps someone. By the way, I am in preliminary research stage so don't judge me.
Spark SQL and DataFrames, Running the Thrift JDBC/ODBC server; Running the Spark SQL CLI can create DataFrame s from an existing RDD , from a Hive table, or from data sources. operated on as normal RDDs and can also be used to create a temporary view. I'm learning pyspark and as an exercise I'm trying to do something that I assumed would be a simple thing but its giving me a very hard time. So let's start with what I'm doing. I'm working with windows and I'm trying to start a local thrift server and connect via beeline to a database I created within my app in a spark session.
createOrReplaceTempView creates an in-memory table. The Spark thrift server needs to be started on the same driver JVM where we created the in-memory table.
In the above example, the driver on which the table is created and the driver running STS(Spark Thrift server) are different.
1. Create the table using
createOrReplaceTempView in the same JVM where the STS is started.
2. Use a backing metastore, and create tables using
org.apache.spark.sql.DataFrameWriter#saveAsTable so that tables are accessible independent of the JVM(in fact without any Spark driver.
Regarding the errors:
1. Relates to client and server metastore version.
2. Seems like some rsync script trying to decode
Both doesnt seems to be related to the issue.
Publish Spark SQL DataFrame and RDD with Spark Thrift Server , With Spark Thrift Server, data is exposed to any JDBC client such as Hive's shell called Browsing the WebUI, you shall see the Spark Slave service being in memory and to expose it as a taxiridescache temporary table is:. Which means each JDBC/ODBC connection owns a copy of their own SQL configuration and temporary function registry. Cached tables are still shared though. If you prefer to run the Thrift server in the old single-session mode, please set option spark.sql.hive.thriftServer.singleSession to true.
After doing several tests I was able to come up with a simple (no authentication) code that's working for me.
It's important noticing that if you want to make temporary tables available via JDBC you need to start thrift server in the same JVM (same spark job) and ensure the code hangs so the application is kept running in the cluster.
Following a working sample code I created for reference:
import time from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession from py4j.java_gateway import java_import spark = SparkSession \ .builder \ .appName('the_test') \ .enableHiveSupport()\ .config('spark.sql.hive.thriftServer.singleSession', True)\ .config('hive.server2.thrift.port', '10001') \ .getOrCreate() sc=spark.sparkContext sc.setLogLevel('INFO') java_import(sc._gateway.jvm, "") from pyspark.sql import Row l = [('John', 20), ('Heather', 34), ('Sam', 23), ('Danny', 36)] rdd = sc.parallelize(l) people = rdd.map(lambda x: Row(name=x, age=int(x))) people = people.toDF().cache() peebs = people.createOrReplaceTempView('peebs') sc._gateway.jvm.org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.startWithContext(spark._jwrapped) while True: time.sleep(10)
I simply used the .py above in my spark-submit and the I was able to connect via JDBC through beeline and using DBeaver using the Hive JDBC Driver.
How does createOrReplaceTempView work in Spark?, How do I create a temporary table from a DataFrame spark? If you want to have a temporary view that is shared among all sessions and keep alive until the Spark application terminates, you can create a global temporary view. The global table remains accessible as long as the application is alive. Opening a new shell and giving it the same application will just create a new application.
Thrift JDBC/ODBC Server, HiveContext import org.apache.spark.sql.hive.thriftserver. HiveThriftServer2 I can access the spark thrift but do not see the temporary table. from pyspark.sql import * from pyspark.sql.types import * When running an interactive query in Jupyter, the web browser window or tab caption shows a (Busy) status along with the notebook title. You also see a solid circle next to the PySpark text in the top-right corner. After the job is completed, it changes to a hollow circle.
Solved: Access spark temporary table via JDBC, Now If I execute show tables, I'm expecting to see paymentDataCache temporary table. Please find attached screen shot. I also tried to start the thrift server using. Running the Thrift JDBC/ODBC server. The Thrift JDBC/ODBC server implemented here corresponds to the HiveServer2 in built-in Hive. You can test the JDBC server with the beeline script that comes with either Spark or compatible Hive. To start the JDBC/ODBC server, run the following in the Spark directory:
Spark temporary table is not shown in beeline, I can't see any Temporary Table names in the dropdown menu plus Would that be a pyhive issue or a Spark Thrift server implementation createOrReplaceTempView creates an in-memory table. The Spark thrift server needs to be started on the same driver JVM where we created the in-memory table. In the above example, the driver on which the table is created and the driver running STS(Spark Thrift server) are different. Two options 1.
- Regarding #1, is there a way I could create the view in the same JVM without having to start STS at the end of my code? I mean, is there a way to create that and still start STS using the start-thriftserver.sh? Would you have some sample code? About #2 the problem is, I'd like to make some transformations on data before exposing via JDBC but I don't want to generate or handle one more table.
- @BrunoFaria , u can use
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2#start. In the driver code, create the temp tables, and in the final line of the driver code start the STS. Also the thrift properties needs to be set rightly(check cwiki.apache.org/confluence/display/Hive/… for properties with prefix