国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

在pyspark中調(diào)用scala代碼

alanoddsoff / 890人閱讀

摘要:由于使用的是天河二號(hào),版本是,同樣,所以獲取主題時(shí)還不能使用在中才開放對(duì)的接口,只能使用的方法。本來做并行化就是希望效率更高,卻在調(diào)用代碼,同時(shí)進(jìn)行了很多數(shù)據(jù)轉(zhuǎn)換。

在pyspark中調(diào)用scala代碼 情境說明 問題

我們這邊是要使用Spark去并行一個(gè)自然語言處理的算法,其中使用到了LDA主題模型。由于使用的是天河二號(hào),Spark版本是1.5.1,pyspark同樣,所以獲取主題時(shí)還不能使用describeTopics(在spark1.6中才開放對(duì)python的接口),只能使用topicsMatrix的方法。

本來湊合用topicsMatrix也行,但我們發(fā)現(xiàn),這一個(gè)用來獲取主題模型的函數(shù),居然比Lda的訓(xùn)練還要慢!無論在我們自己的集群還是在天河二號(hào)的分區(qū)上,都是這一個(gè)情況。觀察topicsMatrix的源代碼,好像也沒有什么復(fù)雜操作,只是把數(shù)據(jù)匯總collect而已:

@Since("1.3.0")
override lazy val topicsMatrix: Matrix = {
  // Collect row-major topics
  val termTopicCounts: Array[(Int, TopicCounts)] =
    graph.vertices.filter(_._1 < 0).map { case (termIndex, cnts) =>
    (index2term(termIndex), cnts)}.collect()
  // Convert to Matrix
  val brzTopics = BDM.zeros[Double](vocabSize, k)
  termTopicCounts.foreach { case (term, cnts) =>
    var j = 0
    while (j < k) {
      brzTopics(term, j) = cnts(j)
      j += 1
    }
  }
  Matrices.fromBreeze(brzTopics)
}

由于并不是算法中有一些復(fù)雜運(yùn)算導(dǎo)致較慢,我們自然不希望在程序中有這樣的情況。發(fā)現(xiàn)到在Spark1.5.1中,mllib中LdaModel已經(jīng)實(shí)現(xiàn)了describeTopics,只是未在Python中開放,我們自然希望嘗試使用describeTopics看看效果。

describeTopics的源代碼探索

已知LDA.train()返回的是LdaModel的實(shí)例,于是乎,參考上篇博客,用以下方式去調(diào)用:

model = LDA.train(rdd_data, k=num_topics, maxIterations=20)
topics = model.call("describeTopics", _py2java(sc, 10))

執(zhí)行速度特別快,然而返回的結(jié)果卻不盡如人意,僅返回了一個(gè)長度k的列表,每個(gè)元素是一個(gè)key為"class",value為"scala.Tuple2"的單元素字典。從結(jié)果來看,scala的代碼應(yīng)該是被成功執(zhí)行了,然而返回結(jié)果卻出了問題。查看callJavaFunc的內(nèi)容,可以判斷出,是describeTopics的返回結(jié)果沒有被_java2py函數(shù)正常的轉(zhuǎn)換。

比對(duì)Spark1.5和Spark1.6的代碼,LdaModel.describeTopics函數(shù)的內(nèi)容是一致的,那么問題在哪兒呢?再去查看pyspark的LDA.train()調(diào)用的PythonMLLibAPI.trainLdaModel,發(fā)現(xiàn)在1.6中返回的不再是LdaModel而是它的子類LdaModelWrapper。查看這個(gè)類的方法,發(fā)現(xiàn)它重載了describeTopics來方便_java2py進(jìn)行數(shù)據(jù)轉(zhuǎn)換:

private[python] class LDAModelWrapper(model: LDAModel) {

  def topicsMatrix(): Matrix = model.topicsMatrix

  def vocabSize(): Int = model.vocabSize

  def describeTopics(): Array[Byte] = describeTopics(this.model.vocabSize)

  def describeTopics(maxTermsPerTopic: Int): Array[Byte] = {
    val topics = model.describeTopics(maxTermsPerTopic).map { case (terms, termWeights) =>
      val jTerms = JavaConverters.seqAsJavaListConverter(terms).asJava
      val jTermWeights = JavaConverters.seqAsJavaListConverter(termWeights).asJava
      Array[Any](jTerms, jTermWeights)
    }
    SerDe.dumps(JavaConverters.seqAsJavaListConverter(topics).asJava)
  }

  def save(sc: SparkContext, path: String): Unit = model.save(sc, path)
}

找到這里,解決方法就油然而生了。只要我們把這一段scala代碼在python中調(diào)用,并將describeTopics的Java對(duì)象傳入,不就萬事大吉了嗎?

在pyspark中調(diào)用scala代碼

也許還有別的方法,不過這里使用的方法也足夠簡單。將.scala文件打包成jar后,啟動(dòng)spark時(shí)加入?yún)?shù)--driver-class-path /path/to/xxx.jar,便可以將你的scala代碼放入Spark運(yùn)行的虛擬機(jī)JVM中,從而讓python代碼在運(yùn)行中通過反射機(jī)制在SparkContext._jvm里動(dòng)態(tài)獲取到你的類與方法:

func = sc._jvm.com.example.YourObject.func
打包scala代碼

那么,現(xiàn)在的問題就是如何把scala代碼打包成jar了。scala雖然也是基于JVM運(yùn)行的語言,與java非常相似,但是其編譯選項(xiàng)中并沒有提供將其打包成jar的參數(shù)。這里我們用sbt打包它,sbt的下載與安裝請(qǐng)自行查閱其他教程,這里就不提供了,官方網(wǎng)站。

首先編寫好你的scala代碼,確認(rèn)沒有bug,并在文件開頭用package關(guān)鍵字將其封裝至包中。接著,請(qǐng)手動(dòng)建立你的項(xiàng)目目錄,并創(chuàng)建如下結(jié)構(gòu):

在build.sbt中,請(qǐng)至少進(jìn)行以下設(shè)置

//項(xiàng)目名
name := "Project"

//項(xiàng)目版本
version := "0.1"

//scala版本
scalaVersion := "2.10.5"

//jdk版本
javacOptions ++= Seq("-source", "1.7", "-target", "1.7")

//主函數(shù)
mainClass in Compile := Some("YourClass.func")

在plugins.sbt中,請(qǐng)加上這一句話,告訴sbt需要這個(gè)第三方插件,這是用來打包的

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5")

這些都準(zhǔn)備完成后,在terminal里進(jìn)入你的項(xiàng)目根目錄下,輸入

sbt package

等待打包完成,會(huì)有相應(yīng)提示。
更多的打包選項(xiàng),以及sbt的更多用法,感興趣可以自行查閱。

解決我們的問題

回到我們這里的問題,我們希望能在python中對(duì)describeTopics的返回值進(jìn)行轉(zhuǎn)換,那么我么只需要打包那一個(gè)重載的describeTopics就好了,這樣可以避免打包Spark的第三方包。更改一下函數(shù)的返回值,并注釋掉調(diào)用Spark的SerDe進(jìn)行序列化的語句,最終的代碼如下:

package com.sysu.sparkhelper

import java.util.List
import scala.collection.JavaConverters

object LdaHelper {
    def convert(topics: Array[(Array[Int], Array[Double])]): List[Array[Any]] = {
        val result = topics.map { case (terms, termWeights) =>
          val jTerms = JavaConverters.seqAsJavaListConverter(terms).asJava
          val jTermWeights = JavaConverters.seqAsJavaListConverter(termWeights).asJava
          Array[Any](jTerms, jTermWeights)
        }
        return JavaConverters.seqAsJavaListConverter(result).asJava
        // SerDe.dumps(JavaConverters.seqAsJavaListConverter(result).asJava)
    }
}

用sbt打包完成后,使用--driver-class-path添加jar包,在python中相應(yīng)代碼為:

lda_java_model = model._java_model
func = getattr(model._java_model, "describeTopics")
result = func(_py2java(sc, 10))
topics = _java2py(sc, sc._jvm.com.sysu.sparkhelper.LdaHelper.convert(result))
總結(jié)

這算是閱讀源碼的一次應(yīng)用,可以說還是解決了遇到的問題,同時(shí)也加深了對(duì)Spark的了解。
本來做并行化就是希望效率更高,pyspark卻在調(diào)用scala代碼,同時(shí)進(jìn)行了很多數(shù)據(jù)轉(zhuǎn)換。想要更好的使用Spark的話,使用scala去編程應(yīng)該才是最好的。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/40955.html

相關(guān)文章

  • pyspark底層淺析

    摘要:底層淺析簡介是官方提供的接口,同時(shí)也是中的一個(gè)程序。這里一提,對(duì)于大部分機(jī)器學(xué)習(xí)算法,你都會(huì)看到模塊與模塊都提供了接口,它們的區(qū)別在于模塊接受格式的數(shù)據(jù)而模塊接受格式的數(shù)據(jù)。 pyspark底層淺析 pyspark簡介 pyspark是Spark官方提供的API接口,同時(shí)pyspark也是Spark中的一個(gè)程序。 在terminal中輸入pyspark指令,可以打開python的she...

    FrozenMap 評(píng)論0 收藏0
  • CentOS7 install spark+ipython-nodebook

    摘要:使用瀏覽器作為界面,向后臺(tái)的服務(wù)器發(fā)送請(qǐng)求,并顯示結(jié)果。本文主要介紹在上安裝流程該文件是用戶登錄時(shí),操作系統(tǒng)定制用戶環(huán)境時(shí)使用的第一個(gè)文件,應(yīng)用于登錄到系統(tǒng)的每一個(gè)用戶。 ipython-nodebook IPython notebook 目前已經(jīng)成為用 Python 做教學(xué)、計(jì)算、科研的一個(gè)重要工具。 IPython Notebook 使用瀏覽器作為界面,向后臺(tái)的 IPython ...

    soasme 評(píng)論0 收藏0
  • Spark的安裝及配置

    摘要:本文作者本文鏈接安裝說明在安裝之前,需要安裝集群環(huán)境,如果沒有可以查看分布式集群的搭建用到的軟件軟件版本下載地址節(jié)點(diǎn)安排名稱主節(jié)點(diǎn)子節(jié)點(diǎn)子節(jié)點(diǎn)安裝解壓到安裝目錄修改配置文件配置文件位于目錄下。 本文作者:foochane?本文鏈接:https://foochane.cn/article/2019051904.html 1 安裝說明 在安裝spark之前,需要安裝hadoop集群環(huán)境,...

    lunaticf 評(píng)論0 收藏0
  • Spark入門階段一之掃盲筆記

    摘要:同時(shí)集成了機(jī)器學(xué)習(xí)類庫。基于計(jì)算框架,將的分布式計(jì)算應(yīng)用到機(jī)器學(xué)習(xí)領(lǐng)域。提供了一個(gè)簡單的聲明方法指定機(jī)器學(xué)習(xí)任務(wù),并且動(dòng)態(tài)地選擇最優(yōu)的學(xué)習(xí)算法。宣稱其性能是的多倍。 介紹 spark是分布式并行數(shù)據(jù)處理框架 與mapreduce的區(qū)別: mapreduce通常將中間結(jié)果放在hdfs上,spark是基于內(nèi)存并行大數(shù)據(jù)框架,中間結(jié)果放在內(nèi)存,對(duì)于迭代數(shù)據(jù)spark效率更高,mapred...

    starsfun 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<