Model Training with Aerospike Feature Store
For an interactive Jupyter notebook experience
This notebook is the second in the series of notebooks that show how Aerospike can be used as a feature store.
This notebook requires the Aerospike Database and Spark running locally with the Aerospike Spark Connector. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.
Introduction
This notebook shows how Aerospike can be used as a Feature Store for Machine Learning applications on Spark using Aerospike Spark Connector. It is Part 2 of the Feature Store series of notebooks, and focuses on Model Training aspects concerning a Feature Store. The first notebook in the series discusses Feature Engineering, and the next one describes Model Serving.

This notebook is organized as follows:
- Summary of the prior (Data Engineering) notebook
- Exploring features and datasets
- Defining and saving a dataset
- Training and saving an AI/ML model
Prerequisites
This tutorial assumes familiarity with the following topics:
Setup
Set up Aerospike Server. Spark Server, and Spark Connector.
Ensure Database Is Running
This notebook requires that Aerospike database is running.
!asd >& /dev/null!pgrep -x asd >/dev/null && echo "Aerospike database is running!" || echo "**Aerospike database is not running!**"
Aerospike database is running!
Initialize Spark
We will be using Spark functionality in this notebook.
Initialize Paths and Env Variables
# directory where spark notebook requisites are installed#SPARK_NB_DIR = '/home/jovyan/notebooks/spark'SPARK_NB_DIR = '/opt/spark-nb'SPARK_HOME = SPARK_NB_DIR + '/spark-3.0.3-bin-hadoop3.2'
# IP Address or DNS name for one host in your Aerospike clusterAS_HOST ="localhost"# Name of one of your namespaces. Type 'show namespaces' at the aql prompt if you are not sureAS_NAMESPACE = "test"AEROSPIKE_SPARK_JAR_VERSION="3.2.0"AS_PORT = 3000 # Usually 3000, but change here if notAS_CONNECTION_STRING = AS_HOST + ":"+ str(AS_PORT)
# Next we locate the Spark installation - this will be found using the SPARK_HOME environment variable that you will have setimport findsparkfindspark.init(SPARK_HOME)
# Aerospike Spark Connector related settingsimport osAEROSPIKE_JAR_PATH= "aerospike-spark-assembly-"+AEROSPIKE_SPARK_JAR_VERSION+".jar"os.environ["PYSPARK_SUBMIT_ARGS"] = '--jars ' + SPARK_NB_DIR + '/' + AEROSPIKE_JAR_PATH + ' pyspark-shell'
Configure Spark Session
Please visit Configuring Aerospike Connect for Spark for more information about the properties used on this page.
# importsimport pysparkfrom pyspark.context import SparkContextfrom pyspark.sql.context import SQLContextfrom pyspark.sql.session import SparkSessionfrom pyspark.sql.types import StringType, StructField, StructType, ArrayType, IntegerType, MapType, LongType, DoubleType
sc = SparkContext.getOrCreate()conf=sc._conf.setAll([("aerospike.namespace",AS_NAMESPACE),("aerospike.seedhost",AS_CONNECTION_STRING)])sc.stop()sc = pyspark.SparkContext(conf=conf)spark = SparkSession(sc)sqlContext = SQLContext(sc)
Access Shell Commands
You may execute shell commands including Aerospike tools like aql and asadm in the terminal tab throughout this tutorial. Open a terminal tab by selecting File->Open from the notebook menu, and then New->Terminal.
Context from Part 1 (Feature Engineering Notebook)
In the previous notebook in the Feature Store series, we showed how features engineered using the Spark platform can be efficiently stored in Aerospike feature store. We implemented a simple example feature store interface that leverages the Aerospike Spark connector capabilities for this purpose. We implemented a simple object model to save and query features, and illustrated its use with two examples.
You are encouraged to review the Feature Engineering notebook as we will use the same object model, implementation (with some extensions), and data in this notebook.
The code from Part 1 is replicated below as we will be using it later.
Code: Feature Group, Feature, and Entity
Below, we have copied over the code for Feature Group, Feature, and Entity classes for use in the following sections. Please review the object model described in the Feature Engineering notebook.
import copy
# Feature Groupclass FeatureGroup: schema = StructType([StructField("name", StringType(), False), StructField("description", StringType(), True), StructField("source", StringType(), True), StructField("attrs", MapType(StringType(), StringType()), True), StructField("tags", ArrayType(StringType()), True)])
def __init__(self, name, description, source, attrs, tags): self.name = name self.description = description self.source = source self.attrs = attrs self.tags = tags return
def __str__(self): return str(self.__class__) + ": " + str(self.__dict__)
def save(self): inputBuf = [(self.name, self.description, self.source, self.attrs, self.tags)] inputRDD = spark.sparkContext.parallelize(inputBuf) inputDF = spark.createDataFrame(inputRDD, FeatureGroup.schema) #Write the data frame to Aerospike, the name field is used as the primary key inputDF.write \ .mode('overwrite') \ .format("aerospike") \ .option("aerospike.writeset", "fg-metadata")\ .option("aerospike.updateByKey", "name") \ .save() return
def load(name): fg = None schema = copy.deepcopy(FeatureGroup.schema) schema.add("__key", StringType(), False) fgdf = spark.read \ .format("aerospike") \ .option("aerospike.set", "fg-metadata") \ .schema(schema) \ .load().where("__key = \"" + name + "\"") if fgdf.count() > 0: fgtuple = fgdf.collect()[0] fg = FeatureGroup(*fgtuple[:-1]) return fg
def query(predicate): #returns a dataframe fg_df = spark.read \ .format("aerospike") \ .schema(FeatureGroup.schema) \ .option("aerospike.set", "fg-metadata") \ .load().where(predicate) return fg_df
# Featureclass Feature: schema = StructType([StructField("fid", StringType(), False), StructField("fgname", StringType(), False), StructField("name", StringType(), False), StructField("type", StringType(), False), StructField("description", StringType(), True), StructField("attrs", MapType(StringType(), StringType()), True), StructField("tags", ArrayType(StringType()), True)])
def __init__(self, fgname, name, ftype, description, attrs, tags): self.fid = fgname + '_' + name self.fgname = fgname self.name = name self.ftype = ftype self.description = description self.attrs = attrs self.tags = tags return
def __str__(self): return str(self.__class__) + ": " + str(self.__dict__)
def save(self): inputBuf = [(self.fid, self.fgname, self.name, self.ftype, self.description, self.attrs, self.tags)] inputRDD = spark.sparkContext.parallelize(inputBuf) inputDF = spark.createDataFrame(inputRDD, Feature.schema) # Write the data frame to Aerospike, the fid field is used as the primary key inputDF.write \ .mode('overwrite') \ .format("aerospike") \ .option("aerospike.writeset", "feature-metadata")\ .option("aerospike.updateByKey", "fid") \ .save() return
def load(fgname, name): f = None schema = copy.deepcopy(Feature.schema) schema.add("__key", StringType(), False) f_df = spark.read \ .format("aerospike") \ .schema(schema) \ .option("aerospike.set", "feature-metadata") \ .load().where("__key = \"" + fgname+'_'+name + "\"") if f_df.count() > 0: f_tuple = f_df.collect()[0] f = Feature(*f_tuple[1:-1]) return f
def query(predicate, pushdown_expr=None): #returns a dataframe f_df = spark.read \ .format("aerospike") \ .schema(Feature.schema) \ .option("aerospike.set", "feature-metadata") # see the section on pushdown expressions if pushdown_expr: f_df = f_df.option("aerospike.pushdown.expressions", pushdown_expr) \ .load() else: f_df = f_df.load().where(predicate) return f_df
# Entityclass Entity:
def __init__(self, etype, record, id_col): # record is an array of triples (name, type, value) self.etype = etype self.record = record self.id_col = id_col return
def __str__(self): return str(self.__class__) + ": " + str(self.__dict__)
def get_schema(record): schema = StructType() for f in record: schema.add(f[0], f[1], True) return schema
def get_id_type(schema, id_col): return schema[id_col].dataType.typeName()
def save(self, schema): fvalues = [f[2] for f in self.record] inputBuf = [tuple(fvalues)] inputRDD = spark.sparkContext.parallelize(inputBuf) inputDF = spark.createDataFrame(inputRDD, schema) #Write the data frame to Aerospike, the id_col field is used as the primary key inputDF.write \ .mode('overwrite') \ .format("aerospike") \ .option("aerospike.writeset", self.etype+'-features')\ .option("aerospike.updateByKey", self.id_col) \ .save() return
def load(etype, eid, schema, id_col): ent = None schema = copy.deepcopy(schema) schema.add("__key", StringType(), False) ent_df = spark.read \ .format("aerospike") \ .schema(schema) \ .option("aerospike.set", etype+'-features') \ .load().where("__key = \"" + eid + "\"") if ent_df.count() > 0: ent_tuple = ent_df.collect()[0] record = [(schema[i].name, schema[i].dataType.typeName(), fv) for i, fv in enumerate(ent_tuple[:-1])] ent = Entity(etype, record, id_col) return ent
def saveDF(df, etype, id_col): # save a dataframe # df: dataframe consisting of entiry records # etype: entity type (such as user or sensor) # id_col: column name that holds the primary key #Write the data frame to Aerospike, the column in id_col is used as the key bin df.write \ .mode('overwrite') \ .format("aerospike") \ .option("aerospike.writeset", etype+'-features')\ .option("aerospike.updateByKey", id_col) \ .save() return
def query(etype, predicate, schema, id_col): #returns a dataframe ent_df = spark.read \ .format("aerospike") \ .schema(schema) \ .option("aerospike.set", etype+'-features') \ .load().where(predicate) return ent_df
def get_feature_vector(etype, eid, feature_list): # elements in feature_list are in "fgname_name" form # deferred to Model Serving tutorial pass
# clear the database by truncating the namespace test!aql -c "truncate test"
truncate testOK
Create set indexes on all sets.
!asinfo -v "set-config:context=namespace;id=test;set=fg-metadata;enable-index=true"!asinfo -v "set-config:context=namespace;id=test;set=feature-metadata;enable-index=true"!asinfo -v "set-config:context=namespace;id=test;set=dataset-metadata;enable-index=true"#!asinfo -v "set-config:context=namespace;id=test;set=cctxn-features;enable-index=true"
okokok
# test feature group# test save and load# savefg1 = FeatureGroup("fg_name1", "fg_desc1", "fg_source1", {"etype":"etype1", "key":"feature1"}, ["tag1", "tag2"])fg1.save()# loadfg2 = FeatureGroup.load("fg_name1")print("Feature group with name fg_name1:")print(fg2, '\n')# test queryfg2 = FeatureGroup("fg_name2", "fg_desc2", "fg_source2", {"etype":"etype1", "key":"fname1"}, ["tag1", "tag3"])fg2.save()fg3 = FeatureGroup("fg_name3", "fg_desc3", "fg_source3", {"etype":"etype2", "key":"fname3"}, ["tag4", "tag5"])fg3.save()# query 1print("Feature groups with a description containing 'desc':")fg_df = FeatureGroup.query("description like '%desc%'")fg_df.show()# query 2print("Feature groups with the source 'fg_source2':")fg_df = FeatureGroup.query("source = 'fg_source2'")fg_df.show()# query 3print("Feature groups with the attribute 'etype'='etype2':")fg_df = FeatureGroup.query("attrs.etype = 'etype2'")fg_df.show()# query 4print("Feature groups with a tag 'tag1':")fg_df = FeatureGroup.query("array_contains(tags, 'tag1')")fg_df.show()
Feature group with name fg_name1:<class '__main__.FeatureGroup'>: {'name': 'fg_name1', 'description': 'fg_desc1', 'source': 'fg_source1', 'attrs': {'etype': 'etype1', 'key': 'feature1'}, 'tags': ['tag1', 'tag2']}
Feature groups with a description containing 'desc':+--------+-----------+----------+--------------------+------------+| name|description| source| attrs| tags|+--------+-----------+----------+--------------------+------------+|fg_name2| fg_desc2|fg_source2|[etype -> etype1,...|[tag1, tag3]||fg_name3| fg_desc3|fg_source3|[etype -> etype2,...|[tag4, tag5]||fg_name1| fg_desc1|fg_source1|[etype -> etype1,...|[tag1, tag2]|+--------+-----------+----------+--------------------+------------+
Feature groups with the source 'fg_source2':+--------+-----------+----------+--------------------+------------+| name|description| source| attrs| tags|+--------+-----------+----------+--------------------+------------+|fg_name2| fg_desc2|fg_source2|[etype -> etype1,...|[tag1, tag3]|+--------+-----------+----------+--------------------+------------+
Feature groups with the attribute 'etype'='etype2':+--------+-----------+----------+--------------------+------------+| name|description| source| attrs| tags|+--------+-----------+----------+--------------------+------------+|fg_name3| fg_desc3|fg_source3|[etype -> etype2,...|[tag4, tag5]|+--------+-----------+----------+--------------------+------------+
Feature groups with a tag 'tag1':+--------+-----------+----------+--------------------+------------+| name|description| source| attrs| tags|+--------+-----------+----------+--------------------+------------+|fg_name2| fg_desc2|fg_source2|[etype -> etype1,...|[tag1, tag3]||fg_name1| fg_desc1|fg_source1|[etype -> etype1,...|[tag1, tag2]|+--------+-----------+----------+--------------------+------------+
# test feature# test save and load# savefeature1 = Feature("fgname1", "f_name1", "integer", "f_desc1", {"etype":"etype1", "f_attr1":"v1"}, ["f_tag1", "f_tag2"])feature1.save()# loadf1 = Feature.load("fgname1", "f_name1")print("Feature with group 'fgname1' and name 'f_name1:")print(f1, '\n')# test queryfeature2 = Feature("fgname1", "f_name2", "double", "f_desc2", {"etype":"etype1", "f_attr1":"v2"}, ["f_tag1", "f_tag3"])feature2.save()feature3 = Feature("fgname2", "f_name3", "double", "f_desc3", {"etype":"etype2", "f_attr2":"v3"}, ["f_tag2", "f_tag4"])feature3.save()# query 1print("Features in feature group 'fg_name1':")f_df = Feature.query("fgname = 'fgname1'")f_df.show()# query 2print("Features of type 'integer':")f_df = Feature.query("type = 'integer'")f_df.show()# query 3print("Features with the attribute 'etype'='etype1':")f_df = Feature.query("attrs.etype = 'etype1'")f_df.show()# query 3print("Features with the tag 'f_tag2':")f_df = Feature.query("array_contains(tags, 'f_tag2')")f_df.show()
Feature with group 'fgname1' and name 'f_name1:<class '__main__.Feature'>: {'fid': 'fgname1_f_name1', 'fgname': 'fgname1', 'name': 'f_name1', 'ftype': 'integer', 'description': 'f_desc1', 'attrs': {'etype': 'etype1', 'f_attr1': 'v1'}, 'tags': ['f_tag1', 'f_tag2']}
Features in feature group 'fg_name1':+---------------+-------+-------+-------+-----------+--------------------+----------------+| fid| fgname| name| type|description| attrs| tags|+---------------+-------+-------+-------+-----------+--------------------+----------------+|fgname1_f_name1|fgname1|f_name1|integer| f_desc1|[etype -> etype1,...|[f_tag1, f_tag2]||fgname1_f_name2|fgname1|f_name2| double| f_desc2|[etype -> etype1,...|[f_tag1, f_tag3]|+---------------+-------+-------+-------+-----------+--------------------+----------------+
Features of type 'integer':+---------------+-------+-------+-------+-----------+--------------------+----------------+| fid| fgname| name| type|description| attrs| tags|+---------------+-------+-------+-------+-----------+--------------------+----------------+|fgname1_f_name1|fgname1|f_name1|integer| f_desc1|[etype -> etype1,...|[f_tag1, f_tag2]|+---------------+-------+-------+-------+-----------+--------------------+----------------+
Features with the attribute 'etype'='etype1':+---------------+-------+-------+-------+-----------+--------------------+----------------+| fid| fgname| name| type|description| attrs| tags|+---------------+-------+-------+-------+-----------+--------------------+----------------+|fgname1_f_name1|fgname1|f_name1|integer| f_desc1|[etype -> etype1,...|[f_tag1, f_tag2]||fgname1_f_name2|fgname1|f_name2| double| f_desc2|[etype -> etype1,...|[f_tag1, f_tag3]|+---------------+-------+-------+-------+-----------+--------------------+----------------+
Features with the tag 'f_tag2':+---------------+-------+-------+-------+-----------+--------------------+----------------+| fid| fgname| name| type|description| attrs| tags|+---------------+-------+-------+-------+-----------+--------------------+----------------+|fgname1_f_name1|fgname1|f_name1|integer| f_desc1|[etype -> etype1,...|[f_tag1, f_tag2]||fgname2_f_name3|fgname2|f_name3| double| f_desc3|[etype -> etype2,...|[f_tag2, f_tag4]|+---------------+-------+-------+-------+-----------+--------------------+----------------+
# test Entity# test save and load# savefeatures1 = [('fg1_f_name1', IntegerType(), 1), ('fg1_f_name2', DoubleType(), 2.0), ('fg1_f_name3', StringType(), 'three')]record1 = [('eid', StringType(), 'eid1')] + features1ent1 = Entity('entity_type1', record1, 'eid')schema = Entity.get_schema(record1)ent1.save(schema);# loade1 = Entity.load('entity_type1', 'eid1', schema, 'eid')print("Entity of type 'entity_type1' and id 'eid1':")print(e1, '\n')# test queryfeatures2 = [('fg1_f_name1', IntegerType(), 10), ('fg1_f_name2', DoubleType(), 20.0), ('fg1_f_name3', StringType(), 'thirty')]record2 = [('eid', StringType(), 'eid2')] + features2ent2 = Entity('entity_type2', record2, 'eid')ent2.save(schema);# query 1print("Instances of entity type entity_type1 with id ending in 1:")instances = Entity.query('entity_type1', 'eid like "%1"', schema, 'eid')instances.show()# query 2print("Instances of entity type entity_type2 meeting the specified condition:")instances = Entity.query('entity_type2', 'eid in ("eid2")', schema, 'eid')instances.show()
Entity of type 'entity_type1' and id 'eid1':<class '__main__.Entity'>: {'etype': 'entity_type1', 'record': [('eid', 'string', 'eid1'), ('fg1_f_name1', 'integer', 1), ('fg1_f_name2', 'double', 2.0), ('fg1_f_name3', 'string', 'three')], 'id_col': 'eid'}
Instances of entity type entity_type1 with id ending in 1:+----+-----------+-----------+-----------+| eid|fg1_f_name1|fg1_f_name2|fg1_f_name3|+----+-----------+-----------+-----------+|eid1| 1| 2.0| three|+----+-----------+-----------+-----------+
Instances of entity type entity_type2 meeting the specified condition:+----+-----------+-----------+-----------+| eid|fg1_f_name1|fg1_f_name2|fg1_f_name3|+----+-----------+-----------+-----------+|eid2| 10| 20.0| thirty|+----+-----------+-----------+-----------+
Feature Data: Credit Card Transactions
The following cell populates the data from Part 1 in the database for use below.
Read and Transform Data
# read and transform the sample credit card transactions data from a csv filefrom pyspark.sql.functions import exprdf = spark.read.options(header="True", inferSchema="True") \ .csv("resources/creditcard_small.csv") \ . orderBy(['_c0'], ascending=[True])new_col_names = ['CC1_' + (c if c != '_c0' else 'OldIdx') for c in df.columns]df = df.toDF(*new_col_names) \ .withColumn('TxnId', expr('CC1_OldIdx+1').cast(StringType())) \ .select(['TxnId','CC1_Class','CC1_Amount']+['CC1_V'+str(i) for i in range(1,29)])df.toPandas().head()
TxnId | CC1_Class | CC1_Amount | CC1_V1 | CC1_V2 | CC1_V3 | CC1_V4 | CC1_V5 | CC1_V6 | CC1_V7 | … | CC1_V19 | CC1_V20 | CC1_V21 | CC1_V22 | CC1_V23 | CC1_V24 | CC1_V25 | CC1_V26 | CC1_V27 | CC1_V28 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 1 | 0 | 149.62 | -1.359807 | -0.072781 | 2.536347 | 1.378155 | -0.338321 | 0.462388 | 0.239599 | … | 0.403993 | 0.251412 | -0.018307 | 0.277838 | -0.110474 | 0.066928 | 0.128539 | -0.189115 | 0.133558 | -0.021053 |
1 | 2 | 0 | 2.69 | 1.191857 | 0.266151 | 0.166480 | 0.448154 | 0.060018 | -0.082361 | -0.078803 | … | -0.145783 | -0.069083 | -0.225775 | -0.638672 | 0.101288 | -0.339846 | 0.167170 | 0.125895 | -0.008983 | 0.014724 |
2 | 3 | 0 | 378.66 | -1.358354 | -1.340163 | 1.773209 | 0.379780 | -0.503198 | 1.800499 | 0.791461 | … | -2.261857 | 0.524980 | 0.247998 | 0.771679 | 0.909412 | -0.689281 | -0.327642 | -0.139097 | -0.055353 | -0.059752 |
3 | 4 | 0 | 123.50 | -0.966272 | -0.185226 | 1.792993 | -0.863291 | -0.010309 | 1.247203 | 0.237609 | … | -1.232622 | -0.208038 | -0.108300 | 0.005274 | -0.190321 | -1.175575 | 0.647376 | -0.221929 | 0.062723 | 0.061458 |
4 | 5 | 0 | 69.99 | -1.158233 | 0.877737 | 1.548718 | 0.403034 | -0.407193 | 0.095921 | 0.592941 | … | 0.803487 | 0.408542 | -0.009431 | 0.798278 | -0.137458 | 0.141267 | -0.206010 | 0.502292 | 0.219422 | 0.215153 |
5 rows × 31 columns
Save Features
Insert the credit card transaction features in the feature store.
# 1. Create a feature group.FG_NAME = 'CC1'FG_DESCRIPTION = 'Credit card transaction data'FG_SOURCE = 'European cardholder dataset from Kaggle'fg = FeatureGroup(FG_NAME, FG_DESCRIPTION, FG_SOURCE, attrs={'entity':'cctxn', 'class':'fraud'}, tags=['kaggle', 'demo'])fg.save()
# 2. Create feature metadataFEATURE_AMOUNT = 'Amount'f = Feature(FG_NAME, FEATURE_AMOUNT, 'double', "Transaction amount", attrs={'entity':'cctxn'}, tags=['usd'])f.save()FEATURE_CLASS = 'Class'f = Feature(FG_NAME, FEATURE_CLASS, 'integer', "Label indicating fraud or not", attrs={'entity':'cctxn'}, tags=['label'])f.save()FEATURE_PCA_XFORM = "V"for i in range(1,29): f = Feature(FG_NAME, FEATURE_PCA_XFORM+str(i), 'double', "Transformed version of PCA", attrs={'entity':'cctxn'}, tags=['pca']) f.save()
# 3. Save feature values in entity recordsENTITY_TYPE = 'cctxn'ID_COLUMN = 'TxnId'Entity.saveDF(df, ENTITY_TYPE, ID_COLUMN)print('Features stored to Feature Store.')
Features stored to Feature Store.
Implementing Dataset
We created example implementations of Feature Group, Feature, and Entity objects as above. Let us now create a similar implementation of Dataset.
Object Model
A dataset is a subset of features and entities selected for an ML model. A Dataset object holds the selected features and entity instances. The actual (materialized) copy of entity records is stored outside the feature store (for instance, in a file system).
Attributes
A dataset record has the following attributes.
- name: name of the data set, serves as the primary key for the record
- description: human readable description
- features: a list of the dataset features
- predicate: query predicate to enumerate the entity instances in the dataset
- location: external location where the dataset is stored
- attrs: other metadata
- tags: associated tags
Datasets are stored in the set “dataset-metadata”.
Operations
Dataset is used during Model Training. The following operations are needed.
- create
- load (get)
- query (returns dataset metadata records)
- materialize (returns entity records as defined by a dataset)
Dataset Implementation
Below is an example implementation of Dataset as described above.
# Datasetclass Dataset: schema = StructType([StructField("name", StringType(), False), StructField("description", StringType(), True), StructField("entity", StringType(), False), StructField("id_col", StringType(), False), StructField("id_type", StringType(), False), StructField("features", ArrayType(StringType()), True), StructField("query", StringType(), True), StructField("location", StringType(), True), StructField("attrs", MapType(StringType(), StringType()), True), StructField("tags", ArrayType(StringType()), True)])
def __init__(self, name, description, entity, id_col, id_type, features, query, location, attrs, tags): self.name = name self.description = description self.entity = entity self.id_col = id_col self.id_type = id_type self.features = features self.query = query self.location = location self.attrs = attrs self.tags = tags return
def __str__(self): return str(self.__class__) + ": " + str(self.__dict__)
def save(self): inputBuf = [(self.name, self.description, self.entity, self.id_col, self.id_type, self.features, self.query, self.location, self.attrs, self.tags)] inputRDD = spark.sparkContext.parallelize(inputBuf) inputDF = spark.createDataFrame(inputRDD, Dataset.schema) #Write the data frame to Aerospike, the name field is used as the primary key inputDF.write \ .mode('overwrite') \ .format("aerospike") \ .option("aerospike.writeset", "dataset-metadata")\ .option("aerospike.updateByKey", "name") \ .save() return
def load(name): dataset = None ds_df = spark.read \ .format("aerospike") \ .option("aerospike.set", "dataset-metadata") \ .schema(Dataset.schema) \ .option("aerospike.updateByKey", "name") \ .load().where("name = \"" + name + "\"") if ds_df.count() > 0: dstuple = ds_df.collect()[0] dataset = Dataset(*dstuple) return dataset
def query(predicate): #returns a dataframe ds_df = spark.read \ .format("aerospike") \ .schema(Dataset.schema) \ .option("aerospike.set", "dataset-metadata") \ .load().where(predicate) return ds_df
def features_to_schema(entity, id_col, id_type, features): def convert_field_type(ftype): return DoubleType() if ftype == 'double' \ else (IntegerType() if ftype in ['integer','long'] \ else StringType()) schema = StructType() schema.add(id_col, convert_field_type(id_type), False) for fid in features: sep = fid.find('_') f = Feature.load(fid[:sep] if sep != -1 else "", fid[sep+1:]) if f: schema.add(f.fid, convert_field_type(f.ftype), True) return schema
def materialize_to_df(self): df = Entity.query(self.entity, self.query, Dataset.features_to_schema(self.entity, self.id_col, self.id_type, self.features), self.id_col) return df
# test Dataset# test save and load# savefeatures = ["CC1_Amount", "CC1_Class", "CC1_V1"]ds = Dataset("ds_test1", "Test dataset", "cctxn", "TxnId", "string", features, "CC1_Amount > 1500", "", {"risk":"high"}, ["test", "dataset"])ds.save()# loadds = Dataset.load("ds_test1")print("Dataset named 'ds_test1':")print(ds, '\n')# test queryprint("Datasets with attribute 'risk'='high' and tag 'test':")dsq_df = Dataset.query("attrs.risk == 'high' and array_contains(tags, 'test')")dsq_df.show()# test materialize_to_dfprint("Materialize dataset ds_test1 as defined above:")ds_df = ds.materialize_to_df()print("Records in the dataset: ", ds_df.count())ds_df.show(5)
Dataset named 'ds_test1':<class '__main__.Dataset'>: {'name': 'ds_test1', 'description': 'Test dataset', 'entity': 'cctxn', 'id_col': 'TxnId', 'id_type': 'string', 'features': ['CC1_Amount', 'CC1_Class', 'CC1_V1'], 'query': 'CC1_Amount > 1500', 'location': '', 'attrs': {'risk': 'high'}, 'tags': ['test', 'dataset']}
Datasets with attribute 'risk'='high' and tag 'test':+--------+------------+------+------+-------+--------------------+-----------------+--------+--------------+---------------+| name| description|entity|id_col|id_type| features| query|location| attrs| tags|+--------+------------+------+------+-------+--------------------+-----------------+--------+--------------+---------------+|ds_test1|Test dataset| cctxn| TxnId| string|[CC1_Amount, CC1_...|CC1_Amount > 1500| |[risk -> high]|[test, dataset]|+--------+------------+------+------+-------+--------------------+-----------------+--------+--------------+---------------+
Materialize dataset ds_test1 as defined above:Records in the dataset: 4+------+----------+---------+-----------------+| TxnId|CC1_Amount|CC1_Class| CC1_V1|+------+----------+---------+-----------------+| 6972| 1809.68| 1|-3.49910753739178|| 165| 3828.04| 0|-6.09324780457494||249168| 1504.93| 1|-1.60021129907252||176050| 2125.87| 1|-2.00345953080582|+------+----------+---------+-----------------+
Using Pushdown Expressions
In order to get best performance from the Aerospike feature store, one important optimization is to “push down” processing to the database and minimize the amount of data retrieved to Spark. This is especially important for querying from large amounts of underlying data, such as when creating a dataset. This is achieved by “pushing down” filters or processing filters in the database.
Currently the Spark Connector allows two mutually exclusive ways of specifying filters in a dataframe load:
- The
where
clause - The
pushdown expressions
option
Only one may be specified because the underlying Aerospike database mechanisms used to process them are different and exclusive. The latter takes precedence if both are specified.
The where
clause filter may be pushed down in part or fully depending on the parts in the filter (that is, if the database supports them and the Spark Connector takes advantage of it). The pushdown expression
filter however is fully processed in the database, which ensures best performance.
Aerospike expressions provide some filtering capabilities that are either not available on Spark (such as record metadata based filtering). Also, expression based filtering will be processed more efficiently in the database. On the other hand, the where
clause also has many capabilities that are not available in Aerospike expressions. So it may be necessary to use both, in which case it is best to use pushdown expressions to retrieve a dataframe, and then process it using the Spark dataframe capabilities.
Creating Pushdown Expressions
The Spark Connector currently requires the base64 encoding of the expression. Exporting the base64 encoded expression currently requires the Java client, which can be run in a parallel notebook, and entails the following steps:
-
Write the expression in Java.
-
Test the expression with the desired data.
-
Obtain the base64 encoding.
-
Use the base64 representation in this notebook as shown below.
You can run the adjunct notebook Pushdown Expressions for Spark Connector to follow the above recipe and obtain the base64 representation of an expression for use in the following examples.
Examples
We illustrate pushdown expressions with Feature
class queries, but the query
method implementation can be adopted in other objects.
The examples below illustrate the capabilities and process of working with pushdown expressions. More details on expressions are explained in Pushdown Expressions for Spark Connector notebook.
Records with Specific Tags
Examine the expression in Java:
Exp.gt( ListExp.getByValueList(ListReturnType.COUNT, Exp.val(new ArrayList<String>(Arrays.asList("label","f_tag1"))), Exp.listBin("tags")), Exp.val(0))
The outer expression compares for the value returned from the first argument to be greater than 0. The first argument is the count of matching tags from the specified tags in the list bin tags
.
Obtain the base64 representation from Pushdown Expressions for Spark Connector notebook. It is “kwOVfwIAkxcFkn6SpgNsYWJlbKcDZl90YWcxk1EEpHRhZ3MA”
base64_expr = "kwOVfwIAkxcFkn6SpgNsYWJlbKcDZl90YWcxk1EEpHRhZ3MA"f_df = Feature.query(None, pushdown_expr=base64_expr)f_df.toPandas()
fid | fgname | name | type | description | attrs | tags | |
---|---|---|---|---|---|---|---|
0 | fgname1_f_name1 | fgname1 | f_name1 | integer | f_desc1 | ’etype’: ‘etype1’, ‘f_attr1’: ‘v1’ | [f_tag1, f_tag2] |
1 | CC1_Class | CC1 | Class | integer | Label indicating fraud or not | ’entity’: ‘cctxn’ | [label] |
2 | fgname1_f_name2 | fgname1 | f_name2 | double | f_desc2 | ’etype’: ‘etype1’, ‘f_attr1’: ‘v2’ | [f_tag1, f_tag3] |
Records with Specific Attribute Value
Examine the expression in Java:
Exp.eq( MapExp.getByKey(MapReturnType.VALUE, Exp.Type.STRING, Exp.val("f_attr1"), Exp.mapBin("attrs")), Exp.val("v1"))
It would filter records having a key “f_attr1” with value “v1” from the map bin attrs
.
Obtain the base64 representation from Pushdown Expressions for Spark Connector notebook. It is “kwGVfwMAk2EHqANmX2F0dHIxk1EFpWF0dHJzowN2MQ==“.
base64_expr = "kwGVfwMAk2EHqANmX2F0dHIxk1EFpWF0dHJzowN2MQ=="f_df = Feature.query(None, pushdown_expr=base64_expr)f_df.toPandas()
fid | fgname | name | type | description | attrs | tags | |
---|---|---|---|---|---|---|---|
0 | fgname1_f_name1 | fgname1 | f_name1 | integer | f_desc1 | ’etype’: ‘etype1’, ‘f_attr1’: ‘v1’ | [f_tag1, f_tag2] |
Records with String Matching Pattern
Examine the expression in Java:
Exp.regexCompare("^c.*2$", RegexFlag.ICASE, Exp.stringBin("fid"))
It would filter records with fid starting with “c” and ending in “2” (case insensitive).
Obtain the base64 representation from Pushdown Expressions for Spark Connector notebook. It is “lAcCpl5DLioyJJNRA6NmaWQ=“.
base64_expr = "lAcCpl5DLioyJJNRA6NmaWQ="f_df = Feature.query(None, pushdown_expr=base64_expr)f_df.toPandas()
fid | fgname | name | type | description | attrs | tags | |
---|---|---|---|---|---|---|---|
0 | CC1_V2 | CC1 | V2 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
1 | CC1_V12 | CC1 | V12 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
2 | CC1_V22 | CC1 | V22 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
Exploring Features in Feature Store
Now let’s explore the features available in the Feature Store prior to using them to train a model. We will illustrate this with the querying functions on the metadata objects we have implemented above, as well as Spark functions.
Exploring Datasets
As we are interested in building a fraud detection model, let’s see if there are any existing datasets that have “fraud’ in their description. At present there should be no datasets in the database until we create and save one in later sections.
ds_df = Dataset.query("description like '%fraud%'")ds_df.show()
+----+-----------+------+------+-------+--------+-----+--------+-----+----+|name|description|entity|id_col|id_type|features|query|location|attrs|tags|+----+-----------+------+------+-------+--------+-----+--------+-----+----++----+-----------+------+------+-------+--------+-----+--------+-----+----+
Exploring Feature Groups
Let’s identify feature groups for the entity type “cctxn” (credit card transactions) that have an attribute “class”=“fraud”
fg_df = FeatureGroup.query("attrs.entity == 'cctxn' and attrs.class == 'fraud'")fg_df.toPandas().transpose().head()
0 | |
---|---|
name | CC1 |
description | Credit card transaction data |
source | European cardholder dataset from Kaggle |
attrs | ’class’: ‘fraud’, ‘entity’: ‘cctxn’ |
tags | [kaggle, demo] |
# View all available features in this feature groupf_df = Feature.query("fgname == 'CC1'")f_df.toPandas()
fid | fgname | name | type | description | attrs | tags | |
---|---|---|---|---|---|---|---|
0 | CC1_V23 | CC1 | V23 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
1 | CC1_V10 | CC1 | V10 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
2 | CC1_Class | CC1 | Class | integer | Label indicating fraud or not | ’entity’: ‘cctxn’ | [label] |
3 | CC1_V20 | CC1 | V20 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
4 | CC1_V16 | CC1 | V16 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
5 | CC1_V1 | CC1 | V1 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
6 | CC1_V6 | CC1 | V6 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
7 | CC1_V25 | CC1 | V25 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
8 | CC1_V9 | CC1 | V9 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
9 | CC1_V2 | CC1 | V2 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
10 | CC1_V3 | CC1 | V3 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
11 | CC1_V12 | CC1 | V12 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
12 | CC1_V21 | CC1 | V21 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
13 | CC1_V27 | CC1 | V27 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
14 | CC1_Amount | CC1 | Amount | double | Transaction amount | ’entity’: ‘cctxn’ | [usd] |
15 | CC1_V24 | CC1 | V24 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
16 | CC1_V7 | CC1 | V7 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
17 | CC1_V28 | CC1 | V28 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
18 | CC1_V4 | CC1 | V4 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
19 | CC1_V13 | CC1 | V13 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
20 | CC1_V17 | CC1 | V17 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
21 | CC1_V18 | CC1 | V18 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
22 | CC1_V26 | CC1 | V26 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
23 | CC1_V19 | CC1 | V19 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
24 | CC1_V14 | CC1 | V14 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
25 | CC1_V11 | CC1 | V11 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
26 | CC1_V8 | CC1 | V8 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
27 | CC1_V5 | CC1 | V5 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
28 | CC1_V22 | CC1 | V22 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
29 | CC1_V15 | CC1 | V15 | double | Transformed version of PCA | ’entity’: ‘cctxn’ | [pca] |
The features look promising for a fraud prediction model. Let’s look at the actual feature data and its characteristics by querying the entity records.
Exploring Feature Data
We can further explore the feature data to determine what features should be part of the dataset. The feature data resides in Entity records and we can use the above info to form the schema and retrieve the records.
Defining Schema
In order to query using the Aerospike Spark Connector, we must define the schema for the record.
# define the schema for the record.FG_NAME = 'CC1'ENTITY_TYPE = 'cctxn'ID_COLUMN = 'TxnId'FEATURE_AMOUNT = 'Amount'FEATURE_CLASS = 'Class'FEATURE_PCA_XFORM = "V"
schema = StructType([StructField(ID_COLUMN, StringType(), False), StructField(FG_NAME+'_'+FEATURE_CLASS, IntegerType(), False), StructField(FG_NAME+'_'+FEATURE_AMOUNT, DoubleType(), False)])for i in range(1,29): schema.add(FG_NAME+'_'+FEATURE_PCA_XFORM+str(i), DoubleType(), True)
Retrieving Data
Here we get all records from the sample data in the database. A small subset of the data would suffice in practice.
# let's get the entity records to assess the datatxn_df = Entity.query(ENTITY_TYPE, "TxnId like '%'", schema, "TxnId")print("Records retrieved: ", txn_df.count())txn_df.printSchema()
Records retrieved: 984root|-- TxnId: string (nullable = false)|-- CC1_Class: integer (nullable = false)|-- CC1_Amount: double (nullable = false)|-- CC1_V1: double (nullable = true)|-- CC1_V2: double (nullable = true)|-- CC1_V3: double (nullable = true)|-- CC1_V4: double (nullable = true)|-- CC1_V5: double (nullable = true)|-- CC1_V6: double (nullable = true)|-- CC1_V7: double (nullable = true)|-- CC1_V8: double (nullable = true)|-- CC1_V9: double (nullable = true)|-- CC1_V10: double (nullable = true)|-- CC1_V11: double (nullable = true)|-- CC1_V12: double (nullable = true)|-- CC1_V13: double (nullable = true)|-- CC1_V14: double (nullable = true)|-- CC1_V15: double (nullable = true)|-- CC1_V16: double (nullable = true)|-- CC1_V17: double (nullable = true)|-- CC1_V18: double (nullable = true)|-- CC1_V19: double (nullable = true)|-- CC1_V20: double (nullable = true)|-- CC1_V21: double (nullable = true)|-- CC1_V22: double (nullable = true)|-- CC1_V23: double (nullable = true)|-- CC1_V24: double (nullable = true)|-- CC1_V25: double (nullable = true)|-- CC1_V26: double (nullable = true)|-- CC1_V27: double (nullable = true)|-- CC1_V28: double (nullable = true)
Examining Data
We will examine the statistical properties as well as null values of the feature columns. Note, the column CC1_Class is the label (fraud or not).
# examine the statistical propertiestxn_df.describe().toPandas().transpose()
0 | 1 | 2 | 3 | 4 | |
---|---|---|---|---|---|
summary | count | mean | stddev | min | max |
TxnId | 984 | 59771.279471544716 | 83735.17714512876 | 1 | 99507 |
CC1_Class | 984 | 0.5 | 0.5002542588519272 | 0 | 1 |
CC1_Amount | 984 | 96.22459349593494 | 240.14239707065826 | 0.0 | 3828.04 |
CC1_V1 | 984 | -2.4674030372100715 | 5.40712231422648 | -30.552380043581 | 2.13238602134104 |
CC1_V2 | 984 | 1.9053035968231344 | 3.5961094277406076 | -12.1142127363483 | 22.0577289904909 |
CC1_V3 | 984 | -3.083884202829433 | 6.435904925385388 | -31.1036848245812 | 3.77285685226266 |
CC1_V4 | 984 | 2.456780057740528 | 3.0427216170397466 | -4.51582435488105 | 12.1146718424589 |
CC1_V5 | 984 | -1.5617259373325372 | 4.202691637741722 | -22.105531524316 | 11.0950886001596 |
CC1_V6 | 984 | -0.572583991041022 | 1.8036571668000605 | -6.40626663445964 | 6.47411462748849 |
CC1_V7 | 984 | -2.73090333834317 | 5.863241960076915 | -43.5572415712451 | 5.80253735302589 |
CC1_V8 | 984 | 0.26108185138806433 | 4.850081053008372 | -41.0442609210741 | 20.0072083651213 |
CC1_V9 | 984 | -1.301144796452937 | 2.266780102671618 | -13.4340663182301 | 5.43663339611854 |
CC1_V10 | 984 | -2.805194376398951 | 4.549492504413138 | -24.5882624372475 | 8.73745780611353 |
CC1_V11 | 984 | 1.9525351017305455 | 2.7369799649027207 | -2.33201137167952 | 12.0189131816199 |
CC1_V12 | 984 | -2.995316874600595 | 4.657383279424634 | -18.6837146333443 | 2.15205511590243 |
CC1_V13 | 984 | -0.09029142836357146 | 1.0102129366924129 | -3.12779501198771 | 2.81543981456255 |
CC1_V14 | 984 | -3.597226605511213 | 4.5682405087763325 | -19.2143254902614 | 3.44242199594215 |
CC1_V15 | 984 | 0.06275139057382163 | 1.0021871899317296 | -4.49894467676621 | 2.47135790380837 |
CC1_V16 | 984 | -2.1571248198091597 | 3.42439305003353 | -14.1298545174931 | 3.13965565883069 |
CC1_V17 | 984 | -3.36609535335953 | 5.953540928078054 | -25.1627993693248 | 6.73938438478335 |
CC1_V18 | 984 | -1.2187062731658431 | 2.3587681071910915 | -9.49874592104677 | 3.79031621184375 |
CC1_V19 | 984 | 0.3359445791509033 | 1.2843379816775733 | -3.68190355226504 | 5.2283417900513 |
CC1_V20 | 984 | 0.21117939872897198 | 1.0613528102262861 | -4.12818582871798 | 11.0590042933942 |
CC1_V21 | 984 | 0.3548982757919287 | 2.78726704784996 | -22.7976039055519 | 27.2028391573154 |
CC1_V22 | 984 | -0.04448149211405775 | 1.1450798238059015 | -8.88701714094871 | 8.36198519168435 |
CC1_V23 | 984 | -0.036528942589509734 | 1.148960101817997 | -19.2543276173719 | 5.46622995370963 |
CC1_V24 | 984 | -0.04738043011343529 | 0.5866834793500019 | -2.02802422921896 | 1.21527882183022 |
CC1_V25 | 984 | 0.08757054553217881 | 0.6404192414977025 | -4.78160552206407 | 2.20820917836653 |
CC1_V26 | 984 | 0.026120460105754934 | 0.4682991121957343 | -1.24392415371264 | 3.06557569653728 |
CC1_V27 | 984 | 0.09618165650018666 | 1.0037324673667467 | -7.26348214633855 | 3.05235768679424 |
CC1_V28 | 984 | 0.02786530375842634 | 0.4429545316584082 | -2.73388711897575 | 1.77936385243205 |
# check for null valuesfrom pyspark.sql.functions import count, when, isnantxn_df.select([count(when(isnan(c), c)).alias(c) for c in txn_df.columns]).toPandas().head()
TxnId | CC1_Class | CC1_Amount | CC1_V1 | CC1_V2 | CC1_V3 | CC1_V4 | CC1_V5 | CC1_V6 | CC1_V7 | … | CC1_V19 | CC1_V20 | CC1_V21 | CC1_V22 | CC1_V23 | CC1_V24 | CC1_V25 | CC1_V26 | CC1_V27 | CC1_V28 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | … | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
1 rows × 31 columns
Defining Dataset
Based on the above exploration, we will choose features V1-V28 for our training dataset, which we will define below.
In addition to the features, we also need to choose the data records for the dataset. We only have a small data from the original dataset, and therefore we will use all the available records by setting the dataset query predicate to “true”.
It is possible to create a random dataset of random records by performing an “aerolookup” of randomly selected key values.
# Create a dataset with the V1-V28 features.CC_FRAUD_DATASET = "CC_FRAUD_DETECTION"features = ["CC1_V"+str(i) for i in range(1,29)]features_and_label = ["CC1_Class"] + featuresds = Dataset(CC_FRAUD_DATASET, "Training dataset for fraud detection model", "cctxn", "TxnId", "string", features_and_label, "true", "", {"class":"fraud"}, ["test", "2017"])ds_df = ds.materialize_to_df()print("Records in the dataset: ", ds_df.count())
Records in the dataset: 984
Save Dataset
Save the dataset in Feature Store for future use.
# save the materialized dataset externally in a fileDATASET_PATH = 'resources/fs_part2_dataset_cctxn.csv'ds_df.write.csv(path=DATASET_PATH, header="true", mode="overwrite", sep="\t")
# save the dataset metadata in the feature storeds.location = DATASET_PATHds.save()
Query and Verify Dataset
Verify the saved dataset is in the feature store for future exploration and use.
dsq_df = Dataset.query("description like '%fraud%'")dsq_df.toPandas().transpose()
0 | |
---|---|
name | CC_FRAUD_DETECTION |
description | Training dataset for fraud detection model |
entity | cctxn |
id_col | TxnId |
id_type | string |
features | [CC1_Class, CC1_V1, CC1_V2, CC1_V3, CC1_V4, CC… |
query | true |
location | resources/fs_part2_dataset_cctxn.csv |
attrs | ’class’: ‘fraud’ |
tags | [test, 2017] |
Verify the database through an AQL query on the set “dataset-metadata”.
!aql -c "select * from test.dataset-metadata"
select * from test.dataset-metadata+--------------------------------------+----------------------------------------------+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+----------+----------------------------------------+----------------------+---------------------+-----------------------------+| attrs | description | entity | features | id_col | id_type | location | name | query | tags |+--------------------------------------+----------------------------------------------+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+----------+----------------------------------------+----------------------+---------------------+-----------------------------+| KEY_ORDERED_MAP('{"class":"fraud"}') | "Training dataset for fraud detection model" | "cctxn" | LIST('["CC1_Class", "CC1_V1", "CC1_V2", "CC1_V3", "CC1_V4", "CC1_V5", "CC1_V6", "CC1_V7", "CC1_V8", "CC1_V9", "CC1_V10", "CC1_V11", "CC1_V12", "CC1_V13", "CC1_V14", "CC1_V15", "CC1_V16", "CC1_V17", "CC1_V18", "CC1_V19", "CC1_V20", "CC1_V21", "CC1_V22", " | "TxnId" | "string" | "resources/fs_part2_dataset_cctxn.csv" | "CC_FRAUD_DETECTION" | "true" | LIST('["test", "2017"]') || KEY_ORDERED_MAP('{"risk":"high"}') | "Test dataset" | "cctxn" | LIST('["CC1_Amount", "CC1_Class", "CC1_V1"]') | "TxnId" | "string" | "" | "ds_test1" | "CC1_Amount > 1500" | LIST('["test", "dataset"]') |+--------------------------------------+----------------------------------------------+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+----------+----------------------------------------+----------------------+---------------------+-----------------------------+2 rows in set (0.212 secs)
OK
Create AI/ML Model
Below we will choose two algorithms to predict fraud in a credit card transaction: LogisticRegression and RandomForestClassifier.
Create Training and Test Sets
We first split the dataset into training and test sets to train and evaluate a model.
from pyspark.ml.feature import VectorAssembler
# create a feature vector from featuresassembler = VectorAssembler(inputCols=features, outputCol="fvector")ds_df2 = assembler.transform(ds_df)
# split the dataset into randomly selected training and test setstrain, test = ds_df2.randomSplit([0.8,0.2], seed=2021)print('Training dataset records:', train.count())print('Test dataset records:', test.count())
Training dataset records: 791Test dataset records: 193
# examine the fraud cases in the training settrain.groupby('CC1_Class').count().show()
+---------+-----+|CC1_Class|count|+---------+-----+| 1| 380|| 0| 411|+---------+-----+
Train Model
We choose two models to train: LogisticRegression and RandomForestClassifier.
from pyspark.ml.classification import LogisticRegression, RandomForestClassifierlr_algo = LogisticRegression(featuresCol='fvector', labelCol='CC1_Class', maxIter=5)lr_model = lr_algo.fit(train)
rf_algo = RandomForestClassifier(featuresCol='fvector', labelCol='CC1_Class')rf_model = rf_algo.fit(train)
Evaluate Model
Run the trained models on the test set and evaluate their performance metrics.
from pyspark.mllib.evaluation import BinaryClassificationMetricsfrom pyspark.ml.evaluation import BinaryClassificationEvaluator
# rename label columntest = test.withColumnRenamed('CC1_Class', 'label')
# use the logistic regression model to predict test caseslr_predictions = lr_model.transform(test)
# instantiate evaluatorevaluator = BinaryClassificationEvaluator()
# Logistic Regression performance metricsprint("Logistic Regression: Accuracy = {}".format(evaluator.evaluate(lr_predictions)))
lr_labels_and_predictions = test.rdd.map(lambda x: float(x.label)).zip(lr_predictions.rdd.map(lambda x: x.prediction))lr_metrics = BinaryClassificationMetrics(lr_labels_and_predictions)print("Logistic Regression: Area under ROC = %s" % lr_metrics.areaUnderROC)print("Logistic Regression: Area under PR = %s" % lr_metrics.areaUnderPR)
Logistic Regression: Accuracy = 0.9853395061728388Logistic Regression: Area under ROC = 0.9298321136461472Logistic Regression: Area under PR = 0.8910277315666429
# use the random forest model to predict test casesrf_predictions = rf_model.transform(test)
# RandonForestClassifer performance metricsprint("Random Forest Classifier: Accuracy = {}".format(evaluator.evaluate(rf_predictions)))
rf_labels_and_predictions = test.rdd.map(lambda x: float(x.label)).zip(rf_predictions.rdd.map(lambda x: x.prediction))rf_metrics = BinaryClassificationMetrics(rf_labels_and_predictions)print("Random Forest Classifier: Area under ROC = %s" % rf_metrics.areaUnderROC)print("Random Forest Classifier: Area under PR = %s" % rf_metrics.areaUnderPR)
Random Forest Classifier: Accuracy = 0.9895282186948847Random Forest Classifier: Area under ROC = 0.9251075268817205Random Forest Classifier: Area under PR = 0.882099602146558
Save Model
Save the model.
# Save each modellr_model.write().overwrite().save("resources/fs_model_lr")rf_model.write().overwrite().save("resources/fs_model_rf")
Load and Test Model
Load the saved model and test it by predicting a test instance.
from pyspark.ml.classification import LogisticRegressionModel, RandomForestClassificationModel
lr_model2 = LogisticRegressionModel.load("resources/fs_model_lr")print("Logistic Regression model save/load test:")lr_predictions2 = lr_model2.transform(test.limit(5))lr_predictions2['label', 'prediction'].show()
print("Random Forest model save/load test:")rf_model2 = RandomForestClassificationModel.read().load("resources/fs_model_rf")rf_predictions2 = rf_model2.transform(test.limit(5))rf_predictions2['label', 'prediction'].show()
Logistic Regression model save/load test:+-----+----------+|label|prediction|+-----+----------+| 1| 1.0|| 0| 0.0|| 0| 0.0|| 0| 0.0|| 0| 0.0|+-----+----------+
Random Forest model save/load test:+-----+----------+|label|prediction|+-----+----------+| 1| 1.0|| 0| 0.0|| 0| 0.0|| 0| 0.0|| 0| 0.0|+-----+----------+
Takeaways and Conclusion
In this notebook, we explored how Aerospike can be used as a Feature Store for ML applications. Specifically, we showed how features and datasets stored in the Aerospike can be explored and reused for model training. We implemented a simple example feature store interface that leverages the Aerospike Spark Connector capabilities for this purpose. We used the APIs to create, save, and query features and datasets for model training.
This is the second notebook in the series of notebooks on how Aerospike can be used as a feature store. The first notebook discusses Feature Engineering aspects, whereas the third notebook explores the use of Aerospike Feature Store for Model Serving.
Cleaning Up
Close the spark session, and remove the tutorial data.
try: spark.stop()except: ; ignore# To remove all data in the namespace test, uncomment the following line and run:#!aql -c "truncate test"
Further Exploration and Resources
Here are some links for further exploration.
Resources
- Related notebooks
- Related blog posts
- Github repos
Exploring Other Notebooks
Visit Aerospike notebooks repo to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open in the notebook menu, and select Upload.