JAVA实现同步oracle数据到es

pom.xml

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>6.4.3</version>  
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>6.4.3</version>
</dependency>



工具类

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.http.HttpHost;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
public class RestHighLevelClientUtils {
private static String ES_IP = "192.168.117.129";
private static int ES_PORT = 9200;
private static RestHighLevelClient client;
public void init() {
    client = new RestHighLevelClient(RestClient.builder(new HttpHost(ES_IP, ES_PORT, "http")));
}
/*
 * 添加数据
 */
public void create(String index, String type, HashMap dataMap) {
    try {
        IndexRequest indexRequest = new IndexRequest(index, type).source(dataMap);
        IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
    } catch (IOException e) {
        e.printStackTrace();
    }
}
/*
 * 更新数据
 */
public void update(String index, String type, String id, HashMap dataMap) {
    try {
        IndexRequest indexRequest = new IndexRequest(index, type).source(dataMap);
        UpdateRequest request = new UpdateRequest(index, type, id).doc(indexRequest);
        UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
    } catch (Exception e) {
        e.printStackTrace();
    }
}
/*
 * 删除
 */
public void delete(String index, String type, String id) {
    try {
        DeleteRequest request = new DeleteRequest(index, type, id);
        DeleteResponse delateResponse = client.delete(request, RequestOptions.DEFAULT);
    } catch (IOException e) {
        e.printStackTrace();
    }
}
/*
 * 删除
 */
public void deleteIndex(String index) {
    try {
        DeleteIndexRequest request = new DeleteIndexRequest(index);
        AcknowledgedResponse delateResponse = client.indices().delete(request, RequestOptions.DEFAULT);
    } catch (IOException e) {
        e.printStackTrace();
    }
}
/*
 * 批量新增
 */
public void bulk(String index, String type, ArrayList<Map> documents) throws IOException {
    BulkRequest bulkRequest = new BulkRequest();
    for(Map map : documents) {
        bulkRequest.add(new IndexRequest(index, type).source(map));
    }
    BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
    }
}

/*
 * 读数据文件,解析成json,组装成list,批量写入es
 */
public static ArrayList<Map> read(File file) throws Exception {
		ArrayList<Map> list = new ArrayList<Map>();
		
		FileInputStream fis = new FileInputStream(file);
		BufferedReader br = new BufferedReader(new InputStreamReader(fis));
		StringBuffer sb = new StringBuffer();
		String line = "";
		int i = 0;
		while((line = br.readLine()) != null) {
			//System.out.println(i++);
			if(!"".equals(line)) {
				try {
					JSONObject jasonObject = JSONObject.parseObject(line);
					Map map = (Map) jasonObject;
					list.add(map);
				} catch (Exception e) {
					System.out.println(e.getMessage());
				}
			}
		}
		return list;
	}


测试类

import java.io.File;
import java.util.ArrayList;
import java.util.Map;
public class ESTestMain {
public static void main(String[] args) throws Exception {
    long t1 = System.currentTimeMillis();
    RestHighLevelClientUtils util = new RestHighLevelClientUtils();
    util.init();
    ArrayList<Map> documents = new ArrayList<Map>();
    int uid = 20000;
    int name = 1;
    while(true) {
        File file = new File("D:\\java_test\\es-test\\"+name+".txt");
        if(file.exists()) {
            ArrayList<Map> list = ReadFileUtils.read(file);
            for(int i = 0; i < list.size(); i ++) {
                documents.add(list.get(i));
            }
            util.bulk("dmfun", "user", documents);
            long t2 = System.currentTimeMillis();
            System.out.println("import " +file.getName()+ " sucess,"+list.size()+" total,cost" + ((t2-t1)/1000)+"s");
            documents = new ArrayList<Map>();
            name ++;
            }else {
                break;
            }
        }
    }
}


txt内容格式

{"uid":"1","name":"赵钱孙里","address":"瞬板南路111号","age":20,"sex":1,"createdt":"2021-01-01"}
{"uid":"1","name":"赵钱孙里","address":"瞬板南路111号","age":20,"sex":1,"createdt":"2021-01-01"}
{"uid":"1","name":"赵钱孙里","address":"瞬板南路111号","age":20,"sex":1,"createdt":"2021-01-01"}


由数据调度工具,将数据库得数据卸数成txt文件,java项目读取txt文件,解析成一个json得list,调用bulk批量写入api,将数据同步到es

{context}