scala - Recommender System using ALS from ML -
i have made research , know there 2 possible ways create recommender system using apache spark, 1 way using mllib comes pretty nice example, have tried , easy, on other hand can use als ml. feel pretty comfortable working rdds, nevertheless i'm trying use dataframes more regularly gain more experience.
to practice, started using crazy data ratings normalized, , have on 4000+ records 5 possible products (as below). first challenge how convert dataframe required structure; structure guessed after several hours reading source code.
val df = sqlcontext.createdataframe(sc.parallelize(list(row(0.0, 0.12, 0.1, 0.0, 0.16), row(0.1, 0.0, 0.3, 0.52, 0.67))), structtype(structfield("product1", doubletype, true) :: structfield("product2", doubletype, true) :: structfield("product3", doubletype, true) :: structfield("product4", doubletype, true) :: structfield("product5", doubletype, true) :: nil)) df.show +--------+--------+--------+--------+--------+ |product1|product2|product3|product4|product5| +--------+--------+--------+--------+--------+ | 0.0| 0.12| 0.1| 0.0| 0.16| | 0.1| 0.0| 0.3| 0.52| 0.67| +--------+--------+--------+--------+--------+ i made several , somehow complex transformations, , want see if there better way obtain required structure.
val rdd = df.rdd.zipwithindex.map { case (row, index) => row.toseq.zipwithindex.map(x => row(index.toint, x._2.toint, x._1)) }.flatmap{x => x} val (train, testing) = rdd.partitionby(_.get(2) != 0.0) val rdds = list(train, testing) then convert rdds dataframes.
val dfs = rdds.map(sqlcontext.createdataframe(_, structtype(structfield("user", integertype, true) :: structfield("product", integertype, true) :: structfield("rating", doubletype, true) :: nil))) and after these steps, can use als algorithm, , when things verbose, it's because doing wrong.
val rec = (new als().setusercol("user") .setitemcol("product") .setratingcol("rating") .setpredictioncol("value") .setseed(17) .setmaxiter(20)) val model = rec.fit(dfs(0)) model.transform(dfs(1)).collect array([0,0,0.0,0.022231804], [1,1,0.0,0.102589644], [0,3,0.0,0.11560536])
some remarks:
user,ratingdefault parametersusercol,ratingcol. if renameproductitemcan omit 1 well.you can replace row rating , omit schema later:
case (row, u) => row.toseq.zipwithindex.map{ case (r: double, i: int) => rating(u, i, r) } ... .todf- since
idseems irrelevant can usezipwithuniqueid - if
uniqueidacceptable can usemonotonically_increasing_iddataframe it possible avoid passing data rdd wrapping array exploding:
val exprs = explode(array(df.columns.map(c => struct(lit(c).alias("item"), col(c).alias("rating"))): _* )) df .withcolumn("user", monotonically_increasing_id) .withcolumn("tmp", exprs) .select($"user", $"tmp.item", $"tmp.rating")and replace names ids.
nevertheless believe there not gain using dataframes here. 1 way or data passed mllib model requires rdd[rating].
Comments
Post a Comment