Elasticearch and Spark: Updating existing entities

apache spark elasticsearch example
spark elasticsearch configuration
elasticsearch-spark jar
spark elasticsearch update document
elasticsearch-hadoop example
org.elasticsearch.spark.sql pyspark
elasticsearch spark package
spark sql elasticsearch join

What is the correct way, when using Elasticsearch with Spark, to update existing entities?

I wanted to something like the following:

  1. Get existing data as a map.
  2. Create a new map, and populate it with the updated fields.
  3. Persist the new map.

However, there are several issues:

  1. The list of returned fields cannot contain the _id, as it is not part of the source.
  2. If, for testing, I hardcode an existing _id in the map of new values, the following exception is thrown:

    org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest

How should the _id be retrieved, and how should it be passed back to Spark?

I include the following code below to better illustrate what I was trying to do:

JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, INDEX_NAME+"/"+TYPE_NAME, 
"?source=,field1,field2).values();

Iterator<Map<String, Object>> iter = esRDD.toLocalIterator();
List<Map<String, Object>> listToPersist = new ArrayList<Map<String, Object>>();
while(iter.hasNext()){
   Map<String, Object> map = iter.next();
   // Get existing values, and do transformation logic

   Map<String, Object> newMap = new HashMap<String, Object>();
   newMap.put("_id", ??????);
   newMap.put("field1", new_value);
   listToPersist.add(newMap);
}
JavaRDD javaRDD = jsc.parallelize(ImmutableList.copyOf(listToPersist));
JavaEsSpark.saveToEs(javaRDD, INDEX_NAME+"/"+TYPE_NAME); 

Ideally, I would want to update the existing map in place, rather than create a new one.

Does anyone have any example code to show, when using Spark, the correct way to update existing entities in elasticsearch?

Thanks


This is how I've done it (Scala/Spark 2.3/Elastic-Hadoop v6.5).

To read (id or other metadata):

spark
    .read
    .format("org.elasticsearch.spark.sql")
    .option("es.read.metadata",true) // allow to read metadata
    .load("yourindex/yourtype")
    .select(col("_metadata._id").as("myId"),...)

To update particular columns in ES:

myDataFrame
    .select("myId","columnToUpdate")
    .saveToEs(
        "yourindex/yourtype",
        Map(
            "es.mapping.id" -> "myId",
            "es.write.operation" -> "update", // important to change operation to partial update
            "es.mapping.exclude" -> "myId"
        )
    )

elasticsearch, What is the correct way, when using Elasticsearch with Spark, to update existing entities? I wanted to something like the following: Get existing  What is the correct way, when using Elasticsearch with Spark, to update existing entities? I wanted to something like the following: Get existing data as a map. Create a new map, and populate it with the updated fields. Persist the new map. However, there are several issues:


Try adding this upsert to your Spark:

.config("es.write.operation", "upsert")

that will let you add new fields to existing documents

Using ElasticSearch with Apache Spark – BMC Blogs, For example, organizations often use ElasticSearch with logstash or filebeat to send web server logs, Windows events, Linux ElasticSearch Spark is a connector that existed before 2.1 and is still supported. Last updated: 10/11/​2017. Update ElasticSearch Run code with spark-submit Create Data. Prerequisites. ES. Download the binary and do not use apt-get install as the version stored there is too old. Apache Spark. Hadoop-ElasticSearch jar file. When you download it from here, it will provide jars for various languages. Add Data. First we need to add two data records to ES.


According to Elasticsearch Configuration you can get document metadata like _id by set read metadata option to true:

 .config("es.read.metadata", "true")

And i think you cannot use '_id' as field name.

But you can create new field with different name like:

 newMap.put("idfield", yourId);

then set name of the new field as a value for mapping id option to inform elastic that this field has the document id:

 .config("es.mapping.id", "idfield")

BTW don't forget to set write operation as update:

 .config("es.write.operation", "update")

Configuration | Elasticsearch for Apache Hadoop [7.6], Reference documentation of elasticsearch-hadoop. that require a strongly typed mapping (such as a table like Hive or SparkSQL) Useful for extracting the needed data from entities. Typically used for migrating existing update scripts. Hi, I am having a problem updating existing index using spark. I followed the Costin's advice per link below. I tried few variations with few existing documents as a start but I am getting either "Field [_id] is a metadata field and cannot be added inside a document. " error, or in the worst case the documents disappeared from the index.


Updating an existing index using spark, Hi, I am having a problem updating existing index using spark. I followed the Costin's advice per link below. I tried few variations with few  The script can update, delete, or skip modifying the document. The update API also supports passing a partial document, which is merged into the existing document. To fully replace an existing document, use the index API. This operation: Gets the document (collocated with the shard) from the index.


Updating Elasticsearch indexes with Spark, To update an Elasticsearch document using the Spark connector, it is Finally, the last step is to update the existing movies index with the  Entity Framework is an object-relational mapping framework that can be used to work with data as objects. While you can run the ADO.NET Entity Data Model wizard in Visual Studio to handle generating the Entity Model, this approach, the model-first approach, can put you at a disadvantage if there are changes in your data source or if you want more control over how the entities operate.


Tips and Best Practices to Take Advantage of Spark 2.x, With Apache Spark 2.0 and later versions, big improvements were fast reads and writes with efficient updates, automatic partitioning, and sorting. MapR Database Data Modeling: Avoiding JOINS with Nested Entities [Existing Code] Storing and Aggregating Time Series Data With Elastic Search. Whether elasticsearch-hadoop will allow reading of non existing indices (and return an empty data set) or not (and throw an exception) es.field.read.empty.as.null (default yes) Whether elasticsearch-hadoop will treat empty fields as null. This settings is typically not needed (as elasticsearch-hadoop already handles the null case) but is enabled for making it easier to work with text fields that haven’t been sanitized yet.