forked from ng-book/angular2-rxjs-chat
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMessagesService.ts
105 lines (91 loc) · 3.72 KB
/
MessagesService.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
/// <reference path="../../typings/app.d.ts" />
import {Injectable, bind} from "angular2/angular2";
import * as Rx from "rx";
import {User, Thread, Message} from "../models";
let initialMessages: Message[] = [];
interface IMessagesOperation extends Function {
(messages: Message[]): Message[];
}
@Injectable()
export class MessagesService {
// a stream that publishes new messages only once
newMessages: Rx.Subject<Message> = new Rx.Subject<Message>();
// `messages` is a stream that emits an array of the most up to date messages
messages: Rx.Observable<Message[]>;
// `updates` receives _operations_ to be applied to our `messages`
// it's a way we can perform changes on *all* messages (that are currently
// stored in `messages`)
updates: Rx.Subject<any> =
new Rx.Subject<any>();
// action streams
create: Rx.Subject<Message> = new Rx.Subject<Message>();
markThreadAsRead: Rx.Subject<any> = new Rx.Subject<any>();
constructor() {
this.messages = this.updates
// watch the updates and accumulate operations on the messages
.scan(initialMessages, (messages: Message[],
operation: IMessagesOperation) => {
return operation(messages);
})
// make sure we can share the most recent list of messages across anyone
// who's interested in subscribing and cache the last known list of
// messages
.shareReplay(1);
// `create` takes a Message and then puts an operation (the inner function)
// on the `updates` stream to add the Message to the list of messages.
//
// That is, for each item that gets added to `create` (by using `onNext`)
// this stream emits a concat operation function.
//
// Next we subscribe `this.updates` to listen to this stream, which means
// that it will receive each operation that is created
//
// Note that it would be perfectly acceptable to simply modify the
// "addMessage" function below to simply add the inner operation function to
// the update stream directly and get rid of this extra action stream
// entirely. The pros are that it is potentially clearer. The cons are that
// the stream is no longer composable.
this.create
.map( function(message: Message): IMessagesOperation {
return (messages: Message[]) => {
return messages.concat(message);
};
})
.subscribe(this.updates);
this.newMessages
.subscribe(this.create);
// similarly, `markThreadAsRead` takes a Thread and then puts an operation
// on the `updates` stream to mark the Messages as read
this.markThreadAsRead
.map( (thread: Thread) => {
return (messages: Message[]) => {
return messages.map( (message: Message) => {
// note that we're manipulating `message` directly here. Mutability
// can be confusing and there are lots of reasons why you might want
// to, say, copy the Message object or some other 'immutable' here
if (message.thread.id === thread.id) {
message.isRead = true;
}
return message;
});
};
})
.subscribe(this.updates);
}
// an imperative function call to this action stream
addMessage(message: Message): void {
this.newMessages.onNext(message);
}
messagesForThreadUser(thread: Thread, user: User): Rx.Observable<Message> {
return this.newMessages
.filter((message: Message) => {
// belongs to this thread
return (message.thread.id === thread.id) &&
// and isn't authored by this user
(message.author.id !== user.id);
});
}
}
export var messagesServiceInjectables: Array<any> = [
bind(MessagesService).toClass(MessagesService)
];