摘要:轉載請注明出處翻譯下面的代碼展示了版的查看源碼的同等實現如下可以通過調用方法實現同步執行示例如下測試如下不提供同步執行方法但是如果確定其只會產生一個值那么也可以用如下方式實現如果實際上產生了多個值上述的代碼將會拋出可以通過調用方法實現異步
轉載請注明出處: 翻譯:Hystrix - How To Use
Hello World!下面的代碼展示了HystrixCommand版的Hello World:
public class CommandHelloWorld extends HystrixCommand{ private final String name; public CommandHelloWorld(String name) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected String run() { // a real example would do work like a network call here return "Hello " + name + "!"; } }
查看源碼
HystrixObservableCommand的同等實現如下:
public class CommandHelloWorld extends HystrixObservableCommandSynchronous Execution{ private final String name; public CommandHelloWorld(String name) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected Observable construct() { return Observable.create(new Observable.OnSubscribe () { @Override public void call(Subscriber super String> observer) { try { if (!observer.isUnsubscribed()) { // a real example would do work like a network call here observer.onNext("Hello"); observer.onNext(name + "!"); observer.onCompleted(); } } catch (Exception e) { observer.onError(e); } } } ).subscribeOn(Schedulers.io()); } }
可以通過調用HystrixCommand.execute()方法實現同步執行, 示例如下:
String s = new CommandHelloWorld("World").execute();
測試如下:
@Test public void testSynchronous() { assertEquals("Hello World!", new CommandHelloWorld("World").execute()); assertEquals("Hello Bob!", new CommandHelloWorld("Bob").execute()); }
HystrixObservableCommand不提供同步執行方法, 但是如果確定其只會產生一個值, 那么也可以用如下方式實現:
HystrixObservableCommand.observe().observe().toBlocking().toFuture().get()
HystrixObservableCommand.toObservable().observe().toBlocking().toFuture().get()
如果實際上產生了多個值, 上述的代碼將會拋出java.lang.IllegalArgumentException: Sequence contains too many elements.
Asynchronous Execution可以通過調用HystrixCommand.queue()方法實現異步執行, 示例如下:
Futurefs = new CommandHelloWorld("World").queue();
此時可以通過Future.get()方法獲取command執行結果:
String s = fs.get();
測試代碼如下:
@Test public void testAsynchronous1() throws Exception { assertEquals("Hello World!", new CommandHelloWorld("World").queue().get()); assertEquals("Hello Bob!", new CommandHelloWorld("Bob").queue().get()); } @Test public void testAsynchronous2() throws Exception { FuturefWorld = new CommandHelloWorld("World").queue(); Future fBob = new CommandHelloWorld("Bob").queue(); assertEquals("Hello World!", fWorld.get()); assertEquals("Hello Bob!", fBob.get()); }
下面的兩種實現是等價的:
String s1 = new CommandHelloWorld("World").execute(); String s2 = new CommandHelloWorld("World").queue().get();
HystrixObservableCommand不提供queue方法, 但是如果確定其只會產生一個值, 那么也可以用如下方式實現:
HystrixObservableCommand.observe().observe().toBlocking().toFuture()
HystrixObservableCommand.toObservable().observe().toBlocking().toFuture()
如果實際上產生了多個值, 上述的代碼將會拋出java.lang.IllegalArgumentException: Sequence contains too many elements.
Reactive Execution你也可以將HystrixCommand當做一個可觀察對象(Observable)來觀察(Observe)其產生的結果, 可以使用以下任意一個方法實現:
observe(): 一旦調用該方法, 請求將立即開始執行, 其利用ReplaySubject特性可以保證不會丟失任何command產生的結果, 即使結果在你訂閱之前產生的也不會丟失.
toObservable(): 調用該方法后不會立即執行請求, 而是當有訂閱者訂閱時才會執行.
Observableho = new CommandHelloWorld("World").observe(); // or Observable co = new CommandHelloWorld("World").toObservable();
然后你可以通過訂閱到這個Observable來取得command產生的結果:
ho.subscribe(new Action1() { @Override public void call(String s) { // value emitted here } });
測試如下:
@Test public void testObservable() throws Exception { ObservablefWorld = new CommandHelloWorld("World").observe(); Observable fBob = new CommandHelloWorld("Bob").observe(); // blocking assertEquals("Hello World!", fWorld.toBlockingObservable().single()); assertEquals("Hello Bob!", fBob.toBlockingObservable().single()); // non-blocking // - this is a verbose anonymous inner-class approach and doesn"t do assertions fWorld.subscribe(new Observer () { @Override public void onCompleted() { // nothing needed here } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onNext(String v) { System.out.println("onNext: " + v); } }); // non-blocking // - also verbose anonymous inner-class // - ignore errors and onCompleted signal fBob.subscribe(new Action1 () { @Override public void call(String v) { System.out.println("onNext: " + v); } }); }
使用Java 8的Lambda表達式可以使代碼更簡潔:
fWorld.subscribe((v) -> { System.out.println("onNext: " + v); }) // - or while also including error handling fWorld.subscribe((v) -> { System.out.println("onNext: " + v); }, (exception) -> { exception.printStackTrace(); })
關于Observable的信息可以在這里查閱
Reactive Commands相比將HystrixCommand使用上述方法轉換成一個Observable, 你也可以選擇創建一個HystrixObservableCommand對象. HystrixObservableCommand包裝的Observable允許產生多個結果(譯者注: Subscriber.onNext可以調用多次), 而HystrixCommand即使轉換成了Observable也只能產生一個結果.
使用HystrixObservableCommnad時, 你需要重載construct方法來實現你的業務邏輯, 而不是重載run方法, contruct方法將會返回你需要包裝的Observable.
使用下面任意一個方法可以從HystrixObservableCommand中獲取Observable對象:
observe(): 一旦調用該方法, 請求將立即開始執行, 其利用ReplaySubject特性可以保證不會丟失任何command產生的結果, 即使結果在你訂閱之前產生的也不會丟失.
toObservable(): 調用該方法后不會立即執行請求, 而是當有訂閱者訂閱時才會執行.
Fallback大多數情況下, 我們都希望command在執行失敗時能夠有一個候選方法來處理, 如: 返回一個默認值或執行其他失敗處理邏輯, 除了以下幾個情況:
執行寫操作的command: 當command的目標是執行寫操作而不是讀操作, 那么通常需要將寫操作失敗的錯誤交給調用者處理.
批處理系統/離線計算: 如果command的目標是做一些離線計算、生成報表、填充緩存等, 那么同樣應該將失敗交給調用者處理.
無論command是否實現了getFallback()方法, command執行失敗時, Hystrix的狀態和斷路器(circuit-breaker)的狀態/指標都會進行更新.
HystrixCommand可以通過實現getFallback()方法來實現降級處理, run()方法異常、執行超時、線程池或信號量已滿拒絕提供服務、斷路器短路時, 都會調用getFallback():
public class CommandHelloFailure extends HystrixCommand{ private final String name; public CommandHelloFailure(String name) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected String run() { throw new RuntimeException("this command always fails"); } @Override protected String getFallback() { return "Hello Failure " + name + "!"; } }
查看源碼
這個命令的run()方法總是會執行失敗, 但是調用者總是能收到getFallback()方法返回的值, 而不是收到一個異常:
@Test public void testSynchronous() { assertEquals("Hello Failure World!", new CommandHelloFailure("World").execute()); assertEquals("Hello Failure Bob!", new CommandHelloFailure("Bob").execute()); }
HystrixObservableCommand可以通過重載resumeWithFallback方法實現原Observable執行失敗時返回回另一個Observable, 需要注意的是, 原Observable有可能在發出多個結果之后才出現錯誤, 因此在fallback實現的邏輯中不應該假設訂閱者只會收到失敗邏輯中發出的結果.
Hystrix內部使用了RxJava的onErrorResumeNext操作符來實現Observable之間的無縫轉移.
Error Propagation除HystrixBadRequestException異常外, run方法中拋出的所有異常都會被認為是執行失敗且會觸發getFallback()方法和斷路器的邏輯.
你可以在HystrixBadRequestException中包裝想要拋出的異常, 然后通過getCause()方法獲取. HystrixBadRequestException使用在不應該被錯誤指標(failure metrics)統計和不應該觸發getFallback()方法的場景, 例如報告參數不合法或者非系統異常等.
對于HystrixObservableCommand, 不可恢復的錯誤都會在通過onError方法通知, 并通過獲取用戶實現的resumeWithFallback()方法返回的Observable來完成回退機制.
執行異常類型Failure Type | Exception class | Exception.cause |
---|---|---|
FAILURE | HystrixRuntimeException | underlying exception(user-controlled) |
TIMEOUT | HystrixRuntimeException | j.u.c.TimeoutException |
SHORT_CIRCUITED | HystrixRuntimeException | j.l.RuntimeException |
THREAD_POOL_REJECTED | HystrixRuntimeException | j.u.c.RejectedExecutionException |
SEMAPHORE_REJECTED | HystrixRuntimeException | j.l.RuntimeException |
BAD_REQUEST | HystrixBadRequestException | underlying exception(user-controller) |
默認的command name是從類名中派生的:
getClass().getSimpleName()
可以通過HystrixCommand或HystrixObservableCommand的構造器來指定command name:
public CommandHelloWorld(String name) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld"))); this.name = name; }
可以通過如下方式來重用Setter:
private static final Setter cachedSetter = Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld")); public CommandHelloWorld(String name) { super(cachedSetter); this.name = name; }
HystrixCommandKey是一個接口, 因此可以將其實現為一個枚舉或者常規的類, 但是它已經內置了一個Factory類來構建幫助構建內部實例, 使用方式如下:
HystrixCommandKey.Factory.asKey("Hello World");Command Group
Hystrix使用command group來為分組, 分組信息主要用于報告、警報、儀表盤上顯示, 或者是標識團隊/庫的擁有者.
默認情況下, 除非已經用這個名字定義了一個信號量, 否則 Hystrix將使用這個名稱來定義command的線程池.
HystrixCommandGroupKey是一個接口, 因此可以將其實現為一個枚舉或者常規的類, 但是它已經內置了一個Factory類來構建幫助構建內部實例, 使用方式如下:
HystrixCommandGroupKey.Factory.asKey("Example Group")Command Thread-pool
thread-pool key主要用于在監控、指標發布、緩存等類似場景中標識一個HystrixThreadPool, 一個HystrixCommand于其構造函數中傳入的HystrixThreadPoolKey指定的HystrixThreadPool相關聯, 如果未指定的話, 則使用HystrixCommandGroupKey來獲取/創建HystrixThreadPool.
可以通過HystrixCommand或HystrixObservableCommand的構造器來指定其值:
public CommandHelloWorld(String name) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld")) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool"))); this.name = name; }
HystrixCommandThreadPoolKey是一個接口, 因此可以將其實現為一個枚舉或者常規的類, 但是它已經內置了一個Factory類來構建幫助構建內部實例, 使用方式如下:
HystrixThreadPoolKey.Factory.asKey("Hello World Pool")
使用HystrixThreadPoolKey而不是使用不同的HystrixCommandGroupKey的原因是: 可能會有多條command在邏輯功能上屬于同一個組(group), 但是其中的某些command需要和其他command隔離開, 例如:
兩條用于訪問視頻元數據的command
兩條command的group name都是VideoMetadata
command A與資源#1互斥
command B與資源#2互斥
如果command A由于延遲等原因導致其所在的線程池資源耗盡, 不應該影響command B對#2的執行, 因為他們訪問的是不同的后端資源.
因此, 從邏輯上來說, 我們希望這兩條command應該被分到同一個分組, 但是我們同樣系統將這兩條命令的執行隔離開來, 因此我們使用HystrixThreadPoolKey將其分配到不同的線程池.
Request Cache可以通過實現HystrixCommand或HystrixObservableCommand的getCacheKey()方法開啟用對請求的緩存功能:
public class CommandUsingRequestCache extends HystrixCommand{ private final int value; protected CommandUsingRequestCache(int value) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.value = value; } @Override protected Boolean run() { return value == 0 || value % 2 == 0; } @Override protected String getCacheKey() { return String.valueOf(value); } }
由于該功能依賴于請求的上下文信息, 因此我們必須初始化一個HystrixRequestContext, 使用方式如下:
@Test public void testWithoutCacheHits() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { assertTrue(new CommandUsingRequestCache(2).execute()); assertFalse(new CommandUsingRequestCache(1).execute()); assertTrue(new CommandUsingRequestCache(0).execute()); assertTrue(new CommandUsingRequestCache(58672).execute()); } finally { context.shutdown(); } }
通常情況下, 上下文信息(HystrixRequestContext)應該在持有用戶請求的ServletFilter或者其他擁有生命周期管理功能的類來初始化和關閉.
下面的例子展示了command如何從緩存中獲取數據, 以及如何查詢一個數據是否是從緩存中獲取到的:
@Test public void testWithCacheHits() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { CommandUsingRequestCache command2a = new CommandUsingRequestCache(2); CommandUsingRequestCache command2b = new CommandUsingRequestCache(2); assertTrue(command2a.execute()); // this is the first time we"ve executed this command with // the value of "2" so it should not be from cache assertFalse(command2a.isResponseFromCache()); assertTrue(command2b.execute()); // this is the second time we"ve executed this command with // the same value so it should return from cache assertTrue(command2b.isResponseFromCache()); } finally { context.shutdown(); } // start a new request context context = HystrixRequestContext.initializeContext(); try { CommandUsingRequestCache command3b = new CommandUsingRequestCache(2); assertTrue(command3b.execute()); // this is a new request context so this // should not come from cache assertFalse(command3b.isResponseFromCache()); } finally { context.shutdown(); } }Request Collapsing
請求合并可以用于將多條請求綁定到一起, 由同一個HystrixCommand實例執行.
collapser可以通過batch size和batch創建以來的耗時來自動將請求合并執行.
Hystrix支持兩個請求合并方式: 請求級的合并和全局級的合并. 默認是請求范圍的合并, 可以在構造collapser時指定值.
請求級(request-scoped)的collapser只會合并每一個HystrixRequestContext中的請求, 而全局級(globally-scoped)的collapser則可以跨HystrixRequestContext合并請求. 因此, 如果你下游的依賴者無法再一個command中處理多個HystrixRequestContext的話, 那么你應該使用請求級的合并.
在Netflix, 我們只會使用請求級的合并, 因為我們當前所有的系統都是基于一個command對應一個HystrixRequestContext的設想下構建的. 因此, 當一個command使用不同的參數在一個請求中并發執行時, 合并是有效的.
下面的代碼展示了如何實現請求級的HystrixCollapser:
public class CommandCollapserGetValueForKey extends HystrixCollapser, String, Integer> { private final Integer key; public CommandCollapserGetValueForKey(Integer key) { this.key = key; } @Override public Integer getRequestArgument() { return key; } @Override protected HystrixCommand
> createCommand(final Collection
> requests) { return new BatchCommand(requests); } @Override protected void mapResponseToRequests(List batchResponse, Collection > requests) { int count = 0; for (CollapsedRequest request : requests) { request.setResponse(batchResponse.get(count++)); } } private static final class BatchCommand extends HystrixCommand > { private final Collection
> requests; private BatchCommand(Collection > requests) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey"))); this.requests = requests; } @Override protected List run() { ArrayList response = new ArrayList (); for (CollapsedRequest request : requests) { // artificial response for each argument received in the batch response.add("ValueForKey: " + request.getArgument()); } return response; } } }
下面的代碼展示了如果使用collapser自動合并4個CommandCollapserGetValueForKey到一個HystrixCommand中執行:
@Test public void testCollapser() throws Exception { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { FutureRequest Context Setupf1 = new CommandCollapserGetValueForKey(1).queue(); Future f2 = new CommandCollapserGetValueForKey(2).queue(); Future f3 = new CommandCollapserGetValueForKey(3).queue(); Future f4 = new CommandCollapserGetValueForKey(4).queue(); assertEquals("ValueForKey: 1", f1.get()); assertEquals("ValueForKey: 2", f2.get()); assertEquals("ValueForKey: 3", f3.get()); assertEquals("ValueForKey: 4", f4.get()); // assert that the batch command "GetValueForKey" was in fact // executed and that it executed only once assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); HystrixCommand> command = HystrixRequestLog.getCurrentRequest().getExecutedCommands().toArray(new HystrixCommand>[1])[0]; // assert the command is the one we"re expecting assertEquals("GetValueForKey", command.getCommandKey().name()); // confirm that it was a COLLAPSED command execution assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED)); // and that it was successful assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS)); } finally { context.shutdown(); } }
使用請求級的特性時(如: 請求緩存、請求合并、請求日志)你必須管理HystrixRequestContext的生命周期(或者實現HystrixConcurrencyStategy).
這意味著你必須在請求之前執行如下代碼:
HystrixRequestContext context = HystrixRequestContext.initializeContext();
并在請求結束后執行如下代碼:
context.shutdown();
在標準的Java web應用中, 你可以使用Setvlet Filter實現的如下的過濾器來管理:
public class HystrixRequestContextServletFilter implements Filter { public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { chain.doFilter(request, response); } finally { context.shutdown(); } } }
可以在web.xml中加入如下代碼實現對所有的請求都使用該過濾器:
Common PatternsHystrixRequestContextServletFilter HystrixRequestContextServletFilter com.netflix.hystrix.contrib.requestservlet.HystrixRequestContextServletFilter HystrixRequestContextServletFilter /*
以下是HystrixCommand和HystrixObservableCommand的一般用法和使用模式.
Fail Fast最基本的使用是執行一條只做一件事情且沒有實現回退方法的command, 這樣的command在發生任何錯誤時都會拋出異常:
public class CommandThatFailsFast extends HystrixCommand{ private final boolean throwException; public CommandThatFailsFast(boolean throwException) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.throwException = throwException; } @Override protected String run() { if (throwException) { throw new RuntimeException("failure from CommandThatFailsFast"); } else { return "success"; } }
下面的代碼演示了上述行為:
@Test public void testSuccess() { assertEquals("success", new CommandThatFailsFast(false).execute()); } @Test public void testFailure() { try { new CommandThatFailsFast(true).execute(); fail("we should have thrown an exception"); } catch (HystrixRuntimeException e) { assertEquals("failure from CommandThatFailsFast", e.getCause().getMessage()); e.printStackTrace(); } }
HystrixObservableCommand需要重載resumeWithFallback()方法來實現同樣的行為:
@Override protected ObservableFail SilentresumeWithFallback() { if (throwException) { return Observable.error(new Throwable("failure from CommandThatFailsFast")); } else { return Observable.just("success"); } }
靜默失敗等同于返回一個空的響應或者移除功能. 可以是返回null、空Map、空List, 或者其他類似的響應.
可以通過實現HystrixCommand.getFallback()方法實現該功能:
public class CommandThatFailsSilently extends HystrixCommand{ private final boolean throwException; public CommandThatFailsSilently(boolean throwException) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.throwException = throwException; } @Override protected String run() { if (throwException) { throw new RuntimeException("failure from CommandThatFailsFast"); } else { return "success"; } } @Override protected String getFallback() { return null; } }
@Test public void testSuccess() { assertEquals("success", new CommandThatFailsSilently(false).execute()); } @Test public void testFailure() { try { assertEquals(null, new CommandThatFailsSilently(true).execute()); } catch (HystrixRuntimeException e) { fail("we should not get an exception as we fail silently with a fallback"); } }
或者返回一個空List的實現如下:
@Override protected ListgetFallback() { return Collections.emptyList(); }
HystrixObservableCommand可以通過重載resumeWithFallback()方法實現同樣的行為:
@Override protected ObservableFallback: StaticresumeWithFallback() { return Observable.empty(); }
Fallback可以返回代碼里設定的默認值, 這種方式可以通過默認行為來有效避免于靜默失敗帶來影響.
例如, 如果一個應返回true/false的用戶認證的command執行失敗了, 那么其默認行為可以如下:
@Override protected Boolean getFallback() { return true; }
對于HystrixObservableCommand可以通過重載resumeWithFallback()方法實現同樣的行為:
@Override protected ObservableFallback: StubbedresumeWithFallback() { return Observable.just( true ); }
當command返回的是一個包含多個字段的復合對象, 且該對象的一部分字段值可以通過其他請求狀態獲得, 另一部分狀態可以通過設置默認值獲得時, 你通常需要使用存根(stubbed)模式.
你可能可以從存根值(stubbed values)中得到適當的值的情況如下:
cookies
請求參數和請求頭
當前失敗請求的前一個服務請求的響應
在fallback代碼塊內可以靜態地獲取請求范圍內的存根(stubbed)值, 但是通常我們更推薦在構建command實例時注入這些值, 就像下面實例的代碼中的countryCodeFromGeoLookup一樣:
public class CommandWithStubbedFallback extends HystrixCommand{ private final int customerId; private final String countryCodeFromGeoLookup; /** * @param customerId * The customerID to retrieve UserAccount for * @param countryCodeFromGeoLookup * The default country code from the HTTP request geo code lookup used for fallback. */ protected CommandWithStubbedFallback(int customerId, String countryCodeFromGeoLookup) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.customerId = customerId; this.countryCodeFromGeoLookup = countryCodeFromGeoLookup; } @Override protected UserAccount run() { // fetch UserAccount from remote service // return UserAccountClient.getAccount(customerId); throw new RuntimeException("forcing failure for example"); } @Override protected UserAccount getFallback() { /** * Return stubbed fallback with some static defaults, placeholders, * and an injected value "countryCodeFromGeoLookup" that we"ll use * instead of what we would have retrieved from the remote service. */ return new UserAccount(customerId, "Unknown Name", countryCodeFromGeoLookup, true, true, false); } public static class UserAccount { private final int customerId; private final String name; private final String countryCode; private final boolean isFeatureXPermitted; private final boolean isFeatureYPermitted; private final boolean isFeatureZPermitted; UserAccount(int customerId, String name, String countryCode, boolean isFeatureXPermitted, boolean isFeatureYPermitted, boolean isFeatureZPermitted) { this.customerId = customerId; this.name = name; this.countryCode = countryCode; this.isFeatureXPermitted = isFeatureXPermitted; this.isFeatureYPermitted = isFeatureYPermitted; this.isFeatureZPermitted = isFeatureZPermitted; } } }
下面的代碼演示了上述行為:
@Test public void test() { CommandWithStubbedFallback command = new CommandWithStubbedFallback(1234, "ca"); UserAccount account = command.execute(); assertTrue(command.isFailedExecution()); assertTrue(command.isResponseFromFallback()); assertEquals(1234, account.customerId); assertEquals("ca", account.countryCode); assertEquals(true, account.isFeatureXPermitted); assertEquals(true, account.isFeatureYPermitted); assertEquals(false, account.isFeatureZPermitted); }
對于HystrixObservableCommand可以通過重載resumeWithFallback()方法實現同樣的行為:
@Override protected ObservableresumeWithFallback() { return Observable.just( new UserAccount(customerId, "Unknown Name", countryCodeFromGeoLookup, true, true, false) ); }
如果你想要從Observable中發出多個值, 那么當失敗發生時, 原本的Observable可能已經發出的一部分值, 此時你或許更希望能夠只從fallback邏輯中發出另一部分未被發出的值, 下面的例子就展示了如何實現這一個目的: 它通過追蹤原Observable發出的最后一個值來實現fallback邏輯中的Observable應該從什么地方繼續發出存根值(stubbed value) :
@Override protected ObservableFallback: Cache via Networkconstruct() { return Observable.just(1, 2, 3) .concatWith(Observable. error(new RuntimeException("forced error"))) .doOnNext(new Action1 () { @Override public void call(Integer t1) { lastSeen = t1; } }) .subscribeOn(Schedulers.computation()); } @Override protected Observable resumeWithFallback() { if (lastSeen < 4) { return Observable.range(lastSeen + 1, 4 - lastSeen); } else { return Observable.empty(); } }
有時后端的服務異常也會引起command執行失敗, 此時我們也可以從緩存中(如: memcached)取得相關的數據.
由于在fallback的邏輯代碼中訪問網絡可能會再次失敗, 因此必須構建新的HystrixCommand或HystrixObservableCommand來執行:
很重要的一點是執行fallback邏輯的command需要在一個不同的線程池中執行, 否則如果原command的延遲變高且其所在線程池已經滿了的話, 執行fallback邏輯的command將無法在同一個線程池中執行.
下面的代碼展示了CommandWithFallbackViaNetwork如何在getFallback()方法中執行FallbackViaNetwork.
注意, FallbackViaNetwork同樣也具有回退機制, 這里通過返回null來實現fail silent.
FallbackViaNetwork默認會從HystrixCommandGroupKey中繼承線程池的配置RemoteServiceX, 因此需要在其構造器中注入HystrixThreadPoolKey.Factory.asKey("RemoteServiceXFallback")來使其在不同的線程池中執行.
這樣, CommandWithFallbackViaNetwork會在名為RemoteServiceX的線程池中執行, 而FallbackViaNetwork會在名為RemoteServiceXFallback的線程池中執行.
public class CommandWithFallbackViaNetwork extends HystrixCommandPrimary + Secondary with Fallback{ private final int id; protected CommandWithFallbackViaNetwork(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX")) .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueCommand"))); this.id = id; } @Override protected String run() { // RemoteServiceXClient.getValue(id); throw new RuntimeException("force failure for example"); } @Override protected String getFallback() { return new FallbackViaNetwork(id).execute(); } private static class FallbackViaNetwork extends HystrixCommand { private final int id; public FallbackViaNetwork(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX")) .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueFallbackCommand")) // use a different threadpool for the fallback command // so saturating the RemoteServiceX pool won"t prevent // fallbacks from executing .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("RemoteServiceXFallback"))); this.id = id; } @Override protected String run() { MemCacheClient.getValue(id); } @Override protected String getFallback() { // the fallback also failed // so this fallback-of-a-fallback will // fail silently and return null return null; } } }
有些系統可能具有是以雙系統模式搭建的 — 主從模式或主備模式.
有時從系統或備用系統會被認為是失敗狀態的一種, 僅在執行fallback邏輯是才使用它;這種場景和Cache via Network一節中描述的場景是一樣的.
然而, 如果切換到從系統是一個很正常時, 例如發布新代碼時(這是有狀態的系統發布代碼的一種方式), 此時每當切換到從系統使用時, 主系統都是處于不可用狀態,斷路器將會打開且發出警報.
這并不是我們期望發生的事, 這種狼來了式的警報可能會導致真正發生問題的時候我們卻把它當成正常的誤報而忽略了.
因此, 我們可以通過在其前面放置一個門面HystrixCommand(見下文), 將主/從系統的切換視為正常的、健康的狀態.
主從HystrixCommand都是需要訪問網絡且實現了特定的業務邏輯, 因此其實現上應該是線程隔離的. 它們可能具有顯著的性能差距(通常從系統是一個靜態緩存), 因此將兩個command隔離的另一個好處是可以針對性地調優.
你不需要將這兩個command都公開發布, 只需要將它們隱藏在另一個由信號量隔離的HystrixCommand中(稱之為門面HystrixCommand), 在這個command中去實現主系統還是從系統的調用選擇. 只有當主從系統都失敗了, 才會去執行這個門面command的fallback邏輯.
門面HystrixCommand可以使用信號量隔離的, 因為其業務邏輯僅僅是調用另外兩個線程隔離的HystrixCommand, 它不涉及任何的網絡訪問、重試等容易出錯的事, 因此沒必要將這部分代碼放到其他線程去執行.
public class CommandFacadeWithPrimarySecondary extends HystrixCommandClient Doesn"t Perform Network Access{ private final static DynamicBooleanProperty usePrimary = DynamicPropertyFactory.getInstance().getBooleanProperty("primarySecondary.usePrimary", true); private final int id; public CommandFacadeWithPrimarySecondary(int id) { super(Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX")) .andCommandKey(HystrixCommandKey.Factory.asKey("PrimarySecondaryCommand")) .andCommandPropertiesDefaults( // we want to default to semaphore-isolation since this wraps // 2 others commands that are already thread isolated HystrixCommandProperties.Setter() .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE))); this.id = id; } @Override protected String run() { if (usePrimary.get()) { return new PrimaryCommand(id).execute(); } else { return new SecondaryCommand(id).execute(); } } @Override protected String getFallback() { return "static-fallback-" + id; } @Override protected String getCacheKey() { return String.valueOf(id); } private static class PrimaryCommand extends HystrixCommand { private final int id; private PrimaryCommand(int id) { super(Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX")) .andCommandKey(HystrixCommandKey.Factory.asKey("PrimaryCommand")) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("PrimaryCommand")) .andCommandPropertiesDefaults( // we default to a 600ms timeout for primary HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(600))); this.id = id; } @Override protected String run() { // perform expensive "primary" service call return "responseFromPrimary-" + id; } } private static class SecondaryCommand extends HystrixCommand { private final int id; private SecondaryCommand(int id) { super(Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX")) .andCommandKey(HystrixCommandKey.Factory.asKey("SecondaryCommand")) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SecondaryCommand")) .andCommandPropertiesDefaults( // we default to a 100ms timeout for secondary HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(100))); this.id = id; } @Override protected String run() { // perform fast "secondary" service call return "responseFromSecondary-" + id; } } public static class UnitTest { @Test public void testPrimary() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", true); assertEquals("responseFromPrimary-20", new CommandFacadeWithPrimarySecondary(20).execute()); } finally { context.shutdown(); ConfigurationManager.getConfigInstance().clear(); } } @Test public void testSecondary() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", false); assertEquals("responseFromSecondary-20", new CommandFacadeWithPrimarySecondary(20).execute()); } finally { context.shutdown(); ConfigurationManager.getConfigInstance().clear(); } } } }
當你使用HystrixCommand實現的業務邏輯不涉及到網絡訪問、對延遲敏感且無法接受多線程帶來的開銷時, 你需要設置executionIsolationStrategy)屬性的值為ExecutionIsolationStrategy.SEMAPHORE, 此時Hystrix會使用信號量隔離代替線程隔離.
下面的代碼展示了如何為command設置該屬性(也可以在運行時動態改變這個屬性的值):
public class CommandUsingSemaphoreIsolation extends HystrixCommandGet-Set-Get with Request Cache Invalidation{ private final int id; public CommandUsingSemaphoreIsolation(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) // since we"re doing an in-memory cache lookup we choose SEMAPHORE isolation .andCommandPropertiesDefaults(HystrixCommandProperties.Setter() .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE))); this.id = id; } @Override protected String run() { // a real implementation would retrieve data from in memory data structure return "ValueFromHashMap_" + id; } }
Get-Set-Get是指: Get請求的結果被緩存下來后, 另一個command對同一個資源發出了Set請求, 此時由Get請求緩存的結果應該失效, 避免隨后的Get請求獲取到過時的緩存結果, 此時可以通過調用HystrixRequestCache.clear())方法來使緩存失效.
public class CommandUsingRequestCacheInvalidation { /* represents a remote data store */ private static volatile String prefixStoredOnRemoteDataStore = "ValueBeforeSet_"; public static class GetterCommand extends HystrixCommand{ private static final HystrixCommandKey GETTER_KEY = HystrixCommandKey.Factory.asKey("GetterCommand"); private final int id; public GetterCommand(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("GetSetGet")) .andCommandKey(GETTER_KEY)); this.id = id; } @Override protected String run() { return prefixStoredOnRemoteDataStore + id; } @Override protected String getCacheKey() { return String.valueOf(id); } /** * Allow the cache to be flushed for this object. * * @param id * argument that would normally be passed to the command */ public static void flushCache(int id) { HystrixRequestCache.getInstance(GETTER_KEY, HystrixConcurrencyStrategyDefault.getInstance()).clear(String.valueOf(id)); } } public static class SetterCommand extends HystrixCommand { private final int id; private final String prefix; public SetterCommand(int id, String prefix) { super(HystrixCommandGroupKey.Factory.asKey("GetSetGet")); this.id = id; this.prefix = prefix; } @Override protected Void run() { // persist the value against the datastore prefixStoredOnRemoteDataStore = prefix; // flush the cache GetterCommand.flushCache(id); // no return value return null; } } }
@Test public void getGetSetGet() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { assertEquals("ValueBeforeSet_1", new GetterCommand(1).execute()); GetterCommand commandAgainstCache = new GetterCommand(1); assertEquals("ValueBeforeSet_1", commandAgainstCache.execute()); // confirm it executed against cache the second time assertTrue(commandAgainstCache.isResponseFromCache()); // set the new value new SetterCommand(1, "ValueAfterSet_").execute(); // fetch it again GetterCommand commandAfterSet = new GetterCommand(1); // the getter should return with the new prefix, not the value from cache assertFalse(commandAfterSet.isResponseFromCache()); assertEquals("ValueAfterSet_1", commandAfterSet.execute()); } finally { context.shutdown(); } } }Migrating a Library to Hystrix
如果你要遷移一個已有的客戶端庫到Hystrix, 你應該將所有的服務方法(service methods)替換成HystrixCommand.
服務方法(service methods)轉而調用HystrixCommand且不在包含任何額外的業務邏輯.
因此, 在遷移之前, 一個服務庫可能是這樣的:
遷移完成之后, 服務庫的用戶要能直接訪問到HystrixCommand, 或者通過服務門面(service facade)的代理間接訪問到HystrixCommand.
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/73416.html
摘要:使用線程池的好處通過線程在自己的線程池中隔離的好處是該應用程序完全可以不受失控的客戶端庫的威脅。簡而言之,由線程池提供的隔離功能可以使客戶端庫和子系統性能特性的不斷變化和動態組合得到優雅的處理,而不會造成中斷。 ? 工作流程圖 下面的流程圖展示了當使用Hystrix的依賴請求,Hystrix是如何工作的。showImg(https://segmentfault.com/img/bV0...
摘要:斷路器本身是一種開關裝置,用于在電路上保護線路過載,當線路中有電器發生短路時,斷路器能夠及時的切斷故障電路,防止發生過載發熱甚至起火等嚴重后果。具備擁有回退機制和斷路器功能的線程和信號隔離,請求緩存和請求打包,以及監控和配置等功能。 轉載請注明出處 http://www.paraller.com 代碼機制:熔斷 & Fallback & 資源隔離 熔斷 概念: 在微服務架構中,我們將系...
摘要:腳本位置依賴內采樣率,默認即如需測試時每次都看到則修改為,但對性能有影響,注意上線時修改為合理值運行查詢參考規范推薦推薦谷歌的大規模分布式跟蹤系統分布式服務的 zipkin-server pom io.zipkin zipkin-ui 1.39.3 or...
摘要:斷路器原理斷路器在和執行過程中起到至關重要的作用。其中通過來定義,每一個命令都需要有一個來標識,同時根據這個可以找到對應的斷路器實例。一個啥都不做的斷路器,它允許所有請求通過,并且斷路器始終處于閉合狀態斷路器的另一個實現類。 斷路器原理 斷路器在HystrixCommand和HystrixObservableCommand執行過程中起到至關重要的作用。查看一下核心組件HystrixCi...
閱讀 1968·2023-04-26 01:59
閱讀 3274·2021-10-11 11:07
閱讀 3305·2021-09-22 15:43
閱讀 3385·2021-09-02 15:21
閱讀 2563·2021-09-01 10:49
閱讀 910·2019-08-29 15:15
閱讀 3095·2019-08-29 13:59
閱讀 2837·2019-08-26 13:36