GCP dataflow with python. "AttributeError: Can't get attribute '_JsonSink' on module 'dataflow_worker.start'

google cloud dataflow
google dataflow tutorial
apache beam
dataflow documentation
gcp dataflow pdf
dataflow python dependencies
google cloud dataflow use cases
dataflow runner

I am new in GCP dataflow.

I try to read text files(one-line JSON string) into JSON format from GCP cloud storage, then split it based on values of certain field and output to GCP cloud storage (as JSON string text file).

Here is my code

However, I encounter some error on GCP dataflow:

Traceback (most recent call last):
  File "main.py", line 169, in <module>
    run()
  File "main.py", line 163, in run
    shard_name_template='')
  File "C:\ProgramData\Miniconda3\lib\site-packages\apache_beam\pipeline.py", line 426, in __exit__
    self.run().wait_until_finish()
  File "C:\ProgramData\Miniconda3\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", line 1346, in wait_until_finish
    (self.state, getattr(self._runner, 'last_error_msg', None)), self)
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 773, in run
    self._load_main_session(self.local_staging_directory)
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 489, in _load_main_session
    pickler.load_session(session_file)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 287, in load_session
    return dill.load_session(file_path)
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 410, in load_session
    module = unpickler.load()
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 474, in find_class
    return StockUnpickler.find_class(self, module, name)
AttributeError: Can't get attribute '_JsonSink' on <module 'dataflow_worker.start' from '/usr/local/lib/python3.7/site-packages/dataflow_worker/start.py'>

I am able to run this script locally, but it fails when I try to use dataflowRunner

Please give me some suggestions.

PS. apache-beam version: 2.15.0

[Update1]

I try @Yueyang Qiu suggestion, add

pipeline_options.view_as(SetupOptions).save_main_session = True

The provided link says:

DoFn's in this workflow relies on global context (e.g., a module imported at module level)

This link supports the suggestion above.

However, the same error occurred.

So, I am thinking whether my implementation of _JsonSink (inherit from filebasedsink.FileBasedSink) is wrong or something else needed to be added.

Any opinion would be appreciated, thank you all!

You have encountered a known issue that currently (as of 2.17.0 release), Beam does not support super() calls in main module on Python 3. Please take a look at possible solutions in BEAM-6158. Udi's answer is a good way to address this until BEAM-6158 is resolved, this way you don't have to run your pipeline on Python 2.

Release notes: Cloud Dataflow SDK for Python, Safari 8+ (Safari private mode is not supported). A Google Cloud Platform project. What you learn. In this lab, you learn how to: Write a simple pipeline in Python  Dataflow enables fast, simplified streaming data pipeline development with lower data latency. Simplify operations and management Allow teams to focus on programming instead of managing server

Using the guidelines from here, I managed get your example to run.

Directory structure:

./setup.py
./dataflow_json
./dataflow_json/dataflow_json.py  (no change from your example)
./dataflow_json/__init__.py  (empty file)
./main.py

setup.py:

import setuptools

setuptools.setup(
  name='dataflow_json',
  version='1.0',
  install_requires=[],
  packages=setuptools.find_packages(),
)

main.py:

from __future__ import absolute_import

from dataflow_json import dataflow_json

if __name__ == '__main__':
    dataflow_json.run()

and you run the pipeline with python main.py.

Basically what's happening is that the '--setup_file=./setup.py' flag tells Beam to create a package and install it on the Dataflow remote worker. The __init__.py file is required for setuptools to identify the dataflow_json/ directory as a package.

Google Cloud Dataflow with Python for Satellite Image Analysis, is a cloud-based data processing service for both batch and real-time data streaming applications. It enables developers to set up processing pipelines for integrating, preparing and analyzing large data sets, such as those found in Web analytics or big data analytics applications. Handily, the Python SDK for Dataflow has the ability to leverage Python libraries along with an existing custom code already built in Python. It is possible to package your Dataflow deployment in

I finally find out the problem:

the class '_jsonsink' I implement using some features form Python3

However, I do not aware of what version of Python I am using for 'Dataflowrunner' (Actually, I have not figured out how to specify the python version for dataflow runner on GCP. Any suggestions?)

Hence, I re-write my code to Python2-compatible version, everything works fine!

Thanks for all of you!

A simple Dataflow pipeline (Python) - Codelabs, Google Cloud Dataflow SDK for Python is based on Apache Beam and targeted for executing Python pipelines on Google Cloud Dataflow. Getting Started. To submit   jobs to the Dataflow service using Python, your development environment will   require Python, the Google Cloud SDK and the Apache Beam SDK for Python.   Additionally, Cloud Dataflow uses pip3, Python's package manager, to manage SDK   dependencies, and virtualenv to create isolated Python environments.

Can you try setting option save_main_session = True as in here: https://github.com/apache/beam/blob/a2b0ad14f1525d1a645cb26f5b8ec45692d9d54e/sdks/python/apache_beam/examples/cookbook/coders.py#L88.

What is Google Cloud Dataflow?, large-scale data processing pipelines. This redistribution of Apache Beam is targeted for executing batch Python pipelines on Google Cloud Dataflow. Is there any guidance available to use Google Cloud SQL as a Dataflow read source and/or sink? At the Apache Beam Python SDK 2.1.0 documentation there isn't a chapter mentioning Google Cloud SQL. But there is written about BigQuery.

Apache Beam | Technology Radar, The Cloud Storage or the local path to the python file being run. String, No. project_id, The ID of the parent project of the Dataflow job. GCPProjectID, No. Write a simple pipeline in Python; Execute the query on the local machine; Execute the query on the cloud; The goal of this lab is to become familiar with the structure of a Dataflow project and learn how to execute a Dataflow pipeline. Step 1. Start CloudShell and navigate to the directory for this lab:

dataflow-python-examples, In particular, I will be using Apache Beam (python version), Dataflow, Google Cloud Platform provides a bunch of really useful tools for big  Experience with GCP Cloud DataFlow highly preferred. Hands-onexperience in SQL, Spark and Python. Experience with GCP to include: BigQuery. Cloud Data Fusion, Cloud Pub/Sub. Kubernetes. Also, Apache Kafka. GCP Certifications. Experiencewith other leading commercial Cloud platforms, including Azure and AWS. Experience leading workstreams or

GoogleCloudPlatform/DataflowPythonSDK: Google Cloud , How I was able to write a severless ETL job, with no previous experience using Google Cloud Dataflow on Python and Google Cloud  Offered by Google Cloud. *Note: this is a new course with updated content from what you may have seen in the previous version of this Specialization. Processing streaming data is becoming increasingly popular as streaming enables businesses to get real-time metrics on business operations. This course covers how to build streaming data pipelines on Google Cloud Platform. Cloud Pub/Sub is

Comments
  • Hi! The process of migration of Dataflow runner to python 3 is currently taking place. You can see the updates on the supported features here! jira.apache.org/jira/browse/BEAM-1251?subTaskView=unresolved.
  • Thanks for your suggestion. I try it and the error mentioned above disappeared and another error shows up: "ModuleNotFoundError: No module named 'libs' ", where 'libs' are my local modules. Does it mean that I can not use the local module with dataflowRunner? (everything is fine on local)
  • Check if there is an indentation problem and follow this doc: cloud.google.com/dataflow/docs/resources/…
  • @MonicaPC I have read that article before, but I do not find any good solution to solve my case. Hence, I try to simplify my problem. Now, a single file with a self-implemented class for data output. But it can not be recognized by dataflowRunner.