package com
import com.process_data.process_features
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Hello world!
*
*/
object App {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("TestSpark").setMaster("local")
var sc = new SparkContext(conf)
val workFile = "/Users/wangsanpeng/Desktop/wsp/code/jdcomplete/data/"
process_features(workFile,sc,"1","t_order.csv")
}
def process_features(workFile:String, sc: SparkContext, types:String,files:String) = {
var readFile = workFile;
val index_work_file = readFile
readFile += files
println("readFile",types,readFile)
val datas = sc.textFile(readFile)
val header = datas.first()
val spilit_value = ","
var features_name = header.split(spilit_value)
println("map_data",datas.count())
val map_data = datas.filter(row=>row!=header).map(x1=>{
val features_length = features_name.length
val block = x1.replace("\n","").split(spilit_value,features_name.length)
val index_shop_id = block(features_name.indexOf("shop_id"))
val index_pid = block(features_name.indexOf("pid"))
// println(features_name.indexOf("create_dt"))
var index_month = block(features_name.indexOf("ord_dt")).substring(0,7)
var index_key = index_shop_id + "_" + index_month
var value = block(features_name.indexOf("shop_id")) + spilit_value + block(features_name.indexOf("sale_amt")) + spilit_value + block(features_name.indexOf("offer_amt")) + spilit_value + block(features_name.indexOf("offer_cnt")) + spilit_value + block(features_name.indexOf("rtn_cnt"))+ spilit_value + block(features_name.indexOf("rtn_amt"))
value = value + spilit_value + block(features_name.indexOf("ord_cnt")) + spilit_value + block(features_name.indexOf("user_cnt"))
(index_key.toString,value.toString)
})
val result = merg_shop(map_data,features_name)
result.saveAsTextFile(workFile + "/shop")
println(result.count())
}
def merg_shop(orig_map:RDD[(String,String)],features_name:Array[String]):RDD[(String,String)] = {
val result = orig_map.reduceByKey((a,b) => {
val a_block = a.replace("\n","").split(",")
val b_block = b.replace("\n","").split(",")
var index_value = a_block(0) + ","
for (i <- 1 to (a_block.length - 1)){
// println(a_block(i),b_block(i))
// println()
index_value = index_value + (a_block(i).toFloat + b_block(i).toFloat).toString
if (i != (a_block.length - 1)){
index_value = index_value + ","
}
}
(index_value)
})
result.map((a) => {
println(a._1,a._2)
})
return result
}
}
package com
import com.process_data.process_features
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Hello world!
*
*/
object deal {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("TestSpark").setMaster("local")
var sc = new SparkContext(conf)
val workFile = "/Users/wangsanpeng/Desktop/wsp/code/jdcomplete/data/"
process_features(workFile,sc,"1","t_order.csv")
}
def process_features(workFile:String, sc: SparkContext, types:String,files:String) = {
var readFile = workFile;
val index_work_file = readFile
readFile += files
println("readFile",types,readFile)
val datas = sc.textFile(readFile)
val header = datas.first()
val spilit_value = ","
var features_name = header.split(spilit_value)
println("map_data",datas.count())
val features_length = features_name.length
val map_data = datas.filter(row=>row!=header).map(x1=>{
val block = x1.replace("\n","").split(spilit_value,features_name.length)
val index_shop_id = block(features_name.indexOf("shop_id"))
val index_pid = block(features_name.indexOf("pid"))
// println(features_name.indexOf("create_dt"))
var index_month = block(features_name.indexOf("ord_dt")).substring(0,7)
var index_key = index_shop_id + "_" + index_pid + "_" + index_month
var value = block(features_name.indexOf("sale_amt")) + spilit_value + block(features_name.indexOf("offer_amt")) + spilit_value + block(features_name.indexOf("offer_cnt")) + spilit_value + block(features_name.indexOf("rtn_cnt"))+ spilit_value + block(features_name.indexOf("rtn_amt"))
value = value + spilit_value + block(features_name.indexOf("ord_cnt")) + spilit_value + block(features_name.indexOf("user_cnt")) + spilit_value + "1.0" + spilit_value + (block(features_name.indexOf("sale_amt")).toFloat / block(features_name.indexOf("ord_cnt")).toFloat).toString
(index_key.toString,value.toString)
})
val result = merg_shop(map_data,features_name)
result.saveAsTextFile(workFile + "/deal")
println(result.count())
}
def merg_shop(orig_map:RDD[(String,String)],features_name:Array[String]):RDD[(String,String)] = {
val result = orig_map.reduceByKey((a,b) => {
val a_block = a.replace("\n","").split(",")
val b_block = b.replace("\n","").split(",")
var index_value = ""
for (i <- 0 to (a_block.length - 1)){
// println(a_block(i),b_block(i))
// println()
if (i != (a_block.length - 1)){
index_value = index_value + (a_block(i).toFloat + b_block(i).toFloat).toString + ','
}
else{
index_value = index_value + ((a_block(i).toFloat + b_block(i).toFloat) / 2.0).toString
}
}
(index_value)
})
result.map((a) => {
println(a._1,a._2)
})
return result
}
}
package com
/**
* Created by wangsanpeng on 17/6/20.
*/
import java.io.Serializable
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.Iterator
import Array._
/**
* Created by wangsanpeng on 17/4/19.
*/
import org.apache.spark.mllib.rdd.RDDFunctions._
import scala.collection.Iterator
import Array._
import scala.collection.mutable.ArrayBuffer
class BinaryAUC extends Serializable {
//input format: predictioin,label
def auc( data: RDD[ (Double, Double) ] ) : Double =
{
//group same score result
val group_result = data.groupByKey().map(x => {
var r = new Array[Double](2)
for(item <- x._2) {
if(item > 0.0) r(1) += 1.0
else r(0) += 1.0
}
// println("data format",x._1, r(0),r(1))
(x._1, r) // score, [ FalseN, PositiveN ]
})
//points 需要累积
val group_rank = group_result.sortByKey(false) //big first
//计算累积
var step_sizes = group_rank.mapPartitions( x =>
{
var r = List[(Double, Double)]()
var fn_sum = 0.0
var pn_sum = 0.0
while( x.hasNext )
{
val cur = x.next
fn_sum += cur._2(0)
pn_sum += cur._2(1)
}
r.::(fn_sum, pn_sum).toIterator
} ,true).collect
var debug_string = ""
var step_sizes_sum = ofDim[Double](step_sizes.size, 2) //二维数组
for( i <- 0 to (step_sizes.size - 1) ) {
if(i == 0) {
step_sizes_sum(i)(0) = 0.0
step_sizes_sum(i)(1) = 0.0
} else {
step_sizes_sum(i)(0) = step_sizes_sum(i - 1)(0) + step_sizes(i - 1)._1
step_sizes_sum(i)(1) = step_sizes_sum(i - 1)(1) + step_sizes(i - 1)._2
}
debug_string += "\t" + step_sizes_sum(i)(0).toString + "\t" + step_sizes_sum(i)(1).toString
}
val sss_len = step_sizes_sum.size
val total_fn = step_sizes_sum(sss_len - 1)(0) + step_sizes(sss_len - 1)._1
val total_pn = step_sizes_sum(sss_len - 1)(1) + step_sizes(sss_len - 1)._2
System.out.println( "debug auc_step_size: " + debug_string)
val bc_step_sizes_sum = data.context.broadcast(step_sizes_sum)
val modified_group_rank = group_rank.mapPartitionsWithIndex( (index, x) =>
{
var sss = bc_step_sizes_sum.value
var r = List[(Double, Array[Double])]()
//var r = List[(Double, String)]()
var fn = sss(index)(0) //start point
var pn = sss(index)(1)
while( x.hasNext )
{
var p = new Array[Double](2)
val cur = x.next
p(0) = fn + cur._2(0)
p(1) = pn + cur._2(1)
fn += cur._2(0)
pn += cur._2(1)
//r.::= (cur._1, p(0).toString() + "\t" + p(1).toString())
r.::= (cur._1, p)
}
r.reverse.toIterator
} ,true)
//output debug info
//modified_group_rank.map(l => l._1.toString + "\t" + l._2(0).toString + "\t" + l._2(1)).saveAsTextFile("/home/hdp_teu_dia/resultdata/wangben/debug_info")
val score = modified_group_rank.sliding(2).aggregate(0.0)(
seqOp = (auc: Double, points: Array[ (Double, Array[Double]) ]) => auc + TrapezoidArea(points),
combOp = _ + _
)
System.out.println( "debug auc_mid: " + score
+ "\t" + (total_fn*total_pn).toString()
+ "\t" + total_fn.toString()
+ "\t" + total_pn.toString() )
score/(total_fn*total_pn)
}
private def TrapezoidArea(points :Array[(Double, Array[Double])]):Double = {
val x1 = points(0)._2(0)
val y1 = points(0)._2(1)
val x2 = points(1)._2(0)
val y2 = points(1)._2(1)
val base = x2 - x1
val height = (y1 + y2)/2.0
return base*height
}
}
object gbdts {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("TestSpark").setMaster("local")
// .setMaster("local")
var sc = new SparkContext(conf)
val trainFile = "/Users/wangsanpeng/Desktop/wsp/code/yirendai/data/sdk_log_all/train_0701.txt"
val testFile = "/Users/wangsanpeng/Desktop/wsp/code/yirendai/data/sdk_log_all/test_0701.txt"
val workFile = "/Users/wangsanpeng/Desktop/wsp/code/yirendai/data/sdk_log_all/"
val trainingData = getData_new(trainFile,workFile,sc,"train")
val testData = getData_new(testFile,workFile,sc,"test")
cross_val(sc,trainingData,testData)
println("trainingData",trainingData.count())
println("testData",testData.count())
// val boostingStrategy = BoostingStrategy.defaultParams("Classification")
// boostingStrategy.numIterations = 200 // Note: Use more iterations in practice.
// boostingStrategy.treeStrategy.maxDepth = 5
// // Empty categoricalFeaturesInfo indicates all features are continuous.
// boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]()
//
// val model = GradientBoostedTrees.train(trainingData, boostingStrategy)
// model.save(sc, "/Users/wangsanpeng/Desktop/wsp/code/yirendai/data/sdk_log_all/data1/gbrt_test_result")
// val model = GradientBoostedTreesModel.load(sc,"/Users/wangsanpeng/Desktop/wsp/code/yirendai/data/sdk_log_all/data1/gbrt_test_result")
// // Evaluate model on test instances and compute test error
// var sumLossMoney = 0.0
// var sumGetMoney = 0.0
// val labelsAndPredictions = testData.map { point =>
// val prediction = model.predict(point.features)
// var prob = prediction
// println("prediction",point.label, prob)
// (prob,point.label)
// }
// val trainErr = labelsAndPredictions.filter(r => r._1 != r._2).count().toDouble / labelsAndPredictions.count
// val testMSE = labelsAndPredictions.map{ case(v, p) => math.pow((v - p), 2)}.mean()
// println("Training Error = ",trainErr,testMSE)
// val trees = model.trees.toList.toString()
// val treeWeights = model.treeWeights.toList.toString()
// println("trees",trees)
// println("tree_weight",treeWeights)
// val auc = new BinaryAUC()
// val auc_score = auc.auc(labelsAndPredictions)
// println("debug auc_score: " + auc_score.toString())
}
def cross_val(sc: SparkContext, train:RDD[LabeledPoint],test:RDD[LabeledPoint]) = {
val all_data = train.union(test).zipWithIndex()
val test_index = 0
val (train_data,test_data) = get_split(all_data,test_index)
val index_result = train_test(train_data,test_data)
val auc = new BinaryAUC()
val auc_score = auc.auc(index_result)
println("debug auc_score: " + auc_score.toString())
}
def train_test(train_data:RDD[LabeledPoint],test_data:RDD[LabeledPoint]):RDD[(Double,Double)] = {
val boostingStrategy = BoostingStrategy.defaultParams("Regression")
// .defaultParams("Classification")
boostingStrategy.numIterations = 200 // Note: Use more iterations in practice.
boostingStrategy.treeStrategy.maxDepth = 5
// Empty categoricalFeaturesInfo indicates all features are continuous.
boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]()
val model = GradientBoostedTrees.train(train_data, boostingStrategy)
val labelsAndPredictions = test_data.map { point =>
val prediction = model.predict(point.features)
println(prediction,point.label)
(prediction,point.label)
}
return labelsAndPredictions
}
def get_split(all_data:RDD[(LabeledPoint,Long)],test_index:Long):(RDD[LabeledPoint],RDD[LabeledPoint]) = {
val test_data = all_data.filter(f=>(f._2 % 10) == test_index).map(f=>f._1)
val train_data = all_data.filter(f=>(f._2 % 10) != test_index).map(f=>f._1)
return (train_data,test_data)
}
def getData_new(readFile:String, workFile:String, sc: SparkContext, types:String):RDD[LabeledPoint] = {
val datas = sc.textFile(readFile)
val parsedData = datas.map { line =>
val parts = line.split(',')
Vectors.dense(parts.tail.tail.tail.map(_.toDouble))
LabeledPoint(parts(2).toDouble, Vectors.dense(parts.tail.tail.tail.map(_.toDouble)))
}
return parsedData
}
def getData(readFile:String, workFile:String, sc: SparkContext, types:String):RDD[LabeledPoint] = {
val datas = sc.textFile(readFile)
val datalength = datas.count()
println ("dataleng : %s".format(datalength))
//test
val originalDataArray = datas.map(line => {
var arr = line.split("\t")
// println("alldata",arr(0))
var k = arr(0).toDouble
val features = arr(1).split(",")
var v = new Array[Double](features.length + 1)
v(0) = k
var i = 0
while (i < features.length) {
var num = features(i).split(':')(1)
if (num == "NULL") {
v(i + 1) = 0.0
} else {
v(i + 1) = num.toDouble
}
i += 1
}
v
})
println(originalDataArray.count())
val features_data = originalDataArray.map(f => {
val target = f.toArray
var features = ArrayBuffer[Double]()
features ++= target
var out = ""
var index = 1
out = out + features(0) + ","
while (index < features.length) {
out = out + features(index) + " "
index += 1
}
out
//features.toArray
}).coalesce(1, true).saveAsTextFile(workFile + "data1" + types)
val parsedData = sc.textFile(workFile + "data1" + types + "/part-00000").map { line =>
val parts = line.split(',')
LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
} //.coalesce(1,true).saveAsTextFile("parsedData")
// println(parsedData.count())
return parsedData
}
}
package com
/**
* Created by wangsanpeng on 17/6/20.
*/
import java.io._
import scala.collection.mutable
import scala.collection.mutable.Map
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.{Level, Logger}
//import org.apache.spark.mllib.linalg.Vectors
//import org.apache.spark.mllib.regression.LabeledPoint
//import org.apache.spark.mllib.tree.GradientBoostedTrees
//import org.apache.spark.mllib.tree.configuration.BoostingStrategy
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable.ArrayBuffer
object process_data {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("TestSpark").setMaster("local")
var sc = new SparkContext(conf)
val workFile = "/Users/wangsanpeng/Desktop/wsp/code/yirendai/data/"
process_features(workFile + "april_all/",sc,"1","all_data_info.txt")
}
// 数据格式处理,离散化
def process_features(workFile:String, sc: SparkContext, types:String,files:String) = {
var readFile = workFile;
val index_work_file = readFile
readFile += files
println("readFile",types,readFile)
val datas = sc.textFile(readFile)
val header = datas.first()
val spilit_value = ","
var features_name = header.split(spilit_value)
// Array("server_time","source_ip","appversion","longitude","latitude","deviceid","sdkversion","localip","tdid","id","number","operator","version","manufacturer","model","release","mocklocationenabled","usbdebugenabled","cellinfo","mac_bluetooth","memory","availablememory","cpunum","cpustat","screenpixel","screenlight","rommemroy","sdcardmemory","speakervolume","batteryinfo","channelid","sessionid","appkey","dt","os","xfilename")
// 数字型的数据
val save_features_number = Map("usbdebugenabled" -> "1", "version" -> "1", "mocklocationenabled" -> "1","memory" -> "1","availablememory" -> "1","cpunum" -> "1","cpustat" -> "1","screenlight" -> "1",
"rommemroy" -> "1","sdcardmemory" -> "1","speakervolume" -> "1", "batteryinfo" -> "1","channelid" -> "1","bank" -> "1","hotel" -> "1","hotel_rate" -> "1","hotel_order" -> "1", "hotel_price" -> "1","meishi" -> "1","meishi_rate" -> "1","meishi_price" -> "1",
"waiguo_meishi" -> "1","waiguo_rate" -> "1","waiguo_price" -> "1","manufacturer" -> "1","model" -> "1","ip_not_null_num" -> "1","ip_null_num" -> "1","divice_cnt" -> "1") //,"cellinfo" -> "1"
val save_features_disperse = Map("test" -> "1")
// var feature_name_map = get_disperse_map(datas,save_features_disperse)
println("map_data",datas.count())
// val map_data1 = datas.map(x1=>(x1))
// map_data1.foreach(x => println(x))
val map_data = datas.filter(row=>row!=header).map(x1=>{
val features_length = features_name.length
val block = x1.replace("\n","").split(spilit_value,features_name.length)
var content_index = block(features_name.indexOf("id")) + "," + block(features_name.indexOf("apply_id")) + ","
val index_lable = if (block(features_name.indexOf("label")) == "0") "1" else "0"
content_index += index_lable + ","
for (a <- 0 to (features_length - 1)) {
val index_name = features_name(a)
var index_type = 0
var index_value = block(a)
if (save_features_number.contains(index_name)) {
if ((index_value == "true") || (index_value == "TRUE")) {
index_value = "0"
}
if ((index_value == "false") || (index_value == "FALSE")) {
index_value = "1"
}
if (index_value == "-") {
index_value = "-1"
}
if (index_value == "") {
index_value = "-2"
}
if (index_value == "appStore") {
index_value = "-3"
}
if (index_value == "t") {
index_value = "-4"
}
if ((index_value == "appStore-bailing") || (index_value == "appStoreHF")) {
index_value = "-5"
}
if (index_value.startsWith("PP")) {
index_value = "-6"
}
if ((index_value == "NULL") || (index_value == "null")) {
index_value = "-7"
}
content_index += index_value + ","
index_type = 1
}
if (save_features_disperse.contains(index_name)) {
// val index_value_map = feature_name_map(index_name)
// // println("map_index",index_value_map,block(a))
// index_value = index_value_map(block(a)).toString()
// content_index += index_value + ","
// index_type = 1
}
}
content_index += "\n"
content_index = content_index.replace(",\n","")
// println(content_index)
(block(features_name.indexOf("apply_date")).toString.split(" ")(0),content_index)
})
println(map_data.count())
val bad_label = get_bad_user(workFile + "black_label_remove_repeat.csv",sc)
val map_all = map_data.map(row=>{
val block = row._2.replace("\n","").split(",")
var result = "0"
if ((bad_label.contains(block(1)))){
println("apply",block(1))
println(row._1.toString)
if (row._1.toString.split("-")(1) >= "08") {
result = "0"
}
else {
result = "1"
}
} else {
if ((row._1.toString.split("-")(0) == "2017") && ((row._1.toString.split("-")(1) >= "04"))) {
result = "0"
}
else {
result = "1"
}
}
(result,row._2)
})
val map_test = map_all.filter(row=>(row._1=="0")).map(row=>row._2)
val map_train = map_all.filter(row=>(row._1=="1")).map(row=>row._2)
// val map_train = map_data.filter(row=>((row._1.toString.split("-")(0) == "2016") || (row._1.toString.split("-")(1) == "01"))).map(row=>row._2)
// || (row._1.toString.split("-")(1) == "04") || (row._1.toString.split("-")(1) == "03")
// val map_all = map_data.map(row=>row._2)
map_test.saveAsTextFile(workFile + "/test_info")
map_train.saveAsTextFile(workFile + "/train_info")
// writer_test.write(content_test)
// writer_test.close()
// writer.write(content)
// writer.close()
// writer_train_test.write(content+content_test)
// writer_train_test.close()
}
def get_disperse_map(datas:RDD[String],save_features_disperse:Map[String,String]):Map[String, scala.collection.Map[String,Long]] = {
val header = datas.first()
val header_name = header.split("\t")
println("first row",header_name.length,header)
var feature_name_map:Map[String, scala.collection.Map[String,Long]] = Map()
// var feature_name_map:Map[Char,Char] = Map()
var A:Map[Char,Int] = Map()
save_features_disperse.keys.foreach{ name =>
val process_index = header_name.indexOf(name)
// val process_index = header_name.find("operator")
// println("process_index",process_index)
// datas.map(p => new String(p.getBytes, 0, p.getBytes.length, "GBK")).
var res_map = datas.filter(row=>row!=header).map(
row=>{
val index_block = row.split("\t",header_name.length)
// println(row)
// println(index_block.length)
if (index_block.length != header_name.length) {
println("not equal:",row)
}
// println(index_block(0),process_index)
(index_block(process_index))
}
).distinct().zipWithIndex().collectAsMap()
// .distinct().zipWithIndex().collectAsMap()
// println(res_map.count(),res_map.distinct().count(),res_map.distinct().zipWithIndex())
// println("res_map",res_map)
feature_name_map += (name ->res_map)
}
feature_name_map.foreach{case (name,value) => println(name,value)}
return feature_name_map
}
def get_bad_user(file:String, sc: SparkContext):collection.Map[String, String] = {
val datas = sc.textFile(file)
var res:Map[String, String] = Map()
val map_data = datas.map(x1=>{
val block = x1.replace("\n","").split(",",5)
(block(3),block(2))
})
return map_data.collectAsMap()
}
def merg_data(workFile:String, sc: SparkContext, types:String,files:String,id_index:Int,files1:String,id_index1:Int) = {
var readFile = workFile;
if (types == "good") {
readFile += "sdk_log_good/"
}
else {
readFile += "sdk_log_bad/"
}
val index_work_file = readFile
readFile += files
println("readFile",types,readFile)
val datas = sc.textFile(readFile)
val map_data = datas.map(x1=>(x1.split(",,")(id_index),x1))
var readFile1 = workFile;
if (types == "good") {
readFile1 += "sdk_log_good/"
}
else {
readFile1 += "sdk_log_bad/"
}
val index_work_file1 = readFile1
readFile1 += files1
println("readFile1",types,readFile1)
val datas1 = sc.textFile(readFile1)
val map_data1 = datas1.map(x2=>(x2.split(",,")(id_index),x2));
println("map_data1.count:",map_data1.count())
val map_data2 = datas1.map(x=>(x.split(",,")(id_index),1));
val merg_result = map_data.join(map_data1).collect()
println("merg sdk info and input result:",types,merg_result.length)
val datas_m3 = sc.textFile("/Users/wangsanpeng/Desktop/wsp/code/yirendai/data/m3.csv")
val datas_m3_map = datas_m3.map(x=>(x.split(",")(0),x))
val merg_m3 = map_data1.join(datas_m3_map)
println("merg sdk info and m3 result:",merg_m3.count())
// merg_m3.foreach(x => println(x))
println(merg_m3.collect())
}
def max(a: (String,String),b: (String,String)):(String,String) ={
val sdf =new SimpleDateFormat("yyyy-mm-dd")
val num=sdf.parse(a._1).getTime-sdf.parse(b._1).getTime
if(num > 0)
a
else
b
}
def get_label(workFile:String, sc: SparkContext,files:String) = {
var readFile = workFile + files
val datas = sc.textFile(readFile)
val originalDataArray1 = datas.map(x=>(x.split("\001")(0),(x.split("\001")(0),x)))
}
def process_orig_data(workFile:String, sc: SparkContext,files:String,id_index:Int) = {
var readFile = workFile;
val index_work_file = workFile
readFile += files
val datas = sc.textFile(readFile)
val datalength = datas.count()
println ("dataleng : %s".format(datalength))
//test
var data_map: Map[String, Int] = Map()
val originalDataArray1 = datas.map(x=>(x.split("\001")(id_index),(x.split("\001")(0),x)))
println(originalDataArray1.count())
val originalDataArray = datas.map(x=>(x.split("\001")(id_index),(x.split("\001")(0),x))).reduceByKey((a,b)=>max(a,b)).collect();
// originalDataArray.foreach(println(_))
val countsMap = datas.map(x=>(x.split("\001")(id_index),1)).reduceByKey(_+_).collectAsMap();
println("originalDataArray.count()",originalDataArray.length)
val writer = new BufferedWriter(new FileWriter(index_work_file + files + "_remove_repeat.txt"))
println("countsMap.count()",countsMap.size)
var content = ""
originalDataArray.foreach{ case (id, (result_time,result_content)) =>
val number = countsMap(id).toString()
content = result_content.toString().replace("\001",",,").replace("\n","") + ",," + number + "\n"
// println(result._2.replace("\n","") + "\001" + number + "\n")
writer.write(content)
}
// println("originalDataArray.count()",originalDataArray.count())
println("countsMap.count()",countsMap.size)
println("all",content)
writer.close()
}
}
package com
import java.io._
import javax.validation.constraints.Max
import org.apache.spark.sql.hive.HiveContext
import com.process_data.get_disperse_map
import scala.collection.mutable
import scala.collection.mutable.Map
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.{Level, Logger}
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable.ArrayBuffer
/**
* Created by wangsanpeng on 17/8/7.
*/
object rank {
val conf = new SparkConf().setAppName("TestSpark").setMaster("local")
var sc = new SparkContext(conf)
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("TestSpark").setMaster("local")
var sc = new SparkContext(conf)
val hiveCtx = new HiveContext(sc)
for ( i <- 0 to (10 - 1)) {
println(i)
// process_part(i.toString)
println(i)
}
}
def process_part(index_part:String) = {
val hiveCtx = new HiveContext(sc)
val aa = hiveCtx.sql(s"select a.passport_id,b.tdid,b.source_ip,b.longitude,b.latitude,b.version,b.manufacturer,b.model,b.mocklocationenabled,b.usbdebugenabled,b.memory,b.availablememory,b.cpunum,b.cpustat,b.rommemroy,b.sdcardmemory,b.channelid,b.dt,a.label,a.system_phone,a.platform_name,to_date(a.apply_time) as apply_date from test.gt_mob4_label a join ml.sdk_info_log_nosensitive b on a.td_id = b.tdid where b.dt <= to_date(a.apply_time) and b.manufacturer not like '%Apple%' and b.id == '' and datediff(to_date(a.apply_time),b.dt) <= 5 and ascii(a.passport_id) % 20 = " + index_part)
val datas = aa.rdd
val header = "id,tdid,source_ip,longitude,latitude,version,manufacturer,model,mocklocationenabled,usbdebugenabled,memory,availablememory,cpunum,cpustat,rommemroy,sdcardmemory,channelid,dt,label,system_phone,platform_name,apply_date"
var features_name = header.split(",")
val map_data = datas.map(x1=>{
(x1(0),x1)
})
val sdf =new SimpleDateFormat("yyyy-mm-dd")
val result = map_data.reduceByKey((a,b) => {
val data_dis_a = sdf.parse(a(21).toString()).getTime - sdf.parse(a(17).toString()).getTime
val data_dis_b = sdf.parse(b(21).toString()).getTime - sdf.parse(b(17).toString()).getTime
if(data_dis_a < data_dis_b)
a
else
b
})
val result_write = result.map(a => a._2)
result_write.map(f=>println(f))
result_write.saveAsTextFile("./ml/android_tdid_res_" + index_part)
}
def process_part_tmp(index_part:String) = {
val hiveCtx = new HiveContext(sc)
// id
val aa = hiveCtx.sql(s"select a.passport_id,a.apply_id,b.source_ip,b.longitude,b.latitude,b.version,b.manufacturer,b.model,b.mocklocationenabled,b.usbdebugenabled,b.memory,b.availablememory,b.cpunum,b.cpustat,b.rommemroy,b.sdcardmemory,b.channelid,b.dt,a.label,a.plat,substr(apply_time,1,10) as apply_date,b.deviceid from (select passport_id,apply_id,apply_time,label,'NULL' as plat from test.yrd_label_1101 where substr(apply_time,1,10) <= '2017-04-30') a join (select id,tdid,source_ip,longitude,latitude,version,manufacturer,model,mocklocationenabled,usbdebugenabled,memory,availablememory,cpunum,cpustat,rommemroy,sdcardmemory,channelid,dt,deviceid from ml.sdk_info_log_nosensitive where (manufacturer not like '%Apple%' and manufacturer not like '%apple%') and dt <= '2017-04-30') b on a.passport_id = b.id ")
// val aa = hiveCtx.sql(s"select a.passport_id,a.apply_id,b.source_ip,b.longitude,b.latitude,b.version,b.manufacturer,b.model,b.mocklocationenabled,b.usbdebugenabled,b.memory,b.availablememory,b.cpunum,b.cpustat,b.rommemroy,b.sdcardmemory,b.channelid,b.dt,a.label,a.apply_platform,substr(apply_time,1,10) as apply_date,b.deviceid from (select passport_id,apply_id,tdid,apply_time,apply_platform,label from test.yrd_anti_main_table_tmp where substr(apply_time,1,10) <= '2017-03-01') a join (select distinct test_a.passport_id,test_b.deviceid from (select passport_id,apply_id,tdid from test.yrd_anti_main_table_tmp where substr(apply_time,1,10) <= '2017-03-01') test_a join (select tdid,deviceid from ml.sdk_input_log_nosensitive) test_b on test_a.tdid = test_b.tdid) ab on a.passport_id = ab.passport_id join (select id,tdid,source_ip,longitude,latitude,version,manufacturer,model,mocklocationenabled,usbdebugenabled,memory,availablememory,cpunum,cpustat,rommemroy,sdcardmemory,channelid,dt,deviceid from ml.sdk_info_log_nosensitive where (manufacturer not like '%Apple%' and manufacturer not like '%apple%') and id = '' and dt <= '2017-03-01') b on ab.deviceid = b.deviceid")
// bad user
// val aa = hiveCtx.sql(s"select a.passport_id,a.apply_id,b.source_ip,b.longitude,b.latitude,b.version,b.manufacturer,b.model,b.mocklocationenabled,b.usbdebugenabled,b.memory,b.availablememory,b.cpunum,b.cpustat,b.rommemroy,b.sdcardmemory,b.channelid,b.dt,a.label,a.apply_platform,substr(apply_time,1,10) as apply_date,b.deviceid from (select apply_passport_id as passport_id,apply_id,apply_time,apply_platform,'1' as label from test.bad_user) a join (select distinct test_a.passport_id,test_b.deviceid from (select apply_passport_id as passport_id,apply_id,yrd_tdid from test.bad_user) test_a join (select tdid,deviceid from ml.sdk_input_log_nosensitive) test_b on test_a.yrd_tdid = test_b.tdid) ab on a.passport_id = ab.passport_id join (select id,tdid,source_ip,longitude,latitude,version,manufacturer,model,mocklocationenabled,usbdebugenabled,memory,availablememory,cpunum,cpustat,rommemroy,sdcardmemory,channelid,dt,deviceid from ml.sdk_info_log_nosensitive where (manufacturer not like '%Apple%' and manufacturer not like '%apple%') and id = '') b on ab.deviceid = b.deviceid")
val datas = aa.rdd
val header = "id,tdid,source_ip,longitude,latitude,version,manufacturer,model,mocklocationenabled,usbdebugenabled,memory,availablememory,cpunum,cpustat,rommemroy,sdcardmemory,channelid,dt,label,system_phone,platform_name,apply_date"
var features_name = header.split(",")
val sdf =new SimpleDateFormat("yyyy-mm-dd")
val map_data = datas.map(x1=>{
var index_imei = x1(21).toString().split("\\|")(0)
if (index_imei == "")
index_imei = "null"
if((x1(3).toString() == "") || (x1(3).toString() == "0"))
(x1(0),(x1,(0,1,1,index_imei)))
else
(x1(0),(x1,(1,0,1,index_imei)))
})
val result = map_data.reduceByKey((a,b) => {
var data_dis_a = sdf.parse(a._1(20).toString()).getTime - sdf.parse(a._1(17).toString()).getTime
var data_dis_b = sdf.parse(a._1(20).toString()).getTime - sdf.parse(a._1(17).toString()).getTime
if (data_dis_a < 0){
data_dis_a = data_dis_a * -10
}
if (data_dis_b < 0){
data_dis_b = data_dis_b * -10
}
var a_devi = a._2._3
var b_devi = b._2._3
var total_devi = if (a_devi >= b_devi) a_devi else b_devi
var big_str = if (a_devi >= b_devi) a._2._4 else b._2._4
var small_str = if (a_devi < b_devi) a._2._4 else b._2._4
var total_str = big_str
var data_block = small_str.split(";")
var not_same_num = 0
var add_char = ""
data_block.foreach(a => {
if (big_str.indexOf(a) == -1){
not_same_num += 1
add_char = add_char + ";" + a
}
})
if (not_same_num != 0) {
total_devi += not_same_num
total_str = big_str + add_char
}
if(data_dis_a < data_dis_b)
(a._1,(a._2._1 + b._2._1,a._2._2 + b._2._2,total_devi,total_str))
else
(b._1,(a._2._1 + b._2._1,a._2._2 + b._2._2,total_devi,total_str))
})
val result_write = result.map(a => (a._2._1,a._2._2._1,a._2._2._2,a._2._2._3))
// val result_write = result.map(a => a._2)
result_write.saveAsTextFile("./ml/android_tdid_res_" + index_part)
}
def process_part_ios(index_part:String) = {
val hiveCtx = new HiveContext(sc)
// id
val aa = hiveCtx.sql(s"select a.passport_id,a.apply_id,b.source_ip,b.longitude,b.latitude,b.version,b.manufacturer,b.model,b.mocklocationenabled,b.usbdebugenabled,b.memory,b.availablememory,b.cpunum,b.cpustat,b.rommemroy,b.sdcardmemory,b.channelid,b.dt,a.label,a.plat,substr(apply_time,1,10) as apply_date,b.deviceid from (select passport_id,apply_id,apply_time,label,'NULL' as plat from test.yrd_label_1010 where substr(apply_time,1,10) <= '2017-04-10') a join (select id,tdid,source_ip,longitude,latitude,version,manufacturer,model,mocklocationenabled,usbdebugenabled,memory,availablememory,cpunum,cpustat,rommemroy,sdcardmemory,channelid,dt,deviceid from ml.sdk_info_log_nosensitive where (manufacturer not like '%Apple%' and manufacturer not like '%apple%') and dt <= '2017-04-10') b on a.passport_id = b.id") // deviceid
// val aa = hiveCtx.sql(s"select a.passport_id,a.apply_id,b.source_ip,b.longitude,b.latitude,b.version,b.manufacturer,b.model,b.mocklocationenabled,b.usbdebugenabled,b.memory,b.availablememory,b.cpunum,b.cpustat,b.rommemroy,b.sdcardmemory,b.channelid,b.dt,a.label,a.apply_platform,substr(apply_time,1,10) as apply_date,b.deviceid from (select passport_id,apply_id,tdid,apply_time,apply_platform,label from test.yrd_anti_main_table_tmp where substr(apply_time,1,10) <= '2017-03-01') a join (select distinct test_a.passport_id,test_b.deviceid from (select passport_id,apply_id,tdid from test.yrd_anti_main_table_tmp where substr(apply_time,1,10) <= '2017-03-01') test_a join (select tdid,deviceid from ml.sdk_input_log_nosensitive) test_b on test_a.tdid = test_b.tdid) ab on a.passport_id = ab.passport_id join (select id,tdid,source_ip,longitude,latitude,version,manufacturer,model,mocklocationenabled,usbdebugenabled,memory,availablememory,cpunum,cpustat,rommemroy,sdcardmemory,channelid,dt,deviceid from ml.sdk_info_log_nosensitive where (manufacturer not like '%Apple%' and manufacturer not like '%apple%') and id = '' and dt <= '2017-03-01') b on ab.deviceid = b.deviceid")
val datas = aa.rdd
val header = "id,tdid,source_ip,longitude,latitude,version,manufacturer,model,mocklocationenabled,usbdebugenabled,memory,availablememory,cpunum,cpustat,rommemroy,sdcardmemory,channelid,dt,label,system_phone,platform_name,apply_date"
var features_name = header.split(",")
val sdf =new SimpleDateFormat("yyyy-mm-dd")
val map_data = datas.map(x1=>{
var index_imei = x1(21).toString().split("\\|")(0)
if (index_imei == "")
index_imei = "null"
if((x1(3).toString() == "") || (x1(3).toString() == "0"))
(x1(0),(x1,(0,1,1,index_imei)))
else
(x1(0),(x1,(1,0,1,index_imei)))
})
val result = map_data.reduceByKey((a,b) => {
var data_dis_a = sdf.parse(a._1(20).toString()).getTime - sdf.parse(a._1(17).toString()).getTime
var data_dis_b = sdf.parse(a._1(20).toString()).getTime - sdf.parse(a._1(17).toString()).getTime
if (data_dis_a < 0){
data_dis_a = data_dis_a * -10
}
if (data_dis_b < 0){
data_dis_b = data_dis_b * -10
}
var a_devi = a._2._3
var b_devi = b._2._3
var total_devi = if (a_devi >= b_devi) a_devi else b_devi
var big_str = if (a_devi >= b_devi) a._2._4 else b._2._4
var small_str = if (a_devi < b_devi) a._2._4 else b._2._4
var total_str = big_str
var data_block = small_str.split(";")
var not_same_num = 0
var add_char = ""
data_block.foreach(a => {
if (big_str.indexOf(a) == -1){
not_same_num += 1
add_char = add_char + ";" + a
}
})
if (not_same_num != 0) {
total_devi += not_same_num
total_str = big_str + add_char
}
if(data_dis_a < data_dis_b)
(a._1,(a._2._1 + b._2._1,a._2._2 + b._2._2,total_devi,total_str))
else
(b._1,(a._2._1 + b._2._1,a._2._2 + b._2._2,total_devi,total_str))
})
val result_write = result.map(a => (a._2._1,a._2._2._1,a._2._2._2,a._2._2._3))
// val result_write = result.map(a => a._2)
result_write.saveAsTextFile("./ml/ios_tdid_res_" + index_part)
}
}
因篇幅问题不能全部显示,请点此查看更多更全内容