Automagically Shipping Custom Libraries in Spark Using OarphPy

Iterative development with simple user code in Spark is easy: just declare some functions, create an RDD or DataFrame and go! (See Spark's own demos here). However, if you need to iterate on a whole library of user code, some careful setup and configuraton is necessary.

oarphpy offers helpful automation: oarphpy-managed Spark sessions automagically include a Python Egg containing the entire library wrapping the session instantiator. You can also just point oarphpy at a specific directory and go. For notebooks, oarphpy supports live library code updates, without having to restart the Spark session or notebook kernel.

This notebook demonstrates using oarphpy-managed Spark with a toy library of user code: a web scraper.

Setup

To run this notebook locally, try using the oarphpy/full dockerized environment:

docker run -it --rm --net=host oarphpy/full:0.0.2 jupyter notebook --allow-root

Google Colab You can also run this notebook in Google Colab. In the Colab environment, you'll need to install oarphpy, Spark, and Java. Running the cell below will take care of that for you. You might need to restart the runtime (Use the menu option: Runtime > Restart runtime ...) in order for Colab to recognize the new modules.

In [1]:
import os
import sys
if 'google.colab' in sys.modules:
    !pip install oarphpy[spark]==0.0.3
    !pip install pyspark==2.4.4
    !apt-get update && apt-get install -y openjdk-8-jdk
    os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'

Our Custom Library: A Scraper

Our custom library will scrape images and 'tags' from simple HTML tables like this one:

beach bridge car

We'll store scraped images in a table where each row stores a single image and its annotations. We'll use Spark here as a tool to Extract data from webpages, Transform it into our own data structure(s) / schemas, and Load the transformed rows into some data store (e.g. the local filesystem).

Source Code for the Library

The code for our custom library could live anywhere, but for this demo we're going to put the code in an arbitrary temporary directory. The cell below sets that up.

Aside: When running a Jupyter notebook, the current working directory of the Jupyter process is implicitly included in the PYTHONPATH. (This is a feature, not a bug!) We're going to put our custom library in a random temporary directory to isolate it from Jupyter and simulates having one's own code in a separate directory (e.g. perhaps the repository isolates library code from a notebooks directory).

In [2]:
# Create a random temporary directory for our library
import os
import tempfile
old_cwd = os.getcwd()
tempdir = tempfile.TemporaryDirectory(suffix='_oarphpy_demo')
CUSTOM_LIB_SRC_DIR = tempdir.name
print("Putting demo assets in %s" % CUSTOM_LIB_SRC_DIR)
os.chdir(CUSTOM_LIB_SRC_DIR)
!mkdir -p mymodule
!touch mymodule/__init__.py
Putting demo assets in /tmp/tmpthfz06uk_oarphpy_demo

Now let's write the library. Here's our webpage-scraping code, which simply uses BeautifulSoup to parse image urls and tags from the pages:

In [3]:
!pip3 install bs4
Requirement already satisfied: bs4 in /usr/local/lib/python3.6/dist-packages (0.0.1)
Requirement already satisfied: beautifulsoup4 in /usr/local/lib/python3.6/dist-packages (from bs4) (4.8.2)
Requirement already satisfied: soupsieve>=1.2 in /usr/local/lib/python3.6/dist-packages (from beautifulsoup4->bs4) (1.9.5)
WARNING: You are using pip version 19.3.1; however, version 20.0.2 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.
In [4]:
%%writefile mymodule/scrape.py
def get_image_tag_pairs(url):
    import urllib.request
    from urllib.parse import urljoin
    from urllib.parse import urlparse
    from bs4 import BeautifulSoup
    
    raw_page = urllib.request.urlopen(url).read()
    soup = BeautifulSoup(raw_page, features="html.parser")
    for td in soup.find_all('td'):
        tag = td.text.strip()
        img_src = td.find('img')['src']
        if not urlparse(img_src).scheme:
            img_src = urljoin(url, img_src)
        yield tag, img_src
Writing mymodule/scrape.py

We'll abstract scraped image-tag pairs into a class ImageWithAnno, which will also represent a single row in our final table. In this simple demo, this class mainly helps us encapsulate the code for constructing a table row.

In [5]:
%%writefile mymodule/imagewithanno.py
class ImageWithAnno(object):
    def __init__(self, url='', tag='', image_bytes=None, width=None, height=None):
        self.url = url
        self.tag = tag
        self.image_bytes = image_bytes
        self.width = width
        self.height = height
    
    @staticmethod
    def create_from_url(url):
        import urllib.request
        image_bytes = bytearray(urllib.request.urlopen(url).read())

        # Read the image dimensions without actually decoding the jpeg
        from oarphpy.util import get_jpeg_size
        width, height = get_jpeg_size(image_bytes)
        return ImageWithAnno(
            url=url,
            image_bytes=image_bytes,
            width=width,
            height=height)
Writing mymodule/imagewithanno.py

Now we're done!

In [6]:
print("Changing working directory back to %s" % old_cwd)
os.chdir(old_cwd)
Changing working directory back to /opt/oarphpy/notebooks

Test the Library

Let's do a brief test of the library in this notebook. As is the custom, we modify sys.path to make our library importable to the notebook's Python process.

In [7]:
sys.path.append(CUSTOM_LIB_SRC_DIR)

from mymodule.scrape import get_image_tag_pairs
test_pairs = list(get_image_tag_pairs('https://pwais.github.io/oarphpy-demo-assets/image_with_tags/page1.html'))
test_labels = [l for l, img in test_pairs]
assert set(['beach', 'bridge', 'car']) == set(test_labels), \
    "Got unexpected labels %s" % (test_labels,)

Scraping with Spark and OarphPy

Now we're ready to scrape! For this demo, we'll scrape these pages:

In [8]:
PAGE_URLS = (
    'https://pwais.github.io/oarphpy-demo-assets/image_with_tags/page1.html',
    'https://pwais.github.io/oarphpy-demo-assets/image_with_tags/page2.html',
)

Magic Egg-ification

Now we'll start a Spark session using oarphpy, which will automagically Egg-ify our custom library and ship that Python Egg with our job. By default oarphpy tries to Egg-ify the library surrounding the calling code; this feature is ideal when running Spark jobs from scripts where the scripts and the library code live in the same Python module. For this demo, we just need to point oarphpy at our temp directory:

In [9]:
from oarphpy.spark import NBSpark

NBSpark.SRC_ROOT = os.path.join(CUSTOM_LIB_SRC_DIR, 'mymodule')
spark = NBSpark.getOrCreate()
2020-02-02 09:29:37,976	oarph 6004 : Using source root /tmp/tmpthfz06uk_oarphpy_demo/mymodule 
2020-02-02 09:29:38,005	oarph 6004 : Generating egg to /tmp/tmp5wzokki1_oarphpy_eggbuild ...
2020-02-02 09:29:38,073	oarph 6004 : ... done.  Egg at /tmp/tmp5wzokki1_oarphpy_eggbuild/mymodule-0.0.0-py3.6.egg

Running the cell above should log messages confirming that oarphpy Egg-ified our code and gave it to Spark. Let's now prove that step worked. We'll show that when we import code from our library on the Spark worker (which in this case is a local Python instance running distinct from the notebook), the import works and the imported code is coming from the oarphpy-generated Egg. (This feature even has an explicit unit test in oarphpy !).

In [10]:
def test_mymodule_is_included():
    import mymodule
    return mymodule.__file__

from oarphpy import spark as S
mod_paths = S.for_each_executor(spark, test_mymodule_is_included)
print("Loaded mymodule from %s" % (mod_paths,))
assert all('.egg' in p for p in mod_paths)
Loaded mymodule from ['/tmp/spark-16b3127a-6481-423e-8a3d-79180ec30192/userFiles-dc5890d3-9aac-4848-951d-a8d275e05e05/mymodule-0.0.0-py3.6.egg/mymodule/__init__.py']

Run a Scraping Job

We'll now run a scraping job using Spark's RDD API, which is the easiest way to leverage our custom library.

In [11]:
url_rdd = spark.sparkContext.parallelize(PAGE_URLS)
print("RDD of pages to scrape: %s" % (url_rdd.collect()))

from mymodule.scrape import get_image_tag_pairs
tag_img_url_rdd = url_rdd.flatMap(get_image_tag_pairs)
  # NB: `get_image_tag_pairs` gets sent to Spark workers via `cloudpickle`

def to_image_anno(tag, img_url):
    from mymodule.imagewithanno import ImageWithAnno
    imganno = ImageWithAnno.create_from_url(img_url)
    imganno.tag = tag
    return imganno
image_anno_rdd = tag_img_url_rdd.map(lambda pair: to_image_anno(*pair))
num_images = image_anno_rdd.count()

print("Scraped %s images" % num_images)
assert num_images == 5, "Unexpectedly saw %s images" % num_images
RDD of pages to scrape: ['https://pwais.github.io/oarphpy-demo-assets/image_with_tags/page1.html', 'https://pwais.github.io/oarphpy-demo-assets/image_with_tags/page2.html']
Scraped 5 images

View / Save the results

Now let's create and save that table! Spark can automatically convert some simple Python objects to DataFrame table rows. (oarphpy offers tools for slotted classes, numpy arrays, and more in RowAdapter). We'll leverage Spark's built-in type munging and Parquet support below:

In [12]:
df = spark.createDataFrame(image_anno_rdd)

# Save the results
df.write.parquet('/tmp/demo_results', mode='overwrite')

# Show the results using df.show() or as a Pandas Dataframe, which has pretty printing support in Jupyter.
df.toPandas()
Out[12]:
height image_bytes tag url width
0 53 [255, 216, 255, 224, 0, 16, 74, 70, 73, 70, 0,... beach https://pwais.github.io/oarphpy-demo-assets/im... 120
1 133 [255, 216, 255, 224, 0, 16, 74, 70, 73, 70, 0,... bridge https://pwais.github.io/oarphpy-demo-assets/im... 100
2 75 [255, 216, 255, 224, 0, 16, 74, 70, 73, 70, 0,... car https://pwais.github.io/oarphpy-demo-assets/im... 100
3 84 [255, 216, 255, 224, 0, 16, 74, 70, 73, 70, 0,... challah https://pwais.github.io/oarphpy-demo-assets/im... 80
4 90 [255, 216, 255, 224, 0, 16, 74, 70, 73, 70, 0,... pizza https://pwais.github.io/oarphpy-demo-assets/im... 120

Support For Live Library Code Updates

Spark supports live updates for code in notebook cells. Every time a notebook cell executes, Spark serializes the latest code (via cloudpickle) and sends it to the workers for execution. No need to restart the Spark session nor the notebook kernel. Thus you can keep entire RDDs and/or DataFrames in cluster memory and iteratively update code and recompute things as needed.

Similary, oarhpy.NBSpark supports live updates for library code! By default, NBSpark hooks into the notebook environment and automatically creates and ships a new Egg whenenever there are updated files on disk. (There are some potential small performance hits, see NBSpark.MAYBE_REBUILD_EGG_EVERY_CELL_RUN for details and a toggle). Note that in a non-notebook, script-oriented workflow, oarphpy.SparkSessions will Egg-ify user code every time a script / Spark Session launches, so live updates are not typically useful.

To demonstrate this feature of oarhpy.NBSpark, we'll update our library code in-place and add a new image attribute / column:

In [13]:
os.chdir(CUSTOM_LIB_SRC_DIR)
In [14]:
%%writefile mymodule/imagewithanno.py
class ImageWithAnno(object):
    def __init__(self, url='', tag='', image_bytes=None, width=None, height=None):
        self.url = url
        self.tag = tag
        self.image_bytes = image_bytes
        self.width = width
        self.height = height
        
        from urllib.parse import urlparse
        self.domain = urlparse(url).netloc  # <------------------  A new attribute / column !!
    
    @staticmethod
    def create_from_url(url):
        import urllib.request
        image_bytes = bytearray(urllib.request.urlopen(url).read())
        
        # Read the image dimensions without actually decoding the jpeg
        from oarphpy.util import get_jpeg_size
        width, height = get_jpeg_size(image_bytes)
        return ImageWithAnno(
            url=url,
            image_bytes=image_bytes,
            width=width,
            height=height)
Overwriting mymodule/imagewithanno.py
In [15]:
os.chdir(old_cwd)
2020-02-02 09:29:45,348	oarph 6004 : Source has changed! Rebuilding Egg ...
2020-02-02 09:29:45,349	oarph 6004 : Using source root /tmp/tmpthfz06uk_oarphpy_demo/mymodule 
2020-02-02 09:29:45,350	oarph 6004 : Generating egg to /tmp/tmpffetxk5d_oarphpy_eggbuild ...
2020-02-02 09:29:45,360	oarph 6004 : ... done.  Egg at /tmp/tmpffetxk5d_oarphpy_eggbuild/mymodule-0.0.0-py3.6.egg

Now we just need to re-run the code that creates our DataFrame. Since Spark RDDs and DataFrames are constructed lazily (unless cache()ed or persist()ed), the code below makes Spark re-run the code that constructs image_anno_rdd, which in turn will execute code in the updated Egg with our local changes.

(Note that if you do cache() or persist() your RDD or DataFrame, Spark will not execute code in the new Egg until you do an unpersist()!)

In [17]:
df = spark.createDataFrame(image_anno_rdd)

# Check that our new column is there!
assert 'domain' in df.columns

df.toPandas()
Out[17]:
domain height image_bytes tag url width
0 pwais.github.io 53 [255, 216, 255, 224, 0, 16, 74, 70, 73, 70, 0,... beach https://pwais.github.io/oarphpy-demo-assets/im... 120
1 pwais.github.io 133 [255, 216, 255, 224, 0, 16, 74, 70, 73, 70, 0,... bridge https://pwais.github.io/oarphpy-demo-assets/im... 100
2 pwais.github.io 75 [255, 216, 255, 224, 0, 16, 74, 70, 73, 70, 0,... car https://pwais.github.io/oarphpy-demo-assets/im... 100
3 pwais.github.io 84 [255, 216, 255, 224, 0, 16, 74, 70, 73, 70, 0,... challah https://pwais.github.io/oarphpy-demo-assets/im... 80
4 pwais.github.io 90 [255, 216, 255, 224, 0, 16, 74, 70, 73, 70, 0,... pizza https://pwais.github.io/oarphpy-demo-assets/im... 120

Testing

The features above are tested extensively:

In [ ]: