重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
本文中详解介绍了 pandas 中 transform() 方法的使用
崇信网站建设公司创新互联公司,崇信网站设计制作,有大型网站制作公司丰富经验。已为崇信上1000+提供企业网站建设服务。企业网站搭建\成都外贸网站建设公司要多少钱,请找那个售后服务好的崇信做网站的公司定做!
Accepted combinations are:
{0 or ‘index’, 1 or ‘columns’}, default 0 If 0 or ‘index’: apply function to each column. If 1 or ‘columns’: apply function to each row.
Positional arguments to pass to func.
Keyword arguments to pass to func.
A DataFrame that must have the same length as self.
If the returned DataFrame has a different length than self.
transform方法通常是和groupby方法一起连用的
每个位置被均值取代
内建的聚合函数直接传递别名,max\min\sum\mean
向tranform中直接传递函数
在这个网站上有一个完整的实例,解释了transform方法的使用
You can see in the data that the file contains 3 different orders (10001, 10005 and 10006) and that each order consists has multiple products (aka skus).
The question we would like to answer is: “What percentage of the order total does each sku represent?”
For example, if we look at order 10001 with a total of $576.12, the break down would be:
求出不同商品在所在订单的价钱占比
先求出一列占比的值,再和原始数据进行合并merge
Transform + groupby连用:先分组再求和
1.RDD是PariRDD类型
def add1(line):
return line[0] + line[1]
def add2(x1,x2):
return x1 + x2
sc = SparkContext(appName="gridAnalyse")
rdd = sc.parallelize([1,2,3])
list1 = rdd.map(lambda line: (line,1)).map(lambda (x1,x2) : x1 + x2).collect() #只有一个参数,通过匹配来直接获取(赋值给里面对应位置的变量)
list1 = rdd.map(lambda line: (line,1)).map(lambda x1,x2 : x1 + x2).collect() #错误,相当于函数有两个参数
list2 = rdd.map(lambda line: (line,1)).map(lambda line : line[0] + line[1]).collect() #只有一个参数,参数是Tuple或List数据类型,再从集合的对应位置取出数据
list3 = rdd.map(lambda line: (line,1)).map(add1).collect() #传递函数,将Tuple或List类型数据传给形参
list4 = rdd.map(lambda line: (line,1)).map(add2).collect() #错误,因为输入只有一个,却有两个形参
当RDD是PairRDD时,map中可以写lambda表达式和传入一个函数。
a、写lambda表达式:
可以通过(x1,x2,x3)来匹配获取值;或者使用line获取集合,然后从集合中获取。
b、传入函数
根据spark具体的transaction OR action 操作来确定自定义函数参数的个数,此例子中只有一个参数,从形参(集合类型)中获取相应位置的数据。
很多朋友都想知道java怎么调python?下面就一起来了解一下吧~
java调python主要有两种方法:1.使用Runtime.getRuntime()执行脚本文件;2. 将python脚本写成进程为java提供服务,下面是具体的方法介绍:
第一种:使用Runtime.getRuntime()执行脚本文件
先建立python脚本文件 demo.py
import numpy as np a = np.arange(12).reshape(3,4)print(a)
java调用python程序并输出该结果
import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;public class Demo { public static void main(String[] args) { // TODO Auto-generated method stub Process proc; try { proc = Runtime.getRuntime().exec("python D:\\demo.py");// 执行py文件 //用输入输出流来截取结果 BufferedReader in = new BufferedReader(new InputStreamReader(proc.getInputStream())); String line = null; while ((line = in.readLine()) != null) { System.out.println(line); } in.close(); proc.waitFor(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }}
如若向python程序中函数传递参数并执行出结果,下面就举一例来说明一下。
同样建立python脚本文件demo2.py
import sys def func(a,b): return (a+b)if __name__ == '__main__': a = [] for i in range(1, len(sys.argv)): a.append((int(sys.argv[i]))) print(func(a[0],a[1]))
其中sys.argv用于获取参数url1,url2等。而sys.argv[0]代表python程序名,所以列表从1开始读取参数。
以上代码实现一个两个数做加法的程序,下面看看在java中怎么传递函数参数,代码如下:
int a = 18;int b = 23;try { String[] args = new String[] { "python", "D:\\demo2.py", String.valueOf(a), String.valueOf(b) }; Process proc = Runtime.getRuntime().exec(args);// 执行py文件 BufferedReader in = new BufferedReader(new InputStreamReader(proc.getInputStream())); String line = null; while ((line = in.readLine()) != null) { System.out.println(line); } in.close(); proc.waitFor();} catch (IOException e) { e.printStackTrace();} catch (InterruptedException e) { e.printStackTrace();}
其中args是String[] { “python”,path,url1,url2 }; ,path是python程序所在的路径,url1是参数1,url2是参数2,以此类推。
2. 将python脚本写成进程为java提供服务
python脚本文件如下:
import socketimport sysimport threadingimport numpy as npfrom PIL import Imagedef main(): # 创建服务器套接字 serversocket = socket.socket(socket.AF_INET,socket.SOCK_STREAM) # 获取本地主机名称 host = socket.gethostname() # 设置一个端口 port = 12345 # 将套接字与本地主机和端口绑定 serversocket.bind((host,port)) # 设置监听最大连接数 serversocket.listen(5) # 获取本地服务器的连接信息 myaddr = serversocket.getsockname() print("服务器地址:%s"%str(myaddr)) # 循环等待接受客户端信息 while True: # 获取一个客户端连接 clientsocket,addr = serversocket.accept() print("连接地址:%s" % str(addr)) try: t = ServerThreading(clientsocket)#为每一个请求开启一个处理线程 t.start() pass except Exception as identifier: print(identifier) pass pass serversocket.close() passclass ServerThreading(threading.Thread): # words = text2vec.load_lexicon() def __init__(self,clientsocket,recvsize=1024*1024,encoding="utf-8"): threading.Thread.__init__(self) self._socket = clientsocket self._recvsize = recvsize self._encoding = encoding pass def run(self): print("开启线程.....") try: #接受数据 msg = '' while True: # 读取recvsize个字节 rec = self._socket.recv(self._recvsize) # 解码 msg += rec.decode(self._encoding) # 文本接受是否完毕,因为python socket不能自己判断接收数据是否完毕, # 所以需要自定义协议标志数据接受完毕 if msg.strip().endswith('over'): msg=msg[:-4] break sendmsg = Image.open(msg) # 发送数据 self._socket.send(("%s"%sendmsg).encode(self._encoding)) pass except Exception as identifier: self._socket.send("500".encode(self._encoding)) print(identifier) pass finally: self._socket.close() print("任务结束.....") pass def __del__(self): passif __name__ == "__main__": main()
在java代码中访问python进程的代码: package hello;import java.lang.System;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.net.InetAddress;import java.net.Socket;import java.io.OutputStream;import java.io.PrintStream;import java.io.InputStream;public class hello { public static void main(String[] args){ //System.out.println("Hello World!"); // TODO Auto-generated method stub try { InetAddress addr = InetAddress.getLocalHost(); String host=addr.getHostName(); //String ip=addr.getHostAddress().toString(); //获取本机ip //log.info("调用远程接口:host="+ip+",port="+12345); // 初始化套接字,设置访问服务的主机和进程端口号,HOST是访问python进程的主机名称,可以是IP地址或者域名,PORT是python进程绑定的端口号 Socket socket = new Socket(host,12345); // 获取输出流对象 OutputStream os = socket.getOutputStream(); PrintStream out = new PrintStream(os); // 发送内容 out.print( "F:\\xxx\\0000.jpg"); // 告诉服务进程,内容发送完毕,可以开始处理 out.print("over"); // 获取服务进程的输入流 InputStream is = socket.getInputStream(); BufferedReader br = new BufferedReader(new InputStreamReader(is,"utf-8")); String tmp = null; StringBuilder sb = new StringBuilder(); // 读取内容 while((tmp=br.readLine())!=null) sb.append(tmp).append('\n'); System.out.print(sb); // 解析结果 //JSONArray res = JSON.parseArray(sb.toString()); } catch (IOException e) { e.printStackTrace(); }finally { try {if(socket!=null) socket.close();} catch (IOException e) {} System.out.print("远程接口调用结束."); } }}