WO2011005232A1 - Mapping time series requests to member nodes - Google Patents
Mapping time series requests to member nodes Download PDFInfo
- Publication number
- WO2011005232A1 WO2011005232A1 PCT/TR2009/000085 TR2009000085W WO2011005232A1 WO 2011005232 A1 WO2011005232 A1 WO 2011005232A1 TR 2009000085 W TR2009000085 W TR 2009000085W WO 2011005232 A1 WO2011005232 A1 WO 2011005232A1
- Authority
- WO
- WIPO (PCT)
- Prior art keywords
- nodes
- requests
- messages
- membership
- time series
- Prior art date
Links
Classifications
-
- H—ELECTRICITY
- H04—ELECTRIC COMMUNICATION TECHNIQUE
- H04L—TRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
- H04L67/00—Network arrangements or protocols for supporting network services or applications
- H04L67/01—Protocols
- H04L67/10—Protocols in which an application is distributed across nodes in the network
-
- H—ELECTRICITY
- H04—ELECTRIC COMMUNICATION TECHNIQUE
- H04L—TRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
- H04L67/00—Network arrangements or protocols for supporting network services or applications
- H04L67/50—Network services
- H04L67/60—Scheduling or organising the servicing of application requests, e.g. requests for application data transmissions using the analysis and optimisation of the required network resources
- H04L67/62—Establishing a time schedule for servicing the requests
-
- H—ELECTRICITY
- H04—ELECTRIC COMMUNICATION TECHNIQUE
- H04L—TRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
- H04L69/00—Network arrangements, protocols or services independent of the application payload and not provided for in the other groups of this subclass
- H04L69/28—Timers or timing mechanisms used in protocols
-
- H—ELECTRICITY
- H04—ELECTRIC COMMUNICATION TECHNIQUE
- H04L—TRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
- H04L67/00—Network arrangements or protocols for supporting network services or applications
- H04L67/01—Protocols
- H04L67/10—Protocols in which an application is distributed across nodes in the network
- H04L67/1001—Protocols in which an application is distributed across nodes in the network for accessing one among a plurality of replicated servers
-
- H—ELECTRICITY
- H04—ELECTRIC COMMUNICATION TECHNIQUE
- H04L—TRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
- H04L69/00—Network arrangements, protocols or services independent of the application payload and not provided for in the other groups of this subclass
- H04L69/40—Network arrangements, protocols or services independent of the application payload and not provided for in the other groups of this subclass for recovering from a failure of a protocol instance or entity, e.g. service redundancy protocols, protocol state redundancy or protocol service redirection
Definitions
- Distributed systems consist of multiple nodes that are connected by a network. These nodes contain processing and storage resources, and through connecting the nodes, distributed systems provide two main advantages: (1) higher and scalable performance achieved by using the aggregate of resources available to the nodes, and (2) higher availability achieved by serving one request from multiple independent nodes.
- a key challenge involved in serving requests in distributed systems is mapping these requests to the appropriate nodes.
- This mapping needs to distribute the requests evenly among the nodes, and maintain a deterministic distribution even in the face of nodes arriving to and departing from the system.
- problems associated with request mapping usually involve establishing the set of active nodes in the distributed system.
- These active nodes are also referred to as member nodes or members.
- member nodes To determine the member nodes, distributed systems use one of several techniques: (1) an authoritative node monitors and decides upon the member nodes, (2) nodes follow a predetermined protocol to agree upon the set of active member nodes, or (3) each node keeps its local view on the system and communicates membership changes to other member nodes. Although widely employed, each approach has some drawbacks.
- the first approach suffers when the authoritative node fails or loses connectivity to the other nodes.
- the second approach requires that distributed consensus is established, which may not be possible in the presence of multiple node or network failures.
- the third approach overcomes previous shortcomings, and always maps the requests to a set of nodes.
- deterministic mapping of requests requires additional protocols to be employed. These protocols increase overall complexity, and consequently reduce this approach's applicability.
- all forementioned approaches focus on determining a membership list for the present time. As nodes arrive and depart, node updates are reflected to the membership list, effectively creating a new list. These updates for keeping the membership list current are necessary, since associating a request with a time point is infeasible. Therefore, all requests are treated as "current.”
- This family of requests is also referred to as event data, time series data, or time series requests; the requests for storing data include timestamps, and the requests for retrieving or processing data include time intervals.
- Some examples that belong to this family are: price changes for a stock item, actions of a website user modifying her shopping cart, and sensor data for a military vehicle.
- Some examples that do not fit into the family of time series requests are: catalog information for a product, and the account balance of a bank's customer.
- the present invention relates to mapping time series requests to member nodes in a distributed system.
- the membership of nodes is determined by arrival and departure messages, where the messages contain node identifiers and anticipated arrival or departure times for the nodes. Since messages contain timestamps for membership, they can be ordered and merged to reconstruct membership lists for any time point.
- mapping data storage requests to member nodes may be as simple as: use the timestamp in the request to determine the membership list for that time point, and choose the member nodes to map from that membership list.
- This embodiment provides a practical and deterministic method to map data, yet it does not rely on any central authority, or require that any data be migrated.
- FIG. 1 is a diagram displaying an outline of a system where nodes keep copies of membership related messages
- FIG. 2 is a diagram displaying example membership messages in more detail
- FIG. 3 is a flow chart of operations used by autonomous member nodes for detecting failures of other nodes
- FIG. 4 is a diagram displaying an example set of membership views constructed from membership messages
- FIG. 5a-5c are diagrams displaying the list of member nodes to contact for given keys and for a selected membership view of FIG.4;
- FIG. 6 is a flow chart of operations used for forming a replication group from a given membership list, replication factor, and request key;
- FIG. 7a-7b are diagrams displaying example mappings from storage requests to member nodes for a shopping cart application.
- FIG. 8a-8b are diagrams displaying processing of an example request for a financial analysis application. DETAILED DESCRIPTION OF THE INVENTION
- One of the primary challenges involved in designing a distributed system is maintaining a deterministic mapping between requests and member nodes.
- a particular subset of the requests also called time series requests, is different in that they include either timestamps or time intervals. Since node arrivals and departures also occur over a time dimension, adding timestamps to membership messages allows the system to capture all information required to map requests to nodes, at any time point.
- the following sections describe the details of mapping time series requests to member nodes. Then, two concrete examples are given to demonstrate the applications of described methods, their advantages and potential shortcomings.
- FIG. 1 shows a plurality of nodes (102) connected through a network (106) to form a distributed system (100).
- the network connections (106) may include a combination of local area networks, wide area networks, intranets, extranets or the internet.
- the nodes (102) connected by the network may be Java- enabled devices, laptop or desktop computers, virtual machines, servers, mainframes, or a combination thereof.
- Each node keeps a copy of membership related messages (104) that it receives.
- FIG. 2 displays five membership related messages that may appear in the history of received messages (104). Messages are identified with message types (212); in this example, the three message types are node arrival, node departure and replication factor.
- the messages also have an activation timestamp (214) that denotes the point in time the message becomes active.
- the activation timestamps are represented such that their definitions are exact across different time zones and daylight savings times.
- the remaining message fields differ depending on the message type.
- Two essential message types are node arrival and departure, and both messages include a node identifier (218) that uniquely represents one node in the system. This identifier may be a public, private or virtual IP address, a domain name, or a uniform resource locator.
- the time interval between a node arrival and departure message specifies the membership interval of the node. For example, the messages (204 and 205) indicate that the node (232) is a member of the distributed system for an interval of one month, between the two activation timestamps (230 and 234).
- node arrival messages may also include additional fields that may be used for more informed mappings between requests and nodes. These informational fields may also be generated and propagated independently of node arrival messages.
- One example field that provides additional node information is the network location (220); this field depicts the node's logical location in the network. In this example, the network location specifies the data center to which the node belongs. Using this information, data storage requests may be mapped such that data can be copied, or replicated, to nodes that live in different data centers.
- processing resources 222
- Other fields may also be included, such as processor cache size, memory space, disk space, or network bandwidth.
- node resources may be mapped such that the node with higher processing resources (236) receives two and a half times more requests than the node with lower processing resources (238).
- the last message type displayed in FIG. 2 relates to a replication message (201).
- Other variations of the message (201) are also possible; for example, one such message may signal the system to replicate and forward requests to R nodes, and if a node is unavailable, to keep contacting up to N nodes.
- These fields and the values they contain are interpreted by the mapping function, and are discussed in more detail below.
- membership related messages may be propagated individually or in batches. Propagating messages in batches is more common in certain embodiments where the messages may be generated by sources that are logically external to the system. Two example external sources are: (i) a human operator who periodically decides upon the set of nodes to add to or remove from the system, and (ii) a name service that monitors pre-registered nodes, and removes failed ones from the system.
- the membership messages may be generated by the nodes themselves.
- the nodes that are activated within the distributed system may announce their arrival to other member nodes through a predetermined protocol. Further, member nodes may monitor each other to detect failures and may send departure messages for failed nodes.
- the mechanisms for detecting failures are several; some example methods include: (i) counting the number of connection failures to a node, and reporting failure if the failure counter exceeds a predetermined value, (ii) receiving heartbeat messages from other nodes, and reporting failure if a node fails to send heartbeat messages for a predetermined time, or (iii) sending ping messages to other nodes, and reporting failure if replies to ping messages are not received for a predetermined number of times.
- FIG. 3 explains in greater detail an example ping method used for detecting failed nodes, and for propagating departure messages for these nodes. The means through which arrival and departure messages are propagated closely relate to the sources that generate these messages.
- membership messages When membership messages are generated by sources external to the system, it is more common for these messages to be propagated through configuration files or through point to point messaging. Conversely, when membership messages are generated autonomously by the member nodes, these messages are usually propagated through one to many messaging protocols such as broadcast, multicast, or gossip.
- Propagation of membership messages in the distributed system may be delayed due to network latencies, network failures or temporary member node failures. Therefore, the membership messages kept at one member node may differ, at any given time, from the membership messages kept at another node. In other words, member nodes may have different views of the membership messages generated in the system.
- membership messages as displayed in FIG. 2 have their activation timestamps (214) set such that member nodes have a consistent view of the membership of other nodes in the system.
- Consistent views may be achieved simply by setting activation timestamps sufficiently in the future. To decide how far in the future, previously or heuristically determined values may be used. As an example, assume that nodes in a distributed system recover from temporary failures in time r and further assume that current time is T. Then, when a node detects that another member node has failed, it sends a departure message for the failed node with the activation timestamp set to T + 2r. This gives sufficient time to the failed node: (i) to recover from a temporary failure, (ii) to generate an arrival message that effectively cancels the departure message, and (iii) for the arrival message to propagate to member nodes in the system.
- a logical view of membership lists needs to be constructed from membership messages.
- membership lists are selected for incoming requests.
- the requests are distributed to the nodes through a predetermined method that uses the selected membership lists.
- the first two steps are described here, and the final step is described in the following section.
- One simple method includes: (i) sorting membership messages by their activation timestamps, (ii) for each activation timestamp that was previously unseen, creating a separate membership list, and (iii) applying membership messages in time-order, where arrival messages insert nodes into the membership list and departure messages remove the nodes from it.
- FIG. 4 illustrates one example view of membership lists.
- each membership list is active between its activation timestamp and the next membership list's.
- the membership list (408) is active between timestamps (404) and (414). This activation interval forms a half-open range; the first timestamp (404) is inclusive and the second timestamp (414) is exclusive.
- selecting membership lists for time series requests is simple.
- the activation timestamps and the corresponding membership views are inserted into a sorted map, with the activation timestamps forming the keys for the map. Then, for a time series request containing a timestamp, the greatest key that is less than the timestamp is found, and the corresponding membership view is selected.
- For a time series request that includes a time interval at first two timestamps are generated from the time interval. These timestamps denote the beginning and the end of the interval.
- the previously generated sorted map is looked up, and a portion of the sorted map is selected whose keys include: (i) the greatest key that is less than the beginning timestamp, and (ii) the set of keys ranging from the beginning timestamp, exclusive, to the end timestamp, inclusive.
- a request for the time interval [244 - 250] includes two membership views with activation timestamps (120 and 250).
- the described embodiment uses request timestamps and time intervals to select membership views; this approach does not require the underlying data to be migrated between nodes and greatly simplifies the mapping processing. However, if all nodes that contain copies of the data depart from the system, data may be lost.
- an extension of the described embodiment migrates data for each departed node.
- each member node groups its local data by activation timestamps and replica node identifiers, and stores the data in logical directories.
- node (407) may keep its local data in three logical directories; one of these logical directories may contain data for node (409) and activation timestamp (404).
- node (407) perceives that node (409) departed from the system. Node (407) then infers that its local copy of data, for node (409) and activation timestamp (404), is not replicated at the advised replication level (416). Therefore, node (407) activates additional protocols, and replicates its data to match the replication factor.
- data for the departed node may be migrated to one specific member node, or it may be distributed among the five other member nodes in the system.
- the specific node selection policy for migrating data is not important; however the policy needs to be deterministic throughout the lifetime of the distributed system. Keeping the migration policy deterministic allows for: (i) reconstructing the series of data migrations that occurred due to node departures, and consequently (ii) finding in one "hop" the current location of the data.
- both the described embodiment and its extension require every membership related message to be stored on the nodes. Usually, membership messages are only useful for some time interval, such as one or two years. After that interval passes by, these messages are no longer used for request mapping, and keeping them may require excessive storage resources. In such cases, the system purges old membership messages that are no longer used in determining membership lists.
- time series requests then need to be mapped, or partitioned, across nodes in these membership lists.
- requests are mapped by simply applying a partitioning function on a given key and membership list.
- the partitioning function determines the member nodes directly and uses known techniques such as hashing, consistent hashing, or balanced tree searches. If the replication factor is greater than one, the partitioning function is applied on the request key to determine the first member node to map. Then, the remaining nodes are determined by walking in order the previously sorted membership list, and by picking nodes that follow the first node in the list.
- FIG. 5a illustrates some example mappings from requests to member nodes.
- the membership view is selected based on requests' timestamps; in this example, membership view (412) from FIG. 4 is used.
- the partitioning function is applied on request keys, and lists of member nodes to map are determined.
- lists (502) as preference lists, and their sizes depend on the replication factor (416a).
- FIG. 5b displays similar mappings between requests and member nodes; the only difference is that sizes for preference lists now depend on an "attempt" factor (506). This factor anticipates that some system components might have temporarily failed, and signals to the partitioning method to keep contacting member nodes in order until the replication factor (416b) is met. This approach may require more requests to be sent in the system; however, it also allows for higher availability in the face of temporary node or network failures.
- member nodes act as exact replicas of each other, and form replication groups. The requests are then partitioned across these replication groups, and sent to all member nodes in one replication group.
- FIG. 6 illustrates an example process for forming replication groups from a membership list.
- the membership list is sorted in step 602; the sort order is established by comparing first data center names, and then node identifiers. Then, the membership list is "divided" into r sublists in step 604. Each of these sublists contains n/r nodes, where n represents the total number of nodes in the membership list and r represents the replication factor.
- a hash function is applied on the provided request key (616), and the computed hash value is then passed through a modulus operation to calculate an index between 1 and n/r.
- nodes that correspond to the calculated index are picked from each sublist, and are merged together to form a replication group.
- FIG. 5c displays some example replication groups; since the membership list was sorted first by data center name, each replication group contains member nodes from different data centers.
- One property of replication groups is that they form more strict mappings from requests to member nodes. This may result in lower data availability in the face of temporary failures, as there are only a fixed set of nodes to contact.
- replication groups have the major advantage that multiple member nodes carry exactly the same state. This becomes beneficial for processing requests that need to operate over the same set of data; computed responses from multiple nodes can then be compared with each other, and the most accurate response be chosen.
- Example: Shopping Cart Previously described methods may be used by numerous applications in different contexts.
- One such example is a shopping cart application for an e- commerce website, where requests are primarily used to store and retrieve data.
- node arrivals and departures are decided monthly by human operators, and changes are propagated in batches as configuration file deployments.
- Temporary node failures are handled through preference lists; in case one node is unavailable, the next node in the preference list is contacted.
- requests are mapped to member nodes by: (i) selecting membership lists based on request timestamps, and then (ii) applying a hash function on membership lists and request keys.
- FIG. 7a demonstrates an example where the customer adds a new item to her shopping cart.
- request key (701) and request timestamp (703) are used to determine the preference list.
- the first node (704) in the preference list is contacted, but this node is temporarily unavailable. Therefore, the next two nodes in the preference list are contacted; nodes (706) and (708) are available, and data are stored on them.
- FIG. 7b Similar steps are performed to store request (712) on nodes (714) and (716). Consequently, the contents of shopping cart are scattered across several member nodes. Later, when this customer (703) requests her shopping cart, data may need to be retrieved from at most four nodes. Once retrieved, data are then merged and deduplicated. Merging and deduplication are possible since each action of the customer is modeled as a separate request, and the timestamp for each request helps infer the request's time order and its uniqueness.
- Another specific example for applying the described methods is stock investors performing financial analysis on stock prices. This example is different from the previous one in that the requests are primarily used to process data.
- FIG. 3 illustrates an example method. As displayed in the figure, each node selects a predetermined number of nodes to monitor, and continues monitoring these nodes until the membership of nodes changes or an external notification is received (steps 302, 304). Then, the node sends ping messages to monitored nodes at regular intervals (step 306). The monitored nodes have " ' failure counters assigned to them; successful ping replies reset this counter, and failed ping replies increment them (steps 310, 312).
- the monitoring node Upon incrementing a failure counter, the monitoring node checks to see: (i) if the counter is above a predetermined failure threshold, and (ii) if counters for more than half of the monitored nodes are set to zero (steps 314, 316). If both conditions are satisfied, the node determines that the monitored node has failed, and generates and gossips a departure message for the failed node (step 318). Otherwise, the node continues monitoring.
- the intuition behind checking for failure counters of other nodes is to avoid excessive departure messages. If more than half of the failure counters have values higher than zero, this indicates problems associated with the network or the monitoring node itself. In either case, departure messages for other nodes should not be sent.
- Requests are then mapped to member nodes by selecting membership lists based on request timestamps, and then by applying a hash function to form replication groups.
- FIG. 8a demonstrates an example where price trends for stock item (803) are analyzed.
- Stock price updates are forwarded to a replication group consisting of two nodes (806 and 810). These nodes perform analysis on incoming prices, and are programmed to generate an alert if the stock price has been increasing for the past ten minutes, followed by a three minute interval where the price is constant.
- node (806) receives stock price update (802), observes the previously mentioned trend (804), and generates alert message (812). However, before sending back the alert, the node compares its message with the replica node's message (814). The replica's message is generated from more stock updates, and contradicts with message (812) about raising an alert. Consequently in FIG. 8b, the processing message that is based on more accurate information is selected, and sent back to the application.
- stock price updates contain explicit timestamps; this avoids potential issues resulting from clock skews between member nodes.
- replication groups are formed
- replication groups allow replicas to easily compare their state with each other, and generate responses using the most accurate and up to date information. Scope
Landscapes
- Engineering & Computer Science (AREA)
- Computer Networks & Wireless Communication (AREA)
- Signal Processing (AREA)
- Computer Security & Cryptography (AREA)
- Information Retrieval, Db Structures And Fs Structures Therefor (AREA)
Abstract
The present invention relates to mapping time series requests to member nodes in a distributed system. Membership of nodes is determined by arrival and departure messages, where the messages contain node identifiers and anticipated arrival or departure times for the nodes. Since messages contain timestamps for membership, they can be ordered and merged to reconstruct membership lists for any time point. This capability, when combined with requests that have a time dimension, allows for more powerful methods in request mapping. In one embodiment of the invention, mapping requests to member nodes may be as simple as: use timestamp in the request to determine the membership list for that time point, and choose the member nodes to map from the determined membership list. This embodiment provides a practical and deterministic method to map data, yet it does not rely on any central authority or require that any underlying data be migrated.
Description
MAPPING TIME SERIES REQUESTS TO MEMBER NODES
BACKGROUND OF THE INVENTION Distributed systems consist of multiple nodes that are connected by a network. These nodes contain processing and storage resources, and through connecting the nodes, distributed systems provide two main advantages: (1) higher and scalable performance achieved by using the aggregate of resources available to the nodes, and (2) higher availability achieved by serving one request from multiple independent nodes.
A key challenge involved in serving requests in distributed systems is mapping these requests to the appropriate nodes. This mapping needs to distribute the requests evenly among the nodes, and maintain a deterministic distribution even in the face of nodes arriving to and departing from the system. Accordingly, problems associated with request mapping usually involve establishing the set of active nodes in the distributed system. These active nodes are also referred to as member nodes or members. To determine the member nodes, distributed systems use one of several techniques: (1) an authoritative node monitors and decides upon the member nodes, (2) nodes follow a predetermined protocol to agree upon the set of active member nodes, or (3) each node keeps its local view on the system and communicates membership changes to other member nodes. Although widely employed, each approach has some drawbacks. The first approach suffers when the authoritative node fails or loses connectivity to the other nodes. The second approach requires that distributed consensus is established, which may not be possible in the presence of multiple node or network failures. The third approach overcomes previous shortcomings, and always maps the requests to a set of nodes. However, when the nodes have differing views about membership, deterministic mapping of requests requires additional protocols to be employed. These protocols increase overall complexity, and consequently reduce this approach's applicability.
At a higher level, all forementioned approaches focus on determining a membership list for the present time. As nodes arrive and depart, node updates are reflected to the membership list, effectively creating a new list. These updates for keeping the membership list current are necessary, since associating a request with a time point is infeasible. Therefore, all requests are treated as "current."
Accordingly, prior methods can be improved upon for requests that have a time dimension. This family of requests is also referred to as event data, time series data, or time series requests; the requests for storing data include timestamps, and the requests for retrieving or processing data include time intervals. Some examples that belong to this family are: price changes for a stock item, actions of a website user modifying her shopping cart, and sensor data for a military vehicle. Some examples that do not fit into the family of time series requests are: catalog information for a product, and the account balance of a bank's customer.
SUMMARY OF THE INVENTION
The present invention relates to mapping time series requests to member nodes in a distributed system. The membership of nodes is determined by arrival and departure messages, where the messages contain node identifiers and anticipated arrival or departure times for the nodes. Since messages contain timestamps for membership, they can be ordered and merged to reconstruct membership lists for any time point.
For the nodes to have a consistent view of the distributed system, once a membership list for one time point is used to map requests, that membership list does not change; in other words, the membership list becomes immutable. The property of immutability may be achieved by sending membership messages with arrival and departure times that are always in the future.
The capability to reconstruct the membership list for any time point, when combined with requests that have a time dimension, allows for more powerful methods in request mapping. For example, previous methods map data storage requests to the current member nodes in the system. As member nodes arrive or depart, the stored data is migrated to other nodes so that a deterministic mapping between the data and the nodes is maintained.
Comparatively, mapping data storage requests to member nodes, in one embodiment of the invention may be as simple as: use the timestamp in the request to determine the membership list for that time point, and choose the member nodes to map from that membership list. This embodiment provides a practical and deterministic method to map data, yet it does not rely on any central authority, or require that any data be migrated. BRIEF DESCRIPTION OF THE DRAWINGS
The invention description given below refers to the accompanying drawings, of which: FIG. 1 is a diagram displaying an outline of a system where nodes keep copies of membership related messages;
FIG. 2 is a diagram displaying example membership messages in more detail; FIG. 3 is a flow chart of operations used by autonomous member nodes for detecting failures of other nodes;
FIG. 4 is a diagram displaying an example set of membership views constructed from membership messages;
FIG. 5a-5c are diagrams displaying the list of member nodes to contact for given keys and for a selected membership view of FIG.4;
FIG. 6 is a flow chart of operations used for forming a replication group from a given membership list, replication factor, and request key;
FIG. 7a-7b are diagrams displaying example mappings from storage requests to member nodes for a shopping cart application; and
FIG. 8a-8b are diagrams displaying processing of an example request for a financial analysis application. DETAILED DESCRIPTION OF THE INVENTION
One of the primary challenges involved in designing a distributed system is maintaining a deterministic mapping between requests and member nodes. A particular subset of the requests, also called time series requests, is different in that they include either timestamps or time intervals. Since node arrivals and departures also occur over a time dimension, adding timestamps to membership messages allows the system to capture all information required to map requests to nodes, at any time point. The following sections describe the details of mapping time series requests to member nodes. Then, two concrete examples are given to demonstrate the applications of described methods, their advantages and potential shortcomings.
Membership Messages
FIG. 1 shows a plurality of nodes (102) connected through a network (106) to form a distributed system (100). The network connections (106) may include a combination of local area networks, wide area networks, intranets, extranets or the internet. The nodes (102) connected by the network may be Java- enabled devices, laptop or desktop computers, virtual machines, servers, mainframes, or a combination thereof. Each node keeps a copy of membership related messages (104) that it receives.
FIG. 2 displays five membership related messages that may appear in the history of received messages (104). Messages are identified with message types (212); in this example, the three message types are node arrival, node departure and replication factor. The messages also have an activation timestamp (214) that denotes the point in time the message becomes active. The activation timestamps are represented such that their definitions are exact across different time zones and daylight savings times. The remaining message fields differ depending on the message type. Two essential message types are node arrival and departure, and both messages include a node identifier (218) that uniquely represents one node in the system. This identifier may be a public, private or virtual IP address, a domain name, or a uniform resource locator. The time interval between a node arrival and departure message specifies the membership interval of the node. For example, the messages (204 and 205) indicate that the node (232) is a member of the distributed system for an interval of one month, between the two activation timestamps (230 and 234). As depicted in the drawing, node arrival messages may also include additional fields that may be used for more informed mappings between requests and nodes. These informational fields may also be generated and propagated independently of node arrival messages. One example field that provides additional node information is the network location (220); this field depicts the node's logical location in the network. In this example, the network location specifies the data center to which the node belongs. Using this information, data storage requests may be mapped such that data can be copied, or replicated, to nodes that live in different data centers. Another informational field included in arrival messages is processing resources (222) available to a node. Other fields may also be included, such as processor cache size, memory space, disk space, or network bandwidth. This additional information about node resources may be helpful when mapping requests across heterogeneous nodes in a distributed system. For
example, requests that primarily rely on processing power may be mapped such that the node with higher processing resources (236) receives two and a half times more requests than the node with lower processing resources (238).
The last message type displayed in FIG. 2 relates to a replication message (201). This message specifies the number of nodes each request should be forwarded to, which in this example is R=2. Other variations of the message (201) are also possible; for example, one such message may signal the system to replicate and forward requests to R nodes, and if a node is unavailable, to keep contacting up to N nodes. These fields and the values they contain are interpreted by the mapping function, and are discussed in more detail below. \ As illustrated in the drawing, membership related messages may be propagated individually or in batches. Propagating messages in batches is more common in certain embodiments where the messages may be generated by sources that are logically external to the system. Two example external sources are: (i) a human operator who periodically decides upon the set of nodes to add to or remove from the system, and (ii) a name service that monitors pre-registered nodes, and removes failed ones from the system.
In other embodiments, the membership messages may be generated by the nodes themselves. The nodes that are activated within the distributed system may announce their arrival to other member nodes through a predetermined protocol. Further, member nodes may monitor each other to detect failures and may send departure messages for failed nodes.
The mechanisms for detecting failures are several; some example methods include: (i) counting the number of connection failures to a node, and reporting failure if the failure counter exceeds a predetermined value, (ii) receiving heartbeat messages from other nodes, and reporting failure if a node fails to send heartbeat messages for a predetermined time, or (iii) sending ping messages to other nodes, and reporting failure if replies to ping messages are
not received for a predetermined number of times. For the last method, FIG. 3 explains in greater detail an example ping method used for detecting failed nodes, and for propagating departure messages for these nodes. The means through which arrival and departure messages are propagated closely relate to the sources that generate these messages. When membership messages are generated by sources external to the system, it is more common for these messages to be propagated through configuration files or through point to point messaging. Conversely, when membership messages are generated autonomously by the member nodes, these messages are usually propagated through one to many messaging protocols such as broadcast, multicast, or gossip.
Consistent Views of Membership
Propagation of membership messages in the distributed system may be delayed due to network latencies, network failures or temporary member node failures. Therefore, the membership messages kept at one member node may differ, at any given time, from the membership messages kept at another node. In other words, member nodes may have different views of the membership messages generated in the system.
When member nodes have conflicting views of the membership of other nodes in the system, deterministic mapping of requests to member nodes becomes a challenging and complicated task. To avoid this additional complexity, membership messages as displayed in FIG. 2 have their activation timestamps (214) set such that member nodes have a consistent view of the membership of other nodes in the system.
Consistent views may be achieved simply by setting activation timestamps sufficiently in the future. To decide how far in the future, previously or heuristically determined values may be used. As an example, assume that nodes in a distributed system recover from temporary failures in time r and
further assume that current time is T. Then, when a node detects that another member node has failed, it sends a departure message for the failed node with the activation timestamp set to T + 2r. This gives sufficient time to the failed node: (i) to recover from a temporary failure, (ii) to generate an arrival message that effectively cancels the departure message, and (iii) for the arrival message to propagate to member nodes in the system.
The only exception to setting timestamps in the future occurs during system initialization. In this exceptional case, no previous membership lists are established, and the member nodes start with the same view of the system; therefore membership messages are allowed to contain timestamps in the past.
The combination of these two methods, allowing activation timestamps in the past during system initialization and otherwise setting activation timestamps in the future, ensures that nodes have consistent views about membership. In other words, once a request with a timestamp or time interval arrives to the system, the membership list for the requested time has already been established and viewed by all the member nodes.
Membership Lists
For time series requests to be mapped, first a logical view of membership lists needs to be constructed from membership messages. Second, membership lists are selected for incoming requests. Finally, the requests are distributed to the nodes through a predetermined method that uses the selected membership lists. The first two steps are described here, and the final step is described in the following section. To construct a logical view of membership lists from membership messages, several methods may be applied. One simple method includes: (i) sorting membership messages by their activation timestamps, (ii) for each activation timestamp that was previously unseen, creating a separate membership list, and (iii) applying membership messages in time-order, where arrival
messages insert nodes into the membership list and departure messages remove the nodes from it. FIG. 4 illustrates one example view of membership lists. Once membership lists are constructed, the corresponding membership lists for time series requests need to be selected or determined. In one embodiment, each membership list is active between its activation timestamp and the next membership list's. For example in FIG. 4, the membership list (408) is active between timestamps (404) and (414). This activation interval forms a half-open range; the first timestamp (404) is inclusive and the second timestamp (414) is exclusive.
Based on this definition of activation intervals, selecting membership lists for time series requests is simple. The activation timestamps and the corresponding membership views are inserted into a sorted map, with the activation timestamps forming the keys for the map. Then, for a time series request containing a timestamp, the greatest key that is less than the timestamp is found, and the corresponding membership view is selected. For a time series request that includes a time interval, at first two timestamps are generated from the time interval. These timestamps denote the beginning and the end of the interval. Then, the previously generated sorted map is looked up, and a portion of the sorted map is selected whose keys include: (i) the greatest key that is less than the beginning timestamp, and (ii) the set of keys ranging from the beginning timestamp, exclusive, to the end timestamp, inclusive.
As an example, assume that there are two membership views with activation timestamps (120 and 250). In this setting, the first membership view represents the interval [120, 250[, and the second one represents [250, °°[. Accordingly, a request for the time interval [244 - 250] includes two membership views with activation timestamps (120 and 250).
The described embodiment uses request timestamps and time intervals to select membership views; this approach does not require the underlying data to be migrated between nodes and greatly simplifies the mapping processing. However, if all nodes that contain copies of the data depart from the system, data may be lost.
To avoid data loss, an extension of the described embodiment migrates data for each departed node. In this extension, each member node groups its local data by activation timestamps and replica node identifiers, and stores the data in logical directories. For example in FIG. 4, node (407) may keep its local data in three logical directories; one of these logical directories may contain data for node (409) and activation timestamp (404).
When current time reaches the next activation timestamp, and the next membership view (412) becomes active, node (407) perceives that node (409) departed from the system. Node (407) then infers that its local copy of data, for node (409) and activation timestamp (404), is not replicated at the advised replication level (416). Therefore, node (407) activates additional protocols, and replicates its data to match the replication factor.
In this example, data for the departed node may be migrated to one specific member node, or it may be distributed among the five other member nodes in the system. The specific node selection policy for migrating data is not important; however the policy needs to be deterministic throughout the lifetime of the distributed system. Keeping the migration policy deterministic allows for: (i) reconstructing the series of data migrations that occurred due to node departures, and consequently (ii) finding in one "hop" the current location of the data. Finally, both the described embodiment and its extension require every membership related message to be stored on the nodes. Usually, membership messages are only useful for some time interval, such as one or two years. After that interval passes by, these messages are no longer used for request mapping, and keeping them may require excessive storage
resources. In such cases, the system purges old membership messages that are no longer used in determining membership lists.
Mapping Requests to Member Nodes
Once membership lists are selected, time series requests then need to be mapped, or partitioned, across nodes in these membership lists. Several methods for partitioning requests among the member nodes are possible. In one embodiment, requests are mapped by simply applying a partitioning function on a given key and membership list. The partitioning function determines the member nodes directly and uses known techniques such as hashing, consistent hashing, or balanced tree searches. If the replication factor is greater than one, the partitioning function is applied on the request key to determine the first member node to map. Then, the remaining nodes are determined by walking in order the previously sorted membership list, and by picking nodes that follow the first node in the list.
FIG. 5a illustrates some example mappings from requests to member nodes. First, the membership view is selected based on requests' timestamps; in this example, membership view (412) from FIG. 4 is used. Then, the partitioning function is applied on request keys, and lists of member nodes to map are determined. We refer to these lists (502) as preference lists, and their sizes depend on the replication factor (416a).
FIG. 5b displays similar mappings between requests and member nodes; the only difference is that sizes for preference lists now depend on an "attempt" factor (506). This factor anticipates that some system components might have temporarily failed, and signals to the partitioning method to keep contacting member nodes in order until the replication factor (416b) is met. This approach may require more requests to be sent in the system; however, it also allows for higher availability in the face of temporary node or network failures.
In another embodiment, member nodes act as exact replicas of each other, and form replication groups. The requests are then partitioned across these replication groups, and sent to all member nodes in one replication group. FIG. 6 illustrates an example process for forming replication groups from a membership list. First, the membership list is sorted in step 602; the sort order is established by comparing first data center names, and then node identifiers. Then, the membership list is "divided" into r sublists in step 604. Each of these sublists contains n/r nodes, where n represents the total number of nodes in the membership list and r represents the replication factor. In step 606, a hash function is applied on the provided request key (616), and the computed hash value is then passed through a modulus operation to calculate an index between 1 and n/r. Finally in step 608, nodes that correspond to the calculated index are picked from each sublist, and are merged together to form a replication group.
FIG. 5c displays some example replication groups; since the membership list was sorted first by data center name, each replication group contains member nodes from different data centers. One property of replication groups is that they form more strict mappings from requests to member nodes. This may result in lower data availability in the face of temporary failures, as there are only a fixed set of nodes to contact. Still, replication groups have the major advantage that multiple member nodes carry exactly the same state. This becomes beneficial for processing requests that need to operate over the same set of data; computed responses from multiple nodes can then be compared with each other, and the most accurate response be chosen.
Example: Shopping Cart Previously described methods may be used by numerous applications in different contexts. One such example is a shopping cart application for an e- commerce website, where requests are primarily used to store and retrieve data.
In this example, node arrivals and departures are decided monthly by human operators, and changes are propagated in batches as configuration file deployments. Temporary node failures are handled through preference lists; in case one node is unavailable, the next node in the preference list is contacted. Finally, requests are mapped to member nodes by: (i) selecting membership lists based on request timestamps, and then (ii) applying a hash function on membership lists and request keys.
Generally, shopping cart applications are modeled such that the full contents of the cart are replicated and synchronized. A more flexible approach involves storing the actions of a website user modifying her shopping chart as individual events, with each distinct event containing an associated timestamp. FIG. 7a demonstrates an example where the customer adds a new item to her shopping cart. First, request key (701) and request timestamp (703) are used to determine the preference list. Then, the first node (704) in the preference list is contacted, but this node is temporarily unavailable. Therefore, the next two nodes in the preference list are contacted; nodes (706) and (708) are available, and data are stored on them.
In FIG. 7b, similar steps are performed to store request (712) on nodes (714) and (716). Consequently, the contents of shopping cart are scattered across several member nodes. Later, when this customer (703) requests her shopping cart, data may need to be retrieved from at most four nodes. Once retrieved, data are then merged and deduplicated. Merging and deduplication are possible since each action of the customer is modeled as a separate request, and the timestamp for each request helps infer the request's time order and its uniqueness.
One potential drawback to this approach is that, if all member nodes that contain copies of data fail permanently and depart from the system, data may be lost. For this example, the probability of such an occurrence is notably low: annual rates for permanent node failures are around 4%, and shopping cart
data are retained for short periods, such as three months. Consequently, the probability of losing contents for a shopping cart is [0.04 x (3/12)]R; when R=3 is set as the replication factor, the probability of data loss is one in one million. In summary, the described method provides a practical approach for storing and retrieving shopping cart contents. Member nodes keep consistent views of membership lists, and avoid single points of failure. Furthermore, actions related to modifying the shopping cart are modeled as separate requests, request timestamps and keys are used to determine a preference list, and temporary failures are handled by writing to the next node in the preference list. Finally, these steps have the desirable property that any member node may carry them out without depending on the state of other nodes in the system. This property greatly reduces overall complexity. Comparatively, similar distributed data stores need to use sophisticated techniques for providing consistent views of shopping cart contents. For example, one state of the art data store relies on consistent hashing to provide incremental scalability, vector clocks with data reconciliation to provide data consistency, sloppy quorum and hinted handoff to handle temporary failures, and anti-entropy using Merkle trees to recover from permanent failures. Each one of these methods introduces dependencies between member nodes in the system, and therefore notably increases complexity. Example: Financial Analysis
Another specific example for applying the described methods is stock investors performing financial analysis on stock prices. This example is different from the previous one in that the requests are primarily used to process data.
In this model, node arrivals are announced by the arriving nodes themselves, and messages are propagated through gossiping. For node departures, each member node actively monitors other nodes in the system to detect node
failures; FIG. 3 illustrates an example method. As displayed in the figure, each node selects a predetermined number of nodes to monitor, and continues monitoring these nodes until the membership of nodes changes or an external notification is received (steps 302, 304). Then, the node sends ping messages to monitored nodes at regular intervals (step 306). The monitored nodes have "' failure counters assigned to them; successful ping replies reset this counter, and failed ping replies increment them (steps 310, 312). Upon incrementing a failure counter, the monitoring node checks to see: (i) if the counter is above a predetermined failure threshold, and (ii) if counters for more than half of the monitored nodes are set to zero (steps 314, 316). If both conditions are satisfied, the node determines that the monitored node has failed, and generates and gossips a departure message for the failed node (step 318). Otherwise, the node continues monitoring. The intuition behind checking for failure counters of other nodes (step 316) is to avoid excessive departure messages. If more than half of the failure counters have values higher than zero, this indicates problems associated with the network or the monitoring node itself. In either case, departure messages for other nodes should not be sent.
Through autonomous arrival and departure messages sent by member nodes, consistent views for membership are established. Requests are then mapped to member nodes by selecting membership lists based on request timestamps, and then by applying a hash function to form replication groups.
FIG. 8a demonstrates an example where price trends for stock item (803) are analyzed. Stock price updates are forwarded to a replication group consisting of two nodes (806 and 810). These nodes perform analysis on incoming prices, and are programmed to generate an alert if the stock price has been increasing for the past ten minutes, followed by a three minute interval where the price is constant.
In this example, node (806) receives stock price update (802), observes the previously mentioned trend (804), and generates alert message (812).
However, before sending back the alert, the node compares its message with the replica node's message (814). The replica's message is generated from more stock updates, and contradicts with message (812) about raising an alert. Consequently in FIG. 8b, the processing message that is based on more accurate information is selected, and sent back to the application.
The described methodology provides certain advantages. First, stock price updates contain explicit timestamps; this avoids potential issues resulting from clock skews between member nodes. Second, replication groups are formed
k
such that replica nodes live in different data centers; this way, failures in one data center do not affect the distributed system's availability. Finally, replication groups allow replicas to easily compare their state with each other, and generate responses using the most accurate and up to date information. Scope
The invention is described above with reference to drawings. These drawings illustrate certain details of some embodiments, and should not be taken as limiting the scope of the invention. For example, those skilled in the art will recognize that some elements of the illustrated embodiments shown in software may be implemented in hardware or may be provided in the form of a software service. Therefore, the invention as described herein contemplates all such embodiments as may come within the scope of the following claims and equivalents thereof.
Claims
1. A method for distributing time series requests across nodes in a distributed system, the method comprising:
(a) generating and propagating arrival and departure messages for the nodes, wherein the arrival and departure times are set sufficiently in the future for the nodes to have a consistent view of membership; (b) keeping a list of the messages on the nodes so that membership of nodes for any time point is deducible;
(c) maintaining a mapping from the time series requests to the nodes, the mapping using the list of the messages and at least one of the following to determine the nodes to map: i. timestamps acquired from the requests, or
ii. time intervals acquired from the requests.
2. The method of claim 1 wherein (a) generating arrival and departure messages for the nodes is performed by sources logically external to the distributed system.
3. The method of claim 1 wherein (a) generating arrival and departure messages for the nodes is performed autonomously by member nodes.
4. The method of claim 1 further comprising generating and propagating additional information, wherein the information contains replication factors for the time series requests.
5. The method of claim 1 further comprising generating and propagating additional information, wherein the information describes resources available to the nodes.
6. The method of claim 1 further comprising generating and propagating additional information, wherein the information describes network locations of the nodes.
7. The method of claim 1 further comprising selecting the messages that are no longer required for distributing the time series requests and expiring them.
8. The method of claim 1 wherein (c) maintaining a mapping from the time series requests to the nodes includes selecting membership lists of the nodes at the given timestamp or the time interval.
9. The method of claim 8 further comprising migrating data upon node departures so that data are stored according to a replication factor.
10. The method of claim 1 wherein (c) maintaining a mapping from the time series requests to the nodes includes using a hash function.
11. The method of claim 1 wherein (c) maintaining a mapping from the time series requests to the nodes is performed in accordance with a preference list.
12. The method of claim 1 wherein (c) maintaining a mapping from the time series requests to the nodes is performed in accordance with a replication group.
13. A system for distributing time series requests across nodes in a distributed system, the system comprising:
(a) generating and propagating means for arrival and departure messages for the nodes, wherein the arrival and departure times are set sufficiently in the future for the nodes to have a consistent view of membership; (b) keeping means for a list of the messages on the nodes so that membership of nodes for any time point is deducible;
(c) mapping means from the time series requests to the nodes, the mapping means using the list of the messages and at least one of the following to determine the nodes to map: i. timestamps acquired from the requests, or
ii. time intervals acquired from the requests.
14. The system of claim 13 wherein (a) the generating means for arrival and departure messages for the nodes is performed by sources logically external to the distributed system.
15. The system of claim 13 wherein (a) the generating means for arrival and departure messages for the nodes is performed autonomously by member nodes.
16. The system of claim 13 further comprising generating and propagating means for additional information, wherein the information contains replication factors for the time series requests.
17. The system of claim 13 further comprising generating and propagating means for additional information, wherein the information describes resources available to the nodes.
18. The system of claim 13 further comprising generating and propagating means for additional information, wherein the information describes network locations of the nodes.
19. The system of claim 13 further comprising selecting and expiring means for the messages that are no longer required for distributing the time series requests.
20. The system of claim 13 wherein (c) the mapping means from the time series requests to the nodes includes selecting membership lists of the nodes at the given timestamp or the time interval.
21. The system of claim 20 further comprising migrating means for data upon node departures so that data are stored according to a replication factor.
22. The system of claim 13 wherein (c) the mapping means from the time series requests to the nodes includes using a hash function.
23. The system of claim 13 wherein (c) the mapping means from the time series requests to the nodes is performed in accordance with a preference list.
24. The system of claim 13 wherein (c) the mapping means from the time series requests to the nodes is performed in accordance with a replication group.
Priority Applications (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
PCT/TR2009/000085 WO2011005232A1 (en) | 2009-07-08 | 2009-07-08 | Mapping time series requests to member nodes |
Applications Claiming Priority (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
PCT/TR2009/000085 WO2011005232A1 (en) | 2009-07-08 | 2009-07-08 | Mapping time series requests to member nodes |
Publications (1)
Publication Number | Publication Date |
---|---|
WO2011005232A1 true WO2011005232A1 (en) | 2011-01-13 |
Family
ID=41716240
Family Applications (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
PCT/TR2009/000085 WO2011005232A1 (en) | 2009-07-08 | 2009-07-08 | Mapping time series requests to member nodes |
Country Status (1)
Country | Link |
---|---|
WO (1) | WO2011005232A1 (en) |
Citations (4)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
US20030140172A1 (en) * | 1998-05-26 | 2003-07-24 | Randy D. Woods | Distributed computing environment using real-time scheduling logic and time deterministic architecture |
US20030158936A1 (en) * | 2002-02-15 | 2003-08-21 | International Business Machines Corporation | Method for controlling group membership in a distributed multinode data processing system to assure mutually symmetric liveness status indications |
US20040010538A1 (en) * | 2002-07-11 | 2004-01-15 | International Business Machines Corporation | Apparatus and method for determining valid data during a merge in a computer cluster |
US20090037367A1 (en) * | 2007-07-30 | 2009-02-05 | Sybase, Inc. | System and Methodology Providing Workload Management in Database Cluster |
-
2009
- 2009-07-08 WO PCT/TR2009/000085 patent/WO2011005232A1/en active Application Filing
Patent Citations (4)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
US20030140172A1 (en) * | 1998-05-26 | 2003-07-24 | Randy D. Woods | Distributed computing environment using real-time scheduling logic and time deterministic architecture |
US20030158936A1 (en) * | 2002-02-15 | 2003-08-21 | International Business Machines Corporation | Method for controlling group membership in a distributed multinode data processing system to assure mutually symmetric liveness status indications |
US20040010538A1 (en) * | 2002-07-11 | 2004-01-15 | International Business Machines Corporation | Apparatus and method for determining valid data during a merge in a computer cluster |
US20090037367A1 (en) * | 2007-07-30 | 2009-02-05 | Sybase, Inc. | System and Methodology Providing Workload Management in Database Cluster |
Similar Documents
Publication | Publication Date | Title |
---|---|---|
US10942812B2 (en) | System and method for building a point-in-time snapshot of an eventually-consistent data store | |
US8949657B2 (en) | Methods and devices for detecting service failures and maintaining computing services using a resilient intelligent client computer | |
EP2575045B1 (en) | Distributed computing backup and recovery system | |
US7546486B2 (en) | Scalable distributed object management in a distributed fixed content storage system | |
US9436694B2 (en) | Cooperative resource management | |
US20180096045A1 (en) | Large distributed database clustering systems and methods | |
US8862644B2 (en) | Data distribution system | |
US10372504B2 (en) | Global usage tracking and quota enforcement in a distributed computing system | |
US9367261B2 (en) | Computer system, data management method and data management program | |
US20140136571A1 (en) | System and Method for Optimizing Data Storage in a Distributed Data Storage Environment | |
US20140108532A1 (en) | System and method for supporting guaranteed multi-point delivery in a distributed data grid | |
EP2317450A1 (en) | Method and apparatus for distributed data management in a switching network | |
WO2020063763A1 (en) | Data storage method, apparatus and system, and server, control node and medium | |
US20110225121A1 (en) | System for maintaining a distributed database using constraints | |
US9081839B2 (en) | Push replication for use with a distributed data grid | |
US8924513B2 (en) | Storage system | |
US11341009B1 (en) | Directing placement of data in cloud storage nodes | |
US8417679B1 (en) | Fast storage writes | |
US11531595B2 (en) | Non-blocking secondary reads | |
JP2022503583A (en) | Non-destructive upgrade methods, equipment and systems for distributed tuning engines in a distributed computing environment | |
WO2011005232A1 (en) | Mapping time series requests to member nodes | |
Beineke et al. | Fast parallel recovery of many small in-memory objects | |
Emerson et al. | An atomic-multicast service for scalable in-memory transaction systems | |
Ramaswamy | Coordination Protocols for Verifiable Consistency in Distributed Storage Systems | |
JP5845298B2 (en) | Nodes and programs |
Legal Events
Date | Code | Title | Description |
---|---|---|---|
121 | Ep: the epo has been informed by wipo that ep was designated in this application |
Ref document number: 09788672 Country of ref document: EP Kind code of ref document: A1 |
|
NENP | Non-entry into the national phase |
Ref country code: DE |
|
122 | Ep: pct application non-entry in european phase |
Ref document number: 09788672 Country of ref document: EP Kind code of ref document: A1 |