You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
webcrawler/core/src/main/java/core/thrift/ThriftClientPool.java

70 lines
2.4 KiB

package core.thrift;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.thrift.async.TAsyncClient;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.transport.TNonblockingTransport;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
@Service
public class ThriftClientPool {
@Value("${thrift.ip}")
private String ip;
@Value("${thrift.port}")
private Integer port;
@Value("${thrift.timeout}")
private Integer timeout;
private static GenericObjectPool<TNonblockingSocket> pool;
private static final Map<Class<? extends TAsyncClient>, Constructor> asyncClient = new HashMap<>();
private static final Map<Class<? extends TAsyncClient>, Constructor> synchronizeClient = new HashMap<>();
@PostConstruct
public void init() {
pool = new GenericObjectPool<>(new ConnectionPoolFactory(ip, port, timeout));
}
public static <E extends TAsyncClient> void doExecute(Thrift<E> thrift,Class<E> c) throws Exception {
//从池里获取一个Transport对象
TNonblockingSocket socket = pool.borrowObject();
thrift.doExecute(client(c,socket));
//把一个Transport对象归还到池里
pool.returnObject(socket);
}
private static <E extends TAsyncClient> E client(Class<E> c, TNonblockingSocket socket) throws NoSuchMethodException, IOException, IllegalAccessException, InvocationTargetException, InstantiationException {
if (!asyncClient.containsKey(c)) {
asyncClient.put(c, c.getConstructor(TProtocolFactory.class, TAsyncClientManager.class, TNonblockingTransport.class));
}
TAsyncClientManager clientManager = new TAsyncClientManager();
TProtocolFactory protocol = new TBinaryProtocol.Factory();
return (E) asyncClient.get(c).newInstance(protocol,clientManager,socket);
}
public interface Thrift<E extends TAsyncClient> {
void doExecute(E client) throws Exception;
}
}