master
WuXianChaoPin 7 years ago
parent ab19a7e43c
commit 14e3171722
  1. 72
      core/src/main/java/core/thrift/FactoryPool.java
  2. 74
      core/src/main/java/core/thrift/SocketConfig.java
  3. 8
      core/src/main/java/core/thrift/TNonblockingSocketPool.java
  4. 8
      core/src/main/java/core/thrift/TSocketPool.java
  5. 15
      core/src/main/java/core/thrift/TTransportPool.java
  6. 2
      sql/new.sql
  7. 3
      web/src/main/resources/config.properties
  8. 12
      web/src/test/java/spring/methodreplace/Add.java
  9. 25
      web/src/test/java/spring/methodreplace/SpringTest.java
  10. 8
      web/src/test/java/thrift/ThriftTest.java

@ -0,0 +1,72 @@
package core.thrift;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.async.TAsyncClient;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.springframework.stereotype.Component;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.Map;
@Component
public class FactoryPool {
private GenericObjectPool<TNonblockingSocket> tNonblockingSocketPool;
private GenericObjectPool<TSocket> tSocketPool;
public FactoryPool(TNonblockingSocketPool pool1, TSocketPool pool2) {
this.tNonblockingSocketPool = new GenericObjectPool<>(pool1);
this.tSocketPool = new GenericObjectPool<>(pool2);
}
private final Map<Class, Constructor> clientMap = new HashMap<>();
public <C extends TAsyncClient> void doExecute(Class<C> c, AsyncClient<C> thrift) throws Exception {
//从池里获取一个Transport对象
TNonblockingSocket trans = tNonblockingSocketPool.borrowObject();
thrift.doExecute(client(c, trans));
//把一个Transport对象归还到池里
tNonblockingSocketPool.returnObject(trans);
}
public <C extends TServiceClient> void doExecute(Class<C> c, ServiceClient<C> thrift) throws Exception{
TSocket trans=tSocketPool.borrowObject();
thrift.doExecute(client(c,trans));
tSocketPool.returnObject(trans);
}
private interface Thrift<E> {
void doExecute(E client) throws Exception;
}
public interface AsyncClient<E extends TAsyncClient> extends Thrift<E>{}
public interface ServiceClient<E extends TServiceClient> extends Thrift<E>{}
public <T extends TTransport,C> C client(Class<C> c, T trans) throws Exception {
if (trans instanceof TNonblockingTransport) {
if (!clientMap.containsKey(c)) {
clientMap.put(c, c.getConstructor(TProtocolFactory.class, TAsyncClientManager.class, TNonblockingTransport.class));
}
TProtocolFactory protocol = new TBinaryProtocol.Factory();
TAsyncClientManager clientManager = new TAsyncClientManager();
return (C) clientMap.get(c).newInstance(protocol, clientManager, trans);
} else {
if (!clientMap.containsKey(c)) {
clientMap.put(c, c.getConstructor(TProtocol.class));
}
TBinaryProtocol protocol = new TBinaryProtocol(trans);
return (C) clientMap.get(c).newInstance(protocol);
}
}
}

@ -1,85 +1,17 @@
package core.thrift;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.async.TAsyncClient;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.Map;
@Component
public class SocketConfig {
@Value("${thrift.ip}")
private String ip;
public String ip;
@Value("${thrift.port}")
private Integer port;
public Integer port;
@Value("${thrift.timeout}")
private Integer timeout;
private GenericObjectPool<TNonblockingSocket> tNonblockingSocketPool;
private GenericObjectPool<TSocket> tSocketPool;
private final Map<Class, Constructor> clientMap = new HashMap<>();
@PostConstruct
public void init(){
tNonblockingSocketPool=new GenericObjectPool<>(new TNonblockingSocketPool(ip,port,timeout));
tSocketPool=new GenericObjectPool<>(new TSocketPool(ip,port,timeout));
}
public <C extends TAsyncClient> void doExecute(Class<C> c, AsyncClient<C> thrift) throws Exception {
//从池里获取一个Transport对象
TNonblockingSocket trans = tNonblockingSocketPool.borrowObject();
thrift.doExecute(client(c, trans));
//把一个Transport对象归还到池里
tNonblockingSocketPool.returnObject(trans);
}
public <C extends TServiceClient> void doExecute(Class<C> c,ServiceClient<C> thrift) throws Exception{
TSocket trans=tSocketPool.borrowObject();
thrift.doExecute(client(c,trans));
tSocketPool.returnObject(trans);
}
private interface Thrift<E> {
void doExecute(E client) throws Exception;
}
public interface AsyncClient<E extends TAsyncClient> extends Thrift<E>{}
public interface ServiceClient<E extends TServiceClient> extends Thrift<E>{}
public Integer timeout;
public <T extends TTransport,C> C client(Class<C> c, T trans) throws Exception {
if (trans instanceof TNonblockingTransport) {
if (!clientMap.containsKey(c)) {
clientMap.put(c, c.getConstructor(TProtocolFactory.class, TAsyncClientManager.class, TNonblockingTransport.class));
}
TProtocolFactory protocol = new TBinaryProtocol.Factory();
TAsyncClientManager clientManager = new TAsyncClientManager();
return (C) clientMap.get(c).newInstance(protocol, clientManager, trans);
} else {
if (!clientMap.containsKey(c)) {
clientMap.put(c, c.getConstructor(TProtocol.class));
}
TBinaryProtocol protocol = new TBinaryProtocol(trans);
return (C) clientMap.get(c).newInstance(protocol);
}
}
}

@ -2,14 +2,16 @@ package core.thrift;
import org.apache.thrift.async.TAsyncClient;
import org.apache.thrift.transport.TNonblockingSocket;
import org.springframework.stereotype.Component;
import java.net.Socket;
@Component
public class TNonblockingSocketPool extends TTransportPool<TNonblockingSocket,TAsyncClient> {
public TNonblockingSocketPool(String ip, Integer port, Integer timeout) {
super(ip, port, timeout);
public TNonblockingSocketPool(SocketConfig config) {
super(config);
}
@Override
@ -19,7 +21,7 @@ public class TNonblockingSocketPool extends TTransportPool<TNonblockingSocket,TA
@Override
public TNonblockingSocket create() throws Exception {
return new TNonblockingSocket(ip,port,timeout);
return new TNonblockingSocket(config.ip,config.port,config.timeout);
}

@ -2,14 +2,16 @@ package core.thrift;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.transport.TSocket;
import org.springframework.stereotype.Component;
import java.net.Socket;
@Component
public class TSocketPool extends TTransportPool<TSocket,TServiceClient>{
public TSocketPool(String ip, Integer port, Integer timeout) {
super(ip, port, timeout);
public TSocketPool(SocketConfig config) {
super(config);
}
@Override
@ -19,7 +21,7 @@ public class TSocketPool extends TTransportPool<TSocket,TServiceClient>{
@Override
public TSocket create() throws Exception {
TSocket socket=new TSocket(ip,port,timeout);
TSocket socket=new TSocket(config.ip,config.port,config.timeout);
socket.open();
return socket;
}

@ -9,16 +9,10 @@ import java.net.Socket;
public abstract class TTransportPool<T extends TTransport, C> extends BasePooledObjectFactory<T> {
protected String ip;
protected SocketConfig config;
protected Integer port;
protected Integer timeout;
public TTransportPool(String ip, Integer port, Integer timeout) {
this.ip = ip;
this.port = port;
this.timeout = timeout;
public TTransportPool(SocketConfig config) {
this.config = config;
}
@Override
@ -61,9 +55,6 @@ public abstract class TTransportPool<T extends TTransport, C> extends BasePooled
public void destroyObject(PooledObject<T> p) {
p.getObject().close();
}
}

@ -14,5 +14,7 @@ create table menu
)
engine=InnoDB
;
--thrift服务器表
create

@ -9,7 +9,6 @@ hibernate.connection.driver_class=com.mysql.cj.jdbc.Driver
druid.maxActive=20
#thrift\u914D\u7F6E
thrift.ip=127.0.0.1
thrift.ip=sukura.top
thrift.port=2233
thrift.timeout=3000

@ -0,0 +1,12 @@
package spring.methodreplace;
import org.springframework.stereotype.Component;
@Component
public class Add {
public void add(){
System.out.println("Add");
}
}

@ -0,0 +1,25 @@
package spring.methodreplace;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.web.SpringJUnitWebConfig;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import javax.annotation.Resource;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringJUnitWebConfig
@ContextConfiguration()
public class SpringTest {
@Resource
private Add add;
@Test
public void test1(){
add.add();
}
}

@ -1,7 +1,7 @@
package thrift;
import core.thrift.FactoryPool;
import core.thrift.MyAsyncMethodCallback;
import core.thrift.SocketConfig;
import core.thrift.task.TSDM;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -20,21 +20,21 @@ import javax.annotation.Resource;
public class ThriftTest {
@Resource
private SocketConfig config;
private FactoryPool factoryPool;
private static Logger log = LogManager.getLogger();
@Test
public void test1() throws Exception {
MyAsyncMethodCallback<Boolean> call=new MyAsyncMethodCallback<>();
config.doExecute(TSDM.AsyncClient.class,client -> client.qiandao(call));
factoryPool.doExecute(TSDM.AsyncClient.class,client -> client.qiandao(call));
log.info(call.getResult());
}
@Test
public void test2() throws Exception{
config.doExecute(TSDM.Client.class,client -> log.info(client.qiandao()));
factoryPool.doExecute(TSDM.Client.class, client -> log.info(client.qiandao()));
}
}

Loading…
Cancel
Save