gRPC-Java Unit Test

采用In-Process作为Transport协议,直接生成一个本地的Server,将Server端mock,从而实现Clinet端的代码测试。gRPC的维护者们不允许Client去做mock

  1. 实现本地In-Process的Server端;

    1. 实现由Protocol Buffer生成的ServiceBase对象,覆盖它的具体方法;(该类的作用就是server执行的具体实现)
    2. 将上述实现注册到本地In-Process Server;(之后Client调用Server就能由我们自己实现的ServiceBase来完成数据的mock,官方生成的类里,实现是抛错……)
  2. 获取In-Process Server中的Channel,以提供Client生成Stub;
  3. 跑测试;

依照上述步骤,结合原力项目(代码服务)场景:
我们需要对某一个路径下的仓库检测其是否存,用gRPC协议调用远端的Server完成。

Projects.java(业务Service)

   /** 
     *{@inheritDoc}
     *
     * @param projectPathWithNamespace
     * @return
     */
    @Override
    public Optional<Projects> isExist(@NonNull String projectPathWithNamespace) {
        Optional<Projects> projectsOptional = projectsDao.findByNamespace(projectPathWithNamespace);
        if (projectsOptional.isPresent()) {
            RepositoryExistsRequest repositoryExistsRequest = RepositoryExistsRequest.newBuilder()
                .setRepository(GrpcUtil.generateRepository(projectPathWithNamespace)).build();
            RepositoryServiceBlockingStub stub = grpcStub.get(RepositoryServiceBlockingStub.class,
                GrpcUtil.getNamespace(projectPathWithNamespace));
            RepositoryExistsResponse response = stub.repositoryExists(repositoryExistsRequest);
            // todo 通知清理脏数据
            if (!response.getExists()) {
                return Optional.empty();
            }
        }
        return projectsOptional;
    }

系统在这里统一处理了stub的生成:grpcStub.get(...)

扩展RepositoryServiceGrpc下的RepositoryServiceImplBase(Protocol Buffer自动生成Service)

/**
 * Extend {@link RepositoryServiceGrpc.RepositoryServiceImplBase} in order to implement the mock method.
 *
 * @author van.yzt
 * @date 2017/09/27
 */
public class MockRepositoryServiceImplBase extends RepositoryServiceGrpc.RepositoryServiceImplBase {

    /**
     * Implement the exist check method.
     *
     * @param request
     * @param responseObserver
     */
    @Override
    public void repositoryExists(RepositoryExistsRequest request,
        StreamObserver<RepositoryExistsResponse> responseObserver) {
        responseObserver.onNext(RepositoryExistsResponse.newBuilder().setExists(true).build());
        responseObserver.onCompleted();
    }
}

ProjectsTest.java

@Rule
private GrpcStub grpcStub = new MockGrpcStub();
...

@Before

public void setUp() throws Exception {

    RepositoryServiceImplBase repositoryServiceImplBase = Mockito.spy(new MockRepositoryServiceImplBase() {});
    // 注册到In-Process中

    grpcStub.getServiceRegistry().addService(repositoryServiceImplBase);

}


 @Test
    public void isExistTest() throws Exception {
        Mockito.when(mockProjectsDao.findByNamespace(TestDataBuilder.ABSOLUTE_PATH)).thenReturn(
            Optional.of(TestDataBuilder.buildProjects()));
        Assert.assertNotNull(projects.isExist(TestDataBuilder.ABSOLUTE_PATH));
    }

MockGrpcStub.java(业务相关的封装,用于生成Stub的Service)

/**
 * This stub generate service is used for unit test.
 *
 * @author van.yzt
 * @date 2017/09/27
 */
public class MockGrpcStub extends ExternalResource implements AbstractMockGrpcStub {
    private ManagedChannel channel;
    private Server server;
    private String serverName;
    private MutableHandlerRegistry serviceRegistry;
    private boolean useDirectExecutor;

    public AbstractMockGrpcStub directExecutor() {
        useDirectExecutor = true;
        return this;
    }


    /**
     * 获取固定的headers
     *
     * @param namespace
     * @param timestamp
     * @return
     */
    private Map<String, String> headers(@NonNull String namespace, @NonNull Long timestamp) {
        final Map<String, String> headers = new HashMap<>(2);
        // Some operations
        return headers;
    }

    @SuppressWarnings("unchecked")
    @Override
    public <T extends AbstractStub<T>> T get(@NonNull Class<T> stubClass, @NonNull final String namespace) {
        try {
            Constructor constructor = stubClass.getDeclaredConstructor(Channel.class);
            constructor.setAccessible(true);
            AbstractStub stub = (AbstractStub)constructor.newInstance(channel);
            /**
             * 不需要关心并发
             */
            stub = attachHeaders(stub, headers(namespace, System.currentTimeMillis()));
            return (T)stub;
        } catch (InvocationTargetException | InstantiationException | IllegalAccessException e) {
            throw new GrpcStubCreateException("gRPC create stub failed.", e);
        } catch (NoSuchMethodException e) {
            /**
             * This place means Google Protocol Buffers generate result format changed !!!!!
             *
             * When writing this code, the XXXBlockingStub class has private constructor like
             * 'XXXBlockingStub(io.grpc.Channel)'
             */
            throw new GrpcStubCreateException("gRPC create stub failed because of the format changed.", e);
        }
    }

    /**
     * Returns a {@link ManagedChannel} connected to this service.
     */
    public final ManagedChannel getChannel() {
        return channel;
    }

    /**
     * Returns the underlying gRPC {@link Server} for this service.
     */
    public final Server getServer() {
        return server;
    }

    /**
     * Returns the randomly generated server name for this service.
     */
    public final String getServerName() {
        return serverName;
    }

    /**
     * Returns the service registry for this service. The registry is used to add service instances
     * (e.g. {@link io.grpc.BindableService} or {@link io.grpc.ServerServiceDefinition} to the server.
     */
    public final MutableHandlerRegistry getServiceRegistry() {
        return serviceRegistry;
    }

    /**
     * After the test has completed, clean up the channel and server.
     */
    @Override
    protected void after() {
        serverName = null;
        serviceRegistry = null;

        channel.shutdown();
        server.shutdown();

        try {
            channel.awaitTermination(1, TimeUnit.MINUTES);
            server.awaitTermination(1, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        } finally {
            channel.shutdownNow();
            channel = null;

            server.shutdownNow();
            server = null;
        }
    }

    /**
     * Before the test has started, create the server and channel.
     */
    @Override
    protected void before() throws Throwable {
        serverName = UUID.randomUUID().toString();

        serviceRegistry = new MutableHandlerRegistry();

        InProcessServerBuilder serverBuilder = InProcessServerBuilder.forName(serverName)
            .fallbackHandlerRegistry(serviceRegistry);

        if (useDirectExecutor) {
            serverBuilder.directExecutor();
        }

        server = serverBuilder.build().start();

        InProcessChannelBuilder channelBuilder = InProcessChannelBuilder.forName(serverName);

        if (useDirectExecutor) {
            channelBuilder.directExecutor();
        }

        channel = channelBuilder.build();
    }