Kettle Rest大文件上传(RestUploadFile.ktr) Rest文件下载(FileDownload.ktr)

1. Rest大文件上传(RestUploadFile.ktr)

需求描述

  1. 上传文件大于10M小于500M
  2. 上传文件进行分片(5M一片要比1M分片整体时间快)
  3. 先使用java类进行功能模拟在迁移Ktr
  4. 使用Kettle+Java片段代码开发
  5. 启动步骤时可以自定义必须参数
  6. 增加UserId(如:testXiaoYu目录)
  7. 上传地址:http://**:8089/api/dlapiservice/v1/file/userdata
  8. 下载地址:http://**:8089/api/dlapiservice/v1/file/userdata/
  9. HDFS地址:http://**:50070/explorer.html#/testXiaoYu(需要查看需要92服务器远程到153服务器查看)

截图步骤说明

  指定大文件上传

          

       《Kettle Rest大文件上传(RestUploadFile.ktr) Rest文件下载(FileDownload.ktr)》

 

 

  片段代码

           《Kettle Rest大文件上传(RestUploadFile.ktr) Rest文件下载(FileDownload.ktr)》

  运行成功结果

           《Kettle Rest大文件上传(RestUploadFile.ktr) Rest文件下载(FileDownload.ktr)》

 

 

上传片段代码

  1 import java.io.*;
  2 import org.apache.http.HttpResponse;
  3 import org.apache.http.client.HttpClient;
  4 import org.apache.http.client.methods.HttpPost;
  5 import org.apache.http.entity.mime.HttpMultipartMode;
  6 import org.apache.http.entity.mime.MultipartEntityBuilder;
  7 import org.apache.http.impl.client.HttpClients;
  8 
  9 
 10 public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
 11 
 12 Object[] r = getRow();
 13  
 14     if (r == null) {
 15         setOutputDone();
 16         return false;
 17     }
 18 
 19 
 20  r = createOutputRow(r, data.outputRowMeta.size());
 21 
 22  String urlString = get(Fields.In, "url").getString(r);
 23  String filename = get(Fields.In, "filename").getString(r);
 24  String filepath = get(Fields.In, "filepath").getString(r);
 25  // String action = get(Fields.In, "action").getString(r);
 26  String userId = get(Fields.In, "userid").getString(r);
 27 
 28         int partsize = 1024 * 1024 * 5; 
 29         File file = new File(filename);
 30           HttpClient httpClient = HttpClients.createDefault();
 31          HttpPost post = new HttpPost(urlString);   
 32          MultipartEntityBuilder entity = MultipartEntityBuilder.create();
 33         entity.setMode(HttpMultipartMode.BROWSER_COMPATIBLE);
 34         entity.addTextBody("filepath", filepath);
 35         entity.addTextBody("userId", userId);
 36 
 37       
 38  try {
 39             //Make HTTP Call
 40             if (file == null || file.getAbsolutePath() == null) {
 41                 get(Fields.Out, "http_status").setValue(r,"失败");
 42             } else {
 43               
 44                 long filelenght = file.length();
 45                 if (filelenght <= partsize) {
 46                     entity.addBinaryBody("file", file);
 47                     entity.addTextBody("action", "create");
 48                     post = new HttpPost(urlString);
 49                     post.setEntity(entity.build());
 50                     HttpResponse response = httpClient.execute(post); 
 51                      //设置返回值
 52                     String httpStatusCode = String.valueOf(response.getStatusLine().getStatusCode());
 53                     // System.out.println("****上传完成*************:" + httpStatusCode + "------result:" + result);
 54                     if(httpStatusCode.equals("200")||httpStatusCode.equals("201"))
 55                     {
 56                         get(Fields.Out, "http_status").setValue(r,"成功");
 57                     }
 58                     else
 59                     {
 60                         get(Fields.Out, "http_status").setValue(r,"失败");
 61                     }
 62                 }else {
 63                     int endPosition = 0;//子文件结束位置
 64                     int count = (filelenght % partsize != 0) ? (int) (filelenght / partsize + 1) : (int) (filelenght / partsize);
 65                     
 66                      try {
 67                           FileInputStream fileInputStream = new FileInputStream(file);
 68                          int byteslength = 0;
 69                         byte[] tempbytes = new byte[partsize];
 70                         byte[] array = null;
 71                         int i = 1;
 72                                         
 73                          while ((byteslength = fileInputStream.read(tempbytes)) != -1) {
 74                             endPosition += partsize;
 75                             endPosition = (endPosition > filelenght) ? (int) filelenght : endPosition;
 76                             array = new byte[byteslength];
 77                              //System.arraycopy(tempbytes, 0, array, 0, byteslength);
 78                              entity = MultipartEntityBuilder.create();
 79                             if (endPosition == partsize) {
 80                                 entity.addTextBody("action", "create");
 81                             } else {
 82                                 entity.addTextBody("action", "append");
 83                             }
 84                             entity.setMode(HttpMultipartMode.BROWSER_COMPATIBLE);
 85                             entity.addTextBody("filepath", filepath);
 86                             entity.addTextBody("userId", userId);
 87                             File tempfile = new File(String.valueOf(0));
 88                              FileOutputStream temfileStream = new FileOutputStream(tempfile);
 89                             temfileStream.write(array);
 90                             entity.addBinaryBody("file", tempfile);
 91                             post = new HttpPost(urlString);
 92                             post.setEntity(entity.build());
 93                              temfileStream.close();
 94                              httpClient = HttpClients.createDefault();
 95                             HttpResponse response = httpClient.execute(post);
 96                             //设置返回值
 97                             String httpStatusCode = String.valueOf(response.getStatusLine().getStatusCode());
 98                             get(Fields.Out, "http_statuscode").setValue(r, httpStatusCode);
 99                              if (httpStatusCode.equals("200") || httpStatusCode.equals("201")) {
100                                 get(Fields.Out, "http_status").setValue(r,"成功");
101                             } else {
102                                 get(Fields.Out, "http_status").setValue(r,"失败");
103                                 break;
104                             }
105                              i++;
106                         }
107                                     
108                     }catch (Exception e) {
109                          get(Fields.Out, "http_statuscode").setValue(r, -1);
110                         get(Fields.Out, "http_status").setValue(r, "失败:"+e.getMessage());
111                     }
112                         
113                   
114                 }
115             }
116           
117         } catch (Exception e) {
118             //System.out.println("==================" + e.getMessage());
119             // Set value of HTTP Status to -1 since HTTP Post caused exception
120             get(Fields.Out, "http_statuscode").setValue(r, -1);
121             get(Fields.Out, "http_status").setValue(r, "失败:"+e.getMessage());
122         } finally {
123 
124         }
125 
126 
127  
128 
129 
130 
131  
132     
133             // get(Fields.Out, "http_statuscode").setValue(r, -1);
134             // get(Fields.Out, "http_status").setValue(r, "失败");
135 
136 
137 
138      // Send the row on to the next step.
139     putRow(data.outputRowMeta, r);
140       return true;
141 }

 

2. Rest文件下载(FileDownload.ktr)

需求描述

  1. 下载上传的文件
  2. (5M一片要比1M分片整体时间快)
  3. 下载地址:http://**:8089/api/dlapiservice/v1/file/userdata/
  4. HDFS地址:http:// **:50070/explorer.html#/testXiaoYu(需要查看需要92服务器远程到153服务器查看)

 

《Kettle Rest大文件上传(RestUploadFile.ktr) Rest文件下载(FileDownload.ktr)》

下载片段代码

 1 import org.apache.http.*;
 2 import org.apache.http.client.ClientProtocolException;
 3 import org.apache.http.client.HttpClient;
 4 import org.apache.http.client.methods.HttpGet;
 5 import org.apache.http.client.utils.URIBuilder;
 6 import org.apache.http.impl.client.HttpClients;
 7 
 8 import java.io.File;
 9 import java.io.FileOutputStream;
10 import java.io.IOException;
11 import java.io.InputStream;
12 import java.net.URISyntaxException;
13 
14 
15 
16 public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
17 
18 Object[] r = getRow();
19  
20     if (r == null) {
21         setOutputDone();
22         return false;
23     }
24 
25  r = createOutputRow(r, data.outputRowMeta.size());
26 
27  String urlString = get(Fields.In, "url").getString(r);
28  String filepath = get(Fields.In, "filepath").getString(r);
29  String folder = get(Fields.In, "folder").getString(r);
30  String filename ="";
31  String userId = get(Fields.In, "userid").getString(r);
32  HttpClient httpClient = HttpClients.createDefault();
33 
34         try {
35             URIBuilder builder = new URIBuilder(urlString+"/" + userId);
36             builder.addParameter("filepath", filepath);
37             HttpGet httpGet = new HttpGet(builder.build());
38             HttpResponse response = httpClient.execute(httpGet);
39             HttpEntity entity = response.getEntity();
40             InputStream is = entity.getContent();
41             filename = getFileName(response);
42             //System.out.println("-----filename--------:" + filename);
43             File file = new File(folder + filename);
44             file.getParentFile().mkdirs();
45             FileOutputStream fileout = new FileOutputStream(file);
46             byte[] buffer = new byte[1024 * 1024];
47             int ch = 0;
48             while ((ch = is.read(buffer)) != -1) {
49                 fileout.write(buffer, 0, ch);
50             }
51             is.close();
52             fileout.flush();
53             fileout.close();
54             get(Fields.Out, "filename").setValue(r,filename);
55             get(Fields.Out, "http_status").setValue(r, "成功");
56         } catch (URISyntaxException e) {
57             //e.printStackTrace();
58             get(Fields.Out, "exception").setValue(r, "失败:"+e.getMessage());
59 
60         } catch (ClientProtocolException e) {
61             //e.printStackTrace();
62             get(Fields.Out, "exception").setValue(r, "失败:"+e.getMessage());
63 
64         } catch (IOException e) {
65             //e.printStackTrace();
66             get(Fields.Out, "exception").setValue(r, "失败:"+e.getMessage());
67 
68         }
69       
70 
71      // Send the row on to the next step.
72     putRow(data.outputRowMeta, r);
73       return true;
74 }
75 
76 public static String getFileName(HttpResponse response) {
77         Header contentHeader = response.getFirstHeader("Content-Disposition");
78         String filename = null;
79         if (contentHeader != null) {
80             HeaderElement[] values = contentHeader.getElements();
81             if (values.length == 1) {
82                 NameValuePair param = values[0].getParameterByName("filename");
83                 if (param != null) {
84                     try {
85                         filename = param.getValue();
86                     } catch (Exception e) {
87                         e.printStackTrace();
88                     }
89                 }
90             }
91         }
92         return filename;
93     }

 

  

  1. KettleDemo整体原型前提条件,Java片段代码需要引用Jar包下载路径: http://hc.apache.org/downloads.cgi

《Kettle Rest大文件上传(RestUploadFile.ktr) Rest文件下载(FileDownload.ktr)》

  1. Jar包拷贝

《Kettle Rest大文件上传(RestUploadFile.ktr) Rest文件下载(FileDownload.ktr)》

 

点赞