You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
webcrawler/core/src/main/java/core/thrift/FactoryPool.java

73 lines
2.8 KiB

package core.thrift;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.async.TAsyncClient;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.springframework.stereotype.Component;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.Map;
@Component
public class FactoryPool {
private GenericObjectPool<TNonblockingSocket> tNonblockingSocketPool;
private GenericObjectPool<TSocket> tSocketPool;
public FactoryPool(TNonblockingSocketPool pool1, TSocketPool pool2) {
this.tNonblockingSocketPool = new GenericObjectPool<>(pool1);
this.tSocketPool = new GenericObjectPool<>(pool2);
}
private final Map<Class, Constructor> clientMap = new HashMap<>();
public <C extends TAsyncClient> void doExecute(Class<C> c, AsyncClient<C> thrift) throws Exception {
//从池里获取一个Transport对象
TNonblockingSocket trans = tNonblockingSocketPool.borrowObject();
thrift.doExecute(client(c, trans));
//把一个Transport对象归还到池里
tNonblockingSocketPool.returnObject(trans);
}
public <C extends TServiceClient,T> void doExecute(Class<C> c, ServiceClient<C,T> thrift,T t) throws Exception{
TSocket trans=tSocketPool.borrowObject();
thrift.doExecute(client(c,trans),t);
tSocketPool.returnObject(trans);
}
public interface AsyncClient<C extends TAsyncClient>{
void doExecute(C client) throws Exception;
}
public interface ServiceClient<C extends TServiceClient,T>{
void doExecute(C client,T t) throws Exception;
}
public <T extends TTransport,C> C client(Class<C> c, T trans) throws Exception {
if (trans instanceof TNonblockingTransport) {
if (!clientMap.containsKey(c)) {
clientMap.put(c, c.getConstructor(TProtocolFactory.class, TAsyncClientManager.class, TNonblockingTransport.class));
}
TProtocolFactory protocol = new TBinaryProtocol.Factory();
TAsyncClientManager clientManager = new TAsyncClientManager();
return (C) clientMap.get(c).newInstance(protocol, clientManager, trans);
} else {
if (!clientMap.containsKey(c)) {
clientMap.put(c, c.getConstructor(TProtocol.class));
}
TBinaryProtocol protocol = new TBinaryProtocol(trans);
return (C) clientMap.get(c).newInstance(protocol);
}
}
}