JAVA实现同步oracle数据到es
pom.xml
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>6.4.3</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>6.4.3</version> </dependency>
工具类
import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import org.apache.http.HttpHost; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; public class RestHighLevelClientUtils { private static String ES_IP = "192.168.117.129"; private static int ES_PORT = 9200; private static RestHighLevelClient client; public void init() { client = new RestHighLevelClient(RestClient.builder(new HttpHost(ES_IP, ES_PORT, "http"))); } /* * 添加数据 */ public void create(String index, String type, HashMap dataMap) { try { IndexRequest indexRequest = new IndexRequest(index, type).source(dataMap); IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } } /* * 更新数据 */ public void update(String index, String type, String id, HashMap dataMap) { try { IndexRequest indexRequest = new IndexRequest(index, type).source(dataMap); UpdateRequest request = new UpdateRequest(index, type, id).doc(indexRequest); UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT); } catch (Exception e) { e.printStackTrace(); } } /* * 删除 */ public void delete(String index, String type, String id) { try { DeleteRequest request = new DeleteRequest(index, type, id); DeleteResponse delateResponse = client.delete(request, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } } /* * 删除 */ public void deleteIndex(String index) { try { DeleteIndexRequest request = new DeleteIndexRequest(index); AcknowledgedResponse delateResponse = client.indices().delete(request, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } } /* * 批量新增 */ public void bulk(String index, String type, ArrayList<Map> documents) throws IOException { BulkRequest bulkRequest = new BulkRequest(); for(Map map : documents) { bulkRequest.add(new IndexRequest(index, type).source(map)); } BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT); } } /* * 读数据文件,解析成json,组装成list,批量写入es */ public static ArrayList<Map> read(File file) throws Exception { ArrayList<Map> list = new ArrayList<Map>(); FileInputStream fis = new FileInputStream(file); BufferedReader br = new BufferedReader(new InputStreamReader(fis)); StringBuffer sb = new StringBuffer(); String line = ""; int i = 0; while((line = br.readLine()) != null) { //System.out.println(i++); if(!"".equals(line)) { try { JSONObject jasonObject = JSONObject.parseObject(line); Map map = (Map) jasonObject; list.add(map); } catch (Exception e) { System.out.println(e.getMessage()); } } } return list; }
测试类
import java.io.File; import java.util.ArrayList; import java.util.Map; public class ESTestMain { public static void main(String[] args) throws Exception { long t1 = System.currentTimeMillis(); RestHighLevelClientUtils util = new RestHighLevelClientUtils(); util.init(); ArrayList<Map> documents = new ArrayList<Map>(); int uid = 20000; int name = 1; while(true) { File file = new File("D:\\java_test\\es-test\\"+name+".txt"); if(file.exists()) { ArrayList<Map> list = ReadFileUtils.read(file); for(int i = 0; i < list.size(); i ++) { documents.add(list.get(i)); } util.bulk("dmfun", "user", documents); long t2 = System.currentTimeMillis(); System.out.println("import " +file.getName()+ " sucess,"+list.size()+" total,cost" + ((t2-t1)/1000)+"s"); documents = new ArrayList<Map>(); name ++; }else { break; } } } }
txt内容格式
{"uid":"1","name":"赵钱孙里","address":"瞬板南路111号","age":20,"sex":1,"createdt":"2021-01-01"} {"uid":"1","name":"赵钱孙里","address":"瞬板南路111号","age":20,"sex":1,"createdt":"2021-01-01"} {"uid":"1","name":"赵钱孙里","address":"瞬板南路111号","age":20,"sex":1,"createdt":"2021-01-01"}
由数据调度工具,将数据库得数据卸数成txt文件,java项目读取txt文件,解析成一个json得list,调用bulk批量写入api,将数据同步到es