본문 바로가기

Spring/Spring

gRPC 사용법, gRPC 예제 코드 실행해보기, 원리는 몰라도 gRPC 입문은 가능하다 (grpc java example)

반응형

이 포스트는 springcamp2017에서 grpc발표를 하신 오명운님의 발표 자료 및 github소스를 참고해서 작성한 것입니다.

gRPC의 장점

  • service 정의가 단순하다
  • 여러 프로그래밍 언어나 플랫폼에서 사용이 가능하다
  • 양방향 스트리밍 데이터 처리가 가능하다
  • rpc의 장점으로 빠르다

grpc는 rpc프레임워크를 구글이 쉽게 사용할 수 있도록 만든 것이다.

rpc는 remote procedure call로 말 그대로 원격에 정의된 프로시져를 호출하는 것이다.

원격의 프로시져를 호출하는 클라이언트 입장에서는 그 방법이 내부의 메서드를 호출하는 것과 다르지 않아서 사용이 간편하다.

이번 포스트의 핵심은 예제 소스를 돌려보면서 grpc의 기능들을 맛보는 것이 핵심이다.

따라서 gRPC의 장점이나 내부에서 어떻게 돌아가는지, 원리는 다음에 자세히 알아보도록 한다.

gRPC 사용법

1. service definition

일종의 스키마 파일을 기본적으로 정의해야한다. 이것을 서버든 클라이언트든 grpc를 사용하는 프로그램은 이 스키마 파일을 공유해야 한다.

grpc는 기본적으로 protocol buffer를 스키마 파일로 사용한다. protobuf에 대해서는 지난 포스트에서 맛을 봤다.

syntax = "proto3";

option java_multiple_files = true;
option java_outer_classname = "EventProto";
option java_package = "com.example.demo.proto";

package com.tistory.jeongpro.event;

message EventRequest {
    string sourceId = 1;
    string eventId = 2;
}

message EventResponse {
    string result = 1;
}

service NewdataService {

    rpc unaryEvent(EventRequest) returns (EventResponse) {}

    rpc serverStreamingEvent(EventRequest) returns (stream EventResponse) {}

    rpc clientStreamingEvent(stream EventRequest) returns (EventResponse) {}

    rpc biStreamingEvent(stream EventRequest) returns (stream EventResponse) {}
}

[Event.proto]

간단히 설명하면 EventRequest, EventResponse라는 데이터 자료형을 정의했고, 그 데이터를 주고 받는 서비스들을 정의했다.

grpc에서 제공하는 서비스 형태로는 4가지가 있다.

  • 서버(1) : 클라이언트(1)
  • 서버(N) : 클라이언트(1)
  • 서버(1) : 클라이언트(N)
  • 서버(N) : 클라이언트(M)

그래서 위에 서비스 4개를 정의했다.

2. source generate

위의 스키마 파일(ex. event.proto)을 가지고 서버 프로그램이나 클라이언트 프로그램을 만들 때 똑같이 source-generate를 해야한다.

원하는 언어로 generate를 하면 원하는 언어의 소스코드가 자동으로 생성된다.

- maven 프로젝트 기준

<build>
  <extensions>
    <extension>
      <groupId>kr.motd.maven</groupId>
      <artifactId>os-maven-plugin</artifactId>
      <version>1.5.0.Final</version>
    </extension>
  </extensions>
  <plugins>
    <plugin>
      <groupId>org.xolstice.maven.plugins</groupId>
      <artifactId>protobuf-maven-plugin</artifactId>
      <version>0.5.1</version>
      <configuration>
        <protocArtifact>com.google.protobuf:protoc:3.7.1:exe:${os.detected.classifier}</protocArtifact>
        <pluginId>grpc-java</pluginId>
        <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.21.0:exe:${os.detected.classifier}</pluginArtifact>
      </configuration>
      <executions>
        <execution>
          <goals>
            <goal>compile</goal>
            <goal>compile-custom</goal>
          </goals>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>

- gradle 프로젝트 기준

apply plugin: 'com.google.protobuf'

buildscript {
  repositories {
    mavenCentral()
  }
  dependencies {
    classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.8'
  }
}

protobuf {
  protoc {
    artifact = "com.google.protobuf:protoc:3.7.1"
  }
  plugins {
    grpc {
      artifact = 'io.grpc:protoc-gen-grpc-java:1.21.0'
    }
  }
  generateProtoTasks {
    all()*.plugins {
      grpc {}
    }
  }
}

 

스프링에서 위의 플러그인은 자동으로 "src/main/proto" 패키지 하위에 있는 .proto파일을 generate해준다.

위에서 작성한 Event.proto 파일도 마찬가지 위치에 두고 generate시키자. (generate된 소스는 target폴더 밑에 있다.)

maven을 이용할 때 이클립스에서는 "run as.." > "Maven generate-sources"를 클릭하면 된다.

아 빼먹은게 있는데 grpc를 사용하기 위한 라이브러리를 추가하도록 하자.

<dependency>
  <groupId>io.grpc</groupId>
  <artifactId>grpc-netty-shaded</artifactId>
  <version>1.21.0</version>
</dependency>
<dependency>
  <groupId>io.grpc</groupId>
  <artifactId>grpc-protobuf</artifactId>
  <version>1.21.0</version>
</dependency>
<dependency>
  <groupId>io.grpc</groupId>
  <artifactId>grpc-stub</artifactId>
  <version>1.21.0</version>
</dependency>

위의 3가지를 추가하면 된다. generate된 소스에서 컴파일에러가 난다면 아래와 같이 protobuf 의존성도 추가해보자.

<dependency>
	<groupId>com.google.protobuf</groupId>
	<artifactId>protobuf-java</artifactId>
	<version>3.8.0</version>
</dependency>

3. generate된 소스를 활용한 구현

서버든 클라이언트든 어차피 같은 generate-source를 사용한다.

위 단계까지 작업은 클라이언트든 서버든 같이한다.

서버는 생성된 서비스를 상속받아 메서드를 재정의하면 되고, 클라이언트는 생성된 서비스의 stub라는 것을 생성해서 그것을 통해 서비스를 호출하기만 하면 된다.

서버부터 알아보자. 

- Server

package com.example.demo.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import com.example.demo.proto.EventRequest;
import com.example.demo.proto.EventResponse;
import com.example.demo.proto.NewdataServiceGrpc.NewdataServiceImplBase;

import io.grpc.stub.StreamObserver;

@Service
public class NewdataServiceImpl extends NewdataServiceImplBase {
	private static final Logger logger = LoggerFactory.getLogger(NewdataServiceImpl.class);

	@Override
	public void unaryEvent(EventRequest request, StreamObserver<EventResponse> responseObserver) {
		logger.info("input= sourceId : " + request.getSourceId() + ", eventId : " + request.getEventId());
		//
		EventResponse eventResponse = EventResponse.newBuilder()
				.setResult(request.getSourceId() + request.getEventId()).build();
		//unary라 onNext 1회만 호출 가능
		//데이터 전송
		responseObserver.onNext(eventResponse);
		logger.info("onNext");
		//1초후 응답 스트림에 대한 완료 보낼 것임
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		responseObserver.onCompleted();
		logger.info("onCompleted");
	}

	@Override
	public void serverStreamingEvent(EventRequest request, StreamObserver<EventResponse> responseObserver) {
		logger.info("input= sourceId : " + request.getSourceId() + ", eventId : " + request.getEventId());
		EventResponse eventResponse = EventResponse.newBuilder()
				.setResult(request.getSourceId() + request.getEventId())
				.build();
		//클라이언트에 의해 데이터는 한 번 들어오지만 serverStreaming이므로 여러번 onNext 호출 가능
		responseObserver.onNext(eventResponse);
		responseObserver.onNext(eventResponse);
		responseObserver.onNext(eventResponse);
		responseObserver.onNext(eventResponse);
		//1초후 완료 응답
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		responseObserver.onCompleted();
	}

	@Override
	public StreamObserver<EventRequest> clientStreamingEvent(StreamObserver<EventResponse> responseObserver) {
		return new StreamObserver<EventRequest>() {
			@Override
			public void onNext(EventRequest value) {
				//클라이언트로부터 여러번의 onNext가 호출될 예정이다.
				logger.info("메세지 : " + value.getSourceId() + "/" + value.getEventId());
			}
			@Override
			public void onError(Throwable t) {
				logger.info("onError호출");
			}
			@Override
			public void onCompleted() {
				//클라이언트로부터 스트림 완료 응답이 오면 1번만 next를 보낼 수 있다.
				//그래서 위의 onNext에서 같이 onNext하면 안된다.(여러번 응답해버리는 biStream형식이 되어버리기 때문.)
				responseObserver.onNext(EventResponse.newBuilder().setResult("response").build());
				responseObserver.onCompleted();
			}
		};
	}

	@Override
	public StreamObserver<EventRequest> biStreamingEvent(StreamObserver<EventResponse> responseObserver) {
		return new StreamObserver<EventRequest>() {
			@Override
			public void onNext(EventRequest value) {
				//클라이언트로부터 데이터가 올 때마다 onNext가 호출된다.
				//1개의 데이터가 올 때마다 3개의 응답을 던져줄 예정이다.
				logger.info("Bidirection" + value.getSourceId() + "/" + value.getEventId());
				responseObserver.onNext(EventResponse.newBuilder().setResult("응답1").build());
				responseObserver.onNext(EventResponse.newBuilder().setResult("응답2").build());
				responseObserver.onNext(EventResponse.newBuilder().setResult("응답3").build());
			}
			@Override
			public void onError(Throwable t) {
				logger.info("onError");
			}
			@Override
			public void onCompleted() {
				logger.info("onCompleted");
				responseObserver.onCompleted();
			}
		};
	}
	
}

꽤 긴데, extends한 NewdataServiceImplBase는 generate된 소스에서 찾을 수 있다.

그러면 내가 정의했던 4가지의 서비스를 재정의해야한다.

나름대로 주석을 달아보았으니 참조하면 좋을 것 같고, 부연설명도 조금 하도록 하겠다.

unaryEvent는 1:1로 메세지를 주고 받기 때문에 쉽다.

Request를 원하는 대로 가공하고, Response해줄 객체를 생성한다음, StreamObserver객체에 Response객체를 넣어주면된다.

1:1 이기때문에 onNext는 1회만 호출할 수 있고, 그 후 잘 보냈다고 onCompleted 메서드를 호출하는 식으로 처리하면 된다.

serverStreaming의 경우, 클라이언트의 1회 요청에 대해 stream으로 응답이 가능하기 때문에 달라진것은 onNext를 여러번 호출해서 여러번 데이터를 보내는 것 뿐이다.

clientStreaming의 경우, 클라이언트에서 N개의 데이터를 보내줄 것인데 그 때 클라이언트도 onNext를 통해서 전달할 것이다.

onNext로 N개의 데이터 전달이 끝난다면, onCompleted메서드를 호출해서 다 보냈다고 알려줄 것이다.

그러면 서버도 onCompleted를 받으면 응답 1개를 해줘야한다.

biStreaming의 경우, 양방향 스트리밍이다. 따라서 서버도 onNext를 여러번 호출 가능하고, 클라이언트도 onNext를 통해서 여러번 데이터가 들어올 것이다.

서버도 onNext를 통해서 여러번 데이터 송신을 마쳤다면, onCompleted로 데이터 송신이 끝났다고 클라이언트에 알려줘야 한다.

이 4가지의 서비스 방식이 클라이언트에도 똑같이 적용되니까 한 번은 꼭 이해하도록 한다.

위의 서비스를 서버에 띄우는 방법은 아래와 같다.

@Component
public class GrpcRunner implements ApplicationRunner{
	private static final Logger logger = LoggerFactory.getLogger(GrpcRunner.class);
	private Server server;
	private int port;
	@Override
	public void run(ApplicationArguments args) throws Exception {
		port = 9595;
		server = ServerBuilder.forPort(port)
				.addService(new NewdataServiceImpl())
				.build();
		this.server.start();
		logger.info("gRPC Server Listening on port " + port);
		this.server.awaitTermination();
	}

}

grpc에서 제공하는 Server객체를 생성하는데 port와 service를 등록하면 된다.

- Client

클라이언트는 조금 복잡하다고 느낄 수 있다. 왜냐하면 stub, channel이라는 개념을 알아야 하는데 간단하게 알아두면, 클라이언트와 서버는 channel을 통해 통신하고, stub는 channel을 통해 서비스를 호출할 수 있는 추상화된 객체로 보면 된다.

* stub의 방식이 또 3가지가 있다. async, blocking, future다.

이 3가지가 아까 4개(1:1, 1:N, N:1, N:M)의 방식에 각각 적용되니까 서비스를 호출하는데 12개의 방법이 있는 것이다.

그러나 다행히도 client가 Streaming으로 보내는 방식은 async밖에 없고, 1Client-streamingServer의 방법도 future는 없다. 결국 7개의 방법이 있는 것이다...

package com.example.demo.configuration;


import com.example.demo.proto.NewdataServiceGrpc;
import com.example.demo.proto.NewdataServiceGrpc.NewdataServiceBlockingStub;
import com.example.demo.proto.NewdataServiceGrpc.NewdataServiceFutureStub;
import com.example.demo.proto.NewdataServiceGrpc.NewdataServiceStub;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

public class NewdataClientStubFactory {
	private final ManagedChannel channel;
	private final NewdataServiceBlockingStub blockingStub;
	private final NewdataServiceStub asyncStub;
	private final NewdataServiceFutureStub futureStub;
	
	public NewdataClientStubFactory(String host, int port) {
		this.channel = ManagedChannelBuilder.forAddress(host, port)
				.usePlaintext()
				.build();
		this.blockingStub = NewdataServiceGrpc.newBlockingStub(channel);
		this.asyncStub = NewdataServiceGrpc.newStub(channel);
		this.futureStub = NewdataServiceGrpc.newFutureStub(channel);
	}

	public NewdataServiceBlockingStub getBlockingStub() {
		return blockingStub;
	}

	public NewdataServiceStub getAsyncStub() {
		return asyncStub;
	}

	public NewdataServiceFutureStub getFutureStub() {
		return futureStub;
	}
	
}

host, port를 받아 채널을 생성하고 채널을 통해 stub를 제공하는 factory 클래스를 만들었다.

channel을 만들 때 .usePlainText()를 호출했는데 이게 없으면 tls exception이 난다.

아무래도 http2를 기본으로 하기 때문에 그런 것 같다. 실사용에서는 보안 처리도 해야할 필요가 있다.(지금은 테스트지만....)

package com.example.demo.client;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.example.demo.proto.EventRequest;
import com.example.demo.proto.EventResponse;
import com.example.demo.proto.NewdataServiceGrpc.NewdataServiceBlockingStub;
import com.example.demo.proto.NewdataServiceGrpc.NewdataServiceFutureStub;
import com.example.demo.proto.NewdataServiceGrpc.NewdataServiceStub;
import com.google.common.util.concurrent.ListenableFuture;

import io.grpc.stub.StreamObserver;

public class NewdataClient {
	private static final Logger logger = LoggerFactory.getLogger(NewdataClient.class);
	private final NewdataServiceBlockingStub blockingStub;
    private final NewdataServiceStub asyncStub;
    private final NewdataServiceFutureStub futureStub;
    
    public NewdataClient(NewdataServiceBlockingStub blockingStub, NewdataServiceStub asyncStub, NewdataServiceFutureStub futureStub) {
    	this.blockingStub = blockingStub;
    	this.asyncStub = asyncStub;
    	this.futureStub = futureStub;
    }
    
    public void sendBlockingUnaryMessage(EventRequest eventRequest) {
    	logger.info("request=" + eventRequest);
    	EventResponse eventResponse = blockingStub.unaryEvent(eventRequest);
    	logger.info("response=" + eventResponse);
    }
    
    public void sendAsyncUnaryMessage(EventRequest eventRequest) {
    	logger.info("request=" + eventRequest);
    	//비동기로 응답을 받기 위해 서버로 보내는 callback 객체(StreamObserver)도 같이 보낸다.
    	asyncStub.unaryEvent(eventRequest, new StreamObserver<EventResponse>() {
			@Override
			public void onNext(EventResponse value) {
				logger.info("response=" + value.getResult());
			}
			@Override
			public void onError(Throwable t) {
				logger.info("onError 호출");
			}
			@Override
			public void onCompleted() {
				logger.info("서버로부터 응답 끝");
			}
		});
    	logger.info("nonblocking + async라 rpc한 후 바로 로그 찍힘");
    }
    
    public void sendFutureUnaryMessage(EventRequest eventRequest) {
    	logger.info("request=" + eventRequest);
    	EventResponse eventResponse = null;
    	ListenableFuture<EventResponse> future = futureStub.unaryEvent(eventRequest);
    	logger.info("future니까 nonblocking?");
    	try {
			eventResponse = future.get(2, TimeUnit.SECONDS);
		} catch (InterruptedException | ExecutionException | TimeoutException e) {
			e.printStackTrace();
		}
    	logger.info("response=" + eventResponse);
    }
    
    public void sendBlockingServerStreamingMessage(EventRequest eventRequest) {
    	logger.info("request=" + eventRequest);
    	Iterator<EventResponse> responseIter;
    	//streaming이 완료될 때까지 대기한다.
    	responseIter = blockingStub.serverStreamingEvent(eventRequest);
    	responseIter.forEachRemaining((response)->{
    		logger.info("response="+response);
    	});
    }
    
    public void sendAsyncServerStreamingMessage(EventRequest eventRequest) {
    	logger.info("request=" + eventRequest);
    	asyncStub.serverStreamingEvent(eventRequest, new StreamObserver<EventResponse>() {
			@Override
			public void onNext(EventResponse value) {
				logger.info("async repsonse=" + value.getResult());
			}
			@Override
			public void onError(Throwable t) {
				logger.info("onError호출");	
			}
			@Override
			public void onCompleted() {
				logger.info("Async Server Streaming response 끝");
			}
		});
    	logger.info("서버 응답과 상관없이 다른 작업중...");
    }
    
    //client streaming은 AsyncStub만 가능.
    public void sendAsyncClientStreamingMessage(List<EventRequest> eventRequests) {
    	//비동기로 응답받을 객체 생성
    	StreamObserver<EventResponse> responseObserver = new StreamObserver<EventResponse>() {
			@Override
			public void onNext(EventResponse value) {
				logger.info("Async client Streaming response=" + value.getResult());
			}
			@Override
			public void onError(Throwable t) {
				logger.info("onError호출");
			}
			@Override
			public void onCompleted() {
				logger.info("서버 응답 끝");
			}
		};
		//비동기로 요청을 보낼 객체 생성
		//응답 받을 객체와 연결
		StreamObserver<EventRequest> requestObserver = asyncStub.clientStreamingEvent(responseObserver);
		//원하는 만큼 데이터 보내기
		for(EventRequest eventRequest : eventRequests) {
			requestObserver.onNext(eventRequest);	
		}
		//데이터 보내기를 완료했다는 시그널 보내기
		requestObserver.onCompleted();
    }
    
    //bidiection Streaming
    public void sendBiDirectionalStreamingMessage(List<EventRequest> eventRequests) {
    	//응답 받을 객체 생성
    	StreamObserver<EventResponse> responseObserver = new StreamObserver<EventResponse>() {
			@Override
			public void onNext(EventResponse value) {
				logger.info("BidirectionResponse=" + value.getResult());
			}
			@Override
			public void onError(Throwable t) {
				logger.info("onError호출");
			}
			@Override
			public void onCompleted() {
				logger.info("Bidirection Streaming response 끝");
			}
		};
		//요청 보낼 객체 생성 및 응답객체와 연결
		StreamObserver<EventRequest> requestObserver = asyncStub.biStreamingEvent(responseObserver);
		for(EventRequest eventRequest : eventRequests) {
			requestObserver.onNext(eventRequest);	
		}
		logger.info("async니까 바로 로그 찍힘");
		requestObserver.onCompleted();
    }
}

7가지 방법을 통해 원격 서버의 메서드를 호출하는 방법을 구현했다.

주석을 참고하도록 하자... (서버에서 간단하게 설명했다)

package com.example.demo.service;

import java.util.ArrayList;
import java.util.List;

import javax.annotation.PostConstruct;

import org.springframework.stereotype.Service;

import com.example.demo.client.NewdataClient;
import com.example.demo.configuration.NewdataClientStubFactory;
import com.example.demo.proto.EventRequest;

@Service
public class ClientService {
	@PostConstruct
	private void init() {
		String host = "localhost";
		int port = 9595;
		
		NewdataClientStubFactory newdataClientStubFactory = new NewdataClientStubFactory(host, port);
		NewdataClient newdataClient = new NewdataClient(newdataClientStubFactory.getBlockingStub(),
				newdataClientStubFactory.getAsyncStub(), newdataClientStubFactory.getFutureStub());
		
		//Request 생성
		EventRequest eventRequest = EventRequest.newBuilder()
    			.setSourceId("sourceId1")
    			.setEventId("eventId1")
    			.build();
		EventRequest eventRequest2 = EventRequest.newBuilder()
    			.setSourceId("sourceId2")
    			.setEventId("eventId2")
    			.build();
		EventRequest eventRequest3 = EventRequest.newBuilder()
    			.setSourceId("sourceId3")
    			.setEventId("eventId3")
    			.build();
		List<EventRequest> eventRequests = new ArrayList<>();
		eventRequests.add(eventRequest);
		eventRequests.add(eventRequest2);
		eventRequests.add(eventRequest3);
		// Blocking Unary
		//newdataClient.sendBlockingUnaryMessage(eventRequest);
		//async Unary
		//newdataClient.sendAsyncUnaryMessage(eventRequest);
		//future Unary
		//newdataClient.sendFutureUnaryMessage(eventRequest);
		//blockingServerStream
		//newdataClient.sendBlockingServerStreamingMessage(eventRequest);		
		//biStream
		newdataClient.sendBiDirectionalStreamingMessage(eventRequests);
	}
}

여러가지를 테스트해서 조금 번잡하긴한데, 결국은 아까 만든 factory클래스에서 stub들을 받아오고, client클래스를 통해 서비스를 호출하는게 전부다.

아래 참고 사이트에서 오명운님 github의 소스를 받아 테스트해봐도 좋고 위의 코드를 테스트해봐도 좋다...

아름답게 코드를 사용하려면 내부 로직을 이해하면 좋을 것 같아서 다음에는 그것을 공부해볼 예정이다.

끝!

 

참고 사이트

https://github.com/HomoEfficio/springcamp2017-grpc-java-client

https://github.com/HomoEfficio/springcamp2017-grpc-java-server

https://github.com/grpc/grpc-java

반응형