scala - Recommender System using ALS from ML -
i have made research , know there 2 possible ways create recommender system using apache spark
, 1 way using mllib
comes pretty nice example, have tried , easy, on other hand can use als ml
. feel pretty comfortable working rdd
s, nevertheless i'm trying use dataframes
more regularly gain more experience.
to practice, started using crazy data ratings
normalized, , have on 4000+ records 5 possible products (as below). first challenge how convert dataframe
required structure; structure guessed after several hours reading source code.
val df = sqlcontext.createdataframe(sc.parallelize(list(row(0.0, 0.12, 0.1, 0.0, 0.16), row(0.1, 0.0, 0.3, 0.52, 0.67))), structtype(structfield("product1", doubletype, true) :: structfield("product2", doubletype, true) :: structfield("product3", doubletype, true) :: structfield("product4", doubletype, true) :: structfield("product5", doubletype, true) :: nil)) df.show +--------+--------+--------+--------+--------+ |product1|product2|product3|product4|product5| +--------+--------+--------+--------+--------+ | 0.0| 0.12| 0.1| 0.0| 0.16| | 0.1| 0.0| 0.3| 0.52| 0.67| +--------+--------+--------+--------+--------+
i made several , somehow complex transformations, , want see if there better way obtain required structure.
val rdd = df.rdd.zipwithindex.map { case (row, index) => row.toseq.zipwithindex.map(x => row(index.toint, x._2.toint, x._1)) }.flatmap{x => x} val (train, testing) = rdd.partitionby(_.get(2) != 0.0) val rdds = list(train, testing)
then convert rdd
s dataframe
s.
val dfs = rdds.map(sqlcontext.createdataframe(_, structtype(structfield("user", integertype, true) :: structfield("product", integertype, true) :: structfield("rating", doubletype, true) :: nil)))
and after these steps, can use als
algorithm, , when things verbose, it's because doing wrong.
val rec = (new als().setusercol("user") .setitemcol("product") .setratingcol("rating") .setpredictioncol("value") .setseed(17) .setmaxiter(20)) val model = rec.fit(dfs(0)) model.transform(dfs(1)).collect array([0,0,0.0,0.022231804], [1,1,0.0,0.102589644], [0,3,0.0,0.11560536])
some remarks:
user
,rating
default parametersusercol
,ratingcol
. if renameproduct
item
can omit 1 well.you can replace row rating , omit schema later:
case (row, u) => row.toseq.zipwithindex.map{ case (r: double, i: int) => rating(u, i, r) } ... .todf
- since
id
seems irrelevant can usezipwithuniqueid
- if
uniqueid
acceptable can usemonotonically_increasing_id
dataframe
it possible avoid passing data rdd wrapping array exploding:
val exprs = explode(array(df.columns.map(c => struct(lit(c).alias("item"), col(c).alias("rating"))): _* )) df .withcolumn("user", monotonically_increasing_id) .withcolumn("tmp", exprs) .select($"user", $"tmp.item", $"tmp.rating")
and replace names ids.
nevertheless believe there not gain using dataframes
here. 1 way or data passed mllib
model requires rdd[rating]
.
Comments
Post a Comment