Iterative development with simple user code in Spark is easy: just declare some functions, create an RDD or DataFrame and go! (See Spark's own demos here). However, if you need to iterate on a whole library of user code, some careful setup and configuraton is necessary.
oarphpy
offers helpful automation: oarphpy
-managed Spark sessions automagically include a Python Egg containing the entire library wrapping the session instantiator. You can also just point oarphpy
at a specific directory and go. For notebooks, oarphpy
supports live library code updates, without having to restart the Spark session or notebook kernel.
This notebook demonstrates using oarphpy
-managed Spark with a toy library of user code: a web scraper.
To run this notebook locally, try using the oarphpy/full
dockerized environment:
docker run -it --rm --net=host oarphpy/full:0.0.2 jupyter notebook --allow-root
Google Colab You can also run this notebook in Google Colab. In the Colab environment, you'll need to install oarphpy
, Spark, and Java. Running the cell below will take care of that for you. You might need to restart the runtime (Use the menu option: Runtime > Restart runtime ...) in order for Colab to recognize the new modules.
import os
import sys
if 'google.colab' in sys.modules:
!pip install oarphpy[spark]==0.0.3
!pip install pyspark==2.4.4
!apt-get update && apt-get install -y openjdk-8-jdk
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
Our custom library will scrape images and 'tags' from simple HTML tables like this one:
beach | bridge | car |
We'll store scraped images in a table where each row stores a single image and its annotations. We'll use Spark here as a tool to Extract data from webpages, Transform it into our own data structure(s) / schemas, and Load the transformed rows into some data store (e.g. the local filesystem).
The code for our custom library could live anywhere, but for this demo we're going to put the code in an arbitrary temporary directory. The cell below sets that up.
Aside: When running a Jupyter notebook, the current working directory of the Jupyter process is implicitly included in the PYTHONPATH
. (This is a feature, not a bug!) We're going to put our custom library in a random temporary directory to isolate it from Jupyter and simulates having one's own code in a separate directory (e.g. perhaps the repository isolates library code from a notebooks
directory).
# Create a random temporary directory for our library
import os
import tempfile
old_cwd = os.getcwd()
tempdir = tempfile.TemporaryDirectory(suffix='_oarphpy_demo')
CUSTOM_LIB_SRC_DIR = tempdir.name
print("Putting demo assets in %s" % CUSTOM_LIB_SRC_DIR)
os.chdir(CUSTOM_LIB_SRC_DIR)
!mkdir -p mymodule
!touch mymodule/__init__.py
Now let's write the library. Here's our webpage-scraping code, which simply uses BeautifulSoup to parse image urls and tags from the pages:
!pip3 install bs4
%%writefile mymodule/scrape.py
def get_image_tag_pairs(url):
import urllib.request
from urllib.parse import urljoin
from urllib.parse import urlparse
from bs4 import BeautifulSoup
raw_page = urllib.request.urlopen(url).read()
soup = BeautifulSoup(raw_page, features="html.parser")
for td in soup.find_all('td'):
tag = td.text.strip()
img_src = td.find('img')['src']
if not urlparse(img_src).scheme:
img_src = urljoin(url, img_src)
yield tag, img_src
We'll abstract scraped image-tag pairs into a class ImageWithAnno
, which will also represent a single row in our final table. In this simple demo, this class mainly helps us encapsulate the code for constructing a table row.
%%writefile mymodule/imagewithanno.py
class ImageWithAnno(object):
def __init__(self, url='', tag='', image_bytes=None, width=None, height=None):
self.url = url
self.tag = tag
self.image_bytes = image_bytes
self.width = width
self.height = height
@staticmethod
def create_from_url(url):
import urllib.request
image_bytes = bytearray(urllib.request.urlopen(url).read())
# Read the image dimensions without actually decoding the jpeg
from oarphpy.util import get_jpeg_size
width, height = get_jpeg_size(image_bytes)
return ImageWithAnno(
url=url,
image_bytes=image_bytes,
width=width,
height=height)
Now we're done!
print("Changing working directory back to %s" % old_cwd)
os.chdir(old_cwd)
Let's do a brief test of the library in this notebook. As is the custom, we modify sys.path
to make our library importable to the notebook's Python
process.
sys.path.append(CUSTOM_LIB_SRC_DIR)
from mymodule.scrape import get_image_tag_pairs
test_pairs = list(get_image_tag_pairs('https://pwais.github.io/oarphpy-demo-assets/image_with_tags/page1.html'))
test_labels = [l for l, img in test_pairs]
assert set(['beach', 'bridge', 'car']) == set(test_labels), \
"Got unexpected labels %s" % (test_labels,)
Now we're ready to scrape! For this demo, we'll scrape these pages:
PAGE_URLS = (
'https://pwais.github.io/oarphpy-demo-assets/image_with_tags/page1.html',
'https://pwais.github.io/oarphpy-demo-assets/image_with_tags/page2.html',
)
Now we'll start a Spark session using oarphpy
, which will automagically Egg-ify our custom library and ship that Python Egg with our job. By default oarphpy
tries to Egg-ify the library surrounding the calling code; this feature is ideal when running Spark jobs from scripts where the scripts and the library code live in the same Python module. For this demo, we just need to point oarphpy
at our temp directory:
from oarphpy.spark import NBSpark
NBSpark.SRC_ROOT = os.path.join(CUSTOM_LIB_SRC_DIR, 'mymodule')
spark = NBSpark.getOrCreate()
Running the cell above should log messages confirming that oarphpy
Egg-ified our code and gave it to Spark. Let's now prove that step worked. We'll show that when we import code from our library on the Spark worker (which in this case is a local Python instance running distinct from the notebook), the import works and the imported code is coming from the oarphpy
-generated Egg. (This feature even has an explicit unit test in oarphpy
!).
def test_mymodule_is_included():
import mymodule
return mymodule.__file__
from oarphpy import spark as S
mod_paths = S.for_each_executor(spark, test_mymodule_is_included)
print("Loaded mymodule from %s" % (mod_paths,))
assert all('.egg' in p for p in mod_paths)
We'll now run a scraping job using Spark's RDD API, which is the easiest way to leverage our custom library.
url_rdd = spark.sparkContext.parallelize(PAGE_URLS)
print("RDD of pages to scrape: %s" % (url_rdd.collect()))
from mymodule.scrape import get_image_tag_pairs
tag_img_url_rdd = url_rdd.flatMap(get_image_tag_pairs)
# NB: `get_image_tag_pairs` gets sent to Spark workers via `cloudpickle`
def to_image_anno(tag, img_url):
from mymodule.imagewithanno import ImageWithAnno
imganno = ImageWithAnno.create_from_url(img_url)
imganno.tag = tag
return imganno
image_anno_rdd = tag_img_url_rdd.map(lambda pair: to_image_anno(*pair))
num_images = image_anno_rdd.count()
print("Scraped %s images" % num_images)
assert num_images == 5, "Unexpectedly saw %s images" % num_images
Now let's create and save that table! Spark can automatically convert some simple Python objects to DataFrame table rows. (oarphpy
offers tools for slotted classes, numpy
arrays, and more in RowAdapter
). We'll leverage Spark's built-in type munging and Parquet support below:
df = spark.createDataFrame(image_anno_rdd)
# Save the results
df.write.parquet('/tmp/demo_results', mode='overwrite')
# Show the results using df.show() or as a Pandas Dataframe, which has pretty printing support in Jupyter.
df.toPandas()
Spark supports live updates for code in notebook cells. Every time a notebook cell executes, Spark serializes the latest code (via cloudpickle) and sends it to the workers for execution. No need to restart the Spark session nor the notebook kernel. Thus you can keep entire RDDs and/or DataFrames in cluster memory and iteratively update code and recompute things as needed.
Similary, oarhpy.NBSpark
supports live updates for library code! By default, NBSpark
hooks into the notebook environment and automatically creates and ships a new Egg whenenever there are updated files on disk. (There are some potential small performance hits, see NBSpark.MAYBE_REBUILD_EGG_EVERY_CELL_RUN
for details and a toggle). Note that in a non-notebook, script-oriented workflow, oarphpy.SparkSession
s will Egg-ify user code every time a script / Spark Session launches, so live updates are not typically useful.
To demonstrate this feature of oarhpy.NBSpark
, we'll update our library code in-place and add a new image attribute / column:
os.chdir(CUSTOM_LIB_SRC_DIR)
%%writefile mymodule/imagewithanno.py
class ImageWithAnno(object):
def __init__(self, url='', tag='', image_bytes=None, width=None, height=None):
self.url = url
self.tag = tag
self.image_bytes = image_bytes
self.width = width
self.height = height
from urllib.parse import urlparse
self.domain = urlparse(url).netloc # <------------------ A new attribute / column !!
@staticmethod
def create_from_url(url):
import urllib.request
image_bytes = bytearray(urllib.request.urlopen(url).read())
# Read the image dimensions without actually decoding the jpeg
from oarphpy.util import get_jpeg_size
width, height = get_jpeg_size(image_bytes)
return ImageWithAnno(
url=url,
image_bytes=image_bytes,
width=width,
height=height)
os.chdir(old_cwd)
Now we just need to re-run the code that creates our DataFrame
. Since Spark RDDs and DataFrames are constructed lazily (unless cache()ed
or persist()ed
), the code below makes Spark re-run the code that constructs image_anno_rdd
, which in turn will execute code in the updated Egg with our local changes.
(Note that if you do cache()
or persist()
your RDD or DataFrame, Spark will not execute code in the new Egg until you do an unpersist()
!)
df = spark.createDataFrame(image_anno_rdd)
# Check that our new column is there!
assert 'domain' in df.columns
df.toPandas()
The features above are tested extensively:
oarphpy
's custom full
environment as well as a vanilla Spark environment.